离散动作 vs. 连续动作
- 离散动作
- 随机性策略
- softmax输出离散概率值
- 连续动作
- 确定性策略
- tanh输出连续浮点数
深度确定性策略梯度(Deep Deterministic Policy Gradient,DDPG)
- DDPG让DQN扩展到连续动作空间
- DDPG在DQN基础上加入一个策略网络(Actor)根据状态来输出动作Action
- Q网络(Cirtic)根据状态和演员动作进行打分
import torch
import torch.nn as nn
import torch.nn.functional as F
class Actor(nn.Module):
def __init__(
self,
in_dim: int,
out_dim: int,
init_w: float = 3e-3
) -> None:
super().__init__()
self.hidden1 = nn.Linear(in_dim, 128)
self.hidden2 = nn.Linear(128, 128)
self.out = nn.Linear(128, out_dim)
self.out.weight.data.uniform_(-init_w, init_w)
self.out.bias.data.uniform_(-init_w, init_w)
def forward(self, state: torch.Tensor) -> torch.Tensor:
x = F.relu(self.hidden1(state))
x = F.relu(self.hidden2(x))
action = self.out(x).tanh()
return action
class Critic(nn.Module):
def __init__(
self,
in_dim: int,
init_w: float = 3e-3
) -> None:
super().__init__()
self.hidden1 = nn.Linear(in_dim, 128)
self.hideen2 = nn.Linear(128, 128)
self.out = nn.Linear(128, 1)
self.out.weight.data.uniform_(-init_w, init_w)
self.out.bias.data.uniform_(-init_w, init_w)
def forward(
self,
state: torch.Tensor,
action: torch.Tensor
) -> torch.Tensor:
x = torch.cat((state, action), dim=1)
x = F.relu(self.hidden1(x))
x = F.relu(self.hideen2(x))
value = self.out(x)
return value
- DDPG求解让Q值最大的动作
- 策略网络:让Q值最大化,
- Q网络: 用真实的Reward和下一步的即来去拟合未来的收益,
-
是不稳定的
- 构造target_P网络:
- 构造target_Q网络:
- 这两个网络在固定参数一段时间后,再与评估网络同步最新的参数
- 经验回放(Replay Memory)
- Off-policy
import numpy as np
from typing import Dict
class ReplayBuffer:
def __init__(
self,
obs_dim: int,
size: int,
batch_size: int = 32
) -> None:
self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
self.next_obs_buf = np.zeros(
[size, obs_dim], dtype=np.float32)
self.acts_buf = np.zeros([size], dtype=np.float32)
self.rews_buf = np.zeros([size], dtype=np.float32)
self.done_buf = np.zeros([size], dtype=np.float32)
self.max_size, self.batch_size = size, batch_size
self.ptr, self.size = 0, 0
def store(
self,
obs: np.ndarray,
act: np.ndarray,
rew: float,
next_obs: np.ndarray,
done: bool,
) -> None:
"""store the transition (s, a, r, s') in buffer"""
self.obs_buf[self.ptr] = obs
self.acts_buf[self.ptr] = act
self.rews_buf[self.ptr] = rew
self.next_obs_buf[self.ptr] = next_obs
self.done_buf[self.ptr] = done
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def sample_batch(self) -> Dict[str, np.ndarray]:
"""Randomly sample a batch of experiences from memory"""
idxs = np.random.choice(self.size,
size=self.batch_size,
replace=False)
return dict(
obs=self.obs_buf[idxs],
acts=self.acts_buf[idxs],
rews=self.rews_buf[idxs],
next_obs=self.next_obs_buf[idxs],
done=self.done_buf[idxs],
)
def __len__(self) -> int:
return self.size
- 噪声
- 为了让DDPG的策略更好地探索,在训练的时候给Action加噪音:
- Ornstein-Uhlenbeck噪声
- 不相关的,均值为零的高斯噪声
- 为了让DDPG的策略更好地探索,在训练的时候给Action加噪音:
import copy
import random
import numpy as np
class OUNoise:
def __init__(
self,
size: int,
mu: float = 0.0,
theta: float = 0.15,
sigma: float = 0.2
):
"""initialize parameters and noise process"""
self.state = np.float64(0.0)
self.mu = mu * np.ones(size)
self.theta = theta
self.sigma = sigma
self.reset()
def reset(self):
"""reset the internal state (=noise) to mean (mu)"""
self.state = copy.copy(self.mu)
def sample(self) -> np.ndarray:
"""update internal state and return it as a noise sample"""
x = self.state
dx = self.theta * (self.mu - x) + self.sigma * np.array(
[random.random() for _ in range(len(x))]
)
self.state = x + dx
return self.state
from typing import Tuple, List
import numpy as np
import matplotlib.pyplot as plt
import gym
import torch
import torch.optim as optim
import torch.nn.functional as F
class DDPGAgent:
def __init__(
self,
env: gym.Env,
memory_size: int,
batch_size: int,
ou_noise_theta: float,
ou_noise_sigma: float,
gamma: float = 0.99,
tau: float = 5e-3,
initial_random_steps: int = 1e4,
) -> None:
obs_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
self.env = env
self.memory = ReplayBuffer(obs_dim,
memory_size,
batch_size)
self.batch_size = batch_size
self.gamma = gamma
self.tau = tau
self.initial_random_steps = initial_random_steps
# noise
self.noise = OUNoise(size=action_dim,
theta=ou_noise_theta,
sigma=ou_noise_sigma)
# CPU/GPU
self.device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu")
# networks
self.actor = Actor(
obs_dim, action_dim).to(self.device)
self.actor_target = Actor(
obs_dim, action_dim).to(self.device)
self.actor_target.load_state_dict(
self.actor.state_dict())
self.critic = Critic(
obs_dim + action_dim).to(self.device)
self.critic_target = Critic(
obs_dim + action_dim).to(self.device)
self.critic_target.load_state_dict(
self.critic.state_dict())
# optimizer
self.actor_optimizer = optim.Adam(
self.actor.parameters(), lr=3e-4)
self.critic_optimizer = optim.Adam(
self.critic.parameters(), lr=1e-3)
self.transition = list()
self.total_step = 0
self.is_test = False
def select_action(
self,
state: np.ndarray
) -> np.ndarray:
"""select an action from the input state"""
if (self.total_step < self.initial_random_steps
and not self.is_test):
selected_action = self.env.action_space.sample()
else:
selected_action = (
self.actor(torch.FloatTensor(state).to(self.device))
.detach()
.cpu()
.numpy()
)
if not self.is_test:
noise = self.noise.sample()
selected_action = np.clip(
selected_action + noise, -1.0, 1.0)
self.transition = [state, selected_action]
return selected_action
def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:
""""take an action and return the response of the env"""
next_state, reward, done, _ = self.env.step(action)
if not self.is_test:
self.transition += [reward, next_state, done]
self.memory.store(*self.transition)
return next_state, reward, done
def update_model(self) -> torch.Tensor:
"""update the model by gradient descent"""
device = self.device
samples = self.memory.sample_batch()
state = torch.FloatTensor(samples["obs"]).to(device)
action = torch.FloatTensor(samples["acts"]).to(device)
reward = torch.FloatTensor(samples["rews"]).to(device)
next_state = torch.FloatTensor(samples["next_obs"]).to(device)
done = torch.FloatTensor(samples["done"].reshape(-1, 1)).to(device)
masks = 1 - done
next_action = self.actor_target(next_state)
next_value = self.critic_target(next_state, next_action)
curr_return = reward + self.gamma * next_value * masks
# train critic
values = self.critic(state, action)
critic_loss = F.mse_loss(values, curr_return)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# train actor
actor_loss = -self.critic(state, self.actor(state)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# target update
self._target_soft_update()
return actor_loss.data, critic_loss.data
def _target_soft_update(self):
tau = self.tau
for t_param, l_param in zip(
self.actor_target.parameters(),
self.actor.parameters()
):
t_param.data.copy_(
tau * l_param.data + (1.0 - tau) * t_param.data)
for t_param, l_param in zip(
self.critic_target.parameters(),
self.critic.parameters()
):
t_param.data.copy_(
tau * l_param.data + (1.0 - tau) * t_param.data)
def train(
self,
num_frames: int,
plotting_interval: int = 200
):
# train the agent
self.is_test = False
state = self.env.reset()
actor_losses = []
critic_losses = []
scores = []
score = 0
for self.total_step in range(1, num_frames + 1):
action = self.select_action(state)
next_state, reward, done = self.step(action)
state = next_state
score += reward
if done:
state = self.env.reset()
scores.append(score)
score = 0
if (
len(self.memory) >= self.batch_size
and self.total_step > self.initial_random_steps
):
actor_loss, critic_loss = self.update_model()
actor_losses.append(actor_loss)
critic_losses.append(critic_loss)
# plotting
if self.total_step % plotting_interval == 0:
self._plot(self.total_step,
scores,
actor_losses,
critic_losses)
self.env.close()
def test(self):
"""test the agent"""
self.is_test = True
state = self.env.reset()
done = False
score = 0
frames = []
while not done:
frames.append(self.env.render(mode="rgb_array"))
action = self.select_action(state)
next_state, reward, done = self.step(action)
state = next_state
score += reward
print("score:", score)
self.env.close()
return frames
def _plot(
self,
frame_idx: int,
scores: List[float],
actor_losses: List[float],
critic_losses: List[float],
):
"""plot the training progresses"""
def subplot(loc: int, title: str, values: List[float]):
plt.subplot(loc)
plt.title(title)
plt.plot(values)
subplot_params = [
(131, f"frame {frame_idx}, score: {np.mean(scores[-10:])}", scores),
(132, "actor_loss", actor_losses),
(133, "critic_loss", critic_losses),
]
plt.figure(figsize=(30, ))
for loc, title, values in subplot_params:
subplot(loc, title, values)
plt.show()
使用Pendulum-v1环境来验证:
class ActionNormalizer(gym.ActionWrapper):
"""rescale and relocate the actions"""
def action(self, action: np.ndarray) -> np.ndarray:
"""change the range (-1, 1) to (low, high)"""
low = self.action_space.low
high = self.action_space.high
scale_factor = (high - low) / 2
reloc_factor = high - scale_factor
action = action * scale_factor + reloc_factor
action = np.clip(action, low, high)
return action
def reverse_action(self, action: np.ndarray) -> np.ndarray:
"""change the range (low, high) to (-1, 1)"""
low = self.action_space.low
high = self.action_space.high
scale_factor = (high - low) / 2
reloc_factor = high - scale_factor
action = (action - reloc_factor) / scale_factor
action = np.clip(action, -1.0, 1.0)
return action
if __name__ == "__main__":
# environment
env_id = "Pendulum-v1"
env = gym.make(env_id)
env = ActionNormalizer(env)
# set random seed
def seed_torch(seed):
torch.manual_seed(seed)
if torch.backends.cudnn.enabled:
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
seed = 42
random.seed(seed)
np.random.seed(seed)
seed_torch(seed)
env.seed(seed)
# parameters
num_frames = 50000
memory_size = 100000
batch_size = 128
ou_noise_theta = 1.0
ou_noise_sigma = 0.1
initial_random_steps = 10000
agent = DDPGAgent(
env,
memory_size,
batch_size,
ou_noise_theta,
ou_noise_sigma,
initial_random_steps=initial_random_steps,
)
# train
agent.train(num_frames)
测试test:
frames = agent.test()
from matplotlib import animation
def display_frames_as_gif(frames, filename):
patch = plt.imshow(frames[0])
plt.axis("off")
def animate(i):
patch.set_data(frames[i])
anim = animation.FuncAnimation(fig=plt.gcf(),
func=animate,
frames=len(frames),
interval=5)
anim.save(filename, writer="pillow", fps=30)
display_frames_as_gif(frames, "ddpg.gif")
双延迟深度确定性策略梯度(Twin Delayed DDPG,TD3)
- 截断的双Q学习(Clipped Dobule Q-learning):TD3学习两个Q函数(因此称为twin),并且利用这两个Q函数中较小的哪个Q值来构建贝尔曼误差函数中的目标网络。
- 延迟的策略更新(“Delayed” Policy Updates):策略(包括目标策略网络)更新的频率要低于Q函数的更新频率。文章建议Q网络每更新两次,策略网络才更新一次。
- 目标策略平滑(Target Policy smoothing):TD3在目标动作中也加入了噪声,通过平滑Q函数沿着不同动作的变化,使得策略更难利用Q函数的错误。