强化学习回顾
强化学习算法
Q-Learning
最朴素的算法,维护一张Q表,Q(s,a)表示在状态s下采取动作a的期望回报,Q表更新公式为:
\[Q(s,a) = Q(s,a) + \alpha(r + \gamma \max_{a'}Q(s',a') - Q(s,a))\]其中,$\alpha$是学习率,$\gamma$是折扣因子,取值为$0 \lt \gamma \lt 1$。,$Q(s’,a’)$表示在状态s’下采取动作a’的期望回报,$r$是当前状态s下采取动作a获得的即时回报。
Q-Learning使用$\epsilon$-greedy策略来选择动作,即以
$\epsilon$的概率随机选择动作,以$1-\epsilon$的概率选择当前状态下Q值最大的动作。
DQN
Deep Q-Networks使用神经网络来拟合Q函数,将状态s和动作a作为神经网络的输入,输出为Q值。
DQN还使用了经验回放:将每个状态转移$(s,a,r,s’)$存储在一个经验池中,每次更新Q值时,从经验池中随机采样一个状态转移,然后使用这个状态转移来更新Q值。因为神经网络训练期望样本是独立同分布的,但是强化学习采集的数据按顺序有很强的关联性。
DQN还使用了target network和evaluate network。target network用于计算下一状态的Q值,evaluate network用于计算策略选择的Q值,梯度下降发生在evaluate network上。target network的参数是固定的,每隔一段时间,将evaluate network的参数复制给target network。这样,target network的参数更新不会太频繁,可以减少训练过程中的不稳定性和波动。
观测状态转移:$(s_t, a_t, r_t, s_{t+1})$。 Q-Learning的目标:$Q(s,a) = Q(s,a) + \alpha(r + \gamma \max_{a’}Q(s’,a’) - Q(s,a))$ 稳态后:$r + \gamma \max_{a’}Q(s’,a’) - Q(s,a)=0$ 记为时序差分目标:$y_t = r + \gamma \max_{a’}Q(s’,a’)$ 损失函数:$L(\theta) = f(r + \gamma \max_{a’}Q(s’,a’) - Q(s,a))$ 梯度下降:$\theta = \theta - \alpha \nabla_\theta L(\theta)$
PG
DQN的问题:无法表示随机策略,每次都使用最大值,输出选择不连续;无法表示连续动作。
策略梯度让神经网络直接输出策略函数$\pi(s,a;\theta)$(即动作的概率)。训练最大化累计回报$J(\theta) = V_\pi(s_0)$。
策略梯度定理:
\[\nabla J(\theta)=E_{s,a\sim\pi}[q_{\pi_\theta}\nabla_\theta\ln\pi_\theta(s,a)]\]$q$是最终的收益的期望,应用蒙特卡洛,把这部分换成采样:
\[\nabla J(\theta)=E_{s,a\sim\pi}[G_t\nabla_\theta\ln\pi_\theta(s,a)]\]$G_t$是从$(s,a)$开始的一条随机轨迹最后的回报。
AC
时序差分方法是把$V(S_t)\leftarrow V(s_t)+\alpha[G_t-V(S_t)]$换成$(S_t)\leftarrow V(s_t)+\alpha[R_{t+1}+\gamma V(S_{t+1})-V(S_t)]$。视为估计$G_t$。
Actor-Critic算法中,Actor学习策略$\pi$得到尽可能高的回报,Critic学习估计策略的值函数$V$,评估Actor的表现。换句话说,Critic像Q-learing(学习评价当前状况),而Actor像PG(给出策略分布)。
Critic的训练目标:最小化时序差分误差的均方误差:
\[L_\text{critic}=\mathbb{E}[(V(s_t)-\text{TD Target})^2]\] \[\text{TD Target}=r_t+\gamma V(s_{t+1})\times(1-\text{done})\]Actor的训练目标:最大化优势乘以对数概率:
\[L_\text{actor}=-\mathbb{E}[\text{Advantage}\times\ln\pi_\theta(a_t|s_t)]\]优势这里取$\text{TD Error}$
\[L=-\sum\ln\pi_\theta(s_t,a_t)(r+\gamma V_(s_{t+1})-V(s_t))\]TRPO
TRPO通过限制新策略和旧策略之间的差异来确保训练的稳定性。
优化目标:
\[\max_\theta\mathbb{E}_{s,a\sim\pi_\theta}[\frac{\pi_\theta(a|s)}{\pi_{\theta_\text{old}}(a|s)}A^{\pi_{\theta_\text{old}}}(s,a)]\]$A$是优势函数,是在$s$处采取动作$a$的期望回报$Q(a,s)$减去在$s$处的期望回报$V(s)$。
(理解为,找一个新策略,步长不大,比旧策略更好,)
每次步幅不大:
\[\mathbb{E}_{s\sim\pi_\theta}[D_\text{KL}(\pi_{\theta_\text{old}}||\pi_\theta)]\leq\delta\]作泰勒展开,KL散度由二阶项
\[D_KL(\theta'||\theta)\approx\frac{1}{2}(\theta'-\theta)^TH(\theta'-\theta)\]迭代 \(\theta\leftarrow\argmax_{\theta'} g^T(\theta'-\theta)\)
引入拉格朗日并求解:
\[\theta\leftarrow\theta+\frac{1}{\lambda}H^{-1}g=\theta+\sqrt{\frac{2\lambda}{g^TH^{-1}g}}H^{-1}g\]PPO
最大化策略的期望累积回报,核心公式
\[L^\text{PPO}_\text{actor} = -\mathbb{E}[\min(r_tA_t,\text{clip}(r_t, 1-\epsilon,1+\epsilon)A_t)]\]近端比率裁剪:$r_t=\frac{\pi_\theta(a | s)}{\pi_{\theta_\text{old}}(a | s)}$被限制在$(1-\epsilon,1+\epsilon)$之间,避免更新幅度过大。 |
优势函数采用GAE(Generalized Advantage Estimation):
\[A_t = \sum_{i=0}^{T-t-1}\gamma^i\delta_{t+i}\]$\delta_t$是TD误差,$\gamma$是折扣因子,$T$是轨迹长度。
DDPG
DDPG(深度确定性策略梯度)。训练actor和critic,actor直接输出确定性的策略$a=\mu(s)$,critic估计价值函数$Q(s,a)$。
DDPG还使用的技巧:目标网络、经验回放、软更新、OU随机噪声
OU随机噪声是一种带有时序性的,更适合惯性系统的随机噪声,微分方程如下:
\[dx_t = \theta(\mu-x_t)dt+\sigma dW_t\]离散形式是:
\[x(t+\Delta t) = x(t) + \theta(\mu-x(t))\Delta t + \sigma \sqrt{\Delta t} \epsilon_t\]软更新是指每次更新时,将新策略的参数和旧策略的参数混合,而不是直接替换。可以避免策略的剧烈变化,提高训练的稳定性。
\[\theta^\text{target} \leftarrow \tau\theta + (1-\tau)\theta^\text{target}\]Gym
一个gym类实现了
step()
:执行动作,返回下一个状态信息reset()
:重置环境,返回初始状态信息render()
,帮助可视化代理来渲染内容close()
:关闭action_space
:动作空间observation_space
:状态空间spec
:包含用于make()
的初始化信息metadata
:元数据np_random
随机数生成器
以简单的倒摆小车来说
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
env = gym.make('CartPole-v1')
env.observation_space
# Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
# 水平位移 水平速度 转角 角速度
env.action_space
# Discrete(2)
# 向左和向右
env.reset()
# (array([-0.02547014, -0.02451906, -0.03074073, -0.01085381], dtype=float32),
# {})
# observation: ObsType, info: Dict
env.step(env.step(env.action_space.sample()))
# (array([-0.02596052, -0.21918696, -0.03095781, 0.27197373], dtype=float32),
# 1.0,
# False,
# False,
# {})
# observation: ObsType, reward: Float, terminated: Bool, truncated: bool, info: Dict
Q-Learning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class QLearningAgent:
def __init__(self, state_dim: int, action_dim: int, learning_rate=0.5, reward_decay=0.99):
"""Q-Learning Agent
Args:
state_dim (int): 状态空间维度
action_dim (int): 动作空间维度
learning_rate (float, optional): 学习率. Defaults to 0.5.
reward_decay (float, optional): 回报对时间的折扣. Defaults to 0.99.
"""
self.state_dim = state_dim
self.action_dim = action_dim
self.lr = learning_rate
self.gamma = reward_decay
self.q_table = np.random.uniform(0, 1, size=(state_dim, action_dim))
def choose_action(self, state, epsilon):
# epsilon表示探索的概率
if np.random.uniform() < epsilon:
action = np.random.randint(self.action_dim)
else:
action = np.argmax(self.q_table[state])
return action
def update(self, state, action, reward, state_next):
self.q_table[state, action] += self.lr * (reward + self.gamma * np.max(self.q_table[state_next]) - self.q_table[state, action])
测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
N_BINS = 6
env = gym.make('CartPole-v1')
action_space = env.action_space
state_space = env.observation_space
n_state = N_BINS ** 4
n_action = 2
q_learning_agent = QLearningAgent(n_state, n_action, learning_rate=0.5, reward_decay=1)
all_steps = [] # 统计坚持的时间
successive_success_episodes = 0 # 连续成功次数
n_episodes = 2000
n_steps_per_episode = 200
for i in tqdm(range(n_episodes)):
observation, info = env.reset()
done = False
for j in range(n_steps_per_episode):
state = digitize_state(observation)
action = q_learning_agent.choose_action(state, 0.5 / (i+500))
observation_next, reward, terminated, truncated, info = env.step(action)
state_next = digitize_state(observation_next)
done = terminated or truncated
# 修改了回报,只有坚持了195步以上才给奖励
if done:
if j > 195:
reward = 1
successive_success_episodes += 1
else:
reward = -1
successive_success_episodes = 0
else:
reward = 0
q_learning_agent.update(state, action, reward, state_next)
if done:
break
observation = observation_next
all_steps.append(j)
if successive_success_episodes >= 10:
break
env.close()
np.save("./q_table.npy", q_learning_agent.q_table)
plt.plot(all_steps)
plt.xlabel("Episode")
plt.ylabel("steps_persevered")
plt.title("Q-Learning Training")
plt.grid()
plt.show()
21点一例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import gym
import numpy as np
from matplotlib import pyplot as plt
from tqdm import tqdm
class QLearningAgent:
def __init__(self, state_dim: int, action_dim: int, learning_rate=0.01, reward_decay=0.95):
self.state_dim = state_dim
self.action_dim = action_dim
self.lr = learning_rate
self.gamma = reward_decay
self.q_table = np.random.uniform(0, 1, size=(state_dim, action_dim))
def choose_action(self, state, epsilon):
if np.random.uniform() < epsilon:
action = np.random.randint(self.action_dim)
else:
action = np.argmax(self.q_table[state])
return action
def update(self, state, action, reward, state_next, done):
self.q_table[state, action] += self.lr * (reward + self.gamma * np.max(self.q_table[state_next] if not done else 0) - self.q_table[state, action])
def observation_to_index(observation):
return int(observation[0] * 22 + observation[1] * 2 + observation[2])
env = gym.make('Blackjack-v1')
action_space = env.action_space
state_space = env.observation_space
n_state = 32 * 11 * 2
n_action = 2
q_learning_agent = QLearningAgent(n_state, n_action)
all_rewards = []
n_episodes = 1000000
for i in tqdm(range(n_episodes)):
observation, info = env.reset()
state = observation_to_index(observation)
done = False
total_rewards = 0
while not done:
action = q_learning_agent.choose_action(state, 0.5 / (i // (n_episodes // 20) + 1))
observation_next, reward, terminated, truncated, info = env.step(action)
state_next = observation_to_index(observation_next)
done = terminated or truncated
q_learning_agent.update(state, action, reward, state_next, done)
state = state_next
total_rewards += reward
all_rewards.append(total_rewards)
env.close()
print(np.average(all_rewards[-1000:]))
all_rewards = np.array(all_rewards)
all_rewards = np.convolve(all_rewards, np.ones((1000,))/1000, mode='valid')
plt.plot(all_rewards)
plt.xlabel("Episode")
plt.ylabel("Rewards")
plt.title("Q-Learning Training")
plt.grid()
plt.show()
DQN
经验回放:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, next_state, reward):
if next_state is None:
next_state = np.zeros_like(state)
self.buffer.append((state, action, next_state, reward))
def sample(
self, batch_size
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
batch = random.sample(self.buffer, batch_size)
(
states,
actions,
next_states,
rewards,
) = map(np.array, zip(*batch))
return (
torch.tensor(states),
torch.tensor(actions, dtype=torch.int64).unsqueeze(1),
torch.tensor(next_states),
torch.tensor(rewards).unsqueeze(1),
)
def __len__(self):
return len(self.buffer)
DQN
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class DQNAgent:
def __init__(
self,
state_dim,
action_dim,
model_cls,
learning_rate=0.0001,
gamma=0.99,
batch_size=32,
replay_buffer_capacity=5000,
):
self.state_dim = state_dim
self.action_dim = action_dim
self.model = model_cls(state_dim, action_dim)
self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
self.learning_rate = learning_rate
self.loss_fn = F.smooth_l1_loss
self.gamma = gamma
self.batch_size = batch_size
self.replay_buffer = ReplayBuffer(replay_buffer_capacity)
def choose_action(self, state: torch.Tensor, epsilon) -> int:
# epsilon表示探索的概率
if np.random.uniform() < epsilon:
action = random.randrange(self.action_dim)
else:
self.model.eval()
with torch.no_grad():
action = np.argmax(self.model(state))
return int(action)
def learn(self):
if len(self.replay_buffer) < self.batch_size:
return
states, actions, next_states, rewards = self.replay_buffer.sample(self.batch_size)
self.model.eval()
q = self.model(states).gather(dim=1, index=actions)
q1 = self.model(next_states).max(dim=1)[0].detach().unsqueeze(1)
target = rewards + self.gamma * q1
loss = self.loss_fn(q, target)
self.model.train()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
测试DQN:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class SimpleMLP(nn.Module):
def __init__(self, state_dim, action_dim):
super(SimpleMLP, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, action_dim),
)
def forward(self, x):
return self.net(x)
env = gym.make("CartPole-v0")
state_space = env.observation_space
action_space = env.action_space
state_dim = state_space.shape[0]
action_dim = action_space.n
n_episodes = 500
n_steps_per_episode = 200
agent = DQNAgent(state_dim, action_dim, SimpleMLP)
all_steps = [] # 统计坚持的时间
successive_success_episodes = 0 # 连续成功次数
for i in tqdm(range(n_episodes)):
observation, info = env.reset()
for j in range(n_steps_per_episode):
action = agent.choose_action(torch.Tensor(observation), 0.5 / (i + 1))
observation_next, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
if done:
next_state = None
if j > 195:
reward = 1
successive_success_episodes += 1
else:
reward = -1
successive_success_episodes = 0
else:
reward = 0
agent.replay_buffer.push(observation, action, observation_next, reward)
agent.learn()
if done:
break
observation = observation_next
all_steps.append(j)
if successive_success_episodes >= 10:
break
torch.save(agent.model.state_dict(), "cartpole_dqn.pth")
plt.plot(all_steps)
plt.xlabel("Episode")
plt.ylabel("steps_persevered")
plt.title("DQN Training")
plt.grid()
plt.show()
PG
首先求$G_t = \sum _{i=t}^{T} \gamma^i r_i$
1
2
3
4
5
6
7
8
9
10
11
def discounted_future_reward(rewards, gamma) -> list[float]:
"""计算未来奖励的折现值
>>> discounted_future_reward([0.8, 0.8, 0.8, 0.8], 0.5)
array([2.7512, 2.168 , 1.52 , 0.8 ])
"""
discounted_future_reward = np.zeros_like(rewards)
running_add = 0
for t in reversed(range(0, len(rewards))):
running_add = running_add * gamma + rewards[t]
discounted_future_reward[t] = running_add
return discounted_future_reward
PG模型需要输出概率:
1
2
3
4
5
6
7
8
9
10
11
12
class SimpleMLPSoftmax(nn.Module):
def __init__(self, state_dim, action_dim):
super(SimpleMLPSoftmax, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, action_dim),
nn.Softmax()
)
def forward(self, x):
return self.net(x)
训练,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class PGAgent:
def __init__(
self,
state_dim,
action_dim,
model_cls,
learning_rate=3e-4,
gamma=0.9):
self.state_dim = state_dim
self.action_dim = action_dim
self.model = model_cls(state_dim, action_dim)
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)
self.gamma = gamma
def choose_action(self, state: torch.Tensor) -> Tuple[int, torch.Tensor]:
probs = self.model(state)
choice = np.random.choice(self.action_dim, p=probs.detach().numpy())
return choice, torch.log(probs[choice])
def learn(self, rewards: list[float], log_probs: list[torch.Tensor]):
discounted_rewards = torch.Tensor(discounted_future_reward(rewards, self.gamma))
discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-9)
policy_grads = - torch.stack(log_probs) * discounted_rewards
self.optimizer.zero_grad()
policy_grads = policy_grads.sum()
policy_grads.backward()
self.optimizer.step()
测试:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
env = gym.make("CartPole-v0")
state_space = env.observation_space
action_space = env.action_space
state_dim = state_space.shape[0]
action_dim = action_space.n
n_episodes = 2000
n_steps_per_episode = 200
agent = PGAgent(state_dim, action_dim, SimpleMLPSoftmax)
all_steps = []z
for i in tqdm(range(n_episodes)):
observation, info = env.reset()
rewards = []
log_probs = []
for j in range(n_steps_per_episode):
action, log_prob = agent.choose_action(torch.Tensor(observation))
observation_next, reward, terminated, truncated, info = env.step(action)
rewards.append(reward)
log_probs.append(log_prob)
done = terminated or truncated
if done:
agent.learn(rewards, log_probs)
all_steps.append(j)
break
observation = observation_next
torch.save(agent.model.state_dict(), "cartpole_pg.pth")
plt.plot(all_steps)
plt.xlabel("Episode")
plt.ylabel("steps_persevered")
plt.title("PG Training")
plt.grid()
plt.show()
AC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class ActorCritic:
def __init__(self, state_dim, action_dim, actor_lr, critic_lr, gamma, device):
self.device = device
self.actor = SimpleMLPSoftmax(state_dim, action_dim)
self.critic = SimpleMLP(state_dim, 1)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
self.gamma = gamma # 时序差分目标的折扣因子
self.loss_fn = nn.MSELoss()
self.transition_dict = {}
self.transition_dict_clear()
def transition_dict_push(self, state, action, next_state, reward, done):
"""将状态、动作、下一个状态、奖励和是否完成添加到transition_dict字典中
"""
self.transition_dict["states"].append(state)
self.transition_dict["actions"].append(action)
self.transition_dict["next_states"].append(next_state)
self.transition_dict["rewards"].append(reward)
self.transition_dict["dones"].append(done)
def transition_dict_clear(self):
"""清空transition_dict字典中每一项的内容
"""
self.transition_dict = {
"states": [],
"actions": [],
"next_states": [],
"rewards": [],
"dones": []
}
def transition_dict_to_tensor(self) -> Tuple[torch.FloatTensor, torch.LongTensor, torch.FloatTensor, torch.FloatTensor, torch.FloatTensor]:
"""将transition_dict字典中的内容转换为张量
"""
states = torch.FloatTensor(self.transition_dict["states"]).to(self.device)
actions = torch.LongTensor(self.transition_dict["actions"]).to(self.device).view(-1, 1)
next_states = torch.FloatTensor(self.transition_dict["next_states"]).to(self.device)
rewards = torch.FloatTensor(self.transition_dict["rewards"]).to(self.device).view(-1, 1)
dones = torch.FloatTensor(self.transition_dict["dones"]).to(self.device).view(-1, 1)
return states, actions, next_states, rewards, dones
def zero_back_step(self, actor_loss: torch.FloatTensor, critic_loss: torch.FloatTensor):
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
actor_loss.backward()
critic_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()
def choose_action(self, state: Arraylike, is_action_sample=True) -> int:
"""
Args:
state (Arraylike): 当前状态
is_action_sample (bool, optional): 是否依概率选择动作,否则选取最大概率的动作. Defaults to True.
Returns:
int: 选择的动作
"""
state = torch.FloatTensor(state).to(self.device).unsqueeze(0)
probs = self.actor(state)
if is_action_sample:
action = torch.multinomial(probs, 1).item()
else:
action = torch.argmax(probs).item()
return action
def learn(self):
states, actions, next_states, rewards, dones = self.transition_dict_to_tensor()
# 预测的当前时刻的值函数
td_value = self.critic(states)
# 时序差分目标
td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
# 时序差分误差
td_error = td_target - td_value
# 动作的对数概率
log_probs = torch.log(self.actor(states).gather(1, actions))
actor_loss = torch.mean(-log_probs * td_error.detach())
critic_loss = torch.mean(self.loss_fn(td_value, td_target.detach()))
self.zero_back_step(actor_loss, critic_loss)
PPO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class PPO(ActorCritic):
def __init__(self, state_dim, action_dim, actor_lr, critic_lr, gamma, device, lamda, n_epochs_per_learn, clip_ratio):
super(PPO, self).__init__(state_dim, action_dim, actor_lr, critic_lr, gamma, device)
self.lamda = lamda # GAE的lambda
self.n_epochs_per_learn = n_epochs_per_learn # 每组数据训练的次数
self.clip_ratio = clip_ratio # 裁剪比
def learn(self):
states, actions, next_states, rewards, dones = self.transition_dict_to_tensor()
# 预测的当前时刻的值函数
td_value = self.critic(states)
# 时序差分目标
td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
# 时序差分误差
td_error = td_target - td_value
# 动作的对数概率
log_probs = torch.log(self.actor(states).gather(1, actions)).detach()
# 计算GAE优势函数
td_error = td_error.cpu().detach().numpy()
advantages = []
advantage = 0.0
for delta in td_error[::-1]:
advantage = self.gamma * self.lamda * advantage + delta
advantages.append(advantage)
advantages.reverse()
advantages = torch.tensor(advantages).float().to(self.device)
for _ in range(self.n_epochs_per_learn):
# 计算新的对数概率
new_log_probs = torch.log(self.actor(states).gather(1, actions))
# 新旧策略之间的比例
ratio = torch.exp(new_log_probs - log_probs)
# 裁剪
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1.0 - self.clip_ratio, 1.0 + self.clip_ratio) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
new_td_value = self.critic(states)
critic_loss = self.loss_fn(new_td_value, td_target.detach())
self.zero_back_step(actor_loss, critic_loss)
测试代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
agent = PPO(
state_dim=state_dim,
action_dim=action_dim,
actor_lr=1e-3,
critic_lr=1e-3,
gamma=0.99,
device=device,
lamda=0.95,
n_epochs_per_learn=10,
clip_ratio=0.2,
)
num_episodes = 500
all_rewards = []
for episode in range(num_episodes):
state, _ = env.reset()
agent.transition_dict_clear()
total_rewards = 0
while True:
action = agent.choose_action(state)
next_state, reward, done, truncated, _ = env.step(action)
total_rewards += reward
agent.transition_dict_push(state, action, next_state, reward, done)
if done or truncated:
break
state = next_state
agent.learn()
all_rewards.append(total_rewards)
print(f"Episode {episode}, Return: {total_rewards}")
env.close()
测试代码:
1
2
3
4
5
6
7
8
9
env = gym.make("CartPole-v1", render_mode="human")
state, info = env.reset()
for _ in range(1000):
action = agent.choose_action(observation, False)
observation, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
state, info = env.reset()
time.sleep(0.02)
env.close()
reward可以做一个标准化,使奖励分布在0附近
1
rewards = a * rewards + b
连续动作空间的Actor,需要根据动作空间的范围调整mean的中心
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class SimpleMLPNormal(nn.Module):
def __init__(self, n_states, n_actions):
super(SimpleMLPNormal, self).__init__()
self.net = nn.Sequential(
nn.Linear(n_states, 64),
nn.ReLU(),
)
self.mean = nn.Sequential(
nn.Linear(64, n_actions),
nn.Tanh()
)
self.log_std = nn.Sequential(
nn.Linear(64, n_actions),
nn.Softplus()
)
def forward(self, x):
x = self.net(x)
mean = self.mean(x) # 缩放mean
log_std = torch.clamp(self.log_std(x), -20, 2) # 限制标准差的范围
std = torch.exp(log_std)
return mean, std
对数概率:
1
2
3
mean, std = self.actor(states)
dist = Normal(mean, std)
log_probs = dist.log_prob(actions)
训练用例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
env = gym.make("Pendulum-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
agent = PPO(
state_dim=state_dim,
action_dim=action_dim,
actor_lr=1e-4,
critic_lr=5e-3,
gamma=0.9,
device=device,
lamda=0.9,
n_epochs_per_learn=10,
clip_ratio=0.2,
)
离散状态空间可以套一个embedding
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SimpleMLPEmbedding(nn.Module):
def __init__(self, n_states, n_actions):
super(SimpleMLPEmbedding, self).__init__()
self.embedding = nn.Embedding(n_states, 64)
self.net = nn.Sequential(
nn.Linear(64, 64),
nn.ReLU(),
nn.Linear(64, n_actions)
)
def forward(self, x):
return self.net(self.embedding(x.long()))
# 使用方式
dist = torch.distributions.Categorical(probs)
action = dist.sample()
DDPG
OU噪声
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class OUNoise:
def __init__(self, size, mu=0.0, theta=0.15, sigma=0.2):
self.size = size
self.mu = mu
self.theta = theta
self.sigma = sigma
self.reset()
def reset(self):
self.state = np.ones(self.size) * self.mu
def sample(self):
dx = self.theta * (self.mu - self.state) + self.sigma * np.random.randn(self.size)
self.state += dx
return self.state
ActorCritic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Actor网络
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 400), nn.ReLU(),
nn.Linear(400, 300), nn.ReLU(),
nn.Linear(300, action_dim), nn.Tanh()
)
self.max_action = max_action
def forward(self, state):
return self.max_action * self.net(state)
# Critic网络
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, 400), nn.ReLU(),
nn.Linear(400, 300), nn.ReLU(),
nn.Linear(300, 1)
)
def forward(self, state, action):
x = torch.cat([state, action], 1)
return self.net(x)
DDPG算法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class DDPG:
def __init__(self, state_dim, action_dim, action_bondary, actor_lr, critic_lr, batch_size, gamma, tau):
self.actor = Actor(state_dim, action_dim, action_bondary)
self.actor_target = Actor(state_dim, action_dim, action_bondary)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic = Critic(state_dim, action_dim)
self.critic_target = Critic(state_dim, action_dim)
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
self.buffer = ReplayBuffer(50000)
self.noise = OUNoise(action_dim)
self.action_bondary = action_bondary # 动作空间的范围是[-action_bondary, action_bondary]
self.batch_size = batch_size # 每轮更新从经验回放中抽出的样本数
self.gamma = gamma # 时序差分目标的折现因子
self.tau = tau # 软更新的比例
def choose_action(self, state, noise=True):
state = torch.FloatTensor(state).unsqueeze(0)
action = self.actor(state).detach().numpy()[0]
if noise:
action += self.noise.sample()
return np.clip(action, -self.action_bondary, self.action_bondary)
def learn(self):
if len(self.buffer) < self.batch_size:
return
states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)
states = torch.FloatTensor(states)
actions = torch.FloatTensor(actions)
rewards = torch.FloatTensor(rewards).unsqueeze(1)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones).unsqueeze(1)
# 更新Critic网络
with torch.no_grad():
next_actions = self.actor_target(next_states)
target_q = self.critic_target(next_states, next_actions)
target_q = rewards + (1 - dones) * self.gamma * target_q
current_q = self.critic(states, actions)
critic_loss = nn.MSELoss()(current_q, target_q)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 更新Actor网络
actor_loss = -self.critic(states, self.actor(states)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 软更新目标网络
for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
训练用例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
env = gym.make("Pendulum-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bondary = env.action_space.high[0]
agent = DDPG(
state_dim,
action_dim,
action_bondary=action_bondary,
actor_lr=1e-3,
critic_lr=1e-3,
batch_size=64,
gamma=0.99,
tau = 0.005,
)
all_rewards = []
for episode in range(100):
state = env.reset()[0]
agent.noise.reset()
episode_reward = 0
for step in range(200):
action = agent.choose_action(state)
next_state, reward, done, _, _ = env.step(action)
agent.buffer.push(state, action, reward, next_state, float(done))
state = next_state
agent.learn()
episode_reward += reward
if done:
break
all_rewards.append(episode_reward)
print(f"Episode {episode}, Reward: {episode_reward:.2f}")
env.close()