【历史文档】高阶技巧-获取市场涨跌停统计数据,并入库,提交定时任务
由bq5bun29创建,最终由bq5bun29 被浏览 34 用户
简介
bigqunat强大的人工智能分析平台,提供了多元化的数据,方便的数据回测等,今天介绍bigqunat获取市场涨跌停统计数据,并入库,提交定时任务。
编写策略
1.点击编写策略
2.点击新建立空白代码策略,我们命名为市场统计(点击回车键即可创建) \n
注意⚠️:提供的源代码仅仅用于交流学习,案例的介绍,不用于获取数据
3.导入需要库dai平台开发的数据读取工具
import pandas as pd
import requests
import dai
4.我们需要按照日期处理交易日获取最近的一个交易日期,读取数据交易市场选择CN
在这里我们使用了dai.DataSource的方式来读取信息,其实也可以运用SQL函数来进行读取数据。
df = dai.DataSource('trading_days').read()
df = df[df['market_code'] == 'CN']
5.获取最近交易日的函数
def get_last_trader_date():
'''
获取最近的交易时间
'''
df = dai.DataSource('trading_days').read()
df = df[df['market_code'] == 'CN']
'''
获取筛选后的data frame
'''
print(df)
latest_trading_date = df['date'].tolist()[-1]
trader_date_str = ''.join(str(latest_trading_date).split('-'))[:8]
return trader_date_str
6.获取市场涨停数据函数
def stock_zt_pool_em(date: str = "20230621") -> pd.DataFrame:
"""
:return: 涨停股池
:rtype: pandas.DataFrame
"""
url = "http://push2ex.eastmoney.com/getTopicZTPool"
params = {
"ut": "7eea3edcaed734bea9cbfc24409ed989",
"dpt": "wz.ztzt",
"Pageindex": "0",
"pagesize": "10000",
"sort": "fbt:asc",
"date": date,
"_": "1621590489736",
}
r = requests.get(url, params=params)
data_json = r.json()
if data_json["data"] is None:
return pd.DataFrame()
temp_df = pd.DataFrame(data_json["data"]["pool"])
temp_df.reset_index(inplace=True)
temp_df["index"] = range(1, len(temp_df) + 1)
temp_df.columns = [
"序号",
"代码",
"_",
"名称",
"最新价",
"涨跌幅",
"成交额",
"流通市值",
"总市值",
"换手率",
"连板数",
"首次封板时间",
"最后封板时间",
"封板资金",
"炸板次数",
"所属行业",
"涨停统计",
]
temp_df["涨停统计"] = (
temp_df["涨停统计"].apply(lambda x: dict(x)["days"]).astype(str)
+ "/"
+ temp_df["涨停统计"].apply(lambda x: dict(x)["ct"]).astype(str)
)
temp_df = temp_df[
[
"序号",
"代码",
"名称",
"涨跌幅",
"最新价",
"成交额",
"流通市值",
"总市值",
"换手率",
"封板资金",
"首次封板时间",
"最后封板时间",
"炸板次数",
"涨停统计",
"连板数",
"所属行业",
]
]
temp_df["首次封板时间"] = temp_df["首次封板时间"].astype(str).str.zfill(6)
temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)
temp_df["最新价"] = temp_df["最新价"] / 1000
temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
temp_df["封板资金"] = pd.to_numeric(temp_df["封板资金"])
temp_df["炸板次数"] = pd.to_numeric(temp_df["炸板次数"])
temp_df["连板数"] = pd.to_numeric(temp_df["连板数"])
temp_df['date']=date
7.获取市场跌停数据的函数
#跌停数据
def stock_zt_pool_dtgc_em(date: str = "20220425") -> pd.DataFrame:
"""
:param date: 交易日
:type date: str
:return: 跌停股池
:rtype: pandas.DataFrame
"""
url = "http://push2ex.eastmoney.com/getTopicDTPool"
params = {
"ut": "7eea3edcaed734bea9cbfc24409ed989",
"dpt": "wz.ztzt",
"Pageindex": "0",
"pagesize": "10000",
"sort": "fund:asc",
"date": date,
"_": "1621590489736",
}
r = requests.get(url, params=params)
data_json = r.json()
if data_json["data"] is None:
return pd.DataFrame()
temp_df = pd.DataFrame(data_json["data"]["pool"])
temp_df.reset_index(inplace=True)
temp_df["index"] = range(1, len(temp_df) + 1)
temp_df.columns = [
"序号",
"代码",
"_",
"名称",
"最新价",
"涨跌幅",
"成交额",
"流通市值",
"总市值",
"动态市盈率",
"换手率",
"封单资金",
"最后封板时间",
"板上成交额",
"连续跌停",
"开板次数",
"所属行业",
]
temp_df = temp_df[
[
"序号",
"代码",
"名称",
"涨跌幅",
"最新价",
"成交额",
"流通市值",
"总市值",
"动态市盈率",
"换手率",
"封单资金",
"最后封板时间",
"板上成交额",
"连续跌停",
"开板次数",
"所属行业",
]
]
temp_df["最新价"] = temp_df["最新价"] / 1000
temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)
temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
temp_df["动态市盈率"] = pd.to_numeric(temp_df["动态市盈率"])
temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
temp_df["封单资金"] = pd.to_numeric(temp_df["封单资金"])
temp_df["板上成交额"] = pd.to_numeric(temp_df["板上成交额"])
temp_df["连续跌停"] = pd.to_numeric(temp_df["连续跌停"])
temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
temp_df['date']=date
return temp_df
8.市场分析统计函数
def count_analysis_zt_dt_data(date='20230621'):
'''
市场涨停跌停统计可能和同花顺不一样
date要求是交易日
'''
#涨停
df=stock_zt_pool_em(date=date)
#跌停数据
df1=stock_zt_pool_dtgc_em(date=date)
zt_amount=df.shape[0]
dt_amount=df1.shape[0]
text='交易日{} 涨停数量{} 跌停数量{}'.format(date,zt_amount,dt_amount)
return df,df1,zt_amount,dt_amount,text
9.主程序入口
if __name__=='__main__':
last_trader_date=get_last_trader_date()
print('交易日 {}'.format(last_trader_date))
df,df1,zt_zmount,dt_amount,text=count_analysis_zt_dt_data(date=last_trader_date)
print(df,df1,zt_zmount,dt_amount,text)
#数据入库
#市场涨停 Limit up of market
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="limit_up_of_market001",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "代码"],
indexes=["date"],
)
#跌停数据入库
#市场跌停Market limit
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="market_limit001",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "代码"],
indexes=["date"],
)
10.数据写入个人的数据,利用dai输入写入,id是数据库名称,unique_together用来去掉重复的数据,indexs数据的索引。
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="market_limit001",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "代码"],
indexes=["date"],
运行结果
点击数据平台可看到自己的数据
提交定时任务
在任务类型中选择数据任务或者每日任务并且根据需要选择触发事件e.g.如图所示23:00
在高级设置中指定触发事件即可定时完成任务
https://bigquant.com/codesharev2/ead198e2-1995-443a-b3fa-b22ae31b975b
\