第12章 深度确定性策略梯度DDPG

离散动作 vs. 连续动作

  • 离散动作
    • 随机性策略\pi_\theta(a_t|s_t)
    • softmax输出离散概率值
  • 连续动作
    • 确定性策略\mu_\theta(s_t)
    • tanh输出连续浮点数

深度确定性策略梯度(Deep Deterministic Policy Gradient,DDPG)

  • DDPG让DQN扩展到连续动作空间
    • DDPG在DQN基础上加入一个策略网络(Actor)根据状态s来输出动作Actiona_\theta(s)
    • Q网络(Cirtic)根据状态和演员动作进行打分Q_w(s,a)
Actor-Critic
import torch
import torch.nn as nn
import torch.nn.functional as F


class Actor(nn.Module):
    def __init__(
        self, 
        in_dim: int, 
        out_dim: int, 
        init_w: float = 3e-3
    ) -> None:
        super().__init__()

        self.hidden1 = nn.Linear(in_dim, 128)
        self.hidden2 = nn.Linear(128, 128)
        self.out = nn.Linear(128, out_dim)

        self.out.weight.data.uniform_(-init_w, init_w)
        self.out.bias.data.uniform_(-init_w, init_w)

    def forward(self, state: torch.Tensor) -> torch.Tensor:
        x = F.relu(self.hidden1(state))
        x = F.relu(self.hidden2(x))
        action = self.out(x).tanh()

        return action


class Critic(nn.Module):
    def __init__(
        self, 
        in_dim: int, 
        init_w: float = 3e-3
    ) -> None:
        super().__init__()

        self.hidden1 = nn.Linear(in_dim, 128)
        self.hideen2 = nn.Linear(128, 128)
        self.out = nn.Linear(128, 1)

        self.out.weight.data.uniform_(-init_w, init_w)
        self.out.bias.data.uniform_(-init_w, init_w)

    def forward(
        self, 
        state: torch.Tensor, 
        action: torch.Tensor
    ) -> torch.Tensor:
        x = torch.cat((state, action), dim=1)
        x = F.relu(self.hidden1(x))
        x = F.relu(self.hideen2(x))
        value = self.out(x)

        return value
  • DDPG求解让Q值最大的动作\underset{a=\mu_\theta(s)}{\operatorname{argmax}}Q_w(s, \mu_\theta(s))
    • 策略网络:让Q值最大化,Loss=-Q
    • Q网络: 用真实的Rewardr和下一步的QQ'来去拟合未来的收益Q_{target}Loss=MSE(Q_{估计}, Q_{target})
优化
  • Q_{target}是不稳定的
    • 构造target_P网络:a'=\mu_{\overline{\theta}}(s')
    • 构造target_Q网络:Q_{\overline{w}}(s',a')
    • 这两个网络在固定参数一段时间后,再与评估网络同步最新的参数
目标网络
  • 经验回放(Replay Memory)
    • Off-policy
    • (s,a,r,s')
import numpy as np
from typing import Dict


class ReplayBuffer:
    def __init__(
        self, 
        obs_dim: int, 
        size: int, 
        batch_size: int = 32
    ) -> None:
        self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
        self.next_obs_buf = np.zeros(
            [size, obs_dim], dtype=np.float32)
        self.acts_buf = np.zeros([size], dtype=np.float32)
        self.rews_buf = np.zeros([size], dtype=np.float32)
        self.done_buf = np.zeros([size], dtype=np.float32)
        self.max_size, self.batch_size = size, batch_size
        self.ptr, self.size = 0, 0

    def store(
        self,
        obs: np.ndarray,
        act: np.ndarray,
        rew: float,
        next_obs: np.ndarray,
        done: bool,
    ) -> None:
        """store the transition (s, a, r, s') in buffer"""
        self.obs_buf[self.ptr] = obs
        self.acts_buf[self.ptr] = act
        self.rews_buf[self.ptr] = rew
        self.next_obs_buf[self.ptr] = next_obs
        self.done_buf[self.ptr] = done
        self.ptr = (self.ptr + 1) % self.max_size
        self.size = min(self.size + 1, self.max_size)

    def sample_batch(self) -> Dict[str, np.ndarray]:
        """Randomly sample a batch of experiences from memory"""
        idxs = np.random.choice(self.size, 
                                size=self.batch_size, 
                                replace=False)
        return dict(
            obs=self.obs_buf[idxs],
            acts=self.acts_buf[idxs],
            rews=self.rews_buf[idxs],
            next_obs=self.next_obs_buf[idxs],
            done=self.done_buf[idxs],
        )

    def __len__(self) -> int:
        return self.size
  • 噪声
    • 为了让DDPG的策略更好地探索,在训练的时候给Action加噪音:
import copy
import random
import numpy as np


class OUNoise:
    def __init__(
        self, 
        size: int, 
        mu: float = 0.0, 
        theta: float = 0.15, 
        sigma: float = 0.2
    ):
        """initialize parameters and noise process"""
        self.state = np.float64(0.0)
        self.mu = mu * np.ones(size)
        self.theta = theta
        self.sigma = sigma
        self.reset()

    def reset(self):
        """reset the internal state (=noise) to mean (mu)"""
        self.state = copy.copy(self.mu)

    def sample(self) -> np.ndarray:
        """update internal state and return it as a noise sample"""
        x = self.state
        dx = self.theta * (self.mu - x) + self.sigma * np.array(
            [random.random() for _ in range(len(x))]
        )
        self.state = x + dx
        return self.state
DDPG
from typing import Tuple, List
import numpy as np
import matplotlib.pyplot as plt
import gym
import torch
import torch.optim as optim
import torch.nn.functional as F


class DDPGAgent:
    def __init__(
        self,
        env: gym.Env,
        memory_size: int,
        batch_size: int,
        ou_noise_theta: float,
        ou_noise_sigma: float,
        gamma: float = 0.99,
        tau: float = 5e-3,
        initial_random_steps: int = 1e4,
    ) -> None:
        obs_dim = env.observation_space.shape[0]
        action_dim = env.action_space.shape[0]

        self.env = env
        self.memory = ReplayBuffer(obs_dim, 
                                   memory_size, 
                                   batch_size)
        self.batch_size = batch_size
        self.gamma = gamma
        self.tau = tau
        self.initial_random_steps = initial_random_steps

        # noise
        self.noise = OUNoise(size=action_dim, 
                             theta=ou_noise_theta,
                             sigma=ou_noise_sigma)

        # CPU/GPU
        self.device = torch.device(
            "cuda" if torch.cuda.is_available() else "cpu")

        # networks
        self.actor = Actor(
            obs_dim, action_dim).to(self.device)
        self.actor_target = Actor(
            obs_dim, action_dim).to(self.device)
        self.actor_target.load_state_dict(
            self.actor.state_dict())

        self.critic = Critic(
             obs_dim + action_dim).to(self.device)
        self.critic_target = Critic(
            obs_dim + action_dim).to(self.device)
        self.critic_target.load_state_dict(
            self.critic.state_dict())

        # optimizer
        self.actor_optimizer = optim.Adam(
            self.actor.parameters(), lr=3e-4)
        self.critic_optimizer = optim.Adam(
            self.critic.parameters(), lr=1e-3)

        self.transition = list()
        self.total_step = 0
        self.is_test = False

    def select_action(
        self, 
        state: np.ndarray
    ) -> np.ndarray:
        """select an action from the input state"""
        if (self.total_step < self.initial_random_steps 
            and not self.is_test):
            selected_action = self.env.action_space.sample()
        else:
            selected_action = (
                self.actor(torch.FloatTensor(state).to(self.device))
                .detach()
                .cpu()
                .numpy()
            )

        if not self.is_test:
            noise = self.noise.sample()
            selected_action = np.clip(
                selected_action + noise, -1.0, 1.0)

        self.transition = [state, selected_action]

        return selected_action

    def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:
        """"take an action and return the response of the env"""
        next_state, reward, done, _ = self.env.step(action)

        if not self.is_test:
            self.transition += [reward, next_state, done]
            self.memory.store(*self.transition)

        return next_state, reward, done

    def update_model(self) -> torch.Tensor:
        """update the model by gradient descent"""
        device = self.device

        samples = self.memory.sample_batch()
        state = torch.FloatTensor(samples["obs"]).to(device)
        action = torch.FloatTensor(samples["acts"]).to(device)
        reward = torch.FloatTensor(samples["rews"]).to(device)
        next_state = torch.FloatTensor(samples["next_obs"]).to(device)
        done = torch.FloatTensor(samples["done"].reshape(-1, 1)).to(device)

        masks = 1 - done
        next_action = self.actor_target(next_state)
        next_value = self.critic_target(next_state, next_action)
        curr_return = reward + self.gamma * next_value * masks
  
        # train critic
        values = self.critic(state, action)
        critic_loss = F.mse_loss(values, curr_return)

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # train actor
        actor_loss = -self.critic(state, self.actor(state)).mean()

        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # target update
        self._target_soft_update()

        return actor_loss.data, critic_loss.data

    def _target_soft_update(self):
        tau = self.tau

        for t_param, l_param in zip(
            self.actor_target.parameters(), 
            self.actor.parameters()
        ):
            t_param.data.copy_(
                tau * l_param.data + (1.0 - tau) * t_param.data)

        for t_param, l_param in zip(
            self.critic_target.parameters(), 
            self.critic.parameters()
        ):
            t_param.data.copy_(
                tau * l_param.data + (1.0 - tau) * t_param.data)

    def train(
            self, 
            num_frames: int, 
            plotting_interval: int = 200
    ):
        # train the agent
        self.is_test = False

        state = self.env.reset()
        actor_losses = []
        critic_losses = []
        scores = []
        score = 0

        for self.total_step in range(1, num_frames + 1):
            action = self.select_action(state)
            next_state, reward, done = self.step(action)

            state = next_state
            score += reward

            if done:
                state = self.env.reset()
                scores.append(score)
                score = 0

            if (
                len(self.memory) >= self.batch_size
                and self.total_step > self.initial_random_steps
            ):
                actor_loss, critic_loss = self.update_model()
                actor_losses.append(actor_loss)
                critic_losses.append(critic_loss)

            # plotting
            if self.total_step % plotting_interval == 0:
                self._plot(self.total_step, 
                           scores, 
                           actor_losses, 
                           critic_losses)

            self.env.close()

    def test(self):
        """test the agent"""
        self.is_test = True

        state = self.env.reset()
        done = False
        score = 0

        frames = []
        while not done:
            frames.append(self.env.render(mode="rgb_array"))
            action = self.select_action(state)
            next_state, reward, done = self.step(action)

            state = next_state
            score += reward

        print("score:", score)
        self.env.close()

        return frames

    def _plot(
        self,
        frame_idx: int,
        scores: List[float],
        actor_losses: List[float],
        critic_losses: List[float],
    ):
        """plot the training progresses"""
        def subplot(loc: int, title: str, values: List[float]):
            plt.subplot(loc)
            plt.title(title)
            plt.plot(values)

        subplot_params = [
            (131, f"frame {frame_idx}, score: {np.mean(scores[-10:])}", scores),
            (132, "actor_loss", actor_losses),
            (133, "critic_loss", critic_losses),
        ]

        plt.figure(figsize=(30, ))
        for loc, title, values in subplot_params:
            subplot(loc, title, values)
        plt.show()

使用Pendulum-v1环境来验证:

class ActionNormalizer(gym.ActionWrapper):
    """rescale and relocate the actions"""

    def action(self, action: np.ndarray) -> np.ndarray:
        """change the range (-1, 1) to (low, high)"""
        low = self.action_space.low
        high = self.action_space.high

        scale_factor = (high - low) / 2
        reloc_factor = high - scale_factor

        action = action * scale_factor + reloc_factor
        action = np.clip(action, low, high)

        return action

    def reverse_action(self, action: np.ndarray) -> np.ndarray:
        """change the range (low, high) to (-1, 1)"""
        low = self.action_space.low
        high = self.action_space.high

        scale_factor = (high - low) / 2
        reloc_factor = high - scale_factor

        action = (action - reloc_factor) / scale_factor
        action = np.clip(action, -1.0, 1.0)

        return action

if __name__ == "__main__":
    # environment
    env_id = "Pendulum-v1"
    env = gym.make(env_id)
    env = ActionNormalizer(env)

    # set random seed
    def seed_torch(seed):
        torch.manual_seed(seed)
        if torch.backends.cudnn.enabled:
            torch.backends.cudnn.benchmark = False
            torch.backends.cudnn.deterministic = True

    seed = 42
    random.seed(seed)
    np.random.seed(seed)
    seed_torch(seed)
    env.seed(seed)

    # parameters
    num_frames = 50000
    memory_size = 100000
    batch_size = 128
    ou_noise_theta = 1.0
    ou_noise_sigma = 0.1
    initial_random_steps = 10000

    agent = DDPGAgent(
        env,
        memory_size,
        batch_size,
        ou_noise_theta,
        ou_noise_sigma,
        initial_random_steps=initial_random_steps,
    )

    # train
    agent.train(num_frames)
DDPG训练过程

测试test:

frames = agent.test()

from matplotlib import animation

def display_frames_as_gif(frames, filename):
    patch = plt.imshow(frames[0])
    plt.axis("off")

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(fig=plt.gcf(),
                                   func=animate,
                                   frames=len(frames),
                                   interval=5)
    anim.save(filename, writer="pillow", fps=30)

display_frames_as_gif(frames, "ddpg.gif")
Pendulum-DDPG

双延迟深度确定性策略梯度(Twin Delayed DDPG,TD3)

  • 截断的双Q学习(Clipped Dobule Q-learning):TD3学习两个Q函数(因此称为twin),并且利用这两个Q函数中较小的哪个Q值来构建贝尔曼误差函数中的目标网络。
  • 延迟的策略更新(“Delayed” Policy Updates):策略(包括目标策略网络)更新的频率要低于Q函数的更新频率。文章建议Q网络每更新两次,策略网络才更新一次。
  • 目标策略平滑(Target Policy smoothing):TD3在目标动作中也加入了噪声,通过平滑Q函数沿着不同动作的变化,使得策略更难利用Q函数的错误。
TD3
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容