PLUS会员

明华的作业

由bq5u2rhb创建,最终由bq5u2rhb 被浏览 4 用户

import pandas as pd
import os
import glob
from datetime import datetime, timedelta

def calculate_15day_change_and_turn(stock_data_dir):
    """计算每只股票15日涨跌幅和20日均换手率"""
    stock_changes = []
    for csv_file in glob.glob(os.path.join(stock_data_dir, "*.csv")):
        try:
            df = pd.read_csv(csv_file, encoding='gbk')
            if len(df) < 20:
                continue
            df = df.sort_values('交易日期')
            latest_close = df.iloc[-1]['收盘价']
            past15_close = df.iloc[-15]['收盘价']
            change_15 = (latest_close - past15_close) / past15_close * 100

            # 检查是否存在 '换手率' 或 '换手%' 列
            if '换手率' in df.columns:
                turnover_col = '换手率'
            elif '换手%' in df.columns:
                turnover_col = '换手%'
            else:
                print(f"文件 {csv_file} 缺少 '换手率' 或 '换手%' 列,跳过计算")
                continue

            avg_turn_20 = df.iloc[-20:][turnover_col].mean()

            stock_code = os.path.basename(csv_file).split('.')[0]
            stock_changes.append({'完整代码': stock_code, '15日涨跌幅': change_15, '20日均换手': avg_turn_20})
        except Exception as e:
            print(f"处理文件 {csv_file} 出错: {e}")
    return pd.DataFrame(stock_changes)

def filter_and_select_stocks(stock_data_dir, all_stock_t_path, top_pct=0.5, top_turn=20):
    print("计算15日涨跌幅和20日均换手...")
    stock_changes = calculate_15day_change_and_turn(stock_data_dir)
    if stock_changes.empty:
        print("无数据,跳过")
        return pd.DataFrame()  # 返回空 DataFrame

    stock_changes = stock_changes.sort_values('15日涨跌幅', ascending=True)
    top_50pct_count = int(len(stock_changes) * top_pct)
    top_50 = stock_changes.head(top_50pct_count)

    try:
        df_all = pd.read_csv(all_stock_t_path, encoding='utf-8', dtype={'完整代码': str})
    except UnicodeDecodeError:
        df_all = pd.read_csv(all_stock_t_path, encoding='gbk', dtype={'完整代码': str})

    df_all['最新价'] = pd.to_numeric(df_all['最新价'], errors='coerce')
    df_all['总市值'] = pd.to_numeric(df_all['总市值'], errors='coerce')

    merged = pd.merge(top_50, df_all, on='完整代码', how='inner')

    if merged.empty:
        print("合并无数据,请检查代码格式")
        return pd.DataFrame()  # 返回空 DataFrame

    # 剔除科创、北证的新股及ST
    merged = merged[~merged['完整代码'].str[2:].str.startswith(('8', '43'))]
    merged = merged[merged['最新价'] >= 3]
    merged = merged[~merged['股票名称'].str.contains(r'ST|\*ST|退', case=False, regex=True)]
    merged = merged.dropna(subset=['总市值'])
    merged = merged[merged['总市值'] > 0]

    # 再根据20日均换手率降序排,取前20
    merged = merged.sort_values(['20日均换手', '总市值'], ascending=[False, True]).head(top_turn)

    # 输出
    output_path = os.path.join(os.path.dirname(all_stock_t_path), 'filtered_stocks.csv')
    cols = ['完整代码', '股票名称', '最新价', '涨跌幅', '15日涨跌幅', '20日均换手', '总市值', '流通市值', '市盈率', '市净率']
    for c in cols:
        if c not in merged.columns:
            merged[c] = ""
    merged[cols].to_csv(output_path, index=False, encoding='utf_8_sig')
    print(f"结果保存至:{output_path}")
    return merged

if __name__ == "__main__":
    # 获取当前脚本所在目录
    current_dir = os.path.dirname(os.path.abspath(__file__))

    # 可配置的参数
    stock_data_dir = os.path.join(current_dir, 'stocks_data')  # 数据文件夹
    all_stock_t_path = os.path.join(current_dir, 'all_stock_t.csv')  # 股票列表文件
    top_pct = 0.5  # 15日涨跌幅最小的前 top_pct
    top_turn = 20  # 换手率最大的前 top_turn 只股票

    # 执行选股
    filtered = filter_and_select_stocks(stock_data_dir, all_stock_t_path, top_pct, top_turn)
    if not filtered.empty:
        print(filtered)
    else:
        print("没有筛选到符合条件的股票")

\

标签

数据处理
评论
  • C:\ProgramData\anaconda3\python.exe D:\自动化交易\换手率低位精选.py
  • 计算15日涨跌幅和20日均换手...
{link}