复制链接
克隆策略
In [1]:
import pandas as pd
import numpy as np
from biglearning.module2.common.data import Outputs
from zipline.finance.commission import PerOrder
import os
from bigdatasource.api import DataSource
from biglearning.api import M
from biglearning.api import tools as T
from biglearning.module2.common.data import Outputs

import warnings
warnings.filterwarnings('ignore')
from datetime import date

from joblib import Parallel, delayed
In [2]:
sd = '2023-05-15'
ed = date.today().strftime('%Y-%m-%d')
#'2023-05-14'
In [2]:
#sd = '2023-05-12'
#ed = '2023-05-15'
In [3]:
def calc_data(instrument,sd,ed):
    import warnings
    warnings.filterwarnings('ignore')
    print(instrument)
    df = DataSource('bar1m_CN_STOCK_A').read(start_date=sd,end_date=ed,instruments = instrument)
    try:

        df_ = df.copy()
        df_['day'] = df_['date'].dt.to_period('D')
        
    except:
        print(instrument,'数据读取失败')

        return

    def calc_one_day(df):
        df.reset_index(inplace=True,drop=True)

        vs = df.loc[0,'volume']
        cs = df.loc[0,'close']
        s = 0 
        df['邻域'] = df['volume']  +df['volume'].shift()+df['volume'].shift(2)+df['volume'].shift(3)+df['volume'].shift(4)+df['volume'].shift(-1)+df['volume'].shift(-2)+df['volume'].shift(-3)+df['volume'].shift(-4)

        df.dropna(inplace=True)
        df.reset_index(inplace=True,drop=True)

        idxmax = df['邻域'].idxmax()  #顶峰
        price_top = df.loc[idxmax,'close']

        if idxmax != 0:
            df_1 = df.loc[:idxmax]
            df_2 = df.loc[idxmax:]

            idxmin_m = df_1['邻域'].idxmin()  
            idxmin_n = df_2['邻域'].idxmin()

            
            vm = df.loc[idxmin_m,'volume']
            cm = df.loc[idxmin_m,'close']
            vn = df.loc[idxmin_n,'volume']
            cn = df.loc[idxmin_n,'close']

            df['vm'] = vm
            df['cm'] = cm
            df['vn'] = vn
            df['cn'] = cn
            df['nm'] = idxmin_n - idxmin_m

            if vm > vn:
                last_price = df_2.loc[:,'close'].iloc[-1]
                t_1 = len(df) - idxmax
                strong =  (last_price - price_top)/price_top/t_1

                first_price = df_1.loc[:,'close'].iloc[0]
                t_2 = idxmax
                weak = (price_top - first_price)/first_price/t_2

                df['w_factor'] = weak
                df['s_factor'] = strong

            if vn > vm:
                first_price= df_1.loc[:,'close'].iloc[0]
                t_1 = idxmax
                strong =  (price_top - first_price)/first_price/t_1

                last_price = df_2.loc[:,'close'].iloc[-1]
                t_2 = len(df) - idxmax
                weak = (last_price - price_top)/price_top/t_2

                df['w_factor'] = weak
                df['s_factor'] = strong


        else:
            idxmin = df['邻域'].idxmin()
            
            
            vn = df.loc[idxmin,'volume']
            cn = df.loc[idxmin,'close']
            
            df['vm'] = vs
            df['cm'] = cs
            df['vn'] = vn
            df['cn'] = cn
            df['nm'] = idxmin

            first_price = df.loc[:,'close'].iloc[0]
            last_price = df.loc[:,'close'].iloc[-1]

            fac = (last_price-first_price)/first_price/len(df)
            df['w_factor'] = fac
            df['s_factor'] = fac



        df['factor']  = (df['cn'] - df['cm'])/df['cm']/df['nm']
        return df

    try:
        df_ = df_.groupby('day').apply(calc_one_day)
        df_.reset_index(inplace=True,drop=True)
        df_.drop_duplicates(subset=['day'],inplace=True)
        
        df_['date'] = df_['date'].dt.strftime('%Y-%m-%d')
        df_['date'] = pd.to_datetime(df_['date'])
        df_ = df_[['instrument','date','w_factor','s_factor','factor']]

        
        
        import dai
        # 可选:定义分区,可以用于数据访问加速
        df_[dai.DEFAULT_PARTITION_FIELD] = df_["date"].apply(lambda x: f"{x.year}")
        dai.DataSource.write_bdb(
            data=df_,
            id="hf_tide",
            unique_together=["date", "instrument"],
    indexes=["date"],
)
        return 

    except:
        print(instrument,'数据更新失败')
        return
In [4]:
tmp = DataSource('bar1d_CN_STOCK_A').read(start_date=sd,end_date=ed)
ins_list = tmp.instrument.unique()
In [5]:
results = Parallel(n_jobs=-1)(delayed(calc_data)(ins,sd,ed) for ins in ins_list)
000001.SZA
000004.SZA
000002.SZA
000005.SZA
000006.SZA
000007.SZA
000008.SZA
000009.SZA
000010.SZA
000011.SZA
000012.SZA
000014.SZA
000016.SZA
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[5], line 1
----> 1 results = Parallel(n_jobs=-1)(delayed(calc_data)(ins,sd,ed) for ins in ins_list)

File /usr/local/python3/lib/python3.8/site-packages/joblib/parallel.py:1056, in Parallel.__call__(self, iterable)
   1053     self._iterating = False
   1055 with self._backend.retrieval_context():
-> 1056     self.retrieve()
   1057 # Make sure that we get a last message telling us we are done
   1058 elapsed_time = time.time() - self._start_time

File /usr/local/python3/lib/python3.8/site-packages/joblib/parallel.py:935, in Parallel.retrieve(self)
    933 try:
    934     if getattr(self._backend, 'supports_timeout', False):
--> 935         self._output.extend(job.get(timeout=self.timeout))
    936     else:
    937         self._output.extend(job.get())

File /usr/local/python3/lib/python3.8/site-packages/joblib/_parallel_backends.py:542, in LokyBackend.wrap_future_result(future, timeout)
    539 """Wrapper for Future.result to implement the same behaviour as
    540 AsyncResults.get from multiprocessing."""
    541 try:
--> 542     return future.result(timeout=timeout)
    543 except CfTimeoutError as e:
    544     raise TimeoutError from e

File /usr/local/python3/lib/python3.8/concurrent/futures/_base.py:439, in Future.result(self, timeout)
    436 elif self._state == FINISHED:
    437     return self.__get_result()
--> 439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    442     raise CancelledError()

File /usr/local/python3/lib/python3.8/threading.py:302, in Condition.wait(self, timeout)
    300 try:    # restore state no matter what (e.g., KeyboardInterrupt)
    301     if timeout is None:
--> 302         waiter.acquire()
    303         gotit = True
    304     else:

KeyboardInterrupt: