#!/usr/bin/env python
"""Package to compute collision free trajectories for each agent.
The algorithm [CIT3]_ used is based on Distributed Model Predictive Control.
Usage
-----
1. Set start position and goals of each agent to compute trajectories\
(``/set_planner_positions`` srv)
2. Start trajectory solver (``/plan_trajectories`` srv)
3. Wait for trajectory solver to be done
4. Start trajectory publishing (``/pub_trajectories`` srv)
5. Wait for trajectory publishing to be done
ROS Features
------------
Subscribed Topics
^^^^^^^^^^^^^^^^^
None
Published Topics
^^^^^^^^^^^^^^^^
/cfx/trajectory_goal(crazyflie_driver/Position)
Position of the CF on the trajectory, at each time step
Services
^^^^^^^^
/set_planner_positions(crazyflie_charles/SetPositions)
Set position (start or goal) of each crazyflie.
See :py:meth:`TrajectoryPlanner.set_positions` for more information.
/plan_trajectories(`std_srvs/Empty`_)
Start solver to find a trajectory for each crazyflie.
/pub_trajectories(`std_srvs/Empty`_)
Call to start publishing all trajectories
Services Called
^^^^^^^^^^^^^^^
/traj_found(`std_srvs/SetBool`_)
Service called once a trajectory is found.
`data` is True if valid trajectories are found.
`data` is false otherwise (collision, no solution in constraints).
/traj_done(`std_srvs/Empty`_)
Service called once the last step of the trajectory is reached.
Parameters
^^^^^^^^^^
~cf_list(str, default: ['cf1'])
`TrajectoryPlanner` Class
-------------------------
.. _std_srvs/Empty: http://docs.ros.org/api/std_srvs/html/srv/Empty.html
.. _std_srvs/SetBool: http://docs.ros.org/api/std_srvs/html/srv/SetBool.html
"""
import ast
import rospy
from std_srvs.srv import Empty, SetBool
from crazyflie_driver.msg import Position
from trajectory_planner.srv import SetPositions
from trajectory_solver import TrajectorySolver
from agent import Agent
[docs]class TrajectoryPlanner(object):
"""To plan trajectories of CFs
"""
def __init__(self, cf_list, solver_args):
"""
Args:
cf_list (list of str): List of all CF in the swarm
"""
self.agents_dict = {}
"""dict of str: dict of str: Keys are the id of the CF.
Items are a dict containing: ``Agent``, trajectory_publisher, start_yaw"""
agents_args = {'r_min': solver_args['r_min'],
'col_radius_ratio': solver_args['col_radius_ratio'],
'goal_dist_thres': solver_args['goal_dist_thres'],
'goal_speed_thres': solver_args['goal_speed_thres']}
for each_cf in cf_list:
self.agents_dict[each_cf] = {}
self.agents_dict[each_cf]['agent'] = Agent(agents_args)
self.agents_dict[each_cf]['trajectory_pub'] =\
rospy.Publisher('/' + each_cf + '/trajectory_goal', Position, queue_size=1)
self.agents_dict[each_cf]['start_yaw'] = 0
agent_list = [agent_dict['agent'] for (_, agent_dict) in self.agents_dict.items()]
self.solver = TrajectorySolver(agent_list, solver_args, verbose=True)
#: bool: True if a trajectories are to be planned
self.to_plan_trajectories = False
#: bool: True if a trajectory has been found for current position
self.trajectory_found = False
#: bool: True if a trajectories are to be published
self.to_publish_traj = False
# Start services
rospy.Service('/set_planner_positions', SetPositions, self.set_positions)
rospy.Service('/plan_trajectories', Empty, self.plan_trajectories)
rospy.Service('/pub_trajectories', Empty, self.start_publishing_srv)
rospy.loginfo("Planner: waiting for swarm manager services")
self.send_result = rospy.ServiceProxy("/traj_found", SetBool)
self.traj_done = rospy.ServiceProxy("/traj_done", Empty)
rospy.loginfo("Planner: swarm manager services found")
self._rate = rospy.Rate(1/rospy.get_param("trajectory_solver")["interp_step"])
# Services
[docs] def set_positions(self, srv_req):
"""Set start position or goal of each agent
Args:
srv_req (SetPositions): List /w position type(start or goal) and positions of each agent
"""
pos_type = srv_req.position_type
cf_positions = ast.literal_eval(srv_req.positions)
self.trajectory_found = False
if pos_type == "start_position":
# print "Setting start positions to:"
# print cf_positions
rospy.loginfo("Planner: Setting start positions")
for cf_id, start_pos in cf_positions.items():
start_coord = start_pos[0:3]
start_yaw = start_pos[3]
self.agents_dict[cf_id]['agent'].set_starting_position(start_coord)
self.agents_dict[cf_id]['start_yaw'] = start_yaw
elif pos_type == "goal":
# print "Setting goals to:"
# print cf_positions
rospy.loginfo("Planner: Setting goals")
for cf_id, goal in cf_positions.items():
goal_coord = goal[0:3]
# goal_yaw = goal[3]
self.agents_dict[cf_id]['agent'].set_goal(goal_coord)
else:
rospy.logerr("Invalid position type")
return {"success": True}
[docs] def plan_trajectories(self, _):
"""Start trajectory planning of each agent
"""
self.solver.update_agents_info()
if not self.to_plan_trajectories and not self.trajectory_found:
self.to_plan_trajectories = True
elif self.to_plan_trajectories:
rospy.logwarn("Solver already running")
elif self.trajectory_found:
rospy.logwarn("Trajectory already found for current positions")
return {}
[docs] def start_publishing_srv(self, _):
"""Start publishing CF trajectories
"""
self.to_publish_traj = True
return {}
# Methods
[docs] def publish_trajectories(self):
"""Publish computed trajectory
"""
# Set swarm manager to follow trajectory
rospy.loginfo("Planner: Publishing trajectories...")
time_step = 0
max_time_step = self.agents_dict[self.agents_dict.keys()[0]]['agent'].final_traj.shape[1]
while time_step < max_time_step:
if rospy.is_shutdown():
break
for _, agent_dict in self.agents_dict.items():
agent_pos = agent_dict["agent"].final_traj[:, time_step]
goal_msg = Position()
goal_msg.x = agent_pos[0]
goal_msg.y = agent_pos[1]
goal_msg.z = agent_pos[2]
goal_msg.yaw = agent_dict["start_yaw"]
agent_dict['trajectory_pub'].publish(goal_msg)
time_step += 1
self._rate.sleep()
rospy.loginfo("Planner: Trajectory completed")
self.traj_done()
[docs] def run_planner(self):
"""Execute the correct method based on booleen states
"""
if self.to_plan_trajectories and not self.trajectory_found:
self.to_plan_trajectories = False
rospy.loginfo("Planner: Planning trajectories...")
planner_successfull, _ = self.solver.solve_trajectories()
if planner_successfull:
self.trajectory_found = True
rospy.loginfo("Planner: Trajectory found")
# self.solver.plot_trajectories()
else:
self.trajectory_found = False
rospy.logerr("Planner: No trajectory found")
# self.solver.plot_trajectories()
self.send_result(self.trajectory_found)
if self.to_publish_traj:
self.to_publish_traj = False
self.publish_trajectories()
if __name__ == '__main__':
# Launch node
rospy.init_node('trajectory_planner', anonymous=False)
# Get params
while True: # Make sure cf_list has been set by `swarm_controller`
try:
CF_LIST = rospy.get_param("cf_list")
break
except KeyError:
pass
SOLVER_ARGS = rospy.get_param("trajectory_solver")
# Initialize planner
PLANNER = TrajectoryPlanner(CF_LIST, SOLVER_ARGS)
while not rospy.is_shutdown():
PLANNER.run_planner()