Source code for formation_manager_ros

#!/usr/bin/env python

"""Module to compute the position of agents in a formation.

A formation is defined as a certain number of agents moving in a specific shape.

.. note::
    An agent is NOT a crazyflie. A crazyflie is the actual robot. An agent is a position in a
    formation

This package uses two classes:

    - :py:class:`FormationManager` to switch between formations
    - :py:class:`FormationClass` to compute the position of each agent for a formation.
      One for each formation.

To set a new formation, ``set_formation`` service is called. The new positions of agents will be
computed using the desired :py:class:`FormationClass` child class.

Based on the position of each crazyflie, :py:class:`FormationManager` will then link each CF to an
agent. Once a CF is linked to an agent, this CF will follow this agent's position in the formation.

``FormationManager`` class
--------------------------
"""
from math import sin, cos, pi
import ast
import numpy as np
from numpy.linalg import norm
import pandas as pd

import rospy
from geometry_msgs.msg import Pose, Twist
from crazyflie_driver.msg import Position
from std_srvs.srv import Empty
from formation_manager.srv import SetFormation, GetFormationList

from square_formation import SquareFormation
from line_formation import LineFormation
from circle_formation import CircleFormation
from pyramid_formation import PyramidFormation
from v_formation import VFormation

[docs]class FormationManager(object): """To change between formations and associate agents and CFs Args: cf_list (:obj:`list` of :obj:`str`): List of all CFs in swarm min_dist (:obj:`float`): Minimum distance between agents in formation start_goal (:obj:`list` of :obj:`float`): Formation start goal Attributes: abs_ctrl_mode (:obj:`bool`): In abs ctrl mode, moves in world/ In rel ctrl mode, moves relative to yaw scale (:obj:`float`): Formation scale formation (:obj:`str`): Current formation extra_agents (:obj:`list` of :obj:`str`): Id of CFs in extra """ def __init__(self, cf_list, min_dist, start_goal): self.abs_ctrl_mode = False self.scale = 1.0 self.formation = None self.extra_agents = [] self._min_dist = min_dist #: (float) Minimum distance between agents in formation self._cf_list = cf_list self._n_cf = len(cf_list) #: (int) Number of CF in the swarm self._pose_cnt = 0 #: (int) To know when compute pose self._rate = rospy.Rate(100) _initial_formation_goal = Position() #: Position: formation start position _initial_formation_goal.x = start_goal[0] _initial_formation_goal.y = start_goal[1] _initial_formation_goal.z = start_goal[2] _initial_formation_goal.yaw = start_goal[3] #: All possible formations self._formations = {"square": SquareFormation(self._min_dist), "v": VFormation(self._min_dist), "pyramid": PyramidFormation(self._min_dist), "circle": CircleFormation(self._min_dist), "line": LineFormation(self._min_dist),} # Publisher self._formation_pose_pub = rospy.Publisher('/formation_pose', Pose, queue_size=1) self._formation_goal_pub = rospy.Publisher('/formation_goal', Position, queue_size=1) self._formation_pose = Pose() self._formation_goal = Position() self._formation_goal_vel = Twist() self._formation_goal = _initial_formation_goal # Subscribers rospy.Subscriber("/formation_goal_vel", Twist, self._formation_goal_vel_handler) self._crazyflies = {} #: dict of list: Information of each CF # Initialize each CF for cf_id in cf_list: self._crazyflies[cf_id] = {"formation_goal": Position(), # msg "swarm_id": 0, # int, id in the swarm "formation_goal_pub": None, # publisher "initial_position": None} # Add goal publisher self._crazyflies[cf_id]["formation_goal_pub"] =\ rospy.Publisher('/%s/formation_goal' % cf_id, Position, queue_size=1) # Start services rospy.Service('/set_formation', SetFormation, self._set_formation) rospy.Service('/toggle_ctrl_mode', Empty, self._toggle_ctrl_mode) rospy.Service('/formation_inc_scale', Empty, self._formation_inc_scale) rospy.Service('/formation_dec_scale', Empty, self._formation_dec_scale) rospy.Service('/get_formations_list', GetFormationList, self._return_formation_list) # Services and subscriptions def _formation_goal_vel_handler(self, goal_vel): """To change formation goal based on a velocity Depends on ctrl mode Args: goal_vel (Twist): Swarm goal velocity """ self._formation_goal_vel = goal_vel # Moves relative to world if self.abs_ctrl_mode: self._formation_goal.x += self._formation_goal_vel.linear.x self._formation_goal.y += self._formation_goal_vel.linear.y self._formation_goal.z += self._formation_goal_vel.linear.z self._formation_goal.yaw += self._formation_goal_vel.angular.z # Moves relative to orientation. X axis in front, y axis on toward the left, z axis up else: x_vel = self._formation_goal_vel.linear.x y_vel = self._formation_goal_vel.linear.y theta = self._formation_goal.yaw x_dist = x_vel * cos(theta) + y_vel * cos(theta + pi/2.0) y_dist = x_vel * sin(theta) + y_vel * sin(theta + pi/2.0) self._formation_goal.x += x_dist self._formation_goal.y += y_dist self._formation_goal.yaw += self._formation_goal_vel.angular.z # Make sure formation stays above ground new_z = self._formation_goal.z + self._formation_goal_vel.linear.z if new_z > self.formation._min_height: self._formation_goal.z = new_z else: self._formation_goal.z = self._formation_goal.z # Update formation goal of each CF if self.formation is not None: self.formation.update_agents_positions(self._formation_goal, self._crazyflies) def _set_formation(self, srv_call): """Set formation Args: srv_call (SetFormation): Formation to set Returns: bool: success """ new_formation = srv_call.formation cf_initial_positions = ast.literal_eval(srv_call.positions) new_goal = ast.literal_eval(srv_call.goal) valid_formation = True if new_formation in self._formations.keys(): rospy.loginfo("Formation: Setting formation to %s" % new_formation) if new_goal is not None: self._formation_goal.x = new_goal[0] self._formation_goal.y = new_goal[1] self._formation_goal.z = new_goal[2] self._formation_goal.yaw = new_goal[3] self.formation = self._formations[new_formation] self.init_formation(cf_initial_positions) self.link_swarm_and_formation() self.formation.update_agents_positions(self._formation_goal, self._crazyflies) else: rospy.logerr("Formation: Invalid formation: %s" % new_formation) valid_formation = False return {"success": valid_formation, "extra_cf": ','.join(self.extra_agents)} def _toggle_ctrl_mode(self, _): """Toggle control mode Absolute mode: Controls in world reference Relative mode: Controls depending of swarm orientation """ self.abs_ctrl_mode = not self.abs_ctrl_mode if self.abs_ctrl_mode: rospy.loginfo("Formation: Control mode set to absolute") else: rospy.loginfo("Formation: Control mode set to relative") return {} def _formation_inc_scale(self, _): """Service to increase scale of the formation """ self.scale += 0.5 self.formation.set_scale(self.scale) self._check_goal_height() # Find new agents positions around goal self.formation.compute_formation_positions() # Update CFs positions self.formation.update_agents_positions(self._formation_goal, self._crazyflies) return {} def _formation_dec_scale(self, _): """Service to reduce scale of the formation """ self.scale -= 0.5 self.formation.set_scale(self.scale) self._check_goal_height() # Find new agents positions around goal self.formation.compute_formation_positions() # Update CFs positions self.formation.update_agents_positions(self._formation_goal, self._crazyflies) return {} def _return_formation_list(self, _): """To get a list of all possible formations Returns: list of str: Possible formation """ possible_formations = self._formations.keys() return {"formations": ','.join(possible_formations)} # Formation initialization methods
[docs] def init_formation(self, initial_positions): """Initialize formation goal and cf positions. ``initial_positions`` used to associate CF and agents Args: initial_positions (dict of list): Keys: Id of CF, Items: Initial position [x, y, z] """ self.formation.set_n_agents(len(self._cf_list)) self.formation.set_scale(self.scale) self._check_goal_height() self.formation.compute_formation_positions() self.formation.update_agents_positions(self._formation_goal) for cf_id, initial_position in initial_positions.items(): self._crazyflies[cf_id]["initial_position"] = initial_position
def _check_goal_height(self): """Make sure formation goal is above formation minimum height. If it's not the case, set formation goal height to mimimum """ if self.formation._min_height > self._formation_goal.z: self._formation_goal.z = self.formation._min_height def _create_goal_matrix(self): """Create a matrix with all the goals stacked vertically Returns: tuple: (Agent id corresponding to each row, goal matrix) """ agents_goals = self.formation.get_agents_goals() agents_id_list = [] goal_mat = None for agent_id, agent_goal in agents_goals.items(): agents_id_list.append(agent_id) if goal_mat is None: goal_mat = np.array([agent_goal]).reshape(3, 1) else: goal_mat = np.vstack((goal_mat, np.array(agent_goal).reshape(3, 1))) return agents_id_list, goal_mat def _create_initial_pos_matrix(self, n_goals): """Create initial position matrix: cols Initial position of cf, rows: for each goal Returns: tuple: (List of cf ids, initial position matrix) """ initial_position_mat = None cf_id_list = [] # list of str: Cf id corresponding to each col for cf_id, cf_vals in self._crazyflies.items(): cf_id_list.append(cf_id) initial_pos = np.array([cf_vals["initial_position"]]).reshape(3, 1) current_pos = initial_pos for _ in range(1, n_goals): current_pos = np.vstack((current_pos, initial_pos)) if initial_position_mat is None: initial_position_mat = current_pos else: initial_position_mat = np.hstack((initial_position_mat, current_pos)) return cf_id_list, initial_position_mat def _compute_distances(self, cf_id_list, initial_position_mat, agents_id_list, goal_mat): """Compute distance from each goal and CF Args: cf_id_list (list): Ids of all CF in swarm initial_position_mat (arrray): Initial position matrix agents_id_list (list): Ids of all agents in formation goal_mat (array): Goal matrix Returns: pd.DataFrame: Index: Agent id, Cols: CF id, vals: Distance """ n_goals = len(agents_id_list) # Find distances all_distances = np.zeros((n_goals, self._n_cf)) for cf_idx in range(self._n_cf): # col for goal_idx in range(n_goals): # row rows = slice(3*goal_idx, 3*goal_idx + 3) dist = norm(initial_position_mat[rows, cf_idx] - goal_mat[rows, 0]) all_distances[goal_idx, cf_idx] = dist all_distances = pd.DataFrame(all_distances, index=agents_id_list, columns=cf_id_list) return all_distances def _find_association(self, all_distances): """To minimze total distance traveled when linking cf and formation agents """ n_goals = len(all_distances.index) linked_goals = [] #: list of str: Ids of Agents (goals) that have been linked match_positions = [] while len(linked_goals) < n_goals: close_cf = {} #: dict: Keys: Id of close cf, vals: list of (goal, goal_dist) # Find closest CF to each goal for agent_idx in all_distances.index: if agent_idx not in linked_goals: goal_dist = all_distances.loc[agent_idx].sort_values() closest_cf_id = goal_dist.index[0] try: close_cf[closest_cf_id].append((agent_idx, goal_dist[closest_cf_id])) except KeyError: close_cf[closest_cf_id] = [(agent_idx, goal_dist[closest_cf_id])] # Link each close CF to farthest goal for cf_id, close_goals in close_cf.items(): goal_to_link = None max_dist = 0.0 for each_close_goal in close_goals: dist = each_close_goal[1] if goal_to_link is None: goal_to_link = each_close_goal[0] max_dist = dist elif dist > max_dist: goal_to_link = each_close_goal[0] max_dist = dist all_distances = all_distances.drop(cf_id, axis='columns') linked_goals.append(goal_to_link) match_positions.append((cf_id, goal_to_link)) return match_positions def _update_associations(self, cf_id_list, match_positions): """Update CF associations and extra agents list """ linked_cf = [] for each_match in match_positions: cf_id = each_match[0] agent_id = each_match[1] self._crazyflies[cf_id]['swarm_id'] = agent_id linked_cf.append(cf_id) self.extra_agents = [cf_id for cf_id in cf_id_list if cf_id not in linked_cf] # Publishers def _publish_cf_formation_goal(self): """Publish formation goal of each CF """ for _, cf_attrs in self._crazyflies.items(): if rospy.is_shutdown(): break cf_attrs["formation_goal_pub"].publish(cf_attrs["formation_goal"]) def _publish_formation_pose(self): """Publish current position of the formation (center) """ self._formation_pose_pub.publish(self._formation_pose) def _publish_formation_goal(self): """Publish current goal of the formation """ self._formation_goal_pub.publish(self._formation_goal)
[docs] def run_formation(self): """Execute formation """ while not rospy.is_shutdown(): if self.formation is not None: self._publish_cf_formation_goal() self._publish_formation_pose() self._publish_formation_goal() self._rate.sleep()
if __name__ == '__main__': # Launch node rospy.init_node('swarm_formation_manager', anonymous=False) rospy.loginfo('Formation: Initialization...') # Get params while True: # Make sure cf_list has been set by `swarm_controller` try: CF_LIST = rospy.get_param("cf_list") break except KeyError: pass MIN_DIST = rospy.get_param("formation")["formation_min_dist"] START_GOAL = rospy.get_param("formation")["formation_start_pos"] # Initialize swarm FORMATION_MANAGER = FormationManager(CF_LIST, MIN_DIST, START_GOAL) FORMATION_MANAGER.run_formation() rospy.spin()