最近重看了一遍大作手回忆录,就想能否让机器识别一下 M 顶、 W 底这种相当主观的技术分析。
假设各种技术分析图形可以看作是各种技术分析因子在一段时间内的相对比例变化,
这样可以认为将技术分析因子进行标准化之后不会损失太多的信息。
将因子随交易时间推进的变换看作时间序列 in_length 表示时间序列的长度,
in_width 看作时间序列的宽度也就是并列的多种技术分析因子数值。
不考虑时间推进因素的影响可以使用 CNN 等非时间序列 DNN 处理。
考虑时间推进对因子变化有影响,此时刻因子变化和股价涨跌受到前面状态的影响的时间序列可以使用 HMM 或 RNN 处理。
HMM 常见的假设为状态 i_t 只受到状态 i_{t-1}时刻的影响, RNN 则泛泛认为可以使用训练集来学习到时间序列的非线性关系。
本帖对于 RNN 结构进行简单探索。
训练 RNN 模型定长序列,假设在 T_i 交易日可以通过前 m 交易日技术分析走势预判后第 n 交易日股价涨跌幅度,并且 RNN 模型可以自动从 T_{i-m}到 T_i 时间序列学习到这种预测关系。
输入数据格式[批次,步长,多因子] 其中步长表示从 T_{i-m}到 T_i 时间序列
class.fit(trainX, trainY)训练模型
clf.pred_prob(trainX) 预测返回概率矩阵
clf.pred_signal(trainX) 预测返回标签
trainX 输入格式 [row, in_length, in_width]
trainY 输入格式 [row]
batch_size=128 喂入批次大小
display_step=5 显示步长
layer_units_num=2000 隐藏层单元数目
training_epoch=100 训练次数
class test_1(object): def __init__(self, batch_size = 128, learning_rate = 0.001, training_epoch = 10, display_step = 5, layer_units_num = 100): self.batch_size = batch_size self.learning_rate = learning_rate self.training_epoch = training_epoch self.display_step = display_step self.layer_units_num = layer_units_num def dense_to_one_hot(self,labels_dense): """标签 转换 one hot 编码 输入 labels_dense 必须为非负数 2016-11-21 """ num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数 raws_labels = labels_dense.shape[0] index_offset = np.arange(raws_labels) * num_classes labels_one_hot = np.zeros((raws_labels, num_classes)) labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1 return labels_one_hot def Preprocessing(self, trainX, trainY, seed=False): trainY = self.dense_to_one_hot(trainY) self.in_length= in_length= trainX.shape[1] self.in_width= in_width= trainX.shape[2] self.out_classes= out_classes= trainY.shape[1] if seed: tf.set_random_seed(20170204) weights = { 'out': tf.Variable(tf.truncated_normal(shape=[self.layer_units_num, out_classes], mean=0., stddev=1., seed=None, dtype=tf.float32), trainable=True, name='Weight_full_out') } biases = { 'out': tf.Variable(tf.truncated_normal([out_classes]), trainable=True, name= 'Biases_full_out') } self.weights = weights self.biases = biases X = tf.placeholder(dtype=tf.float32, shape=[None, in_length, in_width], name='trainX') # 批次,时间序列,多因子 Y = tf.placeholder(dtype= tf.float32, shape=[None, out_classes], name='trainY') keep_prob = tf.placeholder(dtype= tf.float32) self.X = X self.Y = Y self.keep_prob = keep_prob def Network(self): with tf.name_scope('layer_1'): monolayer_1 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, forget_bias=1., state_is_tuple=True, activation=tf.tanh) monolayer_1 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_1, output_keep_prob= keep_prob) with tf.name_scope('layer_2'): monolayer_2 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, forget_bias=1., state_is_tuple=True, activation=tf.tanh) monolayer_2 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_2, output_keep_prob= keep_prob) with tf.name_scope('layer_3'): monolayer_3 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, forget_bias=1., state_is_tuple=True, activation=tf.tanh) monolayer_3 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_3, output_keep_prob= keep_prob) with tf.name_scope('layer_Final'): monolayer_final = tf.nn.rnn_cell.BasicLSTMCell(num_units=self.layer_units_num, forget_bias=1., state_is_tuple=True, activation=tf.tanh) with tf.name_scope('Layer_Structure_Combination'): layer_units_num = self.layer_units_num Layers = tf.nn.rnn_cell.MultiRNNCell(cells=[monolayer_1,monolayer_2,monolayer_3,monolayer_final], state_is_tuple = True) self.Layers = Layers return Layers def Model(self): X = self.X keep_prob = self.keep_prob X = tf.transpose(X, [1, 0, 2]) X = tf.reshape(X, [-1,self.in_width]) X = tf.split(split_dim=0, num_split=self.in_length, value=X) with tf.name_scope('layer_1'): monolayer_1 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, forget_bias=1., state_is_tuple=True, activation=tf.tanh) monolayer_1 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_1, output_keep_prob= keep_prob) with tf.name_scope('layer_2'): monolayer_2 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, forget_bias=1., state_is_tuple=True, activation=tf.tanh) monolayer_2 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_2, output_keep_prob= keep_prob) with tf.name_scope('layer_3'): monolayer_3 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, forget_bias=1., state_is_tuple=True, activation=tf.tanh) monolayer_3 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_3, output_keep_prob= keep_prob) with tf.name_scope('layer_Final'): monolayer_final = tf.nn.rnn_cell.BasicLSTMCell(num_units=self.layer_units_num, forget_bias=1., state_is_tuple=True, activation=tf.tanh) with tf.name_scope('Layer_Structure_Combination'): layer_units_num = self.layer_units_num Layers = tf.nn.rnn_cell.MultiRNNCell(cells=[monolayer_1,monolayer_2,monolayer_3,monolayer_final], state_is_tuple = True) outputs,_ = tf.nn.rnn(cell=monolayer_final, inputs=X, dtype=tf.float32) output = outputs[-1] return tf.nn.bias_add(value= tf.matmul(output, self.weights['out']), bias= self.biases['out']) def train(self, trainX, trainY, seed=False): self.sess = tf.InteractiveSession() self.Preprocessing(trainX, trainY, seed) tmp = self.Model() self.predict = tf.nn.softmax(tmp) self.cost = tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(tmp, self.Y)) optimizer = tf.train.AdamOptimizer(learning_rate= self.learning_rate) # 0 设置训练器 grads_and_vars = optimizer.compute_gradients(self.cost) for i, (grid, var) in enumerate(grads_and_vars): if grid != None: grid = tf.clip_by_value(grid, -1., 1.) grads_and_vars[i] = (grid, var) optimizer = optimizer.apply_gradients(grads_and_vars) self.optimizer = optimizer self.correct_pred = tf.equal(tf.argmax(tmp,1), tf.argmax(sel.Y,1)) accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32)) self.accuracy = accuracy #self.init = tf.global_variables_initializer() self.init = tf.initialize_all_variables() def fit(self,trainX, trainY, dropout = 0.3, seed=True): self.train(trainX, trainY, seed=True) sess = self.sess sess.run(self.init) batch_size = self.batch_size trainY = self.dense_to_one_hot(trainY) for ep in range(self.training_epoch): for i in range(int(len(trainX)/batch_size)+1): if i < int(len(trainX)/batch_size)+1: batch_x = trainX[i*batch_size : (i+1)*batch_size] batch_y = trainY[i*batch_size : (i+1)*batch_size] elif i== int(len(trainX)/batch_size)+1: batch_x = trainX[-batch_size:] batch_y = trainY[-batch_size:] sess.run(self.optimizer, feed_dict={self.X:batch_x, self.Y:batch_y, self.keep_prob:(1.-dropout)}) if ep%self.display_step==0: loss, acc = sess.run([self.cost,self.accuracy], feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.}) print (str(ep)+"th "+'Epoch Loss = {:.5f}'.format(loss)+" Training Accuracy={:.5f}".format(acc)) self.sess= sess print("Optimization Finished!") def pred_prob(self, testX): sess = self.sess batch_size = self.batch_size trainX = testX predict_output = np.zeros([1,self.out_classes]) for i in range(int(len(trainX)/batch_size)+1): if i < int(len(trainX)/batch_size)+1: batch_x = trainX[i*batch_size : (i+1)*batch_size] batch_y = trainY[i*batch_size : (i+1)*batch_size] elif i== int(len(trainX)/batch_size)+1: batch_x = trainX[-batch_size:] batch_y = trainY[-batch_size:] tp = sess.run(self.predict,feed_dict={self.X:batch_x, self.keep_prob:1.}) predict_output = np.row_stack([predict_output, tp]) predict_output = np.delete(predict_output, obj=0, axis=0) return predict_output def pred(self, testX): pred_prob = self.pred_prob(testX) return np.argmax(pred_prob, axis=1)
训练 RNN 模型定长序列,假设在 T_i 交易日可以通过前 m 交易日技术分析走势预判后第 n 交易日股价涨跌幅度, RNN 模型可以从 T_{i-m}到 T_i 时间序列学习到这种预测关系,但是无法训练得到准确的稀疏权重。对应 T 交易日进行预判的时候考虑前 2 周走势或者前 2 个月的走势,这个时候模型无法隐式学习,需要指定训练集为变长序列对应涨跌标签。
输入 对于变长时间序列 设格式为
[batch_size, real_length, in_width]
其中 real_length 表示变长时间序列输入步长,为一个变化值,使用 LSTM 进行预测的时候将输入数据修改格式为
batch_size, in_length, in_width
其中 in_length 使用 real_length 中的最大步长值,空余部分使用 0 填充
修改原始稠密矩阵为稀疏矩阵统一格式进行输入。
输出裁剪 由于输入 tensor 为稀疏矩阵,则对应的 RNN 网络计算得到的矩阵为稀疏矩阵(对于全为 0 的填充稀疏部分进行数值优化的时候梯度为 0 实际不变化)
将拓扑结构图得到的矩阵进行裁剪,使得输出 tensor 格式从
[batch_size, in_length, layer_units_num]
转换为
[batch_size, 1, layer_units_num]
softmax [batch_size, out_classes]
这里 1 表示这里对输入步长取实际输入步长最后一步。
import functools from functools import reduce import numpy as np import tensorflow as tf def lazy_property(function): attribute = '_' + function.__name__ @property @functools.wraps(function) def wrapper(self): if not hasattr(self, attribute): setattr(self, attribute, function(self)) return getattr(self, attribute) return wrapper class test(object): def __init__(self, batch_size = 128, learning_rate = 0.001, error = .01, display_step = 5, layer_units_num = 200): self.batch_size = batch_size self.learning_rate = learning_rate self.error = error self.display_step = display_step self.layer_units_num = layer_units_num def dense_to_one_hot(self,labels_dense): """标签 转换 one hot 编码 输入 labels_dense 必须为非负数 2016-11-21 """ num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数 raws_labels = labels_dense.shape[0] index_offset = np.arange(raws_labels) * num_classes labels_one_hot = np.zeros((raws_labels, num_classes)) labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1 return labels_one_hot # 获得权重和偏置 @staticmethod def _weight_and_bias(in_size, out_size): weight = tf.truncated_normal([in_size, out_size], stddev=0.01) bias = tf.constant(0.1, shape=[out_size]) return tf.Variable(weight), tf.Variable(bias) @lazy_property def length(self): dense_sign = tf.sign(tf.reduce_max(tf.abs(self.X),reduction_indices=2)) length = tf.reduce_sum(input_tensor=dense_sign, reduction_indices=1) length = tf.cast(length, tf.int32) return length @staticmethod def _final_relevant(output, length): # length 输入时间序列的实际长度 # in_length 表示输入时间序列长度 # max_length 表示最大时间序列长度,也就是稀疏矩阵 最大时间序列长度 batch_size = tf.shape(output)[0] max_length = int(output.get_shape()[1]) output_size = int(output.get_shape()[2]) index = tf.range(start=0, limit=batch_size)*max_length + (length-1) # 这里使用 max_length 开创间隔,使用 length-1 表示实际位置,最后一个输出的位置 flat = tf.reshape(output, [-1,output_size]) # 将输出展平, batch_size*length in_width relevant = tf.gather(flat, index) # 根据实际长度选出最后一个输出 output 状态使用 return relevant def Preprocessing(self, trainX, trainY): self.in_length= in_length= trainX.shape[1] self.in_width= in_width= trainX.shape[2] self.out_classes= out_classes= trainY.shape[1] self.X = tf.placeholder(dtype=tf.float32, shape=[None, in_length, in_width], name='trainX') # 批次,时间序列,多因子 self.Y = tf.placeholder(dtype= tf.float32, shape=[None, out_classes], name='trainY') self.keep_prob = tf.placeholder(dtype= tf.float32) def str2float(self,s): def char2num(s): return {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9}[s] n = s.index('.') return reduce(lambda x,y:x*10+y,map(char2num,s[:n]+s[n+1:]))/(10**n) def Interface(self): # 4 层 GRU 结构描述 mOnolayer= tf.nn.rnn_cell.GRUCell(num_units= self.layer_units_num) mOnolayer= tf.nn.rnn_cell.DropoutWrapper(cell=monolayer, output_keep_prob=self.keep_prob) monolayer_final = tf.nn.rnn_cell.GRUCell(num_units= self.layer_units_num) layers = tf.nn.rnn_cell.MultiRNNCell([monolayer]*3+[monolayer_final]) # 激活 注意 in_length 表示输入序列步长, length 表示实际步长 output,_ = tf.nn.dynamic_rnn(cell= layers, inputs= self.X, dtype= tf.float32, sequence_length= self.length) output = self._final_relevant(output, self.length) weights, biases = self._weight_and_bias(self.layer_units_num, self.out_classes) Prediction = tf.nn.bias_add(tf.matmul(output, weights),biases) return Prediction def Graph(self, trainX, trainY): try: tf.InteractiveSession.close() except: pass self.sess = tf.InteractiveSession() tf.get_default_session() self.Preprocessing(trainX, trainY) tmp = self.Interface() self.pred = tf.nn.softmax(tmp) self.cost = tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(tmp, self.Y)) optimizer = tf.train.AdamOptimizer(learning_rate= self.learning_rate) # 0 设置训练器 grads_and_vars = optimizer.compute_gradients(self.cost) for i, (grid, var) in enumerate(grads_and_vars): if grid != None: grid = tf.clip_by_value(grid, -1., 1.) grads_and_vars[i] = (grid, var) optimizer = optimizer.apply_gradients(grads_and_vars) self.optimizer = optimizer self.correct_pred = tf.equal(tf.argmax(tmp,1), tf.argmax(self.Y,1)) self.accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32)) #self.init = tf.global_variables_initializer() self.init = tf.initialize_all_variables() def fit(self, trainX, trainY, dropout= 0.618): # 对标签 one_hot 编码 trainY = self.dense_to_one_hot(trainY) self.Graph(trainX, trainY) self.sess.run(self.init) batch_size = self.batch_size sig =10. ep = 0 while (sig > self.error): for i in range(int(len(trainX)/batch_size)+1): if i < int(len(trainX)/batch_size)+1: batch_x = trainX[i*batch_size : (i+1)*batch_size] batch_y = trainY[i*batch_size : (i+1)*batch_size] elif i== int(len(trainX)/batch_size)+1: batch_x = trainX[-batch_size:] batch_y = trainY[-batch_size:] self.sess.run(self.optimizer,feed_dict={self.X:batch_x, self.Y:batch_y, self.keep_prob:(1.-dropout)}) sig = self.sess.run(self.accuracy, feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.}) if ep%self.display_step==0: loss = self.sess.run(self.cost, feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.}) print (str(ep)+"th "+'Epoch Loss = {:.5f}'.format(loss)+" Training Accuracy={:.5f}".format(sig)) ep += 1 print("Optimization Finished!") def pred_prob(self, testX): batch_size = self.batch_size trainX = testX predict_output = np.zeros([1,self.out_classes]) for i in range(int(len(trainX)/batch_size)+1): if i < int(len(trainX)/batch_size)+1: batch_x = trainX[i*batch_size : (i+1)*batch_size] batch_y = trainY[i*batch_size : (i+1)*batch_size] elif i== int(len(trainX)/batch_size)+1: batch_x = trainX[-batch_size:] batch_y = trainY[-batch_size:] tp = self.sess.run(self.pred, feed_dict={self.X:batch_x, self.keep_prob:1.}) predict_output = np.row_stack([predict_output, tp]) predict_output = np.delete(predict_output, obj=0, axis=0) return predict_output def pred_signal(self, testX): pred_prob = self.pred_prob(testX) return np.argmax(pred_prob, axis=1)
因限制原因,图片与源代码戳这里: https://uqer.io/community/share/589d3cc2c1e3cc00567fdbea
![]() | 1 tomleader0828 2017-02-10 14:17:33 +08:00 mark |
![]() | 2 pming1 2017-02-10 14:19:37 +08:00 不明觉厉 |
![]() | 3 pathbox 2017-02-10 21:54:54 +08:00 在考虑要不要学 Python |