引言
目前车联网和车路协同 (V2V, V2I, V2P... 统称 V2X) 是自动驾驶和智能交通领域研究的核心问题之一。简单来说,V2X 就是为车辆提供了上帝视角,可以获取更大范围、更准确的信息,从而优化驾驶决策。
在各种智能交通场景中,platooning (车队编排) 是一个很热门的话题。借助 V2X, platooning 可以减小车辆之间的安全距离。较小的安全距离可以增加道路的吞吐量,对于在高速公路上行驶的卡车编队,还可以减小风阻,降低能耗。
对于一般的研究机构,由于各方面的限制,很难在实车、实路上测试智能交通算法的性能。在这种情况下,仿真测试可以在一定程度上验证算法的有效性。
V2X 仿真中涉及到两方面:交通场景和网络通讯。
SUMO 可以实现交通场景的仿真,网络通讯方面的仿真软件有OMNeT++, NS3 等.
Veins (vehicles in network simulation) = SUMO + OMNeT++
从名字就可以看出来, Veins 是针对车联网的仿真软件。
Plexe (platooning extension for veins) = Plexe-SUMO + Plexe-Veins
Plexe 是在 Veins 中加入了 platooning 的元素。
在做 V2X 仿真时,如果对网络通讯方面的真实度要求不高,不需要模拟实际联网中的延迟、丢包等情况,可以只用 Plexe-SUMO,这样程序编写要简单很多。
Plexe-SUMO 是在 SUMO 基础上加入了 platooning 相关的元素,更方便 platooning 场景的搭建。另外,Plexe-SUMO 提供了 Python API,可以在 python 中以 module 的方式调用,程序书写更简单。
本文首先介绍了 Plexe-SUMO python API 的安装方法,然后分析 Plexe-SUMO 官方给的 demo 程序,并在此基础上尝试构造我们自己的仿真场景。官方 demo 包括
- brakedemo.py: platoon 协同刹车
- joindemo.py: 从中间加入 platoon
- enginedemo.py: 三车( alfa-147, audi-r8, bugatti-veyron ) 启动性能对比
- platoon-lane-change-test.py: platoon 协同换道超车
- overtake-and-keep-right-test.py: 单辆车超车
本文重点分析 brakedemo.py 和 joindemo.py 两个程序。
安装 Plexe-SUMO python API
cd
git clone --recursive https://github.com/eclipse/sumo
sudo apt-get install cmake python libxerces-c-dev libfox-1.6-dev libgl1-mesa-dev libglu1-mesa-dev libgdal-dev libproj-dev
cd ~/sumo/build/cmake_modules
cmake ../..
make -j8
echo "export SUMO_HOME=~/sumo" >> ~/.bashrc
echo "export PATH=$SUMO_HOME/bin:$SUMO_HOME/tools:$PATH" >> ~/.bashrc
cd
git clone https://github.com/michele-segata/plexe-pyapi.git
cd plexe-pyapi
pip install --user .
brakedemo.py
在 platoon 场景中,车辆的速度控制一般采用 CACC (cooperative adaptive cruise control),这里的 cooperative 就意味着车辆之间可以协作,共享信息。在 brakedemo.py 程序中,platoon 实现协同刹车,即使跟车距离比较短,也不会发生碰撞。
程序及注释如下:
import os
import sys
import random
# 调用 utils module,里面包含了 platooning 的中层实现函数
from utils import add_platooning_vehicle, start_sumo, running, communicate
# 确保路径设置正确,python 能够搜索到 traci module
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
import traci
# 调用 plexe module, 包含了 platooning 的底层实现函数
from plexe import Plexe, ACC, CACC, RPM, GEAR, RADAR_DISTANCE
# vehicle length
LENGTH = 4
# inter-vehicle distance
DISTANCE = 5
# cruising speed
# 120 km/h = 33.3 m/s
SPEED = 120/3.6
# distance between multiple platoons
# 不同 Platoon 之间的距离,在该 demo 中只有一个 platoon
PLATOON_DISTANCE = SPEED * 1.5 + 2
# vehicle who starts to brake
# 最初开始刹车的是头车,编号为 "v.0.0",即 0 号 platoon 中的 0 号车辆
# 由于只有一个 platoon,所以所有车辆编号都是 "v.0.* "的形式
BRAKING_VEHICLE = "v.0.0"
# 构造 platoon,返回 platoon 中车辆之间的通信连接关系
def add_vehicles(plexe, n, n_platoons, real_engine=False):
"""
Adds a set of platoons of n vehicles each to the simulation
:param plexe: API instance
:param n: number of vehicles of the platoon
:param n_platoons: number of platoons
:param real_engine: set to true to use the realistic engine model,
false to use a first order lag model
:return: returns the topology of the platoon, i.e., a dictionary which
indicates, for each vehicle, who is its leader and who is its front
vehicle. The topology can the be used by the data exchange logic to
automatically fetch data from leading and front vehicle to feed the CACC
"""
# add a platoon of n vehicles
# topology 中以 dictionary 的形式存储车辆之间的通信拓扑结构,包括每辆车的前车和头车编号
topology = {}
p_length = n * LENGTH + (n - 1) * DISTANCE
for p in range(n_platoons):
for i in range(n):
vid = "v.%d.%d" % (p, i)
# 调用 utils module 中的函数,将车辆编排成 platoon 的形式
add_platooning_vehicle(plexe, vid, (n-p+1) * (p_length + PLATOON_DISTANCE) + (n-i+1) *
(DISTANCE+LENGTH), 0, SPEED, DISTANCE, real_engine)
# 将车辆保持在 lane 0 ,并忽略安全距离限制
plexe.set_fixed_lane(vid, 0, False)
# 设置车辆速度模式。车辆的速度有 5 个影响因素
# https://sumo.dlr.de/wiki/TraCI/Change_Vehicle_State#speed_mode_.280xb3.29
# 1\. safe speed
# 2\. max accel
# 3\. max speed
# 4\. right of way at intersections
# 5\. brake hard to avoid passing red light
# 如果全都不考虑,则设置为 [0, 0, 0, 0, 0] = 0, 此时,车速完全由 traci 控制
# 如果全都考虑,则设置为 [1, 1, 1, 1, 1] = 31
# 如果只考虑 safe speed,则设置为 [0, 0, 0, 0, 1] = 1
traci.vehicle.setSpeedMode(vid, 0)
# 在 platoon 中头车采用 adaptive cruise control 的控制方式
# 后边的跟车采用 cooperative adative cruise control 的控制方式
plexe.use_controller_acceleration(vid, False)
if i == 0:
plexe.set_active_controller(vid, ACC)
else:
plexe.set_active_controller(vid, CACC)
if i > 0:
topology[vid] = {"front": "v.%d.%d" % (p, i - 1), "leader": "v.%d.0" % p}
else:
topology[vid] = {}
return topology
def main(demo_mode, real_engine, setter=None):
# used to randomly color the vehicles
# 具体着色是在 utils module 的 add_platooning_vehicle 函数中实现的
random.seed(1)
# 运行 SUMO 的配置文件,后边的参数 False / True 表示 SUMO server 是否已经在运行了。
# 若为 False,则打开 SUMO 并加载配置文件
# 若为 True,则重新加载配置文件
# freeway.sumo.cfg 中仿真步长为 0.01s
start_sumo("cfg/freeway.sumo.cfg", False)
# 以下设置可以使得每次 traci.simulationStep() 之后都调用一次 plexe
plexe = Plexe()
traci.addStepListener(plexe)
step = 0
topology = dict()
min_dist = 1e6
# 主循环
while running(demo_mode, step, 1500):
# when reaching 15 seconds, reset the simulation when in demo_mode
if demo_mode and step == 1500:
print("Min dist: %f" % min_dist)
start_sumo("cfg/freeway.sumo.cfg", True)
step = 0
random.seed(1)
traci.simulationStep()
# 仿真初始化阶段,构造含有 8 辆车的 platoon
# 设置 GUI 中画面在整个仿真过程中始终聚焦在 v.0.0, 即头车
# 镜头缩放参数 20000, 这个可以根据具体场景设置,使得镜头既不会拉的太近,也不会拉的太远。
if step == 0:
# create vehicles and track the braking vehicle
topology = add_vehicles(plexe, 8, 1, real_engine)
traci.gui.trackVehicle("View #0", BRAKING_VEHICLE)
traci.gui.setZoom("View #0", 20000)
# 每隔 10 步车辆之间通信一次,获得其他车辆的位置、速度、加速度等信息
if step % 10 == 1:
# simulate vehicle communication every 100 ms
communicate(plexe, topology)
# 是否使用 plexe 中改进的更加逼真的引擎模型
if real_engine and setter is not None:
# if we are running with the dashboard, update its values
tracked_id = traci.gui.getTrackedVehicle("View #0")
if tracked_id != "":
ed = plexe.get_engine_data(tracked_id)
vd = plexe.get_vehicle_data(tracked_id)
setter(ed[RPM], ed[GEAR], vd.speed, vd.acceleration)
# 在第 500 步时头车刹车,由于是 车联网 状态,所以其他车辆也会及时得到头车信息,几乎同步刹车,即使跟车距离很小,也不会追尾。这就是 车辆网 的优势。
if step == 500:
plexe.set_fixed_acceleration(BRAKING_VEHICLE, True, -6)
# 记录在整个仿真过程中车辆间隔的最小距离,有需要的话可以随后进行分析
if step > 1:
radar = plexe.get_radar_data("v.0.1")
if radar[RADAR_DISTANCE] < min_dist:
min_dist = radar[RADAR_DISTANCE]
step += 1
traci.close()
if __name__ == "__main__":
main(True, True)
joindemo.py
该程序实现了单个车辆从中部加入 platoon 的过程
程序及注释如下:
import os
import sys
import random
from utils import add_platooning_vehicle, communicate, get_distance, start_sumo, running
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
import traci
from plexe import Plexe, ACC, CACC, FAKED_CACC, RPM, GEAR, ACCELERATION, SPEED
# vehicle length
LENGTH = 4
# inter-vehicle distance
DISTANCE = 5
# inter-vehicle distance when leaving space for joining
JOIN_DISTANCE = DISTANCE * 2
# cruising speed
SPEED = 120 / 3.6
# maneuver states:
GOING_TO_POSITION = 0
OPENING_GAP = 1
COMPLETED = 2
# maneuver actors
# 这里只考虑 1 个 platoon,其中车辆编号依次为 v.0, v.1, v.2, ...
LEADER = "v.0"
JOIN_POSITION = 3
FRONT_JOIN = "v.%d" % (JOIN_POSITION - 1)
BEHIND_JOIN = "v.%d" % JOIN_POSITION
N_VEHICLES = 8
JOINER = "v.%d" % N_VEHICLES
# 该程序与 brakedemo.py 中类似,向场景中加入车辆,这里包含 1 个 platoon 和 1 个单独的车辆
# 返回 platoon 的 topology,这个 topology 中不包含后加入的单个车辆
def add_vehicles(plexe, n, real_engine=False):
"""
Adds a platoon of n vehicles to the simulation, plus an additional one
farther away that wants to join the platoon
:param plexe: API instance
:param n: number of vehicles of the platoon
:param real_engine: set to true to use the realistic engine model,
false to use a first order lag model
:return: returns the topology of the platoon, i.e., a dictionary which
indicates, for each vehicle, who is its leader and who is its front
vehicle. The topology can the be used by the data exchange logic to
automatically fetch data from leading and front vehicle to feed the CACC
"""
# add a platoon of n vehicles
topology = {}
for i in range(n):
vid = "v.%d" % i
add_platooning_vehicle(plexe, vid, (n - i + 1) * (DISTANCE + LENGTH) +
50, 0, SPEED, DISTANCE, real_engine)
plexe.set_fixed_lane(vid, 0, safe=False)
traci.vehicle.setSpeedMode(vid, 0)
if i == 0:
plexe.set_active_controller(vid, ACC)
else:
plexe.set_active_controller(vid, CACC)
if i > 0:
topology[vid] = {"front": "v.%d" % (i - 1), "leader": LEADER}
# add a vehicle that wants to join the platoon
vid = "v.%d" % n
add_platooning_vehicle(plexe, vid, 10, 1, SPEED, DISTANCE, real_engine)
plexe.set_fixed_lane(vid, 1, safe=False)
traci.vehicle.setSpeedMode(vid, 0)
plexe.set_active_controller(vid, ACC)
plexe.set_path_cacc_parameters(vid, distance=JOIN_DISTANCE)
return topology
# 该函数的作用是通过改变待加入车辆的通讯拓扑,令其到达指定位置,准备变道加入 platoon
def get_in_position(plexe, jid, fid, topology):
"""
Makes the joining vehicle get close to the join position. This is done by
changing the topology and setting the leader and the front vehicle for
the joiner. In addition, we increase the cruising speed and we switch to
the "fake" CACC, which uses a given GPS distance instead of the radar
distance to compute the control action
:param plexe: API instance
:param jid: id of the joiner
:param fid: id of the vehicle that will become the predecessor of the joiner
:param topology: the current platoon topology
:return: the modified topology
"""
topology[jid] = {"leader": LEADER, "front": fid}
plexe.set_cc_desired_speed(jid, SPEED + 15)
plexe.set_active_controller(jid, FAKED_CACC)
return topology
# 该函数的作用是当待加入车辆到达指定位置时,后车为其留出足够的距离
# 在此过程中,platoon 分裂,后车成为新的 leader,带领它的 follower 减速,为待加入车辆留出足够的空间
def open_gap(plexe, vid, jid, topology, n):
"""
Makes the vehicle that will be behind the joiner open a gap to let the
joiner in. This is done by creating a temporary platoon, i.e., setting
the leader of all vehicles behind to the one that opens the gap and then
setting the front vehicle of the latter to be the joiner. To properly
open the gap, the vehicle leaving space switches to the "fake" CACC,
to consider the GPS distance to the joiner
:param plexe: API instance
:param vid: vehicle that should open the gap
:param jid: id of the joiner
:param topology: the current platoon topology
:param n: total number of vehicles currently in the platoon
:return: the modified topology
"""
index = int(vid.split(".")[1])
for i in range(index + 1, n):
# temporarily change the leader
topology["v.%d" % i]["leader"] = vid
# the front vehicle if the vehicle opening the gap is the joiner
topology[vid]["front"] = jid
plexe.set_active_controller(vid, FAKED_CACC)
plexe.set_path_cacc_parameters(vid, distance=JOIN_DISTANCE)
return topology
# 该函数的作用是,完成 join 操作后,修改通讯网络拓扑,回归最初的 CACC 情形,都以头车作为 leader
def reset_leader(vid, topology, n):
"""
After the maneuver is completed, the vehicles behind the one that opened
the gap, reset the leader to the initial one
:param vid: id of the vehicle that let the joiner in
:param topology: the current platoon topology
:param n: total number of vehicles in the platoon (before the joiner)
:return: the modified topology
"""
index = int(vid.split(".")[1])
for i in range(index + 1, n):
# restore the real leader
topology["v.%d" % i]["leader"] = LEADER
return topology
def main(demo_mode, real_engine, setter=None):
# used to randomly color the vehicles
random.seed(1)
start_sumo("cfg/freeway.sumo.cfg", False)
plexe = Plexe()
traci.addStepListener(plexe)
step = 0
state = GOING_TO_POSITION
while running(demo_mode, step, 6000):
# when reaching 60 seconds, reset the simulation when in demo_mode
if demo_mode and step == 6000:
start_sumo("cfg/freeway.sumo.cfg", True)
step = 0
state = GOING_TO_POSITION
random.seed(1)
traci.simulationStep()
if step == 0:
# create vehicles and track the joiner
topology = add_vehicles(plexe, N_VEHICLES, real_engine)
traci.gui.trackVehicle("View #0", JOINER)
traci.gui.setZoom("View #0", 20000)
if step % 10 == 1:
# simulate vehicle communication every 100 ms
communicate(plexe, topology)
# 程序执行 100 步 (0.01s * 100 = 1s) 后,令单个车辆驶近目标位置
if step == 100:
# at 1 second, let the joiner get closer to the platoon
topology = get_in_position(plexe, JOINER, FRONT_JOIN, topology)
# 当车辆达到指定位置,令后车留出足够的空间,以便汇入。
if state == GOING_TO_POSITION and step > 0:
# when the distance of the joiner is small enough, let the others
# open a gap to let the joiner enter the platoon
if get_distance(plexe, JOINER, FRONT_JOIN) < JOIN_DISTANCE + 1:
state = OPENING_GAP
topology = open_gap(plexe, BEHIND_JOIN, JOINER, topology, N_VEHICLES)
# 当空间足够大时,完成汇入
if state == OPENING_GAP:
# when the gap is large enough, complete the maneuver
if get_distance(plexe, BEHIND_JOIN, FRONT_JOIN) > 2 * JOIN_DISTANCE + 2:
state = COMPLETED
plexe.set_fixed_lane(JOINER, 0, safe=False)
plexe.set_active_controller(JOINER, CACC)
plexe.set_path_cacc_parameters(JOINER, distance=DISTANCE)
plexe.set_active_controller(BEHIND_JOIN, CACC)
plexe.set_path_cacc_parameters(BEHIND_JOIN, distance=DISTANCE)
topology = reset_leader(BEHIND_JOIN, topology, N_VEHICLES)
if real_engine and setter is not None:
# if we are running with the dashboard, update its values
tracked_id = traci.gui.getTrackedVehicle("View #0")
if tracked_id != "":
ed = plexe.get_engine_data(tracked_id)
vd = plexe.get_vehicle_data(tracked_id)
setter(ed[RPM], ed[GEAR], vd.speed, vd.acceleration)
step += 1
traci.close()
if __name__ == "__main__":
main(True, False)
构造自己的仿真场景
以上 demo 涵盖了很多基本场景模块,如构造编队、队列分割、添加新 leader、换道、刹车等。基于这些模块,我们可以构造不同的仿真场景,测试自己的车联网和车路协同策略。
这里我们考虑一个 platoon 通过交通路口的场景。交通灯可以向一定范围内的车辆发送信号,信号中包含当前交通灯状态以及剩余时间等信息,platoon leader 基于这些信息作出决策,platoon 中哪些车辆可以通过,哪些车辆需要提前刹车避免闯红灯。
为了实现上述 V2X 场景,可以在 brakedemo.py 基础上做简单修改。需要注意的是,这里的 sumo 网络和配置文件不再是之前的 freeway,而是需要自己设计,加入交通灯等元素。
在车辆颜色方面,为了区分 leader 和 follower,我对 utils 中的车辆颜色做了一些修改,不再使用 random,而是将 leader 设定为红色, follower 设定为黄色。
platoon leader 在进入交通灯 100m 范围内就可以获得交通灯信息,知道当前状态和剩余时间,然后进行后续的 platoon split 操作,分裂成两个 platoon。前部的 platoon 会正常行驶通过路口,后部的 platoon 在新的 leader 带领下刹车,停在路口处,等待下个绿灯通行。
程序及注释如下:
import os
import sys
from math import floor
import random
from utils import add_platooning_vehicle, start_sumo, running, communicate
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
import traci
from plexe import Plexe, ACC, CACC, RPM, GEAR, RADAR_DISTANCE
# vehicle length
LENGTH = 4
# inter-vehicle distance
DISTANCE = 5
# platoon size
PLATOON_SIZE = 8
# number of platoon
PLATOON_NUM = 1
# cruising speed
SPEED = 33 # m/s
# distance between multiple platoons
PLATOON_DISTANCE = SPEED * 1.5 + 2
# vehicle who starts to brake
BRAKING_VEHICLE = "v.0.0"
# the original leader ID
ORI_LEADER_ID ="v.0.0"
# traffic light ID
TL_ID = "tl_0"
# range of traffic light broadcast
RANGE = 100
# traffic light position
TL_POS = 558
def add_vehicles(plexe, n, n_platoons, real_engine=False):
"""
Adds a set of platoons of n vehicles each to the simulation
:param plexe: API instance
:param n: number of vehicles of the platoon
:param n_platoons: number of platoons
:param real_engine: set to true to use the realistic engine model,
false to use a first order lag model
:return: returns the topology of the platoon, i.e., a dictionary which
indicates, for each vehicle, who is its leader and who is its front
vehicle. The topology can the be used by the data exchange logic to
automatically fetch data from leading and front vehicle to feed the CACC
"""
# add a platoon of n vehicles
topology = {}
p_length = n * LENGTH + (n - 1) * DISTANCE
for p in range(n_platoons):
for i in range(n):
vid = "v.%d.%d" % (p, i)
add_platooning_vehicle(plexe, vid, 200 + (n-i+1) * (DISTANCE+LENGTH), 0, SPEED, DISTANCE, real_engine)
plexe.set_fixed_lane(vid, 0, False)
traci.vehicle.setSpeedMode(vid, 0)
plexe.use_controller_acceleration(vid, False)
if i == 0:
plexe.set_active_controller(vid, ACC)
else:
plexe.set_active_controller(vid, CACC)
if i > 0:
topology[vid] = {"front": "v.%d.%d" % (p, i - 1), "leader": "v.%d.0" % p}
else:
topology[vid] = {}
return topology
def main(demo_mode, real_engine, setter=None):
# used to randomly color the vehicles
random.seed(1)
start_sumo("cfg/intersection/intersection.sumo.cfg", False)
plexe = Plexe()
traci.addStepListener(plexe)
step = 0
topology = dict()
min_dist = 1e6
split = False
while running(demo_mode, step, 3000):
traci.simulationStep()
if step == 0:
# create vehicles and track the braking vehicle
topology = add_vehicles(plexe, PLATOON_SIZE, PLATOON_NUM, real_engine)
tracked_veh = "v.0.%d" %(PLATOON_SIZE-1)
traci.gui.trackVehicle("View #0", tracked_veh)
traci.gui.setZoom("View #0", 2000)
# when the leader is 100m away from the traffic light, it will receive the current phase of the traffic light
# Accordingly, it computes which vehicles could pass safely.
leader_data = plexe.get_vehicle_data(ORI_LEADER_ID)
# the structure of vehicle data is defined in vehicle_data.py file in plexe folder
# self.acceleration, self.speed, self.pos_x, self.pos_y
if leader_data.pos_x >= TL_POS - RANGE and not split:
current_phase = traci.trafficlight.getPhase(TL_ID)
if current_phase == 0:
absolute_time = traci.trafficlight.getNextSwitch(TL_ID)
time_left = absolute_time - traci.simulation.getTime()
new_leader = floor((leader_data.speed * time_left - RANGE)/(LENGTH + DISTANCE))
new_leader_id = "v.0.%d" % new_leader
# change topology: add new leader and decelerate.
for i in range(new_leader+1,PLATOON_SIZE):
topology["v.0.%d" %i]["leader"] = new_leader_id
topology[new_leader_id] = {}
new_leader_data = plexe.get_vehicle_data(new_leader_id)
decel = new_leader_data.speed**2 / (2* (RANGE + new_leader * (LENGTH + DISTANCE)))
plexe.set_fixed_acceleration(new_leader_id, True, -1 * decel)
split = True
# set color for leader
for i in range(PLATOON_SIZE):
vid = "v.0.%d" % i
if topology[vid] == {}:
traci.vehicle.setColor(vid, (250,0,0, 255))
if step % 10 == 1:
# simulate vehicle communication every 100 ms
communicate(plexe, topology)
if real_engine and setter is not None:
# if we are running with the dashboard, update its values
tracked_id = traci.gui.getTrackedVehicle("View #0")
if tracked_id != "":
ed = plexe.get_engine_data(tracked_id)
vd = plexe.get_vehicle_data(tracked_id)
setter(ed[RPM], ed[GEAR], vd.speed, vd.acceleration)
if split == True:
new_leader_data = plexe.get_vehicle_data(new_leader_id)
current_phase = traci.trafficlight.getPhase(TL_ID)
if TL_POS - new_leader_data.pos_x < 10 and current_phase == 0:
plexe.set_fixed_acceleration(new_leader_id, True, 3)
if step > 1:
radar = plexe.get_radar_data("v.0.1")
if radar[RADAR_DISTANCE] < min_dist:
min_dist = radar[RADAR_DISTANCE]
step += 1
if step > 3000:
break
traci.close()
if __name__ == "__main__":
main(True, True)