前言:
RNN回归是非常值得学习的内容,这里向大家介绍一个例子!仔细分析,其实很简单!
代码如下:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.pyplot as plt
batch_start = 0
time_steps =20 #时间步数
batch_size = 50 #训练样本集
input_size = 1
output_size = 1
cell_size = 10
lr = 0.006
#这里进行样本和结果获取,用sin 去接近 cos, 事实上用 kx线性函数去接近cos也是可以的,但是相比很慢
def graph_get():
global batch_start,time_steps
xs = np.arange(batch_start,batch_start+time_steps*batch_size).reshape(batch_size,time_steps)/(10*np.pi)
start = np.sin(xs)
result = np.cos(xs)
xs = np.arange(batch_start,batch_start+time_steps*batch_size).reshape(batch_size,time_steps)/(10*np.pi)
start = np.sin(xs)
result = np.cos(xs)
#plt.plot(xs[0,:],x_sin[0,:],'r--',xs[0,:],x_cos[0,:],'b--')
#plt.show()
batch_start+=time_steps #这步很重要,这里使得回归线段 向后推进,显示出不断显现的效果
return [start[:,:,np.newaxis],result[:,:,np.newaxis],xs]
class m_LSTM_RNN(object):
#这里将RNN的操作定义一个类
def __init__(self,time_steps,in_size,out_size,cell_size,batch_size, **kwargs):
self.time_steps =time_steps
self.in_size =in_size
self.out_size = out_size
self.cell_size = cell_size
self.batch_size = batch_size
self.in_size =in_size
self.out_size = out_size
self.cell_size = cell_size
self.batch_size = batch_size
#定义placerhold,格式为graph_get()的输出结果一致
with tf.name_scope('inputs'):
self.xs = tf.placeholder(tf.float32,[None,time_steps,in_size],name ='xs')
self.ys = tf.placeholder(tf.float32,[None,time_steps,out_size],name ='ys')
self.xs = tf.placeholder(tf.float32,[None,time_steps,in_size],name ='xs')
self.ys = tf.placeholder(tf.float32,[None,time_steps,out_size],name ='ys')
#
网络层为 神经层1->RNN_cell->神经层2 这样的方式
with tf.variable_scope('layer_in'):
self.layer_in()
with tf.variable_scope('LSTM_CELL'):
self.rnn_cell()
with tf.variable_scope('layer_out'):
self.layer_end()
with tf.variable_scope('layer_in'):
self.layer_in()
with tf.variable_scope('LSTM_CELL'):
self.rnn_cell()
with tf.variable_scope('layer_out'):
self.layer_end()
#这里计算差别
with tf.name_scope('cost'):
self.calculate_cost()
with tf.name_scope('cost'):
self.calculate_cost()
#采用Adam的优化方式
with tf.name_scope('train'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(self.cost)
with tf.name_scope('train'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(self.cost)
return super().__init__(**kwargs)
def layer_in(self):
l_in_x = tf.reshape(self.xs,[-1,self.in_size],name = '2_2D')
W_l_in = self.weight_variable([self.in_size,self.cell_size])
b_l_in = self.bais_variable([self.cell_size])
W_l_in = self.weight_variable([self.in_size,self.cell_size])
b_l_in = self.bais_variable([self.cell_size])
with tf.name_scope('layer_in'):
W_plus_b = tf.matmul(l_in_x,W_l_in)+b_l_in
self.layer_in_end = tf.reshape(W_plus_b,[-1,self.time_steps,self.cell_size],name = '2_3D')
W_plus_b = tf.matmul(l_in_x,W_l_in)+b_l_in
self.layer_in_end = tf.reshape(W_plus_b,[-1,self.time_steps,self.cell_size],name = '2_3D')
def rnn_cell(self):
lstm_cell = tf.contrib.rnn.BasicLSTMCell(self.cell_size,forget_bias = 1.0,state_is_tuple = True)
lstm_cell = tf.contrib.rnn.BasicLSTMCell(self.cell_size,forget_bias = 1.0,state_is_tuple = True)
with tf.name_scope('rnn_cell'):
self.cell_init_state=lstm_cell.zero_state(self.batch_size,dtype = tf.float32)
self.cell_out,self.cell_final_state = tf.nn.dynamic_rnn(lstm_cell,self.layer_in_end,initial_state= self.cell_init_state,time_major = False)
self.cell_init_state=lstm_cell.zero_state(self.batch_size,dtype = tf.float32)
self.cell_out,self.cell_final_state = tf.nn.dynamic_rnn(lstm_cell,self.layer_in_end,initial_state= self.cell_init_state,time_major = False)
def layer_end(self):
l_from_cell = tf.reshape(self.cell_out,[-1,self.cell_size],name = '2_2D')
W_l_end = self.weight_variable([self.cell_size,self.out_size])
b_l_end = self.bais_variable([self.out_size])
with tf.name_scope('layer_end'):
self.prediction = tf.matmul(l_from_cell,W_l_end)+b_l_end
W_l_end = self.weight_variable([self.cell_size,self.out_size])
b_l_end = self.bais_variable([self.out_size])
with tf.name_scope('layer_end'):
self.prediction = tf.matmul(l_from_cell,W_l_end)+b_l_end
def calculate_cost(self):
losses = tf.contrib.legacy_seq2seq.sequence_loss_by_example(
#这里计算的是每一个batch的cost, 所以后面均分
[tf.reshape(self.prediction,[-1],name = 'reshape_prediction')],
[tf.reshape(self.ys,[-1],name = 'reshape_ys')],
[tf.ones([self.batch_size*self.time_steps],dtype = tf.float32)],
average_across_timesteps = True,
softmax_loss_function = self.ms_error,
name = 'losses'
)
with tf.name_scope('average_cost'):
self.cost = tf.div(
tf.reduce_sum(losses,name = 'losses_sum'),
self.batch_size,
name = 'average_cost'
)
tf.summary.scalar('cost',self.cost)
[tf.reshape(self.prediction,[-1],name = 'reshape_prediction')],
[tf.reshape(self.ys,[-1],name = 'reshape_ys')],
[tf.ones([self.batch_size*self.time_steps],dtype = tf.float32)],
average_across_timesteps = True,
softmax_loss_function = self.ms_error,
name = 'losses'
)
with tf.name_scope('average_cost'):
self.cost = tf.div(
tf.reduce_sum(losses,name = 'losses_sum'),
self.batch_size,
name = 'average_cost'
)
tf.summary.scalar('cost',self.cost)
@staticmethod
def ms_error(labels,logits):
return tf.square(tf.subtract(labels,logits))
def ms_error(labels,logits):
return tf.square(tf.subtract(labels,logits))
def weight_variable(self,shape,name= 'weights'):
init = tf.random_normal_initializer(mean = 0,stddev = 1.0)
return tf.get_variable(shape = shape,initializer = init, name = name)
init = tf.random_normal_initializer(mean = 0,stddev = 1.0)
return tf.get_variable(shape = shape,initializer = init, name = name)
def bais_variable(self, shape,name = 'biases'):
init = tf.constant_initializer(0.1)
return tf.get_variable(shape = shape,initializer = init, name = name)
init = tf.constant_initializer(0.1)
return tf.get_variable(shape = shape,initializer = init, name = name)
if __name__ =='__main__':
rnn_model = m_LSTM_RNN(time_steps,input_size,output_size,cell_size,batch_size)
sess = tf.Session()
sess = tf.Session()
merged = tf.summary.merge_all()
writer = tf.summary.FileWriter('logs2/',sess.graph)
init = tf.global_variables_initializer()
sess.run(init)
plt.ion()
plt.show()
plt.show()
for i in range(200):
start,result,xs = graph_get()
start,result,xs = graph_get()
#这里第一次没有self.cell_final_state ,第二次后self.cell_init_state 需继承前次的self.cell_final_state ,这点不用时,你会发现同样可以接近,但是只是一小段,而且接近过程缓慢
if i ==0:
feed_dict = {
rnn_model.xs:start,
rnn_model.ys:result,
}
else:
feed_dict={
rnn_model.xs:start,
rnn_model.ys:result,
rnn_model.cell_init_state:state
}
feed_dict = {
rnn_model.xs:start,
rnn_model.ys:result,
}
else:
feed_dict={
rnn_model.xs:start,
rnn_model.ys:result,
rnn_model.cell_init_state:state
}
train,cost,state,pred = sess.run(
[rnn_model.train_op,
rnn_model.cost,
rnn_model.cell_final_state,
rnn_model.prediction],
feed_dict=feed_dict
rnn_model.cost,
rnn_model.cell_final_state,
rnn_model.prediction],
feed_dict=feed_dict
)
plt.plot(xs[0,:],result[0].flatten(),'r',xs[0,:],pred.flatten()[:
time_steps],'b--',xs[0,:],start[0].flatten(),'g')
plt.ylim((-1.2, 1.2))
plt.draw()
plt.pause(0.3)
plt.ylim((-1.2, 1.2))
plt.draw()
plt.pause(0.3)
if i %20 ==0:
print('cost:',cost)
reult = sess.run(merged,feed_dict)
writer.add_summary(reult,i)
print('cost:',cost)
reult = sess.run(merged,feed_dict)
writer.add_summary(reult,i)
以上我只把关键的部分做了解释, 若想知道其他部分的意思可以参考Tensorflow RNN简单分类例子:点击打开链接
欢迎大家学习交流!