【人工智能笔记】第五节:基于TensorFlow 2( 三 )


5.股票预测模型,使用JIT模式进行训练,用JIT或Eager模式进行 预测 。JIT模式适合大规模预测,Eager单次预测加载速度快 。
模型代码如下:
class GuPiaoModel():'''股票预测模型'''def __init__(self, output_num, model_path='./data/gupiao_model'):# 预测数据维度self.output_num = output_num# 加载模型路径if not os.path.exists(model_path):os.makedirs(model_path)self.model_path = model_path# 建立模型self.build_model()# 加载模型self.load_model()def build_model(self):'''建立模型'''self.encoder_model = Encoder()self.decoder_model = Decoder(self.output_num)# 优化器self.optimizer = tf.keras.optimizers.RMSprop(clipvalue=1.0, lr=0.001)# 损失函数self.loss_object = tf.keras.losses.MeanAbsoluteError()# 保存模型self.checkpoint = tf.train.Checkpoint(optimizer=self.optimizer,encoder=self.encoder_model,decoder=self.decoder_model)self.checkpoint_manager = tf.train.CheckpointManager(self.checkpoint, self.model_path, max_to_keep=3)@tf.function(input_signature=(tf.TensorSpec(shape=(None, None, 15), dtype=tf.float32),tf.TensorSpec(shape=(None, None, 15), dtype=tf.float32),))def train_step(self, input_data, target_data):'''训练input_data:(batch_size, history_size, 15) target_data:(batch_size, target_size, 15) '''print('Tracing with train_step', type(input_data), type(target_data))print('Tracing with train_step', input_data.shape, target_data.shape)loss = 0.0with tf.GradientTape() as tape:# 编码# encoder_output(history_size,128)# encoder_state1(history_size,128)# encoder_state2(history_size,128)encoder_output, encoder_state1, encoder_state2 = self.encoder_model(input_data)decoder_state1 = encoder_state1decoder_state2 = encoder_state2decoder_input = input_data[:,-1,:]# 解码for target_index in tf.range(tf.shape(target_data)[1]):# 正确值true_target = target_data[:,target_index,3:]# 解码decoder_output, decoder_state1, decoder_state2 = self.decoder_model(decoder_input, decoder_state1, decoder_state2, encoder_output)# 计算损失batch_loss = self.loss_object(y_true=true_target, y_pred=decoder_output)loss += batch_lossdecoder_input = target_data[:,target_index,:]total_loss = (loss / float(tf.shape(target_data)[1]))trainable_variables= self.encoder_model.trainable_variables + self.decoder_model.trainable_variablesgradients = tape.gradient(loss, trainable_variables )self.optimizer.apply_gradients(zip(gradients, trainable_variables))return loss, total_lossdef fit_generator(self, generator, steps_per_epoch, epochs, initial_epoch=1, auto_save=False):'''训练'''for epoch in range(initial_epoch, epochs+1):start = time.process_time()epoch_loss = 0for steps in range(1, steps_per_epoch+1):x, y = next(generator)# print('generator', x.shape, y.shape)loss, total_loss = self.train_step(x, y)epoch_loss += total_lossprint('\rsteps:%d/%d, epochs:%d/%d, loss:%0.4f, total_loss:%0.4f' % (steps, steps_per_epoch, epoch, epochs, loss, total_loss), end='')end = time.process_time()print('\rsteps:%d/%d, epochs:%d/%d, %0.4f S, loss:%0.4f, total_loss:%0.4f, epoch_loss:%0.4f' % (steps, steps_per_epoch, epoch, epochs, (end - start), loss, total_loss, epoch_loss))if auto_save:self.save_model()@tf.function(input_signature=(tf.TensorSpec(shape=(None, None, 15), dtype=tf.float32),tf.TensorSpec(shape=(None, None, 3), dtype=tf.float32),tf.TensorSpec(shape=None, dtype=tf.int32),))def predict_jit(self, input_data, time_step, output_size):'''预测(编译模式)input_data:(1, history_size,15)time_step:预测时间序列,(1,target_size,3)output_size:预测数量'''predict_data = tf.TensorArray(dtype=tf.float32, size=output_size, dynamic_size=True)# 编码# encoder_output(history_size,128)# encoder_state1(history_size,128)# encoder_state2(history_size,128)encoder_output, encoder_state1, encoder_state2 = self.encoder_model(input_data)decoder_state1 = encoder_state1decoder_state2 = encoder_state2decoder_input = input_data[:,-1,:]# 解码for i in tf.range(output_size):# 解码decoder_output, decoder_state1, decoder_state2 = self.decoder_model(decoder_input, decoder_state1, decoder_state2, encoder_output)decoder_input = tf.concat([time_step[:,i,:], decoder_output], axis=1)# 记录预测值predict_data = predict_data.write(i, decoder_input)# 交换维度predict_data = predict_data.stack()predict_data = tf.transpose(predict_data, perm=[1, 0, 2])return predict_datadef predict_eager(self, input_data, time_step, output_size):'''预测(即时模式)))input_data:(1, history_size,15)time_step:预测时间序列,(1,target_size,3)output_size:预测数量'''predict_data = []input_data = tf.constant(input_data, dtype=tf.float32)# 编码# encoder_output(history_size,128)# encoder_state1(history_size,128)# encoder_state2(history_size,128)encoder_output, encoder_state1, encoder_state2 = self.encoder_model(input_data)decoder_state1 = encoder_state1decoder_state2 = encoder_state2decoder_input = input_data[:,-1,:]# 解码for i in range(output_size):# 解码decoder_output, decoder_state1, decoder_state2 = self.decoder_model(decoder_input, decoder_state1, decoder_state2, encoder_output)decoder_input = tf.concat([time_step[:,i,:], decoder_output], axis=1)# 记录预测值predict_data.append(decoder_input.numpy())predict_data = np.array(predict_data)# 交换维度predict_data = predict_data.swapaxes(0,1)return predict_datadef save_model(self):'''保存模型'''save_path = self.checkpoint_manager.save()print('保存模型 {}'.format(save_path))def load_model(self):'''加载模型'''self.checkpoint.restore(self.checkpoint_manager.latest_checkpoint)if self.checkpoint_manager.latest_checkpoint:print('加载模型 {}'.format(self.checkpoint_manager.latest_checkpoint))