"""
Agent Class
-----------
"""
from math import sqrt
import numpy as np
from numpy import array, dot, hstack, vstack
from numpy.linalg import norm, inv
from scipy.special import binom
[docs]class Agent(object):
"""Represents a single agent
"""
def __init__(self, agent_args, start_pos=None, goal=None):
"""Initialize agent class
Args:
start_pos (list of float, optional): Starting position [x, y, z]. Defaults to None.
goal (list of float, optional): Target position [x, y, z]. Defaults to None.
"""
# Attributes
self.start_position = None
self.goal = None
self.set_starting_position(start_pos) #: 3x1 np.array: Starting position
self.set_goal(goal) #: 3x1 np.array: Goal
self.r_min = agent_args['r_min']
self.collision_check_radius = self.r_min * agent_args['col_radius_ratio']
self.goal_dist_thres = agent_args['goal_dist_thres']
self.goal_speed_thres = agent_args['goal_speed_thres']
self.at_goal = False
self.agent_idx = 0 #: Index of agent in positions
self.n_steps = 0 #: int: Number of steps in horizon
# For testing
self.acc_cst = array([[0.5, 0.5, 0]]).T
#: np.array of (6*k)x(Kmax): Position and speed trajectory at each time step.
# Columns: predicted [p, v], Rows: Each k
self.states = None
self.final_traj = None #: Agent final trajectory
self.prev_input = [0, 0, 0]
self.scaling_matrix = np.diag([1, 1, 2])
self.scaling_matrix_inv = inv(self.scaling_matrix)
#: (dict of int: float): Distance of each agent within a certain radius
self.close_agents = {}
self.collision_step = 0 #: Step of prediction where collision happens
self.all_agents_traj = None
# Setter
[docs] def set_starting_position(self, position):
"""Set starting position
Args:
position (list of float): [x, y, z]
"""
if position is not None:
self.start_position = array(position).reshape(3, 1)
else:
self.start_position = array([0.0, 0.0, 0.0]).reshape(3, 1)
[docs] def set_goal(self, goal):
"""Set agent goal
Args:
goal (list of float): [x, y, z]
"""
if goal is not None:
self.goal = array(goal).reshape(3, 1)
else:
self.goal = array([0.0, 0.0, 0.0]).reshape(3, 1)
[docs] def set_all_traj(self, all_trajectories):
"""Set last predicted trajectories of all agents
Args:
all_trajectories (6*k x n_agents array)
"""
self.all_agents_traj = all_trajectories
[docs] def add_state(self, new_state):
"""Add new state to list of positions
Args:
new_state (array): Trajectory at time step
"""
self.states = hstack((self.states, new_state))
# Initialization
[docs] def initialize_position(self, n_steps, all_agents_traj):
"""Initialize position of the agent.
Sets first horizon as a straight line to goal at a cst speed
Args:
n_steps (int): Number of time steps of horizon
all_agents_traj (3k x n_agents array): Last predicted traj of each agent (ptr)
"""
self.n_steps = n_steps
self.all_agents_traj = all_agents_traj.view()
self.at_goal = False
speed = 0.1
# Compute speeds
dist = norm(self.goal - self.start_position)
dist_z = norm(self.goal[2, 0] - self.start_position[2, 0])
speed_z = dist_z * speed / dist if dist != 0 else 0
d_speed = speed**2 - speed_z**2
speed_xy = np.sqrt(d_speed) if d_speed > 0 else 0
dist_x = norm(self.goal[0, 0] - self.start_position[0, 0])
dist_y = norm(self.goal[1, 0] - self.start_position[1, 0])
speed_x = speed_xy*dist_x/dist if dist != 0 else 0
speed_y = speed_xy*dist_y/dist if dist != 0 else 0
# Check signs
if self.goal[0, 0] - self.start_position[0, 0] < 0:
speed_x = -speed_x
if self.goal[1, 0] - self.start_position[1, 0] < 0:
speed_y = -speed_y
if self.goal[2, 0] - self.start_position[2, 0] < 0:
speed_z = -speed_z
speed = array([[speed_x, speed_y, speed_z]]).reshape(3, 1)
speed_position = vstack((speed, np.zeros((3, 1))))
# Compute positions
start_pos = vstack((self.start_position, speed))
self.states = start_pos
last_pos = start_pos
for _ in range(1, self.n_steps):
new_pos = last_pos + speed_position
self.states = vstack((self.states, new_pos))
last_pos = new_pos
# Compute methods
[docs] def check_goal(self):
"""Check if agent reached it's goal.
Goal is considered reach when the agent is in a radius smaller than ``goal_dist_thres`` at
a speed lower than ``goal_speed_thres``.
Returns:
bool: True if goal reached
"""
current_position = self.states[0:3, -1]
current_speed = self.states[3:6, -1]
goal = self.goal.reshape(3)
dist = norm(goal - current_position)
speed = norm(current_speed)
if dist < self.goal_dist_thres and speed < self.goal_speed_thres:
self.at_goal = True
return self.at_goal
[docs] def check_collisions(self):
"""Check current predicted trajectory for collisions.
1 - For all predicted trajectory, check distance of all the other agents
2 - If distance < Rmin: In collision
3 - If collision: Find all close agents
Returns:
(int): Step of collision (-1 if no collision)
(dict of float): Close agents and their distance at collision step
"""
collision_detected = False
n_agents = self.all_agents_traj.shape[1]
close_agents = {}
collision_step = -1
agents_dist = {}
# Find time step of collision
for each_step in range(self.n_steps):
# Predicted position of agent at time_step
rows = slice(3*each_step, 3*(each_step+1))
predicted_pos = self.all_agents_traj[rows, self.agent_idx]
# At time step, check distance of other agents
for j in range(n_agents):
if j != self.agent_idx:
# Position of the other agent at time step
other_agent_pos = self.all_agents_traj[rows, j]
# Faster than norm
scaled = dot(self.scaling_matrix_inv, predicted_pos - other_agent_pos)
dist = sqrt(scaled[0]**2 + scaled[1]**2 + scaled[2]**2)
# dist = norm(scaled)
agents_dist[j] = dist
if dist < self.r_min and not collision_detected:
# For step 0, check a smaller radius
if each_step == 0 and dist > self.r_min - 0.05:
break
collision_step = each_step
collision_detected = True
# break
if collision_detected:
break
# Find all close agents at collision
if collision_detected:
# At collision, check distance of other agents
for j in range(n_agents):
if j != self.agent_idx:
dist = agents_dist[j]
if dist < self.collision_check_radius:
close_agents[j] = dist # Set agent distance at collision
return collision_step, close_agents
[docs] def interpolate_traj(self, time_step_initial, time_step_interp):
"""Interpolate agent's trajectory using a Bezier curbe.
Args:
time_step_initial (float): Period between samples
time_step_interp (float): Period between interpolation samples
"""
# 1 - Trajectory parameters
n_sample = self.states.shape[1] - 1
n_sample_interp = int(n_sample*time_step_initial/time_step_interp)
end_time = n_sample*time_step_initial
traj_times = np.linspace(0, end_time, n_sample_interp, endpoint=False)
# 2 - Build bezier curve
if n_sample != 0:
self.final_traj = np.zeros((3, n_sample_interp))
for i in range(n_sample + 1):
point = self.states[0:3, i].reshape(3, 1)
self.final_traj +=\
binom(n_sample, i) * (1 - (traj_times/end_time))**(n_sample - i) *\
(traj_times/end_time)**i * point
else:
self.final_traj = self.states[0:3, 0]
self.final_traj = self.final_traj.reshape(3, 1)