【历史文档】高阶技巧-获取市场涨跌停统计数据,并入库,提交定时任务
由qxiao创建,最终由small_q 被浏览 163 用户
更新
本文内容对应旧版平台与旧版资源,其内容不再适合最新版平台,请查看新版平台的使用说明
新版量化开发IDE(AIStudio):
https://bigquant.com/wiki/doc/aistudio-aiide-NzAjgKapzW
新版模版策略:
https://bigquant.com/wiki/doc/demos-ecdRvuM1TU
新版数据平台:
https://bigquant.com/data/home
https://bigquant.com/wiki/doc/dai-PLSbc1SbZX
新版表达式算子:
https://bigquant.com/wiki/doc/dai-sql-Rceb2JQBdS
新版因子平台:
https://bigquant.com/wiki/doc/bigalpha-EOVmVtJMS5
\
简介
bigqunat强大的人工智能分析平台,提供了多元化的数据,方便的数据回测等,今天介绍bigqunat获取市场涨跌停统计数据,并入库,提交定时任务。分析链接
https://bigquant.com/codeshare/c8597026-33a1-46ed-b35b-034a79f38514
第一步。点击编写策略
点击新建立空白代码策略,我们命名为市场统计
命名为市场统计按回车就可用
提供的源代码仅仅用于交流学习,案例的介绍,不用于获取数据
导入需要库dai平台开发的数据读取工具
import pandas as pd
import requests
import dai
from bigdatasource import DataSource
我们需要安日期处理交易日获取最近的一个交易日期,读取数据交易市场选择CN
df=DataSource('trading_days').read()
print(df)
获取最近交易日的函数
def get_last_trader_date():
'''
获取最近的交易时间
'''
df=DataSource('trading_days').read()
print(df)
df=df[df['country_code']=='CN']
trader_date=''.join(str(df['date'].tolist()[-1]).split('-'))[:8]
return trader_date
获取市场涨停数据函数
def stock_zt_pool_em(date: str = "20230621") -> pd.DataFrame:
"""
:return: 涨停股池
:rtype: pandas.DataFrame
"""
url = "http://push2ex.eastmoney.com/getTopicZTPool"
params = {
"ut": "7eea3edcaed734bea9cbfc24409ed989",
"dpt": "wz.ztzt",
"Pageindex": "0",
"pagesize": "10000",
"sort": "fbt:asc",
"date": date,
"_": "1621590489736",
}
r = requests.get(url, params=params)
data_json = r.json()
if data_json["data"] is None:
return pd.DataFrame()
temp_df = pd.DataFrame(data_json["data"]["pool"])
temp_df.reset_index(inplace=True)
temp_df["index"] = range(1, len(temp_df) + 1)
temp_df.columns = [
"序号",
"代码",
"_",
"名称",
"最新价",
"涨跌幅",
"成交额",
"流通市值",
"总市值",
"换手率",
"连板数",
"首次封板时间",
"最后封板时间",
"封板资金",
"炸板次数",
"所属行业",
"涨停统计",
]
temp_df["涨停统计"] = (
temp_df["涨停统计"].apply(lambda x: dict(x)["days"]).astype(str)
+ "/"
+ temp_df["涨停统计"].apply(lambda x: dict(x)["ct"]).astype(str)
)
temp_df = temp_df[
[
"序号",
"代码",
"名称",
"涨跌幅",
"最新价",
"成交额",
"流通市值",
"总市值",
"换手率",
"封板资金",
"首次封板时间",
"最后封板时间",
"炸板次数",
"涨停统计",
"连板数",
"所属行业",
]
]
temp_df["首次封板时间"] = temp_df["首次封板时间"].astype(str).str.zfill(6)
temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)
temp_df["最新价"] = temp_df["最新价"] / 1000
temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
temp_df["封板资金"] = pd.to_numeric(temp_df["封板资金"])
temp_df["炸板次数"] = pd.to_numeric(temp_df["炸板次数"])
temp_df["连板数"] = pd.to_numeric(temp_df["连板数"])
temp_df['date']=date
程序运行的效果
获取市场跌停数据的函数
#跌停数据
def stock_zt_pool_dtgc_em(date: str = "20220425") -> pd.DataFrame:
"""
:param date: 交易日
:type date: str
:return: 跌停股池
:rtype: pandas.DataFrame
"""
url = "http://push2ex.eastmoney.com/getTopicDTPool"
params = {
"ut": "7eea3edcaed734bea9cbfc24409ed989",
"dpt": "wz.ztzt",
"Pageindex": "0",
"pagesize": "10000",
"sort": "fund:asc",
"date": date,
"_": "1621590489736",
}
r = requests.get(url, params=params)
data_json = r.json()
if data_json["data"] is None:
return pd.DataFrame()
temp_df = pd.DataFrame(data_json["data"]["pool"])
temp_df.reset_index(inplace=True)
temp_df["index"] = range(1, len(temp_df) + 1)
temp_df.columns = [
"序号",
"代码",
"_",
"名称",
"最新价",
"涨跌幅",
"成交额",
"流通市值",
"总市值",
"动态市盈率",
"换手率",
"封单资金",
"最后封板时间",
"板上成交额",
"连续跌停",
"开板次数",
"所属行业",
]
temp_df = temp_df[
[
"序号",
"代码",
"名称",
"涨跌幅",
"最新价",
"成交额",
"流通市值",
"总市值",
"动态市盈率",
"换手率",
"封单资金",
"最后封板时间",
"板上成交额",
"连续跌停",
"开板次数",
"所属行业",
]
]
temp_df["最新价"] = temp_df["最新价"] / 1000
temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)
temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
temp_df["动态市盈率"] = pd.to_numeric(temp_df["动态市盈率"])
temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
temp_df["封单资金"] = pd.to_numeric(temp_df["封单资金"])
temp_df["板上成交额"] = pd.to_numeric(temp_df["板上成交额"])
temp_df["连续跌停"] = pd.to_numeric(temp_df["连续跌停"])
temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
temp_df['date']=date
return temp_df
运行效果
市场分析统计函数
def count_analysis_zt_dt_data(date='20230621'):
'''
市场涨停跌停统计可能和同花顺不一样
date要求是交易日
'''
#涨停
df=stock_zt_pool_em(date=date)
#跌停数据
df1=stock_zt_pool_dtgc_em(date=date)
zt_amount=df.shape[0]
dt_amount=df1.shape[0]
text='交易日{} 涨停数量{} 跌停数量{}'.format(date,zt_amount,dt_amount)
return df,df1,zt_amount,dt_amount,text
运行的结果
主程序入口
if __name__=='__main__':
last_trader_date=get_last_trader_date()
print('交易日 {}'.format(last_trader_date))
df,df1,zt_zmount,dt_amount,text=count_analysis_zt_dt_data(date=last_trader_date)
print(df,df1,zt_zmount,dt_amount,text)
#数据入库
#市场涨停 Limit up of market
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="limit_up_of_market",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "代码"],
indexes=["date"],
)
#跌停数据入库
#市场跌停Market limit
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="market_limit",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "代码"],
indexes=["date"],
)
数据写入个人的数据,利用dai输入写入,id是数据库名称,unique_together用来去掉重复的数据,indexs数据的索引
#市场跌停Market limit
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="market_limit",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "代码"],
indexes=["date"],
)
自己写入的数据可用进行查看,也可用读取,利润dai数据自己的数据库
select * form 数据名称
import dai
df=dai.query('select * from limit_up_of_market').df()
print(df)
运行结果
点击数据beta平台可用看到自己的数据
提交定时任务
import pandas as pd
import requests
import dai
from bigdatasource import DataSource
def get_last_trader_date():
'''
获取最近的交易时间
'''
df=DataSource('trading_days').read()
df=df[df['country_code']=='CN']
trader_date=''.join(str(df['date'].tolist()[-1]).split('-'))[:8]
return trader_date
def stock_zt_pool_em(date: str = "20230621") -> pd.DataFrame:
"""
:return: 涨停股池
:rtype: pandas.DataFrame
"""
url = "http://push2ex.eastmoney.com/getTopicZTPool"
params = {
"ut": "7eea3edcaed734bea9cbfc24409ed989",
"dpt": "wz.ztzt",
"Pageindex": "0",
"pagesize": "10000",
"sort": "fbt:asc",
"date": date,
"_": "1621590489736",
}
r = requests.get(url, params=params)
data_json = r.json()
if data_json["data"] is None:
return pd.DataFrame()
temp_df = pd.DataFrame(data_json["data"]["pool"])
temp_df.reset_index(inplace=True)
temp_df["index"] = range(1, len(temp_df) + 1)
temp_df.columns = [
"序号",
"代码",
"_",
"名称",
"最新价",
"涨跌幅",
"成交额",
"流通市值",
"总市值",
"换手率",
"连板数",
"首次封板时间",
"最后封板时间",
"封板资金",
"炸板次数",
"所属行业",
"涨停统计",
]
temp_df["涨停统计"] = (
temp_df["涨停统计"].apply(lambda x: dict(x)["days"]).astype(str)
+ "/"
+ temp_df["涨停统计"].apply(lambda x: dict(x)["ct"]).astype(str)
)
temp_df = temp_df[
[
"序号",
"代码",
"名称",
"涨跌幅",
"最新价",
"成交额",
"流通市值",
"总市值",
"换手率",
"封板资金",
"首次封板时间",
"最后封板时间",
"炸板次数",
"涨停统计",
"连板数",
"所属行业",
]
]
temp_df["首次封板时间"] = temp_df["首次封板时间"].astype(str).str.zfill(6)
temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)
temp_df["最新价"] = temp_df["最新价"] / 1000
temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
temp_df["封板资金"] = pd.to_numeric(temp_df["封板资金"])
temp_df["炸板次数"] = pd.to_numeric(temp_df["炸板次数"])
temp_df["连板数"] = pd.to_numeric(temp_df["连板数"])
temp_df['date']=date
return temp_df
#跌停数据
def stock_zt_pool_dtgc_em(date: str = "20220425") -> pd.DataFrame:
"""
:param date: 交易日
:type date: str
:return: 跌停股池
:rtype: pandas.DataFrame
"""
url = "http://push2ex.eastmoney.com/getTopicDTPool"
params = {
"ut": "7eea3edcaed734bea9cbfc24409ed989",
"dpt": "wz.ztzt",
"Pageindex": "0",
"pagesize": "10000",
"sort": "fund:asc",
"date": date,
"_": "1621590489736",
}
r = requests.get(url, params=params)
data_json = r.json()
if data_json["data"] is None:
return pd.DataFrame()
temp_df = pd.DataFrame(data_json["data"]["pool"])
temp_df.reset_index(inplace=True)
temp_df["index"] = range(1, len(temp_df) + 1)
temp_df.columns = [
"序号",
"代码",
"_",
"名称",
"最新价",
"涨跌幅",
"成交额",
"流通市值",
"总市值",
"动态市盈率",
"换手率",
"封单资金",
"最后封板时间",
"板上成交额",
"连续跌停",
"开板次数",
"所属行业",
]
temp_df = temp_df[
[
"序号",
"代码",
"名称",
"涨跌幅",
"最新价",
"成交额",
"流通市值",
"总市值",
"动态市盈率",
"换手率",
"封单资金",
"最后封板时间",
"板上成交额",
"连续跌停",
"开板次数",
"所属行业",
]
]
temp_df["最新价"] = temp_df["最新价"] / 1000
temp_df["最后封板时间"] = temp_df["最后封板时间"].astype(str).str.zfill(6)
temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"])
temp_df["最新价"] = pd.to_numeric(temp_df["最新价"])
temp_df["成交额"] = pd.to_numeric(temp_df["成交额"])
temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"])
temp_df["总市值"] = pd.to_numeric(temp_df["总市值"])
temp_df["动态市盈率"] = pd.to_numeric(temp_df["动态市盈率"])
temp_df["换手率"] = pd.to_numeric(temp_df["换手率"])
temp_df["封单资金"] = pd.to_numeric(temp_df["封单资金"])
temp_df["板上成交额"] = pd.to_numeric(temp_df["板上成交额"])
temp_df["连续跌停"] = pd.to_numeric(temp_df["连续跌停"])
temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
temp_df["开板次数"] = pd.to_numeric(temp_df["开板次数"])
temp_df['date']=date
return temp_df
def count_analysis_zt_dt_data(date='20230621'):
'''
市场涨停跌停统计可能和同花顺不一样
date要求是交易日
'''
#涨停
df=stock_zt_pool_em(date=date)
#跌停数据
df1=stock_zt_pool_dtgc_em(date=date)
zt_amount=df.shape[0]
dt_amount=df1.shape[0]
text='交易日{} 涨停数量{} 跌停数量{}'.format(date,zt_amount,dt_amount)
return df,df1,zt_amount,dt_amount,text
if __name__=='__main__':
last_trader_date=get_last_trader_date()
print('交易日 {}'.format(last_trader_date))
df,df1,zt_zmount,dt_amount,text=count_analysis_zt_dt_data(date=last_trader_date)
print(df,df1,zt_zmount,dt_amount,text)
#数据入库
#市场涨停 Limit up of market
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="limit_up_of_market",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "代码"],
indexes=["date"],
)
#跌停数据入库
#市场跌停Market limit
dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id="market_limit",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "代码"],
indexes=["date"],
)
\