文章

强化学习回顾

强化学习回顾

强化学习算法

Q-Learning

最朴素的算法,维护一张Q表,Q(s,a)表示在状态s下采取动作a的期望回报,Q表更新公式为:

\[Q(s,a) = Q(s,a) + \alpha(r + \gamma \max_{a'}Q(s',a') - Q(s,a))\]

其中,$\alpha$是学习率,$\gamma$是折扣因子,取值为$0 \lt \gamma \lt 1$。,$Q(s’,a’)$表示在状态s’下采取动作a’的期望回报,$r$是当前状态s下采取动作a获得的即时回报。

Q-Learning使用$\epsilon$-greedy策略来选择动作,即以

$\epsilon$的概率随机选择动作,以$1-\epsilon$的概率选择当前状态下Q值最大的动作。

DQN

Deep Q-Networks使用神经网络来拟合Q函数,将状态s和动作a作为神经网络的输入,输出为Q值。

DQN还使用了经验回放:将每个状态转移$(s,a,r,s’)$存储在一个经验池中,每次更新Q值时,从经验池中随机采样一个状态转移,然后使用这个状态转移来更新Q值。因为神经网络训练期望样本是独立同分布的,但是强化学习采集的数据按顺序有很强的关联性。

DQN还使用了target network和evaluate network。target network用于计算下一状态的Q值,evaluate network用于计算策略选择的Q值,梯度下降发生在evaluate network上。target network的参数是固定的,每隔一段时间,将evaluate network的参数复制给target network。这样,target network的参数更新不会太频繁,可以减少训练过程中的不稳定性和波动。

观测状态转移:$(s_t, a_t, r_t, s_{t+1})$。 Q-Learning的目标:$Q(s,a) = Q(s,a) + \alpha(r + \gamma \max_{a’}Q(s’,a’) - Q(s,a))$ 稳态后:$r + \gamma \max_{a’}Q(s’,a’) - Q(s,a)=0$ 记为时序差分目标:$y_t = r + \gamma \max_{a’}Q(s’,a’)$ 损失函数:$L(\theta) = f(r + \gamma \max_{a’}Q(s’,a’) - Q(s,a))$ 梯度下降:$\theta = \theta - \alpha \nabla_\theta L(\theta)$

PG

DQN的问题:无法表示随机策略,每次都使用最大值,输出选择不连续;无法表示连续动作。

策略梯度让神经网络直接输出策略函数$\pi(s,a;\theta)$(即动作的概率)。训练最大化累计回报$J(\theta) = V_\pi(s_0)$。

策略梯度定理:

\[\nabla J(\theta)=E_{s,a\sim\pi}[q_{\pi_\theta}\nabla_\theta\ln\pi_\theta(s,a)]\]

$q$是最终的收益的期望,应用蒙特卡洛,把这部分换成采样:

\[\nabla J(\theta)=E_{s,a\sim\pi}[G_t\nabla_\theta\ln\pi_\theta(s,a)]\]

$G_t$是从$(s,a)$开始的一条随机轨迹最后的回报。

AC

时序差分方法是把$V(S_t)\leftarrow V(s_t)+\alpha[G_t-V(S_t)]$换成$(S_t)\leftarrow V(s_t)+\alpha[R_{t+1}+\gamma V(S_{t+1})-V(S_t)]$。视为估计$G_t$。

Actor-Critic算法中,Actor学习策略$\pi$得到尽可能高的回报,Critic学习估计策略的值函数$V$,评估Actor的表现。换句话说,Critic像Q-learing(学习评价当前状况),而Actor像PG(给出策略分布)。

Critic的训练目标:最小化时序差分误差的均方误差:

\[L_\text{critic}=\mathbb{E}[(V(s_t)-\text{TD Target})^2]\] \[\text{TD Target}=r_t+\gamma V(s_{t+1})\times(1-\text{done})\]

Actor的训练目标:最大化优势乘以对数概率:

\[L_\text{actor}=-\mathbb{E}[\text{Advantage}\times\ln\pi_\theta(a_t|s_t)]\]

优势这里取$\text{TD Error}$

\[L=-\sum\ln\pi_\theta(s_t,a_t)(r+\gamma V_(s_{t+1})-V(s_t))\]

TRPO

TRPO通过限制新策略和旧策略之间的差异来确保训练的稳定性。

优化目标:

\[\max_\theta\mathbb{E}_{s,a\sim\pi_\theta}[\frac{\pi_\theta(a|s)}{\pi_{\theta_\text{old}}(a|s)}A^{\pi_{\theta_\text{old}}}(s,a)]\]

$A$是优势函数,是在$s$处采取动作$a$的期望回报$Q(a,s)$减去在$s$处的期望回报$V(s)$。

(理解为,找一个新策略,步长不大,比旧策略更好,)

每次步幅不大:

\[\mathbb{E}_{s\sim\pi_\theta}[D_\text{KL}(\pi_{\theta_\text{old}}||\pi_\theta)]\leq\delta\]

作泰勒展开,KL散度由二阶项

\[D_KL(\theta'||\theta)\approx\frac{1}{2}(\theta'-\theta)^TH(\theta'-\theta)\]

迭代 \(\theta\leftarrow\argmax_{\theta'} g^T(\theta'-\theta)\)

引入拉格朗日并求解:

\[\theta\leftarrow\theta+\frac{1}{\lambda}H^{-1}g=\theta+\sqrt{\frac{2\lambda}{g^TH^{-1}g}}H^{-1}g\]

PPO

最大化策略的期望累积回报,核心公式

\[L^\text{PPO}_\text{actor} = -\mathbb{E}[\min(r_tA_t,\text{clip}(r_t, 1-\epsilon,1+\epsilon)A_t)]\]
近端比率裁剪:$r_t=\frac{\pi_\theta(as)}{\pi_{\theta_\text{old}}(as)}$被限制在$(1-\epsilon,1+\epsilon)$之间,避免更新幅度过大。

优势函数采用GAE(Generalized Advantage Estimation):

\[A_t = \sum_{i=0}^{T-t-1}\gamma^i\delta_{t+i}\]

$\delta_t$是TD误差,$\gamma$是折扣因子,$T$是轨迹长度。

DDPG

DDPG(深度确定性策略梯度)。训练actor和critic,actor直接输出确定性的策略$a=\mu(s)$,critic估计价值函数$Q(s,a)$。

DDPG还使用的技巧:目标网络、经验回放、软更新、OU随机噪声

OU随机噪声是一种带有时序性的,更适合惯性系统的随机噪声,微分方程如下:

\[dx_t = \theta(\mu-x_t)dt+\sigma dW_t\]

离散形式是:

\[x(t+\Delta t) = x(t) + \theta(\mu-x(t))\Delta t + \sigma \sqrt{\Delta t} \epsilon_t\]

软更新是指每次更新时,将新策略的参数和旧策略的参数混合,而不是直接替换。可以避免策略的剧烈变化,提高训练的稳定性。

\[\theta^\text{target} \leftarrow \tau\theta + (1-\tau)\theta^\text{target}\]

Gym

一个gym类实现了

  • step():执行动作,返回下一个状态信息
  • reset():重置环境,返回初始状态信息
  • render(),帮助可视化代理来渲染内容
  • close():关闭
  • action_space:动作空间
  • observation_space:状态空间
  • spec:包含用于make()的初始化信息
  • metadata:元数据
  • np_random随机数生成器

以简单的倒摆小车来说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
env = gym.make('CartPole-v1')

env.observation_space
# Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
# 水平位移 水平速度 转角 角速度

env.action_space
# Discrete(2)
# 向左和向右

env.reset()
# (array([-0.02547014, -0.02451906, -0.03074073, -0.01085381], dtype=float32),
#  {})
# observation: ObsType, info: Dict

env.step(env.step(env.action_space.sample()))
# (array([-0.02596052, -0.21918696, -0.03095781,  0.27197373], dtype=float32),
#  1.0,
#  False,
#  False,
#  {})
# observation: ObsType, reward: Float, terminated: Bool, truncated: bool, info: Dict

Q-Learning

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class QLearningAgent:
    def __init__(self, state_dim: int, action_dim: int, learning_rate=0.5, reward_decay=0.99):
        """Q-Learning Agent
        Args:
            state_dim (int): 状态空间维度
            action_dim (int): 动作空间维度
            learning_rate (float, optional): 学习率. Defaults to 0.5.
            reward_decay (float, optional): 回报对时间的折扣. Defaults to 0.99.
        """
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.lr = learning_rate
        self.gamma = reward_decay
        self.q_table = np.random.uniform(0, 1, size=(state_dim, action_dim))
    
    
    def choose_action(self, state, epsilon):
        # epsilon表示探索的概率
        if np.random.uniform() < epsilon:
            action = np.random.randint(self.action_dim)
        else:
            action = np.argmax(self.q_table[state])
        return action

    def update(self, state, action, reward, state_next):
        self.q_table[state, action] += self.lr * (reward + self.gamma * np.max(self.q_table[state_next]) - self.q_table[state, action])

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
N_BINS = 6

env = gym.make('CartPole-v1')
action_space = env.action_space
state_space = env.observation_space
n_state = N_BINS ** 4
n_action = 2

q_learning_agent = QLearningAgent(n_state, n_action, learning_rate=0.5, reward_decay=1)

all_steps = [] # 统计坚持的时间
successive_success_episodes = 0 # 连续成功次数

n_episodes = 2000
n_steps_per_episode = 200

for i in tqdm(range(n_episodes)):
    observation, info = env.reset()
    done = False

    for j in range(n_steps_per_episode):
        
        state = digitize_state(observation)
        action = q_learning_agent.choose_action(state, 0.5 / (i+500))
        observation_next, reward, terminated, truncated, info = env.step(action)
        state_next = digitize_state(observation_next)
        done = terminated or truncated

        # 修改了回报,只有坚持了195步以上才给奖励
        if done:
            if j > 195:
                reward = 1
                successive_success_episodes += 1
            else:
                reward = -1
                successive_success_episodes = 0
        else:
            reward = 0
        
        q_learning_agent.update(state, action, reward, state_next)
        if done:
            break
        observation = observation_next
    
    all_steps.append(j)
    if successive_success_episodes >= 10:
        break
env.close()

np.save("./q_table.npy", q_learning_agent.q_table)

plt.plot(all_steps)
plt.xlabel("Episode")
plt.ylabel("steps_persevered")
plt.title("Q-Learning Training")
plt.grid()
plt.show()

21点一例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import gym
import numpy as np

from matplotlib import pyplot as plt
from tqdm import tqdm

class QLearningAgent:
    def __init__(self, state_dim: int, action_dim: int, learning_rate=0.01, reward_decay=0.95):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.lr = learning_rate
        self.gamma = reward_decay
        self.q_table = np.random.uniform(0, 1, size=(state_dim, action_dim))
    
    def choose_action(self, state, epsilon):
        if np.random.uniform() < epsilon:
            action = np.random.randint(self.action_dim)
        else:
            action = np.argmax(self.q_table[state])
        return action

    def update(self, state, action, reward, state_next, done):
        self.q_table[state, action] += self.lr * (reward + self.gamma * np.max(self.q_table[state_next] if not done else 0) - self.q_table[state, action])


def observation_to_index(observation):
    return int(observation[0] * 22 + observation[1] * 2 + observation[2])


env = gym.make('Blackjack-v1')
action_space = env.action_space
state_space = env.observation_space
n_state = 32 * 11 * 2
n_action = 2

q_learning_agent = QLearningAgent(n_state, n_action)

all_rewards = []
n_episodes = 1000000

for i in tqdm(range(n_episodes)):
    observation, info = env.reset()
    state = observation_to_index(observation)
    done = False
    total_rewards = 0
    while not done:
        action = q_learning_agent.choose_action(state, 0.5 / (i // (n_episodes // 20) + 1))
        observation_next, reward, terminated, truncated, info = env.step(action)
        state_next = observation_to_index(observation_next)
        done = terminated or truncated
        q_learning_agent.update(state, action, reward, state_next, done)
        state = state_next
        total_rewards += reward
    all_rewards.append(total_rewards)
env.close()

print(np.average(all_rewards[-1000:]))

all_rewards = np.array(all_rewards)
all_rewards = np.convolve(all_rewards, np.ones((1000,))/1000, mode='valid')
plt.plot(all_rewards)

plt.xlabel("Episode")
plt.ylabel("Rewards")
plt.title("Q-Learning Training")
plt.grid()
plt.show()

DQN

经验回放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, next_state, reward):
        if next_state is None:
            next_state = np.zeros_like(state)
        self.buffer.append((state, action, next_state, reward))

    def sample(
        self, batch_size
    ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        batch = random.sample(self.buffer, batch_size)
        (
            states,
            actions,
            next_states,
            rewards,
        ) = map(np.array, zip(*batch))
        return (
            torch.tensor(states),
            torch.tensor(actions, dtype=torch.int64).unsqueeze(1),
            torch.tensor(next_states),
            torch.tensor(rewards).unsqueeze(1),
        )

    def __len__(self):
        return len(self.buffer)

DQN

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class DQNAgent:
    def __init__(
        self,
        state_dim,
        action_dim,
        model_cls,
        learning_rate=0.0001,
        gamma=0.99,
        batch_size=32,
        replay_buffer_capacity=5000,
    ):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.model = model_cls(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.learning_rate = learning_rate
        self.loss_fn = F.smooth_l1_loss
        self.gamma = gamma
        self.batch_size = batch_size
        self.replay_buffer = ReplayBuffer(replay_buffer_capacity)

    def choose_action(self, state: torch.Tensor, epsilon) -> int:
        # epsilon表示探索的概率
        if np.random.uniform() < epsilon:
            action = random.randrange(self.action_dim)
        else:
            self.model.eval()
            with torch.no_grad():
                action = np.argmax(self.model(state))
        return int(action)

    def learn(self):

        if len(self.replay_buffer) < self.batch_size:
            return
        states, actions, next_states, rewards = self.replay_buffer.sample(self.batch_size)

        self.model.eval()
        q = self.model(states).gather(dim=1, index=actions)
        q1 = self.model(next_states).max(dim=1)[0].detach().unsqueeze(1)
        target = rewards + self.gamma * q1
        loss = self.loss_fn(q, target)

        self.model.train()
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

测试DQN:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class SimpleMLP(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(SimpleMLP, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim),
        )

    def forward(self, x):
        return self.net(x)

env = gym.make("CartPole-v0")
state_space = env.observation_space
action_space = env.action_space
state_dim = state_space.shape[0]
action_dim = action_space.n

n_episodes = 500
n_steps_per_episode = 200
agent = DQNAgent(state_dim, action_dim, SimpleMLP)

all_steps = []  # 统计坚持的时间
successive_success_episodes = 0  # 连续成功次数


for i in tqdm(range(n_episodes)):
    observation, info = env.reset()

    for j in range(n_steps_per_episode):
        action = agent.choose_action(torch.Tensor(observation), 0.5 / (i + 1))
        observation_next, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

        if done:
            next_state = None
            if j > 195:
                reward = 1
                successive_success_episodes += 1
            else:
                reward = -1
                successive_success_episodes = 0
        else:
            reward = 0

        agent.replay_buffer.push(observation, action, observation_next, reward)
        agent.learn()
        if done:
            break
        observation = observation_next
    all_steps.append(j)

    if successive_success_episodes >= 10:
        break

torch.save(agent.model.state_dict(), "cartpole_dqn.pth")

plt.plot(all_steps)

plt.xlabel("Episode")
plt.ylabel("steps_persevered")
plt.title("DQN Training")
plt.grid()
plt.show()

PG

首先求$G_t = \sum _{i=t}^{T} \gamma^i r_i$

1
2
3
4
5
6
7
8
9
10
11
def discounted_future_reward(rewards, gamma) -> list[float]:
    """计算未来奖励的折现值
    >>> discounted_future_reward([0.8, 0.8, 0.8, 0.8], 0.5)
    array([2.7512, 2.168 , 1.52  , 0.8   ])
    """
    discounted_future_reward = np.zeros_like(rewards)
    running_add = 0
    for t in reversed(range(0, len(rewards))):
        running_add = running_add * gamma + rewards[t]
        discounted_future_reward[t] = running_add
    return discounted_future_reward

PG模型需要输出概率:

1
2
3
4
5
6
7
8
9
10
11
12
class SimpleMLPSoftmax(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(SimpleMLPSoftmax, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim),
            nn.Softmax()
        )
    
    def forward(self, x):
        return self.net(x)

训练,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class PGAgent:
    def __init__(
        self, 
        state_dim,
        action_dim,
        model_cls,
        learning_rate=3e-4,
        gamma=0.9):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.model = model_cls(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)
        self.gamma = gamma
    
    def choose_action(self, state: torch.Tensor) -> Tuple[int, torch.Tensor]:
        probs = self.model(state)
        choice = np.random.choice(self.action_dim, p=probs.detach().numpy())
        return choice, torch.log(probs[choice])

    def learn(self, rewards: list[float], log_probs: list[torch.Tensor]):
        discounted_rewards = torch.Tensor(discounted_future_reward(rewards, self.gamma))
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-9)
        
        policy_grads = - torch.stack(log_probs) * discounted_rewards
        
        self.optimizer.zero_grad()
        policy_grads = policy_grads.sum()
        policy_grads.backward()
        self.optimizer.step()

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
env = gym.make("CartPole-v0")
state_space = env.observation_space
action_space = env.action_space
state_dim = state_space.shape[0]
action_dim = action_space.n

n_episodes = 2000
n_steps_per_episode = 200
agent = PGAgent(state_dim, action_dim, SimpleMLPSoftmax)

all_steps = []z

for i in tqdm(range(n_episodes)):
    observation, info = env.reset()
    rewards = []
    log_probs = []
    for j in range(n_steps_per_episode):
        action, log_prob = agent.choose_action(torch.Tensor(observation))
        observation_next, reward, terminated, truncated, info = env.step(action)
        rewards.append(reward)
        log_probs.append(log_prob)
        done = terminated or truncated
        
        if done:
            agent.learn(rewards, log_probs)
            all_steps.append(j)
            break
        observation = observation_next

torch.save(agent.model.state_dict(), "cartpole_pg.pth")

plt.plot(all_steps)

plt.xlabel("Episode")
plt.ylabel("steps_persevered")
plt.title("PG Training")
plt.grid()
plt.show()

AC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class ActorCritic:
    def __init__(self, state_dim, action_dim, actor_lr, critic_lr, gamma, device):
        self.device = device
        
        self.actor = SimpleMLPSoftmax(state_dim, action_dim)
        self.critic = SimpleMLP(state_dim, 1)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        self.gamma = gamma # 时序差分目标的折扣因子
        
        self.loss_fn = nn.MSELoss()
        
        self.transition_dict = {}
        self.transition_dict_clear()
    
    
    def transition_dict_push(self, state, action, next_state, reward, done):
        """将状态、动作、下一个状态、奖励和是否完成添加到transition_dict字典中
        """
        self.transition_dict["states"].append(state)
        self.transition_dict["actions"].append(action)
        self.transition_dict["next_states"].append(next_state)
        self.transition_dict["rewards"].append(reward)
        self.transition_dict["dones"].append(done)
    
    def transition_dict_clear(self):
        """清空transition_dict字典中每一项的内容
        """
        self.transition_dict = {
            "states": [],
            "actions": [],
            "next_states": [],
            "rewards": [],
            "dones": []
        }
    
    def transition_dict_to_tensor(self) -> Tuple[torch.FloatTensor, torch.LongTensor, torch.FloatTensor, torch.FloatTensor, torch.FloatTensor]:
        """将transition_dict字典中的内容转换为张量
        """
        states = torch.FloatTensor(self.transition_dict["states"]).to(self.device)
        actions = torch.LongTensor(self.transition_dict["actions"]).to(self.device).view(-1, 1)
        next_states = torch.FloatTensor(self.transition_dict["next_states"]).to(self.device)
        rewards = torch.FloatTensor(self.transition_dict["rewards"]).to(self.device).view(-1, 1)
        dones = torch.FloatTensor(self.transition_dict["dones"]).to(self.device).view(-1, 1)
        return states, actions, next_states, rewards, dones
    
    def zero_back_step(self, actor_loss: torch.FloatTensor, critic_loss: torch.FloatTensor):
        self.actor_optimizer.zero_grad()
        self.critic_optimizer.zero_grad()
        actor_loss.backward()
        critic_loss.backward()
        self.actor_optimizer.step()
        self.critic_optimizer.step()
        
    
    def choose_action(self, state: Arraylike, is_action_sample=True) -> int:
        """
        Args:
            state (Arraylike): 当前状态
            is_action_sample (bool, optional): 是否依概率选择动作,否则选取最大概率的动作. Defaults to True.

        Returns:
            int: 选择的动作
        """
        state = torch.FloatTensor(state).to(self.device).unsqueeze(0)
        probs = self.actor(state)
        if is_action_sample:
            action = torch.multinomial(probs, 1).item()
        else:
            action = torch.argmax(probs).item()
        return action
    
    def learn(self):
        states, actions, next_states, rewards, dones = self.transition_dict_to_tensor()
        
        # 预测的当前时刻的值函数
        td_value = self.critic(states)
        # 时序差分目标
        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
        # 时序差分误差
        td_error = td_target - td_value
        
        # 动作的对数概率
        log_probs = torch.log(self.actor(states).gather(1, actions))
        
        actor_loss = torch.mean(-log_probs * td_error.detach())
        critic_loss = torch.mean(self.loss_fn(td_value, td_target.detach()))
        self.zero_back_step(actor_loss, critic_loss)

PPO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class PPO(ActorCritic):
    def __init__(self, state_dim, action_dim, actor_lr, critic_lr, gamma, device, lamda, n_epochs_per_learn, clip_ratio):
        super(PPO, self).__init__(state_dim, action_dim, actor_lr, critic_lr, gamma, device)
        self.lamda = lamda # GAE的lambda
        self.n_epochs_per_learn = n_epochs_per_learn # 每组数据训练的次数
        self.clip_ratio = clip_ratio # 裁剪比


    def learn(self):
        states, actions, next_states, rewards, dones = self.transition_dict_to_tensor()

        # 预测的当前时刻的值函数
        td_value = self.critic(states)
        # 时序差分目标
        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
        # 时序差分误差
        td_error = td_target - td_value
        
        # 动作的对数概率
        log_probs = torch.log(self.actor(states).gather(1, actions)).detach()
        
        # 计算GAE优势函数
        td_error = td_error.cpu().detach().numpy()
        advantages = []
        advantage = 0.0
        for delta in td_error[::-1]:
            advantage = self.gamma * self.lamda * advantage + delta
            advantages.append(advantage)
        advantages.reverse()
        advantages = torch.tensor(advantages).float().to(self.device)
        
        for _ in range(self.n_epochs_per_learn):
            # 计算新的对数概率
            new_log_probs = torch.log(self.actor(states).gather(1, actions))
            # 新旧策略之间的比例
            ratio = torch.exp(new_log_probs - log_probs)
            # 裁剪
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1.0 - self.clip_ratio, 1.0 + self.clip_ratio) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()
            
            new_td_value = self.critic(states)
            critic_loss = self.loss_fn(new_td_value, td_target.detach())
            
            self.zero_back_step(actor_loss, critic_loss)

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

agent = PPO(
    state_dim=state_dim,
    action_dim=action_dim,
    actor_lr=1e-3,
    critic_lr=1e-3,
    gamma=0.99,
    device=device,
    lamda=0.95,
    n_epochs_per_learn=10,
    clip_ratio=0.2,
)

num_episodes = 500
all_rewards = []
for episode in range(num_episodes):
    state, _ = env.reset()
    agent.transition_dict_clear()
    total_rewards = 0

    while True:
        action = agent.choose_action(state)
        next_state, reward, done, truncated, _ = env.step(action)
        total_rewards += reward

        agent.transition_dict_push(state, action, next_state, reward, done)

        if done or truncated:
            break
        state = next_state

    agent.learn()
    all_rewards.append(total_rewards)
    print(f"Episode {episode}, Return: {total_rewards}")

env.close()

测试代码:

1
2
3
4
5
6
7
8
9
env = gym.make("CartPole-v1", render_mode="human")
state, info = env.reset()
for _ in range(1000):
    action = agent.choose_action(observation, False)
    observation, reward, terminated, truncated, info = env.step(action)
    if terminated or truncated:
        state, info = env.reset()
    time.sleep(0.02)
env.close()

reward可以做一个标准化,使奖励分布在0附近

1
rewards = a * rewards + b

连续动作空间的Actor,需要根据动作空间的范围调整mean的中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class SimpleMLPNormal(nn.Module):
    def __init__(self, n_states, n_actions):
        super(SimpleMLPNormal, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(n_states, 64),
            nn.ReLU(),
        )
        self.mean = nn.Sequential(
            nn.Linear(64, n_actions),
            nn.Tanh()
        )
        self.log_std = nn.Sequential(
            nn.Linear(64, n_actions),
            nn.Softplus()
        )

    def forward(self, x):
        x = self.net(x)
        mean = self.mean(x) # 缩放mean
        log_std = torch.clamp(self.log_std(x), -20, 2) # 限制标准差的范围
        std = torch.exp(log_std)
        return mean, std

对数概率:

1
2
3
mean, std = self.actor(states)
dist = Normal(mean, std)
log_probs = dist.log_prob(actions)

训练用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
env = gym.make("Pendulum-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

agent = PPO(
    state_dim=state_dim,
    action_dim=action_dim,
    actor_lr=1e-4,
    critic_lr=5e-3,
    gamma=0.9,
    device=device,
    lamda=0.9,
    n_epochs_per_learn=10,
    clip_ratio=0.2,
)

离散状态空间可以套一个embedding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SimpleMLPEmbedding(nn.Module):
    def __init__(self, n_states, n_actions):
        super(SimpleMLPEmbedding, self).__init__()
        self.embedding = nn.Embedding(n_states, 64)
        self.net = nn.Sequential(
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, n_actions)
        )

    def forward(self, x):
        return self.net(self.embedding(x.long()))

# 使用方式
dist = torch.distributions.Categorical(probs)
action = dist.sample()

DDPG

OU噪声

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class OUNoise:
    def __init__(self, size, mu=0.0, theta=0.15, sigma=0.2):
        self.size = size
        self.mu = mu
        self.theta = theta
        self.sigma = sigma
        self.reset()

    def reset(self):
        self.state = np.ones(self.size) * self.mu

    def sample(self):
        dx = self.theta * (self.mu - self.state) + self.sigma * np.random.randn(self.size)
        self.state += dx
        return self.state

ActorCritic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Actor网络
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 400), nn.ReLU(),
            nn.Linear(400, 300), nn.ReLU(),
            nn.Linear(300, action_dim), nn.Tanh()
        )
        self.max_action = max_action

    def forward(self, state):
        return self.max_action * self.net(state)

# Critic网络
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim + action_dim, 400), nn.ReLU(),
            nn.Linear(400, 300), nn.ReLU(),
            nn.Linear(300, 1)
        )

    def forward(self, state, action):
        x = torch.cat([state, action], 1)
        return self.net(x)

DDPG算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class DDPG:
    def __init__(self, state_dim, action_dim, action_bondary, actor_lr, critic_lr, batch_size, gamma, tau):
        self.actor = Actor(state_dim, action_dim, action_bondary)
        self.actor_target = Actor(state_dim, action_dim, action_bondary)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic = Critic(state_dim, action_dim)
        self.critic_target = Critic(state_dim, action_dim)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        
        self.buffer = ReplayBuffer(50000)
        self.noise = OUNoise(action_dim)
        self.action_bondary = action_bondary # 动作空间的范围是[-action_bondary, action_bondary]
        self.batch_size = batch_size # 每轮更新从经验回放中抽出的样本数
        
        self.gamma = gamma # 时序差分目标的折现因子
        self.tau = tau # 软更新的比例
    
    def choose_action(self, state, noise=True):
        state = torch.FloatTensor(state).unsqueeze(0)
        action = self.actor(state).detach().numpy()[0]
        if noise:
            action += self.noise.sample()
        return np.clip(action, -self.action_bondary, self.action_bondary)

    def learn(self):
        if len(self.buffer) < self.batch_size:
            return
        states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)

        states = torch.FloatTensor(states)
        actions = torch.FloatTensor(actions)
        rewards = torch.FloatTensor(rewards).unsqueeze(1)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones).unsqueeze(1)
        
        # 更新Critic网络
        with torch.no_grad():
            next_actions = self.actor_target(next_states)
            target_q = self.critic_target(next_states, next_actions)
            target_q = rewards + (1 - dones) * self.gamma * target_q
        
        current_q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_q, target_q)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # 更新Actor网络
        actor_loss = -self.critic(states, self.actor(states)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # 软更新目标网络
        for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

训练用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
env = gym.make("Pendulum-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bondary = env.action_space.high[0]

agent = DDPG(
    state_dim,
    action_dim,
    action_bondary=action_bondary,
    actor_lr=1e-3,
    critic_lr=1e-3,
    batch_size=64,
    gamma=0.99,
    tau = 0.005,
)

all_rewards = []
for episode in range(100):
    state = env.reset()[0]
    agent.noise.reset()
    episode_reward = 0
    for step in range(200):
        action = agent.choose_action(state)
        next_state, reward, done, _, _ = env.step(action)
        agent.buffer.push(state, action, reward, next_state, float(done))
        state = next_state

        agent.learn()
        episode_reward += reward
        if done:
            break
    all_rewards.append(episode_reward)
    print(f"Episode {episode}, Reward: {episode_reward:.2f}")

env.close()
本文由作者按照 CC BY 4.0 进行授权