import gym
import tensorflow as tf
import numpy as np
GAMMA = 0.95
LEARNING_RATE = 0.01 class Policy_Gradient ( ) : def __init__ ( self, env) : self. time_step = 0 self. state_dim = env. observation_space. shape[ 0 ] self. action_dim = env. action_space. n self. ep_obs, self. ep_as, self. ep_rs = [ ] , [ ] , [ ] self. create_softmax_network( ) self. session = tf. InteractiveSession( ) self. session. run( tf. global_variables_initializer( ) ) def create_softmax_network ( self) : """当在time-step-i时刻,策略网络输出概率向量若与采样到的time-step-i时刻的动作越相似,那么交叉熵会越小。最小化这个交叉熵误差也就能够使策略网络的决策越接近我们采样的动作。最后用交叉熵乘上对应time-step的reward,就将reward的大小引入损失函数,entropy*reward越大,神经网络调整参数时计算得到的梯度就会越偏向该方向。:return:""" W1 = self. weight_variable( [ self. state_dim, 20 ] ) b1 = self. bias_variable( [ 20 ] ) W2 = self. weight_variable( [ 20 , self. action_dim] ) b2 = self. bias_variable( [ self. action_dim] ) self. state_input = tf. placeholder( "float" , [ None , self. state_dim] ) self. tf_acts = tf. placeholder( tf. int32, [ None , ] , name= "actions_num" ) self. tf_vt = tf. placeholder( tf. float32, [ None , ] , name= "actions_value" ) h_layer = tf. nn. relu( tf. matmul( self. state_input, W1) + b1) self. softmax_input = tf. matmul( h_layer, W2) + b2 self. all_act_prob = tf. nn. softmax( self. softmax_input, name= 'act_prob' ) self. neg_log_prob = tf. nn. sparse_softmax_cross_entropy_with_logits( logits= self. softmax_input, labels= self. tf_acts) self. loss = tf. reduce_mean( self. neg_log_prob * self. tf_vt) self. train_op = tf. train. AdamOptimizer( LEARNING_RATE) . minimize( self. loss) def weight_variable ( self, shape) : initial = tf. truncated_normal( shape) return tf. Variable( initial) def bias_variable ( self, shape) : initial = tf. constant( 0.01 , shape= shape) return tf. Variable( initial) def choose_action ( self, observation) : """选择动作 :这里的observation其实就是状态,当前的状态先传入state_input(也就相当于softmax网络的入口),softmax网络的输出是针对当前状态每个动作的概率,第一句就是运行了一个会话进行这个过程。#TODO prob_weights 应该是一个动作对应概率的矩阵,怎么查看数据类型来着忘了下一句就是依据概率选择动作了,选择概率最大的动作""" prob_weights = self. session. run( self. all_act_prob, feed_dict= { self. state_input: observation[ np. newaxis, : ] } ) action = np. random. choice( range ( prob_weights. shape[ 1 ] ) , p= prob_weights. ravel( ) ) return actiondef store_transition ( self, s, a, r) : self. ep_obs. append( s) self. ep_as. append( a) self. ep_rs. append( r) def learn ( self) : """模型学习:通过蒙特卡洛完整序列采样,对神经网络进行调整。:return:""" discounted_ep_rs = np. zeros_like( self. ep_rs) running_add = 0 for t in reversed ( range ( 0 , len ( self. ep_rs) ) ) : running_add = running_add * GAMMA + self. ep_rs[ t] discounted_ep_rs[ t] = running_adddiscounted_ep_rs -= np. mean( discounted_ep_rs) discounted_ep_rs /= np. std( discounted_ep_rs) self. session. run( self. train_op, feed_dict= { self. state_input: np. vstack( self. ep_obs) , self. tf_acts: np. array( self. ep_as) , self. tf_vt: discounted_ep_rs, } ) self. ep_obs, self. ep_as, self. ep_rs = [ ] , [ ] , [ ]
ENV_NAME = 'CartPole-v0'
EPISODE = 3000
STEP = 3000
TEST = 20 def main ( ) : env = gym. make( ENV_NAME) agent = Policy_Gradient( env) for episode in range ( EPISODE) : state = env. reset( ) for step in range ( STEP) : action = agent. choose_action( state) next_state, reward, done, _ = env. step( action) agent. store_transition( state, action, reward) state = next_state if done: agent. learn( ) break if episode % 100 == 0 : total_reward = 0 for i in range ( TEST) : state = env. reset( ) for j in range ( STEP) : env. render( ) action == agent. choose_action( state) state, reward, done, _ = env. step( action) total_reward += reward if done: break ave_reward = total_reward / TEST print ( 'episode:' , episode, 'Evaluation Average Reward:' , ave_reward) if __name__ == '__main__' : main( )