明华的作业
由bq5u2rhb创建,最终由bq5u2rhb 被浏览 4 用户
import pandas as pd
import os
import glob
from datetime import datetime, timedelta
def calculate_15day_change_and_turn(stock_data_dir):
"""计算每只股票15日涨跌幅和20日均换手率"""
stock_changes = []
for csv_file in glob.glob(os.path.join(stock_data_dir, "*.csv")):
try:
df = pd.read_csv(csv_file, encoding='gbk')
if len(df) < 20:
continue
df = df.sort_values('交易日期')
latest_close = df.iloc[-1]['收盘价']
past15_close = df.iloc[-15]['收盘价']
change_15 = (latest_close - past15_close) / past15_close * 100
# 检查是否存在 '换手率' 或 '换手%' 列
if '换手率' in df.columns:
turnover_col = '换手率'
elif '换手%' in df.columns:
turnover_col = '换手%'
else:
print(f"文件 {csv_file} 缺少 '换手率' 或 '换手%' 列,跳过计算")
continue
avg_turn_20 = df.iloc[-20:][turnover_col].mean()
stock_code = os.path.basename(csv_file).split('.')[0]
stock_changes.append({'完整代码': stock_code, '15日涨跌幅': change_15, '20日均换手': avg_turn_20})
except Exception as e:
print(f"处理文件 {csv_file} 出错: {e}")
return pd.DataFrame(stock_changes)
def filter_and_select_stocks(stock_data_dir, all_stock_t_path, top_pct=0.5, top_turn=20):
print("计算15日涨跌幅和20日均换手...")
stock_changes = calculate_15day_change_and_turn(stock_data_dir)
if stock_changes.empty:
print("无数据,跳过")
return pd.DataFrame() # 返回空 DataFrame
stock_changes = stock_changes.sort_values('15日涨跌幅', ascending=True)
top_50pct_count = int(len(stock_changes) * top_pct)
top_50 = stock_changes.head(top_50pct_count)
try:
df_all = pd.read_csv(all_stock_t_path, encoding='utf-8', dtype={'完整代码': str})
except UnicodeDecodeError:
df_all = pd.read_csv(all_stock_t_path, encoding='gbk', dtype={'完整代码': str})
df_all['最新价'] = pd.to_numeric(df_all['最新价'], errors='coerce')
df_all['总市值'] = pd.to_numeric(df_all['总市值'], errors='coerce')
merged = pd.merge(top_50, df_all, on='完整代码', how='inner')
if merged.empty:
print("合并无数据,请检查代码格式")
return pd.DataFrame() # 返回空 DataFrame
# 剔除科创、北证的新股及ST
merged = merged[~merged['完整代码'].str[2:].str.startswith(('8', '43'))]
merged = merged[merged['最新价'] >= 3]
merged = merged[~merged['股票名称'].str.contains(r'ST|\*ST|退', case=False, regex=True)]
merged = merged.dropna(subset=['总市值'])
merged = merged[merged['总市值'] > 0]
# 再根据20日均换手率降序排,取前20
merged = merged.sort_values(['20日均换手', '总市值'], ascending=[False, True]).head(top_turn)
# 输出
output_path = os.path.join(os.path.dirname(all_stock_t_path), 'filtered_stocks.csv')
cols = ['完整代码', '股票名称', '最新价', '涨跌幅', '15日涨跌幅', '20日均换手', '总市值', '流通市值', '市盈率', '市净率']
for c in cols:
if c not in merged.columns:
merged[c] = ""
merged[cols].to_csv(output_path, index=False, encoding='utf_8_sig')
print(f"结果保存至:{output_path}")
return merged
if __name__ == "__main__":
# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 可配置的参数
stock_data_dir = os.path.join(current_dir, 'stocks_data') # 数据文件夹
all_stock_t_path = os.path.join(current_dir, 'all_stock_t.csv') # 股票列表文件
top_pct = 0.5 # 15日涨跌幅最小的前 top_pct
top_turn = 20 # 换手率最大的前 top_turn 只股票
# 执行选股
filtered = filter_and_select_stocks(stock_data_dir, all_stock_t_path, top_pct, top_turn)
if not filtered.empty:
print(filtered)
else:
print("没有筛选到符合条件的股票")
\