# -*- coding: utf-8 -*-
"""
Created on Wed Aug 28 08:29:27 2019
@author: Administrator
"""
import os
import io;type(io)
from collections import OrderedDict
from copy import deepcopy
import datetime
import numpy as np; type(np)
import pandas as pd
import pathlib
datapath=‘d:/db/amipy/data/‘
import toolkit.pf as pf; type(pf)
import toolkit.mydef as md; type(md)
import matplotlib as mpl
mpl.use(‘Qt5Agg‘) # ‘agg‘ 不支持交互显示图形
#mpl.get_backend() # ‘Qt5Agg‘
import matplotlib.pyplot as plt; type(plt)
#from matplotlib.font_manager import FontProperties
#myfont = FontProperties(fname=r"c:\windows\fonts\msyh.ttf") #size可不用指定
#增加绘图时支持中文的配置:
mpl.rcParams[‘axes.unicode_minus‘] = False
mpl.rcParams[‘font.sans-serif‘] = [‘Microsoft YaHei‘] # 指定默认字体:解决plot不能显示中文问题
#mpl 有一个rc()函数, 专门用来设置渲染场景的各种参数
#pip3 uninstall matplotlib
#pip3 install --user matplotlib
import util.tdxhq as hq ; type(hq)
import util.jqds as jq; type(jq)
import jqdatasdk as jqd
import util.ttr as ttr
#from util.stock import Stock, Context
import util.stock as qss # qingshan stock module
import util.plotter as pl
#import imp
#imp.reload(hq)
#%%
class G:
pass
global g
g=G()
#%%
g.code=‘ 000069.XSHE 000300.XSHG 510300.XSHG ‘.split()
g.today = datetime.datetime.now().date() # datetime.date.today()
g.nMA = 13
g.n1EMA, g.n2EMA = 13, 21
g.nDMA = 34
g.fields = ‘ open high low close volume money‘.split()
g.fields_d = ‘date open high low close volume money‘.split()
#%%
def load_data(
symbol=‘000069.sz‘,
start=‘2017-01-01‘,
with_plot=False,
with_plot_k=False,savefig=False,
):
global ctx
global stk
ctx=qss.Context(symbol=symbol)
stk = qss.Stock(ctx, start=start)
stk.grab_data_from_tdxhq()
stk.qfq()
stk.grab_index_from_tdxhq() #
# stk.aligned_to_dp()
print(‘\nLoaded data(raw+qfq+aligned): {} {}‘.format(ctx.name, ctx.code))
stk.indicator()
if with_plot:
stk.plot_raw_vs_adjusted(savefig=savefig)
fig,ax = plt.subplots(1,1)
stk.mycandlestick_ohlc(ax, n_trend=(20,60), savefig=savefig)
stk.plot_bh_eq(savefig=savefig)
if with_plot_k:
subset = slice(-120*1,None) # ‘2017-07‘ ‘2017‘
subset=None
draw = pl.Plotter(stk.context, stk, subset) #plot stk data
draw.plot_candle_only(‘lday‘,names=[‘amtc34‘, ‘bandTop‘, ‘bandBot‘],savefig=savefig)
#draw.plot_candle_vol(‘lday‘, savefig=True)
return ctx,stk
#%%
def get_signal():
global ctx
global stk
c=stk.ohlc.close
ctx.bsig= ( (c<stk.bandBot) & (c<stk.amtc34) ) | ( (c>stk.amtc34) & (c>stk.bandTop))
ctx.ssig= ( (c>stk.bandTop) & (c<stk.amtc34) ) | ( (c>stk.amtc34) & (c<stk.bandBot))
#%%
def plot_signal(subset=(-100,-1), factor=1.005, savefig=False,
plot_action=False,):
u‘‘‘
>>> plot_signal()
‘‘‘
#import mpl_finance
assert isinstance(ctx.bsig, pd.Series)
assert isinstance(ctx.ssig, pd.Series)
_s = slice(subset[0], subset[1])
ohlc= stk.ohlc[_s]
sdt,edt = ohlc.index[0].date().isoformat(), ohlc.index[-1].date().isoformat()
if plot_action:
buyAction= pos[pos[‘order‘] ==‘开仓‘].loc[sdt:edt]
sellAction= pos[pos[‘order‘] ==‘清仓‘].loc[sdt:edt]
fig, (ax1)= plt.subplots(1,1)
# quotes=zip( mpl.dates.date2num(ohlc.index), ohlc.open, ohlc.high, ohlc.low, ohlc.close)
x=range(len(ohlc.index))
quotes=zip( x, ohlc.open, ohlc.high, ohlc.low, ohlc.close)
# mpl_finance.candlestick_ohlc(ax1, quotes, colorup=‘r‘, colordown=‘k‘, width=0.7, ochl=False)
_candlestick(ax1, quotes, colorup=‘r‘, colordown=‘k‘, width=0.7, ochl=False)
_djx = stk.amtc34[_s]
_ddd, _ggg = _djx/1.1, _djx*1.1
# stk.amtc34[_s] .plot(ax=ax1, style=‘.-‘, label=‘amtc34‘, lw=0.3) #
ax1.plot(x, stk.amtc34[_s] , label=‘成本均线34‘, lw=0.3) #
ax1.plot(x, stk.bandTop[_s] , label=‘bandTop‘, lw=0.3) #
ax1.plot(x, stk.bandBot[_s] , label=‘bandBot‘, lw=0.3) #
ax1.plot(x, _ddd, lw=0.3) #
ax1.plot(x, _ggg, lw=0.3) #
# (ctx.bsig[_s]*_o).plot(ax=ax1, style=‘-^‘, label=‘buy_sig‘, lc=‘white‘)
# (ctx.ssig[_s]*_o).plot(ax=ax1, style=‘-v‘, label=‘sell_sig‘, lc=‘white‘)
trueb=ctx.bsig[_s][ctx.bsig[_s]]
xx = [ctx.bsig[_s].index.get_loc(label) for label in trueb.index]
x=trueb.index
ymin = [_ddd[i]/factor for i in x]
ymax = [_ddd[i]*factor for i in x]
# ymin = [ohlc.low[i]/(3*factor-2) for i in x]
# ymax = [ohlc.low[i]/factor for i in x]
ax1.vlines(xx,ymin,ymax, color=‘m‘, label=‘buy_sig‘, lw=3.5)
trues=ctx.ssig[_s][ctx.ssig[_s]]
xx = [ctx.ssig[_s].index.get_loc(label) for label in trues.index]
x=trues.index
ymin = [_ggg[i]/factor for i in x]
ymax = [_ggg[i]*factor for i in x]
ax1.vlines(xx,ymin,ymax, color=‘cyan‘, label=‘sell_sig‘)
if plot_action:
xx = [ctx.bsig[_s].index.get_loc(label, method=‘ffill‘) for label in buyAction.index]
y=buyAction.buy_price
xmin = [i-1 for i in xx]
xmax = [i+1 for i in xx]
ax1.hlines(y,xmin,xmax, color=‘g‘, label=‘buy_act‘, lw=2.8)
#print(‘y=‘,y)
#print(‘xmin=‘,xmin)
xx = [ctx.ssig[_s].index.get_loc(label, method=‘ffill‘) for label in sellAction.index]
y=sellAction.sell_price
xmin = [i-1 for i in xx]
xmax = [i+1 for i in xx]
ax1.hlines(y,xmin,xmax, color=‘cyan‘, label=‘sell_act‘, lw=2.8)
bp=pos[(pos.hold_days>=0) | (pos.order==‘c‘)][‘buy_price‘][sdt:edt]
xx_bp = [ohlc.index.get_loc(label, method=‘ffill‘) for label in bp.index]
ax1.plot(xx_bp, bp, ‘:x‘, lw=0.8, label=‘buy_price‘ ) # fmt=line_style marker color
ax1.legend()
pl.set_xticklabels(ax1, _djx)
title = ‘K线均线通道线和交易信号\n‘
title += ‘{o.name}({o.code}), 时间段: {dt1} -- {dt2}‘.format(
o=ctx, dt1=_djx.index[0].date(), dt2=_djx.index[-1].date())
ax1.set_title(title)
if savefig:
tstamp = datetime.datetime.now().strftime(‘%Y%m%d_%H%M%S‘)
fn=‘{}_{}_5_k线指标线和交易信号.png‘.format(ctx.name,tstamp)
fig.savefig(fn, dpi=300)
#plot_signal(subset=(-100, -1), savefig=False, plot_action=True )
#%%
def simulation():
c=stk.ohlc.close
ctx.check, hold_dict, ctx.pos, pos_dict = [], {}, [], {}
order=‘empty‘
days = stk.ohlc.index[40:-1]
_15hours = datetime.timedelta(0,15*3600); type(_15hours)
_9hours = datetime.timedelta(0,9*3600)
_930hours = datetime.timedelta(0,9.5*3600)
#stk.ohlc.index.to_pydatetime() + _15hours
for tdate in days:
loc = stk.ohlc.index.get_loc(tdate) + 1
the_days = stk.ohlc.index[ (loc-5):loc]
# 回测当天
today = the_days[-1]
# 上一个交易日
yesterday = the_days[-2]
print(‘‘‘+++++++++++++++++++++ 交易日期是: {}, {}‘‘‘.format(today,yesterday))
############### 检查交易信号
ssig = ctx.ssig.loc[yesterday]
bsig = ctx.bsig.loc[yesterday]
preclose = c.loc[yesterday]
todayopen_ = stk.ohlc.open[today]
todayclose = stk.ohlc.close[today]
amtc34 = stk.amtc34[yesterday]
check_dict = OrderedDict(
dt=today.to_pydatetime() + _9hours,
bsig=bsig,
ssig=ssig,
preclose=preclose,
todayopen=todayopen_,
todayclose=todayclose,
comment= ‘牛股:伺机做多‘ if preclose > amtc34 else ‘熊股:少碰‘
)
ctx.check.append(check_dict)
############### 制定/给出下单命令: 检查头寸, 检查浮动盈亏/持仓天数, 然后定出结论
if hold_dict=={}:
if bsig:
order = ‘开仓‘
else:
order =‘空仓寻进‘; # print(‘order=wait‘,yesterday,wait)
else:
if ssig:
order = ‘清仓‘
elif pos_dict!={} and pos_dict[‘fpnl‘]<-0.20 :
order = ‘止损‘
else:
order = ‘满仓寻出‘
# 4种情况汇总起来应该是一个完整的集合, 不能有遗漏; 否则逻辑上有漏洞
pass
if hold_dict!={}:
fpnl, hold_days = pos_dict[‘fpnl‘], pos_dict[‘hold_days‘]
stoploss_hp = abs(fpnl)<5 and hold_days>5 # 横盘太久应该出局观望
stoploss_5pct = fpnl<-5 # 建仓之后很快亏损, 到达一定幅度的话应该亏本出局
stopprft_cg = fpnl>20 # 快速冲高, 浮盈很快到达一定幅度后应该卖出, 等回落后再买进
type((stoploss_5pct,stoploss_hp, stopprft_cg))
############### 照单执行命令
if order==‘开仓‘:
buy_date = today.to_pydatetime() + _930hours
pos_dict = OrderedDict(
order = order,
dt = buy_date,
buy_date=buy_date,
buy_price=todayopen_,
hold_days=0,
fpnl = 0.0, # preclose/buy_price,
)
### 保留头寸字典到列表里(头寸字典在循环过程中, 每天都在变化的, 而且在清仓时会变成空字典)
### 一定要用上deepcopy方法,
### 字典的深复制对象不会遭受到源字典的动态变化(依据收盘价不断修改键值)的影响
ctx.pos.append(deepcopy(pos_dict))
hold_dict[ctx.code]=[buy_date, todayopen_]
if order==‘清仓‘ or order==‘止损‘:
pos_dict[‘order‘] = order
pos_dict[‘dt‘] = today.to_pydatetime() + _930hours
pos_dict[‘sell_date‘] = today.to_pydatetime() + _930hours
pos_dict[‘sell_price‘] = round(todayopen_,3)
pos_dict[‘ratio‘] = round(100*((todayopen_/pos_dict[‘buy_price‘]) - 1),2)
pos_dict[‘fpnl‘] = todayopen_/pos_dict[‘buy_price‘] -1
pos_dict[‘params‘] = (3,20, 1.5, 1, 5)
ctx.pos.append(deepcopy(pos_dict))
del hold_dict[ctx.code]
if order==‘满仓寻出‘:
pos_dict[‘order‘] = order
pos_dict[‘dt‘] = today.to_pydatetime() + _9hours
pos_dict[‘hold_days‘] += 1
pos_dict[‘fpnl‘] = preclose/pos_dict[‘buy_price‘] -1
ctx.pos.append(deepcopy(pos_dict))
if order==‘空仓寻进‘:
del pos_dict; pos_dict=OrderedDict()
pos_dict[‘order‘] = order
pos_dict[‘dt‘] = today.to_pydatetime() + _930hours
ctx.pos.append(deepcopy(pos_dict))
check_df = pd.DataFrame(ctx.check); type(check_df)
pos = pd.DataFrame(ctx.pos)
return pos
#%%
def _candlestick(ax, quotes, width=0.2, colorup=‘r‘, colordown=‘k‘,
alpha=1.0, ochl=True):
"""
Plot the time, open, high, low, close as a vertical line ranging
from low to high. Use a rectangular bar to represent the
open-close span. If close >= open, use colorup to color the bar,
otherwise use colordown
Parameters
----------
ax : `Axes`
an Axes instance to plot to
quotes : sequence of quote sequences
data to plot. time must be in float date format - see date2num
(time, open, high, low, close, ...) vs
(time, open, close, high, low, ...)
set by `ochl`
width : float
fraction of a day for the rectangle width
colorup : color
the color of the rectangle where close >= open
colordown : color
the color of the rectangle where close < open
alpha : float
the rectangle alpha level
ochl: bool
argument to select between ochl and ohlc ordering of quotes
Returns
-------
ret : tuple
returns (lines, patches) where lines is a list of lines
added and patches is a list of the rectangle patches added
"""
OFFSET = width / 2.0
lines = []
patches = []
for q in quotes:
if ochl:
t, open, close, high, low = q[:5]
else:
t, open, high, low, close = q[:5]
if close >= open:
color = colorup
lower = open
height = close - open
else:
color = colordown; type(color)
lower = close
height = open - close
if close>=open:
vline1 = plt.Line2D(
xdata=(t, t), ydata=(low, open) ,
color=‘k‘, #color,
linewidth=0.5,
antialiased=True,
)
ax.add_line(vline1)
lines.append(vline1)
vline2 = plt.Line2D(
xdata=(t, t), ydata=(close, high) ,
color=‘k‘, #color,
linewidth=0.5,
antialiased=True,
)
ax.add_line(vline2)
lines.append(vline2)
else:
vline3 = plt.Line2D(
xdata=(t, t), ydata=(low, high) ,
color=‘k‘, #color,
linewidth=0.5,
antialiased=True,
)
ax.add_line(vline3)
lines.append(vline3)
rect = plt.Rectangle(
xy=(t - OFFSET, lower),
width=width,
height=height,
facecolor=‘w‘ if close>open else ‘k‘, #color,
edgecolor=‘k‘, #color,
linewidth=0.5,
)
rect.set_alpha(alpha)
ax.add_patch(rect)
patches.append(rect)
ax.autoscale_view()
return lines, patches
#%%
def go(symbol, with_plot_k=False, with_plot=False, savefig=False):
ctx,stk = load_data(symbol=symbol, start=‘2013-01-01‘,
with_plot_k=with_plot_k, with_plot=with_plot, savefig=savefig)
get_signal()
# plot_signal(subset=(-100, -1), savefig=True )
global pos
pos = simulation()
pos = pos.set_index(‘dt‘, drop=False)
# plot_signal(subset=(-100, -1), savefig=False, plot_action=True )
# fig,ax=plt.subplots()
# (pos.ratio.fillna(0)/100 + 1).cumprod().plot(ax=ax)
#%%
symbol=‘000402.sz‘
go(symbol, False, False, False)
plot_signal(subset=(-100, -1), savefig=False, plot_action=True )
plot_signal(subset=(-500, -400), savefig=False, plot_action=True )
frame_len=150
li=list(range(0,len(stk.ohlc),frame_len))
subs = [(li[i], li[i+1]) for i,e in enumerate(li) if i<len(li)-1]
for sub in subs:
subset= slice(sub[0], sub[1])
draw = pl.Plotter(stk.context, stk, subset) #plot stk data
draw.plot_candle_only(‘lday‘,names=[‘amtc34‘, ‘bandTop‘, ‘bandBot‘],savefig=False)
for sub in subs:
plot_signal(sub, savefig=False)
#%%
def study_candle_pattern():
import talib as ta
ohlc=stk.ohlc
o,h,l,c=ohlc.open.values, ohlc.high.values, ohlc.low.values, ohlc.close.values
cdl = ta.CDL2CROWS(o,h,l,c)
_2crows = cdl[cdl!=0] ; type(_2crows)
patterns=[a for a in dir(ta) if a.startswith(‘CDL‘)]
for p in patterns:
print(p)
#%%
‘‘‘编写公用函数 ‘‘‘
def get_ohlc(code=None, n=None, end_dt=None):
code= g.code[0] if code==None else code
n=g.nMA if n==None else n
end_dt=g.today if end_dt==None else end_dt
ohlc = jqd.get_price(code, count=n, end_date=end_dt, fields=g.fields, skip_paused=False, fq=‘pre‘)
return ohlc
def get_candlePattern(code=None, n=None, end_dt=None, last_row=True):
‘‘‘
ref:
d:\Documents\Pictures\ta-lib\c\src\ta_func\ta_utility.h
d:\Documents\Pictures\ta-lib\c\src\ta_func\ta_CDL2CROWS.c
‘‘‘
global stk
code= g.code[0] if code==None else code
n=g.nMA if n==None else n
end_dt=g.today if end_dt==None else end_dt
#ohlc=get_ohlc(code, n, end_dt)
ohlc=stk.ohlc
o,h,l,c=ohlc.open,ohlc.high,ohlc.low,ohlc.close;type((h,l,o))
up = c>c.shift(1)
up3 = up & up.shift(1) & up.shift(2)
dn = ~ up #c<c.shift(1)
dn3 = dn & dn.shift(1) & dn.shift(2)
pattern=pd.DataFrame(index=ohlc.index)
pattern=pattern.assign(up=up)
pattern=pattern.assign(up3=up3)
pattern=pattern.assign(dn=dn)
pattern=pattern.assign(dn3=dn3)
# 返回 pd Series, 索引是: up up3 dn dn3
return pattern.iloc[-1,:] if last_row else pattern
def get_sma(code=None, n=None, end_dt=None):
code= g.code[0] if code==None else code
n=g.nMA if n==None else n
end_dt=g.today if end_dt==None else end_dt
c = get_ohlc(code, n, end_dt).close
return ttr.sma(c, n)[-1]
# 获取ma值
def get_ma(stock=None, n=None, end_dt=None, ):
stock = g.code[0] if stock==None else stock
n=g.nMA if n==None else n
end_dt=g.today if end_dt==None else end_dt
price = jqd.get_price(security=stock,
end_date=end_dt,
frequency=‘daily‘,
fields=[‘close‘],
skip_paused=False,
fq=‘pre‘,
count=n+10)[‘close‘]
ma = price[-g.nMA:].mean()
return ma
# 获取ema值
def get_ema(stock=None, n=None, end_dt=None):
‘‘‘
>>> res = get_ema(), get_sma(), get_ma(g.code[0], g.nMA, g.today)
‘‘‘
#ema = jqd.technical_analysis.EMA(stock, check_date=end_dt, timeperiod=ma_value)
#return ema[stock]
stock = g.code[0] if stock==None else stock
n=g.nMA if n==None else n
end_dt=g.today if end_dt==None else end_dt
print(‘n={}‘.format(n))
c = get_ohlc(stock, n, end_dt).close
return ttr.ema(c, n)[-1]
# 判断是否出现买入信息
def get_buy_sig(stock, yesterday, before_yesterday,):# *ema):
# weekday = yesterday.weekday()
# if weekday==3: # 星期四
pattern = get_candlePattern(stock,10, yesterday)
print(pattern)
if pattern[‘dn3‘]:
print(‘得到了买入信号:up3‘)
return True
else:
return False
# 判断是否有卖出信息
def get_sell_sig(stock, yesterday, before_yesterday, ): #*ema):
# weekday = yesterday.weekday()
# if weekday==4: # 星期五
pattern = get_candlePattern(stock,10, yesterday)
if pattern[‘up3‘]:
return True
else:
return False
# 判断是否有亏损信息
def get_stoploss_sig(stock, yesterday, *ema):
n1,n2,n3 = ema[0], ema[1], ema[2]
# 昨日收盘价
close = jqd.get_price(security=stock,
end_date=yesterday,
frequency=‘daily‘,
fields=[‘open‘,‘close‘],
skip_paused=False,
fq=‘pre‘,
count=10)[‘close‘][-1]
# 求出上一个交易日的ma_min,ma_med,ma_max的值
ma1 = get_ema(stock, n1, yesterday); type(ma1)
ma2 = get_ema(stock, n2, yesterday)
ma3 = get_ema(stock, n3, yesterday)
if (close < ma2) and (ma2< ma3):
return True
else:
return False
# 过滤掉有上影线和小实体阳线的时刻
def is_high_line(stock, end_dt):
price = jqd.get_price(security=stock,
end_date=end_dt,
frequency=‘daily‘,
fields=[‘open‘, ‘close‘, ‘high‘, ‘low‘, ‘volume‘, ‘money‘],
skip_paused=False,
fq=‘pre‘,
count=1)
open = price[‘open‘][0]
close = price[‘close‘][0]
high = price[‘high‘][0]
low = price[‘low‘][0]; type(low)
o_c_ratio = (open-close)/close
h_c_ratio = (high-close)/close
if o_c_ratio > 0 and h_c_ratio < 0.02:
return True
else:
return False
#%%
‘‘‘ 交易函数‘‘‘
def trade(stock,n1,n2,n3,trade_days):
‘‘‘
‘‘‘
global stk
global ctx
# 记录盈利
pnl_history = []
# 持仓
hold_dict = {} # dict[stock] = [trade_day, buy_price]
global hold_history
hold_history = []
# 权重
weight = 0
# 总权重
# 结束日期, 交易日天数
numtdays = last_weihgt = len(trade_days)
end_tdate = trade_days[-1] ; type((numtdays, end_tdate))
# patterns = get_candlePattern(stock, numtdays+1, end_tdate, last_row=False)
c=stk.ohlc.close
buy_sig = ( (c<stk.amtc34) & (c<stk.bandBot)) | ( (c>stk.amtc34) & (c>stk.bandTop))
sell_sig = ( (c<stk.amtc34) & (c>stk.bandTop) ) | ( (c>stk.amtc34) & (c<stk.bandBot))
ctx.pnl_hist=[]
# 进入每个交易日循环: 因为回测的是历史
# 检查信号时, 先处理/执行卖出然后处理买入
for trade_day in trade_days:
# 权重累加
weight += 1
# 把要使用的日期集合起来
#the_days = jqd.get_trade_days(end_date=trade_day, count=5)
# loc=stk.aindex.index.get_loc(trade_day) + 1
# the_days = stk.aindex.index[ (loc-5):loc]
loc = stk.ohlc.index.get_loc(trade_day) + 1
the_days = stk.ohlc.index[ (loc-5):loc]
# 回测当天
today = the_days[-1]
print(‘‘‘+++++++++++++++++++++ 交易日期是: {}‘‘‘.format(today))
# 上一个交易日
yesterday = the_days[-2]
# 上上上个交易日
before_yesterday = the_days[-4]; type(before_yesterday)
import time ; type(time)
if hold_dict !={}:
# 记录盈亏状况@交易日的开盘前时刻比如早上9点: update hold_dict
# time.sleep(2)
lprice = stk.ohlc.open[yesterday]
amtc34, bandTop, bandBot=stk.amtc34[yesterday],stk.bandTop[yesterday],stk.bandBot[yesterday]
pnl = round( (lprice/hold_dict[stock][‘buy_price‘] - 1)*100,2)
hold_dict[stock].update(dict(
tdate = yesterday,
amtc34=amtc34,
last_price=lprice,
bandTop=bandTop,
bandBot=bandBot,
pnl=pnl,
stoploss=None,
))
print(‘记录头寸的盈亏‘,yesterday)
print(‘pnl={}‘.format(pnl))
# print(hold_dict)
# hold_history.append(hold_dict.copy())
hold_history.append(hold_dict)
ctx.pnl_hist.append([yesterday, pnl])
# ========================卖出操作========================
sell_list = []
for stock,info in hold_dict.items():
#trade_day = info[0]
#buy_price = info[1]
trade_day = info[‘buy_date‘]
buy_price = info[‘buy_price‘]
# 判断是否有卖出信号
# re_value = get_sell_sig(stock, yesterday, before_yesterday, ) #ma_min,ma_med,ma_max)
# re_value = patterns.loc[yesterday,‘up3‘]
re_value = sell_sig.loc[yesterday]
print(‘check sell sig: re_value=‘, re_value)
# 判断是否触发止损信号
# loss = get_stoploss_sig(stock, yesterday, n1,n2,n3)
# 进行卖出操作
if re_value: #or loss:
print(‘todays signal is sell‘)
sell_list.append(stock)
price = stk.ohlc.open[today]
# 进行记录
trade_dic = {‘stock‘:stock,
‘buy_date‘:trade_day,
‘buy_price‘:buy_price,
‘sell_date‘:today,
‘sell_price‘:price,
‘ratio‘:(price-buy_price)/buy_price,
‘MA‘:(n1,n2,n3),
‘weight‘:weight,
‘count‘:last_weihgt}
pnl_history.append(trade_dic)
# 从持仓中删除已经卖出的股票
for stock in sell_list:
del hold_dict[stock]
# continue
# ========================卖出操作========================
# ========================买入操作========================
# 如果空仓
if hold_dict=={}:
# 判断是否有买入信号
# re_value = get_buy_sig(stock, yesterday, before_yesterday,) # ma_min,ma_med,ma_max)
# re_value = patterns.loc[yesterday,‘dn3‘]
re_value = buy_sig.loc[yesterday] #用昨天的数据(包括历史数据)来判断, 今天开盘时刻的操作
print(‘check buy signal ‘, re_value)
# 如果有买入信号,则买入
if re_value:
# 在今天的开盘时买入,参考的买入价格是昨天的收盘价
lprice = stk.ohlc.open[yesterday]
price = stk.ohlc.open[today]
amtc34, bandTop, bandBot=stk.amtc34[yesterday],stk.bandTop[yesterday],stk.bandBot[yesterday]
reson=‘BT‘ if lprice>amtc34 else ‘BB‘
hold_dict[stock] = OrderedDict(
buy_date=today,
buy_price=price,
costp=price-0.01,
nowp=price,
reson=reson, # 三选一: BT-牛市买高边;BB-震荡市买低边;BM-疯熊市观望等反弹才进场
amtc34=amtc34,
last_price=lprice,
bandTop=bandTop,
bandBot=bandBot,
pnl=0.0,
stoploss=price/1.03,
)
print(hold_dict)
hold_history.append(hold_dict.copy())
# 一定要用copy(), 否则将会得到空字典, 因为建仓之后的清仓会把该字典清空
# 为了不受影响, 所以需要用.copy()方法保留建仓字典
# ========================买入操作========================
#========================结束日如果有持仓, 做卖出处理, 并记录到盈亏列表里 ==========
if stock in hold_dict.keys():
sell_list.append(stock)
price = stk.ohlc.open[today]
# 进行记录
trade_dic = {‘stock‘:stock,
‘buy_date‘:trade_day,
‘buy_price‘:buy_price,
‘sell_date‘:today,
‘sell_price‘:price,
‘ratio‘:(price-buy_price)/buy_price,
‘MA‘:(n1, n2, n3),
‘weight‘:weight,
‘count‘:last_weihgt}
pnl_history.append(trade_dic)
#========================将最后一次未卖出的持仓也记录==========================
# 返回交易记录
ctx.hold_history = hold_history
return pnl_history
#%%
def backtest():
‘‘‘开始回测 ‘‘‘
global stk
count=600
# 获取近三年的交易日期
# days = jqd.get_trade_days(end_date=datetime.datetime.now(), count=250*2+100)
# days = jqd.get_trade_days(end_date=datetime.datetime.now(), count=3500)
days = stk.aindex.index[-count:]
days = stk.ohlc.index[40:-1]
train_days = days[:-100]
train_days = days[:]
# train_days = days[:50]
trade_days = days[-100:]; type(trade_days)
# 回测股票
trade_stock = ‘000069.XSHE‘
# 保存回测记录
all_list = []; type(all_list)
# # 三进兵的三个值集合
# n1 = [3, 5, 7]
# n2= [10, 20, 30]
# n3= [40, 50, 60]
# 回测不同的三进兵组合
# iloop=0
# for ma1 in ma_min1:
# for ma2 in ma_med1:
# for ma3 in ma_max1:
# iloop += 1
# print(‘\nNow iloop={}, trading params: ma1={}, ma2={}, ma3={}‘.format(iloop, ma1,ma2,ma3))
# re_dic = trade(trade_stock,ma1,ma2,ma3,train_days)
# all_list = all_list + re_dic
bt_start_time=datetime.datetime.now()
re_dic = trade(trade_stock,5,10, 50,train_days)
bt_end_time=datetime.datetime.now()
print(‘\n---- 回测耗时: {}‘.format(bt_end_time-bt_start_time))
columns = ‘buy_date sell_date buy_price sell_price ratio MA count weight‘.split()
re_df=pd.DataFrame(re_dic,
index=pd.DatetimeIndex([d[‘sell_date‘] for d in re_dic]),
columns=columns)
re_df = re_df.assign(returns = (re_df.ratio+1).cumprod())
cols=‘buy_date sell_date buy_price sell_price ratio returns‘.split()
print(re_df.loc[:,cols].tail())
pData = jqd.get_price(trade_stock,start_date=train_days[0], end_date=train_days[-1],
fields=‘open high low close volume money‘.split())
fig,(ax1,ax2)=plt.subplots(2,1)
pData.close.plot(ax=ax1, legend=True)
# re_df.returns.plot(ax=ax2, legend=True)
res_df.returns.reindex(stk.ohlc.index).fillna(method=‘ffill‘).plot(ax=ax2, legend=True)
tit = ‘收盘价和策略收益率\n‘
tit += ‘{} {}--{}‘.format(jqd.get_security_info(trade_stock).display_name, train_days[0], train_days[-1])
ax1.set_title(tit)
return re_df
#%%
#%%
res_df = backtest()
hold_df = pd.DataFrame( [d[‘000069.XSHE‘] for d in ctx.hold_history])
hold_df
#res_df.to_csv(‘res_tmp.csv‘)
#o=stk.ohlc.open
#buy_price = [o[date] for date in res_df.buy_date]
#sell_price = [o[date] for date in res_df.sell_date]
#res_df[‘b_price_from_tdxhq‘] = list(map(lambda x:round(x,2),buy_price))
#res_df[‘s_price_from_tdxhq‘] = list(map(lambda x:round(x,2),sell_price))
#cols=‘buy_price b_price_from_tdxhq sell_price s_price_from_tdxhq‘.split()
#res_df.loc[:, cols]
#%%
def write_report_md(py_fn=‘dasu_simu_trade.py‘):
‘‘‘借助markdown模块, 用md语法来写报告, 输出被解析为html文档
‘‘‘
#import markdown as markd
from mistune import Markdown
title = ‘DMA34均线+ATR(20,1.2)通道(以华侨城A为例) 策略研究‘
reports=‘‘‘
### %s
# 处理流程
1. 加载数据
1. 历史数据回放
1. 计算技术指标
1. 计算交易信号
1. 模拟交易
# 质检图和成果图
1. 检查复权价: 原始价 vs 前复权价
1. K线和SMA均线
1. 收盘价和BH收益率线条图
1. 蜡烛图DMA均线ATR通道
1. 蜡烛图和交易信号
#这就是所谓的代吗块了: 缩进一个tab, 然后就可随便写注释了(而不再是md文本了)
#下一行是md格式的图片, 因为图片太大, 按实际大小显示的话, 不方便浏览
#
# {:height="100px" width="500px"}
#<img src="http://pp.myapp.com /ma_pic2/0/shot_42391053_1_1488499316/550" height="330" width="150" >
# 用html语言的img标签来显示并控制显示图片的大小
### 备注
1. md里强制换行时, 需要在行尾写上2个空格.
1. md里写尖括号时, 需要用转义符. 因为不转义的话, 会被解析为html的标签.
1. md里写代码段: 用一对左单引号围起来.
1. 在Markdown中要生成一个代码块,只需要在代码块内容的每一行缩进至少四个空格或者一个TAB。
Markdown不会解析代码块中的Markdown标记。如代码块中的星号就是星号,
失去了它原来的Markdown含义
直接markdown.markdown(text)生成的html文本,非常粗略,只是单纯的html内容。
而且在浏览器内查看的时候中文有乱码(在chrome中).
因为没有设置css样式文件, 外观也太丑了。
解决办法也很简单,在保存文件的时候,将
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> 和
`<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />` 和
css样式添加上。就这么简单解决了。
‘‘‘%(title)
# html = markd.markdown(reports)
# html = markd.markdown(reports, extensions=[‘markdown.extensions.codehilite‘])
markd = Markdown(escape=False, hard_wrap=False)
markd = Markdown(escape=True, hard_wrap=True)
html = markd(reports)
img_name=[‘./华侨城A_20190901_175109_1_原始价与前复权价.png‘,
‘./华侨城A_20190901_175121_2_k线均线.png‘,
‘./华侨城A_20190901_175129_3_收盘价和BH收益率.png‘,
‘./华侨城A_lday_20190901_180000_4_k线和指标线.png‘,
‘./华侨城A_20190901_175134_5_k线指标线和交易信号.png‘,
]
html_img = ‘‘‘
<p><img alt="" src="{0}" height="60%" width="60%"/></p>
<p><img src="{1}" alt="candle plot with signal" height="60%" width="60%"/ ></p>
<p><img alt="" src="{2}" height="60%" width="60%"/></p>
<p><img alt="" src="{3}" height="100%" width="200%" /></p>
<p><img alt="" src="{4}" height="60%" width="60%" /></p>
‘‘‘.format(*img_name)
#####经常用到"<"和">"两个标签,索引总结一下。
#
#|显示结果|文字描述|实体名称|实体编号|
#:—??:—??:—:
#? |空格| | 
#< |小于号|<|<
#> |大于号|>|>
#&|和号|&|&
#" |引号|"|"
#‘|撇号|'(IE不支持)|'
css = ‘‘‘
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<style type="text/css">
<!-- 此处省略掉markdown的css样式,因为太长了 -->
</style>
‘‘‘; type(css)
html = ‘‘‘
<html lang="zh-cn">
<head>
<meta content="text/html; charset=utf-8" http-equiv="content-type" />
<link href="default.css" rel="stylesheet">
<link href="github.css" rel="stylesheet">
</head>
<body>
<title> %s </title>
%s
</body>
</html>
‘‘‘
html = html%( title, markd(reports))
fn = py_fn # ‘dasu_simu_tradpypy‘
fn = pathlib.Path(‘‘.join([os.getcwd(), ‘\\‘, fn]))
if fn.exists():
with open(fn, mode=‘rt‘, encoding=‘utf‘) as f:
lines = f.read().splitlines()
big_code_str = ‘\n‘.join([‘>%5d %s‘%(i,line) for i,line in enumerate(lines)])
#c_str = ‘‘‘<h3>python 源代码 </h3>
# <div><pre><code><p>{} </p> </code></pre></div>‘‘‘.format(big_code_str)
md_txt = ‘‘‘
### python源代码
#%%
原文:https://www.cnblogs.com/duan-qs/p/12913330.html