#!/usr/bin/env python
"""
Package used to find a collision less trajectory between a number of agents.
Algorithm
----------
The algorithm used is the one presented in this paper: [CIT3]_
Exemple
-------
::
A1 = Agent([0.0, 2.0, 0.0])
A1.set_goal([4.0, 2.0, 0.0])
A2 = Agent([4.0, 1.9999, 0.0], goal=[0.0, 1.9999, 0.0])
solver = TrajectorySolver([A1, A2])
solver.set_obstacle(obstacles)
solver.set_wait_for_input(False)
solver.set_slow_rate(1.0)
solver.solve_trajectories()
Run Demo Scripts
----------------
It's possible to test the trajectory algorithm by running demo scripts
(``.../trajectory_planner/demos``.)
To execute a demo, simply uncomment the desired demonstration in ``demos.py`` -> ``demo`` and run
the script (``demo.py``).
It's possible to add new demonstrations by adding them in either: ``demos_formation.py``,
``demos_positions.py`` or ``demos_random.py``.
Benchmark Algo Performances
---------------------------
The algorithm benchmarks can be calculated using ``.../trajectory_planner/demos/performance.py``.
The script will output:
* The success rate (%)
* Total compute time (sec)
* Total travel time (sec)
Those statistics are always calculated by running the same 9 demos.
.. todo:: Script to compare and analyse results of different configurations (issue #86). With a
graphic interface?
Profile algorithm
-----------------
To find hotspots in the algorithm::
$ cd .../trajectory_planner/demos
$ python -m cProfile -o perfo.prof performance.py
$ snakeviz perfo.prof
.. note:: Snakeviz is required to visualize results: ``pip install snakeviz``
Solver Class
------------
"""
from math import sqrt
import numpy as np
from numpy import array, dot, hstack, vstack
from numpy.linalg import inv, matrix_power
from qpsolvers import solve_qp
from trajectory_plotting import TrajPlot
IN_DEBUG = False
[docs]class TrajectorySolver(object):
"""To solve trajectories of all agents
Args:
agent_list (list of Agents): Compute trajectory of all agents in the list
verbose (:obj:`bool`): If True, prints information
Attributes:
horizon_time(:obj:`float`): Horizon to predict trajectory [sec]
step_interval(:obj:`float`): Time steps interval (h) [sec]
steps_in_horizon(:obj:`int`): Number of time steps in horizon (k)
interp_time_step(:obj:`float`): Interpolation time step [sec]
k_t(:obj:`int`): Current time step
k_max(:obj:`int`): Max time step
at_goal(:obj:`bool`): True when all agents reached their goal
in_collision(:obj:`bool`): True when a collision is detected between two agents
agents(:obj:`list` of :py:class:`Agent`): List of all agents
n_agents(:obj:`int`): Number of agents
all_agents_traj(3k x n_agents :obj:`array`): Latest predicted trajectory of each agent over
horizon
agents_distances(:obj:`list` of :obj:`float`)
kapa(:obj:`int`): For more aggressive trajectories
error_weight(:obj:`float`): Error weigth
effort_weight(:obj:`float`): Effort weigth
input_weight(:obj:`float`): Input variation weigth
relaxation_weight_p(:obj:`float`): Quadratic relaxation weigth
relaxation_weight_q(:obj:`float`): Linear relaxation weigth
relaxation_max_bound(:obj:`float`): Maximum relaxation, 0
relaxation_min_bound(:obj:`float`): Minimum relaxation
r_min(:obj:`float`): Mimimum distance between agents [m]
a_max(:obj:`float`): Maximum acceleration of an agent [m/s^2]
a_min(:obj:`float`): Minimum acceleration of an agent [m/s^2]
has_fix_obstacle(:obj:`bool`): True if there are fix obstacles to avoid
obstacle_positions(:obj:`list` of :obj:`tuple`): Coordinates of obstacles to avoid (x, y, z)
"""
def __init__(self, agent_list=None, solver_args=None, verbose=True):
"""Init solver
Args:
agent_list (list of Agents): Compute trajectory of all agents in the list
verbose (bool): If True, prints information
"""
self.verbose = verbose
if self.verbose:
print "Initializing solver..."
# Time related attributes
self.horizon_time = solver_args['horizon_time']
self.step_interval = solver_args['step_interval']
self.steps_in_horizon = int(self.horizon_time/self.step_interval)
self.interp_time_step = solver_args['interp_step']
self.k_t = 0
self.k_max = int(solver_args['max_time']/self.step_interval)
self.at_goal = False
self.in_collision = False
# Initialize agents
self.agents = agent_list
self.n_agents = 0
self.all_agents_traj = None
self._new_agents_traj = None # Temp dict to share between processes
self.agents_distances = []
if self.agents is not None:
self.initialize_agents()
# Error weights
self.kapa = solver_args['goal_agg']
self.error_weight = solver_args['error_weight']
self.effort_weight = solver_args['effort_weight']
self.input_weight = solver_args['input_weight']
self.relaxation_max_bound = solver_args['relax_max']
self.relaxation_min_bound = solver_args['relax_min']
self.relaxation_inc = solver_args['relax_inc']
self.relaxation_weight_p = solver_args['relax_weight_sq']
self.relaxation_weight_q = solver_args['relax_weight_lin']
# Constraints
self.r_min = solver_args['r_min']
self.a_max = solver_args['max_acc']
self.a_min = solver_args['min_acc']
p_min = solver_args['min_pos']
p_max = solver_args['max_pos']
self.p_min = [p_min, p_min, p_min]
self.p_max = [p_max, p_max, p_max]
self.obstacle_positions = []
self.has_fix_obstacle = False
self.a_min_mat = array([[self.a_min, self.a_min, self.a_min]]).T
self.a_max_mat = array([[self.a_max, self.a_max, self.a_max]]).T
self.p_min_mat = array([self.p_min]).T
self.p_max_mat = array([self.p_max]).T
# State matrix
self.a_mat = array([[]])
self.b_mat = array([[]])
self.a0_mat = array([[]])
self.lambda_mat = array([[]])
self.lambda_accel = array([[]])
self.a0_accel = array([[]])
# Objective matrix
self.q_tilde = array([[]])
self.r_tilde = array([[]])
self.delta = array([[]])
self.prev_input_mat = array([[]])
self.s_tilde = array([[]])
# Constraint matrix
self.h_constraint = array([[]])
self.g_constraint = array([[]])
self.lb_constraint = self.a_min_mat
self.ub_constraint = self.a_max_mat
self.initialize_matrices()
# Graph parameters
self.trajectory_plotter = TrajPlot(self.agents, self.step_interval, self.interp_time_step)
if self.verbose:
print "Solver ready"
# Setters methods
[docs] def set_obstacles(self, obstacle_positions):
"""Add obstacle has an agent with a cst position
Args:
obstacle_positions (:obj:`list` of :obj:`tuple`): Position of each obstacle (x, y, z)
"""
if obstacle_positions: # If not empty
self.has_fix_obstacle = True
self.obstacle_positions = obstacle_positions
for each_obstacle in self.obstacle_positions:
# Each point of the wall is considered as an agent /w cst position over horizon
obstacle_trajectories = None
for each_obstacle_pos in each_obstacle:
each_coord_array = array([each_obstacle_pos]).reshape(3, 1)
coord = each_coord_array
for _ in range(1, self.steps_in_horizon):
coord = vstack((coord, each_coord_array))
if obstacle_trajectories is None:
obstacle_trajectories = coord
else:
obstacle_trajectories = hstack((obstacle_trajectories, coord))
self.all_agents_traj = hstack((self.all_agents_traj, obstacle_trajectories))
# Update all agents informations
for agent in self.agents:
agent.set_all_traj(self.all_agents_traj)
# Initialization methods
[docs] def initialize_agents(self):
"""Initialize positions and starting trajectory of all agents
"""
self.n_agents = len(self.agents)
# Set idx of all agents
for i in range(self.n_agents):
self.agents[i].agent_idx = i
#: 3k x n_agents array: Latest predicted trajectory of each agent over horizon
self.all_agents_traj = np.zeros((3*self.steps_in_horizon, self.n_agents))
for each_agent in self.agents:
each_agent.initialize_position(self.steps_in_horizon, self.all_agents_traj)
# Set initial trajectory
slc = slice(0, 3)
traj = each_agent.states[:, -1]
p_traj = traj[slc].reshape(3, 1)
for i in range(1, self. steps_in_horizon):
slc = slice(i*6, i*6+3)
p_k = traj[slc].reshape(3, 1)
p_traj = vstack((p_traj, p_k))
self.all_agents_traj[:, each_agent.agent_idx] = p_traj[:, 0]
[docs] def initialize_matrices(self):
r"""Compute matrices used to determine new states
.. math::
:nowrap:
\begin{align*}
A &= \begin{bmatrix}
I_3 & hI_3\\
0_3 & I_3
\end{bmatrix}\\
\\
B &= \begin{bmatrix}
\frac{h^2}{2}I_3\\
hI_3
\end{bmatrix}\\
\\
\Lambda &= \begin{bmatrix}
B & 0_3 & ... & 0_3\\
AB & B & ... & 0_3\\
... & ... & ... & ...\\
A^{k-1}B & A^{k-2}B & ... & B
\end{bmatrix}\\
\\
A_0 &= \begin{bmatrix}
A^T & (A^2)^T & ... & (A^k)^T
\end{bmatrix}^T\\
\end{align*}
"""
# Build prediction matrix
self._build_prediction_matrices()
# Objective functions
self._build_objective_fct_matrices()
# Build constraints matrix
self._build_constraint_matrices()
def _build_prediction_matrices(self):
"""Build all matrix used to predict trajectories
"""
# A, 6x6
matrix_a1 = hstack((np.eye(3), np.eye(3)*self.step_interval))
matrix_a2 = hstack((np.zeros((3, 3)), np.eye(3)))
self.a_mat = vstack((matrix_a1, matrix_a2)) # 6x6
# B, 6x3
n_dim = 6
m_dim = 3
self.b_mat = vstack(((self.step_interval**2/2)*np.eye(3),
self.step_interval*np.eye(3))) # 6x3
# Lambda, 6k x 3k
self.lambda_mat = np.zeros((n_dim*self.steps_in_horizon, m_dim*self.steps_in_horizon))
rsl = slice(0, n_dim)
self.lambda_mat[rsl, :m_dim] = self.b_mat
for i in range(1, self.steps_in_horizon):
rsl_p, rsl = rsl, slice(i * n_dim, (i + 1) * n_dim)
self.lambda_mat[rsl, :m_dim] = dot(self.a_mat, self.lambda_mat[rsl_p, :m_dim])
self.lambda_mat[rsl, m_dim : (i + 1) * m_dim] = self.lambda_mat[rsl_p, : i * m_dim]
# A0, 6k x 6
n_dim = m_dim = 6
self.a0_mat = np.zeros((6, 6*self.steps_in_horizon))
rsl = slice(0, m_dim)
self.a0_mat[:, rsl] = self.a_mat.T
for i in range(1, self.steps_in_horizon):
rsl_p, rsl = rsl, slice(i * m_dim, (i + 1) * m_dim)
self.a0_mat[:, rsl] = dot(self.a_mat, self.a0_mat[:, rsl_p].T).T
self.a0_mat = self.a0_mat.T
# Lambda accel, 3k x 3k
rsl = slice(3)
self.lambda_accel = self.lambda_mat[rsl]
for i in range(1, self.steps_in_horizon):
rsl = slice(i * 6, 6*i + 3) # Select top three rows of block
rows = self.lambda_mat[rsl]
self.lambda_accel = vstack((self.lambda_accel, rows))
# A0 accel, 3k x 6, select first three cols of each block (of the transpose)
a0_trans = self.a0_mat.T
rsl = slice(3)
self.a0_accel = a0_trans[:, rsl]
for i in range(1, self.steps_in_horizon):
rsl = slice(i * 6, 6*i + 3) # Select every other 3 cols
cols = a0_trans[:, rsl]
self.a0_accel = hstack((self.a0_accel, cols))
self.a0_accel = self.a0_accel.T
def _build_objective_fct_matrices(self):
"""Build all matrix used in objective functions
"""
# 1 - Trajectory error penalty
# Q_tilde
q_mat = self.error_weight*np.eye(3)
self.q_tilde = np.zeros((3*self.steps_in_horizon, 3*self.steps_in_horizon))
for i in range(self.steps_in_horizon - self.kapa, self.steps_in_horizon):
rsl = slice(i*3, (i+1)*3)
self.q_tilde[rsl, rsl] = q_mat
# 2 - Control Effort penalty
self.r_tilde = np.zeros((3*self.steps_in_horizon, 3*self.steps_in_horizon))
r_mat = self.effort_weight*np.eye(3)
for i in range(self.steps_in_horizon):
rsl = slice(i*3, (i+1)*3)
self.r_tilde[rsl, rsl] = r_mat
# 3 - Input Variaton penalty
self.delta = np.zeros((3*self.steps_in_horizon, 3*self.steps_in_horizon))
slc = slice(3)
self.delta[slc, slc] = np.eye(3)
for i in range(self.steps_in_horizon - 1): # For all rows
rows = slice(3*(i+1), (i+2)*3)
cols_neg = slice(i*3, (i+1)*3)
cols_pos = slice((i+1)*3, (i+2)*3)
self.delta[rows, cols_neg] = -1 * np.eye(3)
self.delta[rows, cols_pos] = np.eye(3)
self.prev_input_mat = np.zeros((3*self.steps_in_horizon, 1))
self.s_tilde = np.zeros((3*self.steps_in_horizon, 3*self.steps_in_horizon))
s_mat = self.input_weight*np.eye(3)
for i in range(self.steps_in_horizon):
rsl = slice(i*3, (i+1)*3)
self.s_tilde[rsl, rsl] = s_mat
def _build_constraint_matrices(self):
"""Build all matrix used in constraints
"""
for _ in range(1, self.steps_in_horizon):
self.lb_constraint = vstack((self.lb_constraint, self.a_min_mat))
self.ub_constraint = vstack((self.ub_constraint, self.a_max_mat))
acc_min_mat = self.a_min_mat
acc_max_mat = self.a_max_mat
for _ in range(1, self.steps_in_horizon):
acc_min_mat = vstack((acc_min_mat, self.a_min_mat))
acc_max_mat = vstack((acc_max_mat, self.a_max_mat))
# Add lower bound
self.g_constraint = -np.eye(self.steps_in_horizon*3)
self.h_constraint = -1*acc_min_mat
# Add upper bound
self.g_constraint = vstack((self.g_constraint, np.eye(self.steps_in_horizon*3)))
self.h_constraint = vstack((self.h_constraint, acc_max_mat))
# Positions constraints
self.p_min_mat = array([self.p_min]).T
self.p_max_mat = array([self.p_max]).T
p_min = self.p_min_mat
p_max = self.p_max_mat
for _ in range(1, self.steps_in_horizon):
self.p_min_mat = vstack((self.p_min_mat, p_min))
self.p_max_mat = vstack((self.p_max_mat, p_max))
[docs] def update_agents_info(self):
"""To update agents information
Has to be called when starting position or goal of an agent changed
"""
self.initialize_agents()
self.initialize_matrices()
self.k_t = 0
self.at_goal = False
self.in_collision = False
# Trajectory solvers
[docs] def solve_trajectories(self):
"""Compute a collision free trajectory for each agent.
Core of the algorithm.
Returns:
:obj:`bool`: If the algorithm was succesfull
:obj:`float`: Total trajectory time
"""
if self.verbose:
print "Solving trajectories..."
# For each time step
while not self.at_goal and self.k_t < self.k_max and not self.in_collision:
# Compute new accel input
accel_dict = {}
for agent in self.agents:
accel_input = self._solve_accel(agent)
if accel_input is None:
self.in_collision = True
accel_dict[agent.agent_idx] = accel_input
# Update agents trajectories
if not self.in_collision:
for agent in self.agents:
agent_idx, agent_traj = self._solve_agent(agent, accel_dict[agent.agent_idx])
self.all_agents_traj[:, agent_idx] = agent_traj
self._check_goals()
self._compute_agents_dist()
self.k_t += 1
self.print_final_positions()
self._interpolate_agents_traj()
if not self.at_goal:
print "Solver failed with starting positions:"
print [agt.start_position for agt in self.agents]
print "at time step: %i" % self.k_t
return self.at_goal, (self.k_t*self.step_interval)
def _solve_agent(self, agent, accel_input):
"""To compute new agent trajectory based on accel input.
Args:
agent (:obj:`Agent`): Agent to solve trajectory
accel_input (:obj:`Array`): Agent's accel input
"""
# If new acceleration feasible
current_state = agent.states[0:6, -1].reshape(6, 1)
x_pred = self._predict_trajectory(current_state, accel_input) # Find new state
agent.prev_input = accel_input[0:3, 0]
# Extract predicted positions
p_pred = np.zeros((self. steps_in_horizon*3, 1))
for n_dim in range(0, self. steps_in_horizon):
slc_x = slice(n_dim*6, n_dim*6+3)
slc_p = slice(n_dim*3, n_dim*3+3)
p_pred[slc_p, 0] = x_pred[slc_x, 0]
# Update trajectory of current agent
agent_idx = self.agents.index(agent)
agent_traj = p_pred.reshape(3*self.steps_in_horizon)
agent.add_state(x_pred)
return agent_idx, agent_traj
def _solve_accel(self, agent):
"""Optimize acceleration input for the horizon
Args:
agent (:obj:`Agent`): Current agent
Returns:
array 3*hor_steps x 1: Acceleration over the trajectory
"""
# Check if there is a collision
coll_info = list(agent.check_collisions()) #: True if trajectory in collision
avoid_collision = (coll_info[0] != -1)
coll_info.append(avoid_collision)
if avoid_collision and IN_DEBUG:
print "\nTime %.2f at step %i: Agent %i" % (self.k_t*self.step_interval,\
self.k_t, agent.agent_idx)
# Build optimization problem: 1/2 x.T * p_mat * x + q_mat.T * x s.t. g_mat*x <= h_mat
try:
p_mat, q_mat, g_mat, h_mat = self._build_optimization_matrices(agent, coll_info)
except TypeError:
if self.verbose:
print ''
print "ERROR: In collision"
return None
# Solve optimization problem
accel_input, relax_vals = self._solve_optimization(agent, avoid_collision,
p_mat, q_mat, g_mat, h_mat[:, 0])
if IN_DEBUG and avoid_collision:
print "\t\t Relaxation: {}".format(relax_vals)
return accel_input
def _solve_optimization(self, agent, avoid_collision, p_mat, q_mat, g_mat, h_mat):
"""To find acceleration input by solvin the qp problem.
If no solution is found whithin constraints, will try to lower relaxation.
1/2 x.T * p * x + q.T * x s.t. g*x <= h
Args:
agent (:obj:`Agent`): Current agent
avoid_collisiont (:obj:`bool`): If solving for collision or not
p_mat (:obj:`np.Array`)
q_mat (:obj:`np.Array`)
g_mat (:obj:`np.Array`)
h_mat (:obj:`np.Array`)
Returns:
:obj:`np.Array`: Acceleration input
"""
cur_relaxation = self.relaxation_max_bound # To locally increase relaxation bound
find_solution = True
accel_input = None
relax_vals = None
while find_solution:
try:
accel_input = solve_qp(p_mat, q_mat, G=g_mat, h=h_mat, solver='quadprog')
relax_vals = accel_input[3*self.steps_in_horizon:]
accel_input = accel_input[0:3*self.steps_in_horizon]
accel_input = accel_input.reshape(3*self.steps_in_horizon, 1)
find_solution = False
# No solution whithin constraints
except ValueError:
cur_relaxation -= self.relaxation_inc
# Relax until 2*min is reached
if cur_relaxation > self.relaxation_min_bound*2 and avoid_collision:
print "No solution, relaxing constraints: %.2f" % cur_relaxation
# Update constraint in h matrix
n_collision = len(agent.close_agents.keys())
for i in range(n_collision):
h_mat[(-1 - i*3), 0] = -cur_relaxation
# Max relaxation reached
else:
self.in_collision = True
find_solution = False
if self.verbose:
print "ERROR: No solution in constraints, Check max space"
return accel_input, relax_vals
def _build_optimization_matrices(self, agent, coll_info):
"""Build optimization matrices depending on collision state
1/2 x.T * p_mat * x + q_mat.T * x s.t. g_mat*x <= h_mat
Args:
agent (Agent)
coll_info (list): collision_step, close_agents, avoid_collision
Returns:
list of matrix: see description
"""
avoid_collision = coll_info[2]
try:
if not avoid_collision:
p_mat, q_mat, g_mat, h_mat = self._solve_accel_no_coll(agent)
else:
p_mat, q_mat, g_mat, h_mat = self._solve_accel_coll(agent, coll_info)
# If two agents are in collision
except TypeError:
return None
return p_mat, q_mat, g_mat, h_mat
def _solve_accel_no_coll(self, agent, agent_goal=None):
"""Compute acceleration over horizon when no collision are detected
Args:
agent(:obj:`Agent`): Current agent
agent_goal (:obj:`Array`, optional): Corrected agent goal. Defaults to None.
Returns:
p, q, g, h: matrices of QP problem
"""
if agent_goal is None:
agent_goal = agent.goal
prev_input = agent.prev_input
initial_state = agent.states[0:6, -1].reshape(6, 1)
# 1 - Trajectory error penalty
# Goal
agent_goal = agent_goal.reshape(3)
goal_matrix = np.zeros((self.steps_in_horizon*3, 1))
for i in range(0, self.steps_in_horizon):
slc = slice(i*3, i*3+3)
goal_matrix[slc, 0] = agent_goal
# P_e = Lambda.T * Q_tilde * Lambda
q_lam_prod = dot(self.q_tilde, self.lambda_accel)
p_error = 2 * dot(self.lambda_accel.T, q_lam_prod)
q_error = -2*(dot(goal_matrix.T, q_lam_prod)-
dot(dot(self.a0_accel, initial_state).T,
q_lam_prod))
# 2 - Control Effort penalty
p_effort = self.r_tilde
q_effort = 0
# 3 - Input Variaton penalty
prev_input_mat = self.prev_input_mat
prev_input_mat[0:3, 0] = prev_input
p_input = dot(self.delta.T, dot(self.s_tilde, self.delta))
q_input = -2*dot(prev_input_mat.T, dot(self.s_tilde, self.delta))
# Position constraints
g_matrix = self.g_constraint
h_matrix = self.h_constraint
lb_position = self.p_min_mat - dot(self.a0_accel, initial_state)
g_matrix = vstack((g_matrix, -1*self.lambda_accel))
h_matrix = vstack((h_matrix, -1*lb_position))
ub_position = self.p_max_mat - dot(self.a0_accel, initial_state)
g_matrix = vstack((g_matrix, self.lambda_accel))
h_matrix = vstack((h_matrix, ub_position))
# Solve
p_tot = p_error + p_effort + p_input
q_tot = (q_error + q_effort + q_input).T
q_tot = q_tot.reshape(q_tot.shape[0]) # Reshape it to work /w library
return p_tot, q_tot, g_matrix, h_matrix
def _solve_accel_coll(self, agent, coll_info):
"""Compute acceleration over horizon when a collision is detected
Args:
agent (Agent)
coll_info (list): collision_step, close_agents, avoid_collision
Returns:
array, 3k x 1: Acceleration vector
"""
# Check number of close agents
coll_step = coll_info[0]
close_agents = coll_info[1]
collisions_list = close_agents.keys() # list of int: Idx of all other agents colliding
n_collisions = len(collisions_list)
initial_state = agent.states[0:6, -1].reshape(6, 1)
if IN_DEBUG:
print "\t\t Close agents: {}".format(collisions_list)
print "\t\t At step of horizon: %i" % coll_step
# Collision at step 0 mean two agents collided
if coll_step == 0:
if self.verbose:
print "Agent %i in collision" % agent.agent_idx
print "Min Distance: %.2f" % min([dist for _, dist in close_agents.items()])
return None
# Agent start_position at collision time step
agent_goal = np.copy(agent.goal)
start_rows = slice(0, 3)
agent_start_pos = self.all_agents_traj[start_rows, agent.agent_idx]
agent_dy = agent_goal[1, 0] - agent_start_pos[1]
agent_dx = agent_goal[0, 0] - agent_start_pos[0]
goal_line = agent_dy/agent_dx if agent_dx != 0 else 0
# Check if an agent is on a direct line to goal, used to avoid deadlocks
for agent_idx in close_agents:
# Other agent position
other_start_pos = self.all_agents_traj[start_rows, agent_idx]
other_dx = other_start_pos[0] - agent_start_pos[0]
other_dy = other_start_pos[1] - agent_start_pos[1]
other_line = other_dy/other_dx if other_dx != 0 else 0
if abs(goal_line - other_line) < 0.01:
# If agents are not vertically aligned
if agent_dx != 0 and other_dx != 0:
agent_goal[1, 0] += 0.5 if agent_dx > 0 else -0.5
else:
agent_goal[0, 0] += 0.5 if agent_dy > 0 else -0.5
# Get no collision problem
p_no_coll, q_no_coll, g_no_coll, h_no_coll =\
self._solve_accel_no_coll(agent, agent_goal=agent_goal)
# Augment matrices
g_coll = np.c_[g_no_coll, np.zeros((g_no_coll.shape[0], n_collisions))]
h_coll = h_no_coll
p_aug = np.c_[p_no_coll, np.zeros((p_no_coll.shape[0], n_collisions))]
p_aug = np.r_[p_aug, np.zeros((n_collisions, p_aug.shape[1]))]
q_aug = np.concatenate([q_no_coll, np.zeros((n_collisions))])
# Agent position at collision time step
collision_rows = slice(coll_step*3, (coll_step+1)*3)
agent_position_coll = self.all_agents_traj[collision_rows, agent.agent_idx]
for agent_j_idx, dist in close_agents.items():
collision_idx = collisions_list.index(agent_j_idx)
# Other agent position at collision time step
other_position_coll = self.all_agents_traj[collision_rows, agent_j_idx]
# Build matrices
#: 3x1 array, v_ij = scaling_matrix**-2 @ (p_i - p_j)
v_matrix = dot(matrix_power(agent.scaling_matrix, -2),
agent_position_coll - other_position_coll)
#: 1x1 array, rho_ij = r_min*ksi_ik + ksi_ij**2 + v_ij' * p_i
rho_constraint = self.r_min*dist - dist**2 + dot(v_matrix.T, agent_position_coll)
#: 3k x 1 array
slc = slice((coll_step)*3, (coll_step + 1)*3)
mu_matrix = np.zeros((3*self.steps_in_horizon, 1))
mu_matrix[slc] = v_matrix.reshape(3, 1)
# Contraintes
accel_constraint = dot(mu_matrix.T, self.lambda_accel)[0]
h_relaxation_const = rho_constraint - dot(mu_matrix.T,
dot(self.a0_accel, initial_state))[0, 0]
other_agents_mat = np.zeros((n_collisions))
other_agents_mat[collision_idx] = 1*dist
g_relaxation_const = hstack((-1*accel_constraint, other_agents_mat))
# Add constraints
g_coll = vstack((g_coll, g_relaxation_const))
h_coll = vstack((h_coll, -h_relaxation_const))
# Add limit to e_ij
e_bound = np.zeros((1, (3*self.steps_in_horizon) + n_collisions))
e_bound[0, 3*self.steps_in_horizon + collision_idx] = 1
g_coll = np.r_[g_coll, e_bound]
g_coll = np.r_[g_coll, -1*e_bound]
h_coll = np.r_[h_coll, array([[self.relaxation_max_bound]])]
h_coll = np.r_[h_coll, array([[-self.relaxation_min_bound]])]
# Add relaxation penalty
total_size = 3*self.steps_in_horizon + n_collisions
p_relaxation = np.zeros((total_size, total_size))
p_relaxation[3*self.steps_in_horizon:, 3*self.steps_in_horizon:] = np.eye(n_collisions)
p_relaxation = 2*p_relaxation*self.relaxation_weight_p
q_relaxation = np.zeros((total_size))
q_relaxation[3*self.steps_in_horizon:] = np.ones(n_collisions)
q_relaxation = -1*q_relaxation * self.relaxation_weight_q
p_coll = p_aug + p_relaxation
q_coll = q_aug + q_relaxation
return p_coll, q_coll, g_coll, h_coll
def _predict_trajectory(self, current_state, accel):
"""Predict an agent trajectory based on it's position and acceleration
Args:
x (np.array, 6x1): Current state
u (np.array, 6*k x 1): Acceleration for horizon
Returns:
x_pred (np.array, k*6x1): Predicted states over the horizon
"""
# Computing predicted trajectory
x_pred = dot(self.a0_mat, current_state) + dot(self.lambda_mat, accel)
return x_pred
def _check_goals(self):
"""Verify if all agents are in a small radius around their goal
"""
all_goal_reached = True
for each_agent in self.agents:
if not each_agent.check_goal():
all_goal_reached = False
self.at_goal = all_goal_reached
def _compute_agents_dist(self):
"""Print distance of each agent at new position
"""
scaling_matrix = np.diag([1, 1, 2])
scaling_matrix_inv = inv(scaling_matrix)
for i in range(self.all_agents_traj.shape[1]):
pos_agent = self.all_agents_traj[0:3, i]
for j in range(self.n_agents):
if j != i:
pos_col = self.all_agents_traj[0:3, j]
scaled = dot(scaling_matrix_inv, pos_agent - pos_col)
# dist = norm(scaled)
dist = sqrt(scaled[0]**2 + scaled[1]**2 + scaled[2]**2)
self.agents_distances.append(dist)
def _interpolate_agents_traj(self):
"""Interpolate trajectory of all agents using bezier curves
"""
for each_agent in self.agents:
each_agent.interpolate_traj(self.step_interval, self.interp_time_step)
# UI and printing methods
[docs] def print_final_positions(self):
"""Print final position of all agents
"""
if self.verbose:
if self.at_goal:
print "Trajectory succesfull"
if self.agents_distances:
print "Minimal distance between agents: %.2f" % min(self.agents_distances)
for each_agent in self.agents:
print "Final pos, agent", self.agents.index(each_agent), ": {}".format(
each_agent.states[0:3, -1])
print "Time to reach goal: %.2f" % (self.k_t*self.step_interval)
elif self.in_collision:
print "Trajectory failed: Collision"
else:
print "Trajectory failed: Max time reached"
[docs] def plot_trajectories(self):
"""Plot all computed trajectories
"""
if self.has_fix_obstacle:
self.trajectory_plotter.plot_obstacle(self.obstacle_positions)
self.trajectory_plotter.update_objects(self.agents)
self.trajectory_plotter.run()