#!/usr/bin/env python
"""
Module to move the swarm and use ``TrajectoryPlanner`` and ``FormationManager`` packages.
``SwarmController`` class
-------------------------
"""
import ast
import Queue
import rospy
import numpy as np
from numpy import dot
from numpy.linalg import norm, inv
from geometry_msgs.msg import Twist, PoseStamped
from std_msgs.msg import String
from crazyflie_driver.msg import Position
from std_srvs.srv import Empty, SetBool
from swarm_manager.srv import SetParam, SetGoals, GetPositions, SetMode
from formation_manager.srv import SetFormation, GetFormationList
from trajectory_planner.srv import SetPositions
from swarm_api.launch_file_api import launch_swarm
from state_machine import StateMachine
from crazyflie import yaw_from_quat
[docs]class SwarmController(object):
"""Main class
"""
def __init__(self):
rospy.loginfo("Swarm: Initialization...")
self._crazyflies = {} #: Dict of :py:class:`CrazyfliePy`
self._cf_list = rospy.get_param("cf_list") #: List of str: List of all cf names in the swarm
self._rate = rospy.Rate(10) #: Rate: Rate to publish messages
# Initialize each Crazyflie
for each_cf in self._cf_list:
self._crazyflies[each_cf] = CrazyfliePy(each_cf)
self._init_params()
self._stabilize_position()
# Init services
self.formation_services = {}
self.formation_services = {"set_formation": None,
"get_list": None,
"inc_scale": None,
"dec_scale": None}
self.traj_services = {}
self.traj_services = {"set_planner_positions": None,
"plan_trajectories": None,
"pub_trajectories": None,}
self.start_rec_service = None
self._init_services()
# Subscriber
self._formation_goal_vel_pub = rospy.Publisher('/formation_goal_vel',
Twist, queue_size=1)
self._formation_goal_vel = Twist()
self._joy_swarm_vel = Twist() #: Twist: Velocity received from joystick
rospy.Subscriber("/joy_swarm_vel", Twist, self._joy_swarm_vel_handler)
# Initialize formation
self.formation = "line"
self._formation_list = self.formation_services["get_list"]().formations.split(',')
self._extra_cf_list = [] #: list of str: ID of extra CF
self._landed_cf_ids = [] #: list of str: Swarm Id of landed CFs
# Initialize state machine
_state_list = {"landed": self._landed_state,
"follow_traj": self._follow_traj_state,
"in_formation": self._in_formation_state,
"hover": self._hover_state,
"landing": self._landing_state,}
self._state_machine = StateMachine(_state_list)
self._state_machine.set_state("landed")
# Other
self.ctrl_mode = "automatic"
self._traj_found = False
self._traj_successfull = False
self._after_traj_state = "" # State to go once the trajectory is completed
self.start_rec_service()
rospy.loginfo("Swarm: Ready to start")
def _init_services(self):
# Services
rospy.Service('/swarm_emergency', Empty, self._swarm_emergency_srv)
rospy.Service('/stop_swarm', Empty, self._stop_swarm_srv)
rospy.Service('/take_off_swarm', Empty, self._take_off_swarm_srv)
rospy.Service('/land_swarm', Empty, self._land_swarm_srv)
rospy.Service('/set_mode', SetMode, self._set_mode_srv)
rospy.Service('/go_to', SetGoals, self._go_to_srv)
rospy.Service('/get_positions', GetPositions, self._get_positions_srv)
rospy.Service('/set_swarm_formation', SetFormation, self._set_formation_srv)
rospy.Service('/next_swarm_formation', Empty, self._next_swarm_formation_srv)
rospy.Service('/prev_swarm_formation', Empty, self._prev_swarm_formation_srv)
rospy.Service('/inc_swarm_scale', Empty, self._inc_swarm_scale_srv)
rospy.Service('/dec_swarm_scale', Empty, self._dec_swarm_scale_srv)
rospy.Service('/traj_found', SetBool, self._traj_found_srv)
rospy.Service('/traj_done', Empty, self._traj_done_srv) # Trajectory done
# Subscribe to services
# Formation manager
rospy.loginfo("Swarm: waiting for formation services")
rospy.wait_for_service("/set_formation")
rospy.wait_for_service("/get_formations_list")
rospy.wait_for_service("/formation_inc_scale")
rospy.wait_for_service("/formation_dec_scale")
self.formation_services["set_formation"] = rospy.ServiceProxy("/set_formation",
SetFormation)
self.formation_services["get_list"] = rospy.ServiceProxy("/get_formations_list",
GetFormationList)
self.formation_services["inc_scale"] = rospy.ServiceProxy("/formation_inc_scale", Empty)
self.formation_services["dec_scale"] = rospy.ServiceProxy("/formation_dec_scale", Empty)
rospy.loginfo("Swarm: formation services found")
# Trajectory planner
rospy.loginfo("Swarm: waiting for trajectory planner services")
self.traj_services["set_planner_positions"] = rospy.ServiceProxy("/set_planner_positions",
SetPositions)
self.traj_services["plan_trajectories"] = rospy.ServiceProxy("/plan_trajectories", Empty)
self.traj_services["pub_trajectories"] = rospy.ServiceProxy("/pub_trajectories", Empty)
rospy.loginfo("Swarm: trajectory planner services found")
# Flight recorder
rospy.loginfo("Swarm: waiting for recorder services")
rospy.wait_for_service("/start_recorder")
self.start_rec_service = rospy.ServiceProxy("/start_recorder", Empty)
rospy.loginfo("Swarm: recorder services found")
def _init_params(self):
self.update_swarm_param("commander/enHighLevel", 1)
self.update_swarm_param("stabilizer/estimator", 2) # Use EKF
self.update_swarm_param("stabilizer/controller", 1) # 1: High lvl, 2: Mellinger
self.update_swarm_param("kalman/resetEstimation", 1)
def _stabilize_position(self):
"""To wait until position of all CFs is stable.
A CF position is considered stable when the variance of it's position in x, y and z is less
than a threshold.
The variance is calculated over the past 10 sec (10 data)
"""
rospy.loginfo("Swarm: Waiting for position to stabilize...")
# rospy.sleep(1.0)
threshold = 0.01
n_data = 50
positions = {}
var_list = {}
position_stable = False
queue_full = False
# Initialize position history
for cf_id in self._crazyflies:
queue_dict = {}
queue_dict['x'] = Queue.Queue(n_data)
queue_dict['y'] = Queue.Queue(n_data)
queue_dict['z'] = Queue.Queue(n_data)
positions[cf_id] = queue_dict
while not position_stable:
for cf_id, cf_info in self._crazyflies.items():
cf_pose = cf_info.pose.pose
if positions[cf_id]['x'].full():
queue_full = True
positions[cf_id]['x'].get()
positions[cf_id]['y'].get()
positions[cf_id]['z'].get()
positions[cf_id]['x'].put(cf_pose.position.x)
positions[cf_id]['y'].put(cf_pose.position.y)
positions[cf_id]['z'].put(cf_pose.position.z)
if queue_full:
all_vars = [np.var(positions[cf_id]['x'].queue),
np.var(positions[cf_id]['y'].queue),
np.var(positions[cf_id]['z'].queue)]
var_list[cf_id] = max(all_vars)
if queue_full:
max_var = max([v for _, v in var_list.items()])
if max_var < threshold:
position_stable = True
self._rate.sleep()
rospy.loginfo("Swarm: All CF position stable")
# Methods
[docs] def control_swarm(self):
"""Main method, publish on topics depending of current state
"""
# Execute function depending on current state
state_function = self._state_machine.run_state()
state_function()
if len(self._cf_list) > 1:
self.check_collisions()
# Publish goal of each CF
self._pub_cf_goals()
self._rate.sleep()
[docs] def update_swarm_param(self, param, value):
"""Update parameter of all swarm
Args:
param (str): Name of parameter
value (int64): Parameter value
"""
rospy.loginfo("Swarm: Set %s param to %s" % (param, str(value)))
self._call_all_cf_service("set_param", kwargs={'param': param, 'value': value})
[docs] def check_collisions(self):
"""To check that the minimal distance between each CF is okay.
If actual distance between CF is too small, calls emergency service.
"""
position_dist = []
goal_dist = []
scaling_matrix_inv = inv(np.diag([1, 1, 2]))
for cf_id, each_cf in self._crazyflies.items():
cf_position = each_cf.pose.pose
cf_position = np.array([cf_position.position.x,
cf_position.position.y,
cf_position.position.z])
cf_goal = each_cf.goals["goal"]
cf_goal = np.array([cf_goal.x,
cf_goal.y,
cf_goal.z])
for other_id, other_cf in self._crazyflies.items():
if other_id != cf_id:
other_pos = other_cf.pose.pose
other_pos = np.array([other_pos.position.x,
other_pos.position.y,
other_pos.position.z])
other_goal = other_cf.goals["goal"]
other_goal = np.array([other_goal.x,
other_goal.y,
other_goal.z])
p_dist = norm(dot(scaling_matrix_inv, cf_position - other_pos))
g_dist = norm(dot(scaling_matrix_inv, cf_goal - other_goal))
position_dist.append(p_dist)
goal_dist.append(g_dist)
min_distances = [min(goal_dist), min(position_dist)]
if min_distances[0] < MIN_GOAL_DIST:
rospy.logwarn("Goals are too close")
if min_distances[1] < MIN_CF_DIST:
rospy.logerr("CF too close, emergency")
self._swarm_emergency_srv(Empty())
[docs] def go_to_goal(self, target_goals=None, land_swarm=False):
"""Controls swarm to move each CF to it's desired goal while avoiding collisions.
If some CFs are in extra (in formation) will land them.
Handles extra CF landing.
Args:
land_swarm (bool, optional): If true, land swarm to initial pos. Defaults to False.
goals (dict, optional): To specify target goals. Used when not in formation mode.
Defaults to None.
"""
rospy.loginfo("Swarm: Going to new goals")
self._traj_found = False
self._landed_cf_ids = []
rospy.sleep(0.1)
# Update goal and send positions to planner
self._update_cf_goal(land_swarm)
self._send_start_positions()
self._send_goals(land_swarm=land_swarm, target_goals=target_goals)
# Start solver
self.traj_services["plan_trajectories"]()
# Take off landed CF that are not in extra
take_off_list = [cf for cf in self._landed_cf_ids if cf not in self._extra_cf_list]
self._call_all_cf_service("take_off", cf_list=take_off_list)
self._wait_for_take_off()
self._wait_for_planner()
# Follow traj
if self._traj_successfull:
self.traj_services["pub_trajectories"]()
self._state_machine.set_state("follow_traj")
# If traj. planner fails
else:
self._swarm_emergency_srv(None)
def _send_formation(self, formation_goal=None):
"""Send desired formation to formation manager.
`formation_manager` package will compute the new `formation_goal` of each CF and return all
CFs in extra.
Args:
formation_goal (list, optional): New formation goal: [x, y, z, yaw]. Defaults to None.
"""
start_positions = {}
for cf_id, cf_vals in self._crazyflies.items():
cf_pose = cf_vals.pose.pose
start_positions[cf_id] = [cf_pose.position.x,
cf_pose.position.y,
cf_pose.position.z]
srv_res = self.formation_services["set_formation"](formation=self.formation,
positions=str(start_positions),
goal=str(formation_goal))
self._extra_cf_list = srv_res.extra_cf.split(',')
def _update_cf_goal(self, land_swarm=False):
"""Update goal and initial position of landed CFs
Also find Id of landed CFs
Notes:
- If land_swarm: Set all goals 0.5m above initial pose
- If landed and not in extra: Set goal 0.5m above initial pose and add to landed list
- If landed and in extra: Set goal to initial pose and add to landed list
Args:
land_swarm (bool): When True, land all CFs in swarm
"""
self._landed_cf_ids = []
for cf_id, cf_vals in self._crazyflies.items():
if land_swarm:
cf_initial_pose = cf_vals.initial_pose
cf_vals.goals['goal'].x = cf_initial_pose.position.x
cf_vals.goals['goal'].y = cf_initial_pose.position.y
cf_vals.goals['goal'].z = cf_initial_pose.position.z + TAKE_OFF_HEIGHT
# If CF is landed and not in extra
elif cf_vals.state in ["stop", "landed", "land"] and cf_id not in self._extra_cf_list:
self._landed_cf_ids.append(cf_id)
cf_pose = cf_vals.pose.pose
if cf_vals.initial_pose is None:
cf_vals.update_initial_pose()
cf_vals.goals["goal"].x = cf_pose.position.x
cf_vals.goals["goal"].y = cf_pose.position.y
cf_vals.goals["goal"].z = GND_HEIGHT + TAKE_OFF_HEIGHT
# If CF is landed and in extra
elif cf_vals.state in ["stop", "landed", "land"] and cf_id in self._extra_cf_list:
self._landed_cf_ids.append(cf_id)
cf_pose = cf_vals.pose.pose
if cf_vals.initial_pose is None:
cf_vals.update_initial_pose()
cf_vals.goals["goal"].x = cf_pose.position.x
cf_vals.goals["goal"].y = cf_pose.position.y
cf_vals.goals["goal"].z = cf_pose.position.z
def _send_start_positions(self):
"""Send start positions to trajectory planner
Notes:
- Doesn't send positon of extra CF if it's landed
"""
# Send each CF starting position to planner
start_positions = {}
for cf_id, cf_vals in self._crazyflies.items():
cf_pose = cf_vals.pose.pose
# If landed, starting position is 0.5m above
if cf_id in self._landed_cf_ids and cf_id not in self._extra_cf_list:
start_positions[cf_id] = [cf_pose.position.x,
cf_pose.position.y,
cf_pose.position.z + TAKE_OFF_HEIGHT,
yaw_from_quat(cf_pose.orientation)]
else:
start_positions[cf_id] = [cf_pose.position.x,
cf_pose.position.y,
cf_pose.position.z,
yaw_from_quat(cf_pose.orientation)]
self.traj_services["set_planner_positions"](position_type="start_position",
positions=str(start_positions))
def _send_goals(self, land_swarm=False, target_goals=None):
"""Send goal of each CF to trajectory planner
Notes:
- If CF is in extra, goal is above initial position
- If CF is in extra and landed, no goal is sent
Args:
land_swarm (bool, optional): If True, land all CF in the swarm. Defaults to False.
target_goals (dict, optional): To specify each_cf goal. Defaults to None.
"""
goals = {}
for cf_id, cf_vals in self._crazyflies.items():
# Land all CFs in air
if land_swarm and cf_id not in self._landed_cf_ids:
cf_initial_pose = cf_vals.initial_pose
goals[cf_id] = [cf_initial_pose.position.x,
cf_initial_pose.position.y,
GND_HEIGHT + TAKE_OFF_HEIGHT,
yaw_from_quat(cf_initial_pose.orientation)]
# Goal of CF in formation
elif cf_id not in self._extra_cf_list: # If CF in formation
target_pose = None
if target_goals is None or cf_id not in target_goals.keys():
target_pose = cf_vals.goals["formation"]
else:
target_pose = target_goals[cf_id]
goals[cf_id] = [target_pose.x,
target_pose.y,
target_pose.z,
target_pose.yaw]
# If CF in extra and not landed, go to initial position
elif cf_id not in self._landed_cf_ids:
cf_initial_pose = cf_vals.initial_pose
goals[cf_id] = [cf_initial_pose.position.x,
cf_initial_pose.position.y,
GND_HEIGHT + TAKE_OFF_HEIGHT,
yaw_from_quat(cf_initial_pose.orientation),]
self.traj_services["set_planner_positions"](position_type="goal", positions=str(goals))
def _wait_for_take_off(self):
"""Wait that all CF of formation are in hover state
"""
all_cf_in_air = False #: bool: True if all CFs are done taking off
while not all_cf_in_air:
all_cf_in_air = True
if rospy.is_shutdown():
break
for cf_id, each_cf in self._crazyflies.items():
if each_cf.state != "hover" and cf_id not in self._extra_cf_list:
all_cf_in_air = False
self._rate.sleep()
def _wait_for_planner(self):
"""Wait until planner finds a trajectory
"""
rospy.sleep(0.1) # Make sure first traj messages are received
while not self._traj_found:
if rospy.is_shutdown():
break
for _, each_cf in self._crazyflies.items():
each_cf.update_goal()
self._rate.sleep()
self._traj_found = False
def _update_landing_cf_goal(self, land_swarm=False):
"""Set goal of CF to land to initial position
Args:
land_swarm (bool, optional): True if all swarm is to be landed. Default: False
"""
for cf_id, cf_vals in self._crazyflies.items():
# If land all swarm or (cf in extra and not landed)
if land_swarm or\
(cf_vals.state not in ["landed", "land", "stop"] and\
cf_id in self._extra_cf_list):
cf_initial_pose = cf_vals.initial_pose
cf_vals.goals["goal"].x = cf_initial_pose.position.x
cf_vals.goals["goal"].y = cf_initial_pose.position.y
cf_vals.goals["goal"].z = GND_HEIGHT
# cf_vals["goal_msg"].yaw = yaw_from_quat(cf_initial_pose.orientation)
# States
def _landed_state(self):
"""All CF are on the ground
"""
pass
def _follow_traj_state(self):
"""All cf follow a specified trajectory
"""
# Set CF goal to trajectory goal
for cf_id, each_cf in self._crazyflies.items():
traj_goal = each_cf.goals["trajectory"]
# Make sure first msg was sent
if traj_goal is not None:
#If CF in extra and landed, don't follow goal
if each_cf.state in ["landed", "land", "stop"] and cf_id in self._extra_cf_list:
each_cf.update_goal()
else:
each_cf.update_goal("trajectory")
def _in_formation_state(self):
"""Swarm is in a specific formation. Lands extra CFs
Formation can be moved /w the joystick
"""
# Publish goal velocity
self._formation_goal_vel = self._joy_swarm_vel
self._pub_formation_goal_vel(self._formation_goal_vel)
# Set CF goal to formation goal
for cf_id, each_cf in self._crazyflies.items():
# If CF part of formation, set goal to formation_goal
if cf_id not in self._extra_cf_list:
each_cf.update_goal("formation")
# IF CF is not landed
elif each_cf.state not in ["landed", "land", "stop"]:
self._update_landing_cf_goal()
self._call_all_cf_service("land", cf_list=self._extra_cf_list)
# If CF is already landed
else:
each_cf.update_goal()
def _hover_state(self):
"""CFs hover in place
"""
# Set CF goal to current pose
for _, each_cf in self._crazyflies.items():
each_cf.update_goal()
def _landing_state(self):
"""Land CF to their starting position
"""
rospy.loginfo("Swarm: land")
self._update_landing_cf_goal(True)
self._call_all_cf_service("land")
self._state_machine.set_state("landed")
# Publisher and subscription
def _joy_swarm_vel_handler(self, joy_swarm_vel):
"""Update swarm goal velocity
Args:
swarm_goal_vel (Twist): Swarm goal velocity
"""
self._joy_swarm_vel = joy_swarm_vel
def _pub_formation_goal_vel(self, formation_goal_vel):
"""Publish swarm_goal speed
Args:
goal_spd (Twist): Speed variation of goal
"""
self._formation_goal_vel_pub.publish(formation_goal_vel)
def _pub_cf_goals(self):
"""Publish goal of each CF
"""
for _, each_cf in self._crazyflies.items():
each_cf.publish_goal()
# Services
def _call_all_cf_service(self, service_name, args=None, kwargs=None, cf_list=None):
"""Call a service for all the CF in the swarm.
If a list /w CF name is specified, will only call the srv for CF in the list.
Args:
service_name (str): Name of the service to call
service_msg (srv_msg, optional): Message to send. Defaults to None.
cf_list (list of str, optional): Only call service of CF in list. Defaults to None.
"""
for cf_id, each_cf in self._crazyflies.items():
if cf_list is None or cf_id in cf_list:
each_cf.call_srv(service_name, args, kwargs)
return {}
def _swarm_emergency_srv(self, _):
"""Call emergency service """
rospy.logerr("Swarm: EMERGENCY")
self._call_all_cf_service("emergency")
return {}
def _stop_swarm_srv(self, _):
"""Call stop service """
rospy.loginfo("Swarm: stop")
self._call_all_cf_service("stop")
return {}
def _take_off_swarm_srv(self, _):
"""Take off all cf in swarm
"""
self._update_cf_goal()
self._call_all_cf_service("take_off", cf_list=self._landed_cf_ids)
self._wait_for_take_off()
self._state_machine.set_state("hover")
if self.ctrl_mode == "formation":
self._after_traj_state = "in_formation"
self.update_formation()
return {}
def _land_swarm_srv(self, _):
"""Land all cf in swarm
"""
# Bring swarm to initial pose
self.go_to_goal(land_swarm=True)
# Land
self._after_traj_state = "landing"
return {}
def _set_mode_srv(self, mode_req):
new_mode = mode_req.new_mode.lower()
avalaible_mode = False
if new_mode in ["formation", "automatic"]:
self.ctrl_mode = new_mode
avalaible_mode = True
return {'success': avalaible_mode}
def _go_to_srv(self, srv_req):
goals = ast.literal_eval(srv_req.goals)
target_goals = {}
formation_goal = None
# Make sure swarm is in hover or formation state
if self._state_machine.in_state("in_formation") or self._state_machine.in_state("hover"):
for goal_id, goal_val in goals.items():
if goal_id == "formation":
# print "Formation goal: {}".format(goal_val)
formation_goal = goal_val
elif goal_id in self._cf_list:
# print "{} goal: {}".format(goal_id, goal_val)
new_goal = Position()
new_goal.x = goal_val[0]
new_goal.y = goal_val[1]
new_goal.z = goal_val[2]
new_goal.yaw = goal_val[3]
target_goals[goal_id] = new_goal
else:
rospy.logerr("%s: Invalid goal name" % goal_id)
if self.ctrl_mode == "automatic":
self._after_traj_state = "hover"
self.go_to_goal(target_goals=target_goals)
elif self.ctrl_mode == "formation":
self._after_traj_state = "in_formation"
self.update_formation(formation_goal=formation_goal)
else:
rospy.logerr("Swarm not ready to move")
return {}
def _get_positions_srv(self, srv_req):
cf_list = ast.literal_eval(srv_req.cf_list)
# If not list is passed, send position of all CFs
if not cf_list:
cf_list = self._cf_list
positions = {}
for each_cf in cf_list:
if each_cf in self._cf_list:
pose = self._crazyflies[each_cf].pose.pose
positions[each_cf] = [pose.position.x,
pose.position.y,
pose.position.z,
yaw_from_quat(pose.orientation)]
else:
rospy.logerr("%s: Invalid crazyflie name" % each_cf)
return {"positions":str(positions)}
def _follow_traj_srv(self, _):
"""Follow a trajectory
"""
self._state_machine.set_state("follow_traj")
return {}
def _set_formation_srv(self, srv_req):
new_formation = srv_req.formation
if self._state_machine.in_state("in_formation"):
if new_formation not in self._formation_list:
rospy.logerr("%s: Invalid formation" % new_formation)
else:
self.formation = new_formation
self.update_formation()
else:
self.formation = new_formation
return {}
def _next_swarm_formation_srv(self, _):
"""Change swarm formation to next the next one
"""
if self._state_machine.in_state("in_formation"):
idx = self._formation_list.index(self.formation)
next_idx = idx + 1
if idx == (len(self._formation_list) - 1):
next_idx = 0
self.formation = self._formation_list[next_idx]
self.update_formation()
return {}
def _prev_swarm_formation_srv(self, _):
"""Change swarm formation to the previous one
"""
if self._state_machine.in_state("in_formation"):
idx = self._formation_list.index(self.formation)
prev_idx = idx - 1
if prev_idx < 0:
prev_idx = len(self._formation_list) - 1
self.formation = self._formation_list[prev_idx]
self.update_formation()
return {}
def _inc_swarm_scale_srv(self, _):
"""Increase formation scale
"""
if self._state_machine.in_state("in_formation"):
# Change state to make sure CF don't 'teleport' to new formation
self._state_machine.set_state("hover")
rospy.sleep(0.1)
self.formation_services["inc_scale"]()
self.go_to_goal()
return {}
def _dec_swarm_scale_srv(self, _):
"""Decrease formation scale
"""
if self._state_machine.in_state("in_formation"):
# Change state to make sure CF don't 'teleport' to new formation
self._state_machine.set_state("hover")
rospy.sleep(0.1)
self.formation_services["dec_scale"]()
self.go_to_goal()
return {}
def _traj_found_srv(self, srv_req):
"""Called when trajectory solver is done.
Args:
srv_req.data (bool): True if a non colliding trajectory is found
"""
self._traj_found = True
self._traj_successfull = srv_req.data
if not self._traj_successfull:
rospy.logerr("Swarm: No trajectory found")
return {'success': True, "message": ""}
def _traj_done_srv(self, _):
"""Called when trajectory has all been executed.
Goes to formation?
"""
self._state_machine.set_state(self._after_traj_state)
return {}
[docs]class CrazyfliePy(object):
""" Link between the swarm manager and a crazyflie.
Controls one robot.
"""
# Initialisation
def __init__(self, cf_id):
self.cf_id = cf_id
# Subscribed services
self._services = {"emergency": None,
"take_off": None,
"hover": None,
"land": None,
"stop": None,
"toggle_teleop": None,
"set_param": None}
self.pose = None
self.initial_pose = None
self.goals = {"goal": Position(),
"formation": Position(),
"trajectory": Position()}
self._goal_publisher = None
self.state = ""
self._init_cf()
def _init_cf(self):
"""Initialize each CF
Initialize services, publisher and subscriber
"""
# Subscribe to services
rospy.loginfo("Swarm: waiting services of %s " % self.cf_id)
self._link_service("take_off", Empty)
self._link_service("land", Empty)
self._link_service("hover", Empty)
self._link_service("stop", Empty)
self._link_service("emergency", Empty)
self._link_service("set_param", SetParam)
rospy.loginfo("Swarm: found services of %s " % self.cf_id)
# CF pose
rospy.Subscriber("/%s/pose" % self.cf_id, PoseStamped, self._pose_handler)
# Wait to receive first position
while self.pose is None:
pass
# CF goal
self._goal_publisher = rospy.Publisher('/' + self.cf_id + '/goal', Position, queue_size=1)
self.goals["goal"].x = self.pose.pose.position.x
self.goals["goal"].y = self.pose.pose.position.y
self.goals["goal"].z = self.pose.pose.position.z
rospy.Subscriber("/%s/formation_goal" % self.cf_id, Position, self._goal_handler, 0)
rospy.Subscriber("/%s/trajectory_goal" % self.cf_id, Position, self._goal_handler, 1)
rospy.Subscriber("/%s/state" % self.cf_id, String, self._state_handler)
def _link_service(self, service_name, service_type):
"""Link service
Args:
service_name (str): Name of the serviec
service_type (_): Type of the service
"""
rospy.wait_for_service('/%s/%s' % (self.cf_id, service_name))
self._services[service_name] = rospy.ServiceProxy('/%s/%s' % (self.cf_id, service_name),
service_type)
# Topic Handlers
def _pose_handler(self, pose_stamped):
"""Update current position of a cf
Args:
pose_stamped (PoseStamped): New pose of CF
"""
self.pose = pose_stamped
def _goal_handler(self, goal, goal_type):
"""Update formation or trajectory goal of CF.
Args:
goal (Position): Goal
goal_type (int): Goal type: 0 -> formation_goal, 1 -> trajectory_goal
"""
if goal_type == 0:
self.goals["formation"] = goal
elif goal_type == 1:
self.goals["trajectory"] = goal
else:
rospy.logerr("Invalid goal type (%i) for CF" % goal_type)
def _state_handler(self, state):
self.state = state.data
# Publisher
[docs] def publish_goal(self):
"""Publish current CF goal
"""
self.goals["goal"].header.seq += 1
self.goals["goal"].header.stamp = rospy.Time.now()
self._goal_publisher.publish(self.goals["goal"])
[docs] def call_srv(self, srv_name, args=None, kwargs=None):
"""Call a CF service
Args:
srv_name (str): Name of srv to call
args (list, optional): Service args. Defaults to None.
kwargs (dict, optional): Service kwargs. Defaults to None.
"""
if args is None:
args = []
if kwargs is None:
kwargs = {}
self._services[srv_name](*args, **kwargs)
[docs] def update_initial_pose(self):
"""Update initial position to current pose
"""
self.initial_pose = self.pose.pose
[docs] def update_goal(self, goal_name=""):
"""Update current goal to one of other CF goal (trajectory, formation or current goal)
Notes:
"" or "goal": Set to current goal
"traj" or "trajectory": Set to goal to trajectory goal
"formation: Set goal to formation goal
Args:
goal_name (str, optional): To specify goal to set. Defaults to "".
"""
new_goal = None
if goal_name == "" or goal_name == "goal":
new_goal = self.goals["goal"]
elif goal_name == "traj" or goal_name == "trajectory":
new_goal = self.goals["trajectory"]
elif goal_name == "formation":
new_goal = self.goals["formation"]
self.goals["goal"] = new_goal
def generate_cf_list(n_cf):
"""Generate a list with the names of CFs
Names are 'cf_id' i.e: cf_0, cf_1...
Args:
n_cf (int): Number of CF in the swarm
Returns:
list: List of CFs names
"""
return [('cf_' + str(i)) for i in range(n_cf)]
if __name__ == '__main__':
# Launch node
rospy.init_node('swarm_controller', anonymous=False)
# Get params
CF_LIST = generate_cf_list(rospy.get_param("swarm")["n_cf"])
rospy.set_param("cf_list", CF_LIST)
TAKE_OFF_HEIGHT = rospy.get_param("swarm")["take_off_height"]
GND_HEIGHT = rospy.get_param("swarm")["gnd_height"]
MIN_CF_DIST = rospy.get_param("swarm")["min_dist"]
MIN_GOAL_DIST = rospy.get_param("swarm")["min_goal_dist"]
# Launch all CFs nodes
launch_swarm(CF_LIST)
# Initialize swarm
SWARM = SwarmController()
while not rospy.is_shutdown():
SWARM.control_swarm()
rospy.delete_param("cf_list")
for cf_to_del in CF_LIST:
rospy.delete_param(cf_to_del)