From 8aca891d8f12bd7822ee498a11a7009a43220abc Mon Sep 17 00:00:00 2001 From: anishsuvarna11 <71928805+anishsuvarna11@users.noreply.github.com> Date: Sat, 24 Sep 2022 14:54:45 -0400 Subject: [PATCH] a*, d*, 2d map to grid, slam --- README.md | 2 +- a_star.py | 271 +++++++++++++++++++++++++++++++++ d_star_lite.py | 387 +++++++++++++++++++++++++++++++++++++++++++++++ ekf_slam.py | 263 ++++++++++++++++++++++++++++++++ lidar01.csv | 154 +++++++++++++++++++ lidar_to_grid.py | 225 +++++++++++++++++++++++++++ 6 files changed, 1301 insertions(+), 1 deletion(-) create mode 100644 a_star.py create mode 100644 d_star_lite.py create mode 100644 ekf_slam.py create mode 100644 lidar01.csv create mode 100644 lidar_to_grid.py diff --git a/README.md b/README.md index ba182e8..219fa42 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # ComputerVision -The computer vision software for the Olympian robot. Ram and Ryan +The computer vision software for the Olympian robot. Ram, Ryan, and Anish diff --git a/a_star.py b/a_star.py new file mode 100644 index 0000000..5c43c03 --- /dev/null +++ b/a_star.py @@ -0,0 +1,271 @@ +import math + +import matplotlib.pyplot as plt + +show_animation = True + + +class AStarPlanner: + + def __init__(self, ox, oy, resolution, rr): + """ + Initialize grid map for a star planning + + ox: x position list of Obstacles [m] + oy: y position list of Obstacles [m] + resolution: grid resolution [m] + rr: robot radius[m] + """ + + self.resolution = resolution + self.rr = rr + self.min_x, self.min_y = 0, 0 + self.max_x, self.max_y = 0, 0 + self.obstacle_map = None + self.x_width, self.y_width = 0, 0 + self.motion = self.get_motion_model() + self.calc_obstacle_map(ox, oy) + + class Node: + def __init__(self, x, y, cost, parent_index): + self.x = x # index of grid + self.y = y # index of grid + self.cost = cost + self.parent_index = parent_index + + def __str__(self): + return str(self.x) + "," + str(self.y) + "," + str( + self.cost) + "," + str(self.parent_index) + + def planning(self, sx, sy, gx, gy): + """ + A star path search + + input: + s_x: start x position [m] + s_y: start y position [m] + gx: goal x position [m] + gy: goal y position [m] + + output: + rx: x position list of the final path + ry: y position list of the final path + """ + + start_node = self.Node(self.calc_xy_index(sx, self.min_x), + self.calc_xy_index(sy, self.min_y), 0.0, -1) + goal_node = self.Node(self.calc_xy_index(gx, self.min_x), + self.calc_xy_index(gy, self.min_y), 0.0, -1) + + open_set, closed_set = dict(), dict() + open_set[self.calc_grid_index(start_node)] = start_node + + while 1: + if len(open_set) == 0: + print("Open set is empty..") + break + + c_id = min( + open_set, + key=lambda o: open_set[o].cost + self.calc_heuristic(goal_node, + open_set[ + o])) + current = open_set[c_id] + + # show graph + if show_animation: # pragma: no cover + plt.plot(self.calc_grid_position(current.x, self.min_x), + self.calc_grid_position(current.y, self.min_y), "xc") + # for stopping simulation with the esc key. + plt.gcf().canvas.mpl_connect('key_release_event', + lambda event: [exit( + 0) if event.key == 'escape' else None]) + if len(closed_set.keys()) % 10 == 0: + plt.pause(0.001) + + if current.x == goal_node.x and current.y == goal_node.y: + print("Find goal") + goal_node.parent_index = current.parent_index + goal_node.cost = current.cost + break + + # Remove the item from the open set + del open_set[c_id] + + # Add it to the closed set + closed_set[c_id] = current + + # expand_grid search grid based on motion model + for i, _ in enumerate(self.motion): + node = self.Node(current.x + self.motion[i][0], + current.y + self.motion[i][1], + current.cost + self.motion[i][2], c_id) + n_id = self.calc_grid_index(node) + + # If the node is not safe, do nothing + if not self.verify_node(node): + continue + + if n_id in closed_set: + continue + + if n_id not in open_set: + open_set[n_id] = node # discovered a new node + else: + if open_set[n_id].cost > node.cost: + # This path is the best until now. record it + open_set[n_id] = node + + rx, ry = self.calc_final_path(goal_node, closed_set) + + return rx, ry + + def calc_final_path(self, goal_node, closed_set): + # generate final course + rx, ry = [self.calc_grid_position(goal_node.x, self.min_x)], [ + self.calc_grid_position(goal_node.y, self.min_y)] + parent_index = goal_node.parent_index + while parent_index != -1: + n = closed_set[parent_index] + rx.append(self.calc_grid_position(n.x, self.min_x)) + ry.append(self.calc_grid_position(n.y, self.min_y)) + parent_index = n.parent_index + + return rx, ry + + @staticmethod + def calc_heuristic(n1, n2): + w = 1.0 # weight of heuristic + d = w * math.hypot(n1.x - n2.x, n1.y - n2.y) + return d + + def calc_grid_position(self, index, min_position): + """ + calc grid position + + :param index: + :param min_position: + :return: + """ + pos = index * self.resolution + min_position + return pos + + def calc_xy_index(self, position, min_pos): + return round((position - min_pos) / self.resolution) + + def calc_grid_index(self, node): + return (node.y - self.min_y) * self.x_width + (node.x - self.min_x) + + def verify_node(self, node): + px = self.calc_grid_position(node.x, self.min_x) + py = self.calc_grid_position(node.y, self.min_y) + + if px < self.min_x: + return False + elif py < self.min_y: + return False + elif px >= self.max_x: + return False + elif py >= self.max_y: + return False + + # collision check + if self.obstacle_map[node.x][node.y]: + return False + + return True + + def calc_obstacle_map(self, ox, oy): + + self.min_x = round(min(ox)) + self.min_y = round(min(oy)) + self.max_x = round(max(ox)) + self.max_y = round(max(oy)) + print("min_x:", self.min_x) + print("min_y:", self.min_y) + print("max_x:", self.max_x) + print("max_y:", self.max_y) + + self.x_width = round((self.max_x - self.min_x) / self.resolution) + self.y_width = round((self.max_y - self.min_y) / self.resolution) + print("x_width:", self.x_width) + print("y_width:", self.y_width) + + # obstacle map generation + self.obstacle_map = [[False for _ in range(self.y_width)] + for _ in range(self.x_width)] + for ix in range(self.x_width): + x = self.calc_grid_position(ix, self.min_x) + for iy in range(self.y_width): + y = self.calc_grid_position(iy, self.min_y) + for iox, ioy in zip(ox, oy): + d = math.hypot(iox - x, ioy - y) + if d <= self.rr: + self.obstacle_map[ix][iy] = True + break + + @staticmethod + def get_motion_model(): + # dx, dy, cost + motion = [[1, 0, 1], + [0, 1, 1], + [-1, 0, 1], + [0, -1, 1], + [-1, -1, math.sqrt(2)], + [-1, 1, math.sqrt(2)], + [1, -1, math.sqrt(2)], + [1, 1, math.sqrt(2)]] + + return motion + + +def main(): + print(__file__ + " start!!") + + # start and goal position + sx = 10.0 # [m] + sy = 10.0 # [m] + gx = 50.0 # [m] + gy = 50.0 # [m] + grid_size = 2.0 # [m] + robot_radius = 1.0 # [m] + + # set obstacle positions + ox, oy = [], [] + for i in range(-10, 60): + ox.append(i) + oy.append(-10.0) + for i in range(-10, 60): + ox.append(60.0) + oy.append(i) + for i in range(-10, 61): + ox.append(i) + oy.append(60.0) + for i in range(-10, 61): + ox.append(-10.0) + oy.append(i) + for i in range(-10, 40): + ox.append(20.0) + oy.append(i) + for i in range(0, 40): + ox.append(40.0) + oy.append(60.0 - i) + + if show_animation: # pragma: no cover + plt.plot(ox, oy, ".k") + plt.plot(sx, sy, "og") + plt.plot(gx, gy, "xb") + plt.grid(True) + plt.axis("equal") + + a_star = AStarPlanner(ox, oy, grid_size, robot_radius) + rx, ry = a_star.planning(sx, sy, gx, gy) + + if show_animation: # pragma: no cover + plt.plot(rx, ry, "-r") + plt.pause(0.001) + plt.show() + + +if __name__ == '__main__': + main() diff --git a/d_star_lite.py b/d_star_lite.py new file mode 100644 index 0000000..2edbdd1 --- /dev/null +++ b/d_star_lite.py @@ -0,0 +1,387 @@ +import math +import matplotlib.pyplot as plt +import random +from typing import Tuple +show_animation = True +pause_time = 0.001 +p_create_random_obstacle = 0 + + +class Node: + def __init__(self, x: int = 0, y: int = 0, cost: float = 0.0): + self.x = x + self.y = y + self.cost = cost + + +def add_coordinates(node1: Node, node2: Node): + new_node = Node() + new_node.x = node1.x + node2.x + new_node.y = node1.y + node2.y + new_node.cost = node1.cost + node2.cost + return new_node + + +def compare_coordinates(node1: Node, node2: Node): + return node1.x == node2.x and node1.y == node2.y + + +class DStarLite: + + # Please adjust the heuristic function (h) if you change the list of + # possible motions + motions = [ + Node(1, 0, 1), + Node(0, 1, 1), + Node(-1, 0, 1), + Node(0, -1, 1), + Node(1, 1, math.sqrt(2)), + Node(1, -1, math.sqrt(2)), + Node(-1, 1, math.sqrt(2)), + Node(-1, -1, math.sqrt(2)) + ] + + def __init__(self, ox: list, oy: list): + # Ensure that within the algorithm implementation all node coordinates + # are indices in the grid and extend + # from 0 to abs(_max - _min) + self.x_min_world = int(min(ox)) + self.y_min_world = int(min(oy)) + self.x_max = int(abs(max(ox) - self.x_min_world)) + self.y_max = int(abs(max(oy) - self.y_min_world)) + self.obstacles = [Node(x - self.x_min_world, y - self.y_min_world) + for x, y in zip(ox, oy)] + self.start = Node(0, 0) + self.goal = Node(0, 0) + self.U = list() # type: ignore + self.km = 0.0 + self.kold = 0.0 + self.rhs = list() # type: ignore + self.g = list() # type: ignore + self.detected_obstacles = list() # type: ignore + if show_animation: + self.detected_obstacles_for_plotting_x = list() # type: ignore + self.detected_obstacles_for_plotting_y = list() # type: ignore + + def create_grid(self, val: float): + grid = list() + for _ in range(0, self.x_max): + grid_row = list() + for _ in range(0, self.y_max): + grid_row.append(val) + grid.append(grid_row) + return grid + + def is_obstacle(self, node: Node): + return any([compare_coordinates(node, obstacle) + for obstacle in self.obstacles]) or \ + any([compare_coordinates(node, obstacle) + for obstacle in self.detected_obstacles]) + + def c(self, node1: Node, node2: Node): + if self.is_obstacle(node2): + # Attempting to move from or to an obstacle + return math.inf + new_node = Node(node1.x-node2.x, node1.y-node2.y) + detected_motion = list(filter(lambda motion: + compare_coordinates(motion, new_node), + self.motions)) + return detected_motion[0].cost + + def h(self, s: Node): + # Cannot use the 2nd euclidean norm as this might sometimes generate + # heuristics that overestimate the cost, making them inadmissible, + # due to rounding errors etc (when combined with calculate_key) + # To be admissible heuristic should + # never overestimate the cost of a move + # hence not using the line below + # return math.hypot(self.start.x - s.x, self.start.y - s.y) + + # Below is the same as 1; modify if you modify the cost of each move in + # motion + # return max(abs(self.start.x - s.x), abs(self.start.y - s.y)) + return 1 + + def calculate_key(self, s: Node): + return (min(self.g[s.x][s.y], self.rhs[s.x][s.y]) + self.h(s) + + self.km, min(self.g[s.x][s.y], self.rhs[s.x][s.y])) + + def is_valid(self, node: Node): + if 0 <= node.x < self.x_max and 0 <= node.y < self.y_max: + return True + return False + + def get_neighbours(self, u: Node): + return [add_coordinates(u, motion) for motion in self.motions + if self.is_valid(add_coordinates(u, motion))] + + def pred(self, u: Node): + # Grid, so each vertex is connected to the ones around it + return self.get_neighbours(u) + + def succ(self, u: Node): + # Grid, so each vertex is connected to the ones around it + return self.get_neighbours(u) + + def initialize(self, start: Node, goal: Node): + self.start.x = start.x - self.x_min_world + self.start.y = start.y - self.y_min_world + self.goal.x = goal.x - self.x_min_world + self.goal.y = goal.y - self.y_min_world + self.U = list() # Would normally be a priority queue + self.km = 0.0 + self.rhs = self.create_grid(math.inf) + self.g = self.create_grid(math.inf) + self.rhs[self.goal.x][self.goal.y] = 0 + self.U.append((self.goal, self.calculate_key(self.goal))) + self.detected_obstacles = list() + + def update_vertex(self, u: Node): + if not compare_coordinates(u, self.goal): + self.rhs[u.x][u.y] = min([self.c(u, sprime) + + self.g[sprime.x][sprime.y] + for sprime in self.succ(u)]) + if any([compare_coordinates(u, node) for node, key in self.U]): + self.U = [(node, key) for node, key in self.U + if not compare_coordinates(node, u)] + self.U.sort(key=lambda x: x[1]) + if self.g[u.x][u.y] != self.rhs[u.x][u.y]: + self.U.append((u, self.calculate_key(u))) + self.U.sort(key=lambda x: x[1]) + + def compare_keys(self, key_pair1: Tuple[float, float], + key_pair2: Tuple[float, float]): + return key_pair1[0] < key_pair2[0] or \ + (key_pair1[0] == key_pair2[0] and key_pair1[1] < key_pair2[1]) + + def compute_shortest_path(self): + self.U.sort(key=lambda x: x[1]) + while (len(self.U) > 0 and + self.compare_keys(self.U[0][1], + self.calculate_key(self.start))) or \ + self.rhs[self.start.x][self.start.y] != \ + self.g[self.start.x][self.start.y]: + self.kold = self.U[0][1] + u = self.U[0][0] + self.U.pop(0) + if self.compare_keys(self.kold, self.calculate_key(u)): + self.U.append((u, self.calculate_key(u))) + self.U.sort(key=lambda x: x[1]) + elif self.g[u.x][u.y] > self.rhs[u.x][u.y]: + self.g[u.x][u.y] = self.rhs[u.x][u.y] + for s in self.pred(u): + self.update_vertex(s) + else: + self.g[u.x][u.y] = math.inf + for s in self.pred(u) + [u]: + self.update_vertex(s) + self.U.sort(key=lambda x: x[1]) + + def detect_changes(self): + changed_vertices = list() + if len(self.spoofed_obstacles) > 0: + for spoofed_obstacle in self.spoofed_obstacles[0]: + if compare_coordinates(spoofed_obstacle, self.start) or \ + compare_coordinates(spoofed_obstacle, self.goal): + continue + changed_vertices.append(spoofed_obstacle) + self.detected_obstacles.append(spoofed_obstacle) + if show_animation: + self.detected_obstacles_for_plotting_x.append( + spoofed_obstacle.x + self.x_min_world) + self.detected_obstacles_for_plotting_y.append( + spoofed_obstacle.y + self.y_min_world) + plt.plot(self.detected_obstacles_for_plotting_x, + self.detected_obstacles_for_plotting_y, ".k") + plt.pause(pause_time) + self.spoofed_obstacles.pop(0) + + # Allows random generation of obstacles + random.seed() + if random.random() > 1 - p_create_random_obstacle: + x = random.randint(0, self.x_max) + y = random.randint(0, self.y_max) + new_obs = Node(x, y) + if compare_coordinates(new_obs, self.start) or \ + compare_coordinates(new_obs, self.goal): + return changed_vertices + changed_vertices.append(Node(x, y)) + self.detected_obstacles.append(Node(x, y)) + if show_animation: + self.detected_obstacles_for_plotting_x.append(x + + self.x_min_world) + self.detected_obstacles_for_plotting_y.append(y + + self.y_min_world) + plt.plot(self.detected_obstacles_for_plotting_x, + self.detected_obstacles_for_plotting_y, ".k") + plt.pause(pause_time) + return changed_vertices + + def compute_current_path(self): + path = list() + current_point = Node(self.start.x, self.start.y) + while not compare_coordinates(current_point, self.goal): + path.append(current_point) + current_point = min(self.succ(current_point), + key=lambda sprime: + self.c(current_point, sprime) + + self.g[sprime.x][sprime.y]) + path.append(self.goal) + return path + + def compare_paths(self, path1: list, path2: list): + if len(path1) != len(path2): + return False + for node1, node2 in zip(path1, path2): + if not compare_coordinates(node1, node2): + return False + return True + + def display_path(self, path: list, colour: str, alpha: float = 1.0): + px = [(node.x + self.x_min_world) for node in path] + py = [(node.y + self.y_min_world) for node in path] + drawing = plt.plot(px, py, colour, alpha=alpha) + plt.pause(pause_time) + return drawing + + def main(self, start: Node, goal: Node, + spoofed_ox: list, spoofed_oy: list): + self.spoofed_obstacles = [[Node(x - self.x_min_world, + y - self.y_min_world) + for x, y in zip(rowx, rowy)] + for rowx, rowy in zip(spoofed_ox, spoofed_oy) + ] + pathx = [] + pathy = [] + self.initialize(start, goal) + last = self.start + self.compute_shortest_path() + pathx.append(self.start.x + self.x_min_world) + pathy.append(self.start.y + self.y_min_world) + + if show_animation: + current_path = self.compute_current_path() + previous_path = current_path.copy() + previous_path_image = self.display_path(previous_path, ".c", + alpha=0.3) + current_path_image = self.display_path(current_path, ".c") + + while not compare_coordinates(self.goal, self.start): + if self.g[self.start.x][self.start.y] == math.inf: + print("No path possible") + return False, pathx, pathy + self.start = min(self.succ(self.start), + key=lambda sprime: + self.c(self.start, sprime) + + self.g[sprime.x][sprime.y]) + pathx.append(self.start.x + self.x_min_world) + pathy.append(self.start.y + self.y_min_world) + if show_animation: + current_path.pop(0) + plt.plot(pathx, pathy, "-r") + plt.pause(pause_time) + changed_vertices = self.detect_changes() + if len(changed_vertices) != 0: + print("New obstacle detected") + self.km += self.h(last) + last = self.start + for u in changed_vertices: + if compare_coordinates(u, self.start): + continue + self.rhs[u.x][u.y] = math.inf + self.g[u.x][u.y] = math.inf + self.update_vertex(u) + self.compute_shortest_path() + + if show_animation: + new_path = self.compute_current_path() + if not self.compare_paths(current_path, new_path): + current_path_image[0].remove() + previous_path_image[0].remove() + previous_path = current_path.copy() + current_path = new_path.copy() + previous_path_image = self.display_path(previous_path, + ".c", + alpha=0.3) + current_path_image = self.display_path(current_path, + ".c") + plt.pause(pause_time) + print("Path found") + return True, pathx, pathy + + +def main(): + + # start and goal position + sx = 10 # [m] + sy = 10 # [m] + gx = 50 # [m] + gy = 50 # [m] + + # set obstacle positions + ox, oy = [], [] + for i in range(-10, 60): + ox.append(i) + oy.append(-10.0) + for i in range(-10, 60): + ox.append(60.0) + oy.append(i) + for i in range(-10, 61): + ox.append(i) + oy.append(60.0) + for i in range(-10, 61): + ox.append(-10.0) + oy.append(i) + for i in range(-10, 40): + ox.append(20.0) + oy.append(i) + for i in range(0, 40): + ox.append(40.0) + oy.append(60.0 - i) + + if show_animation: + plt.plot(ox, oy, ".k") + plt.plot(sx, sy, "og") + plt.plot(gx, gy, "xb") + plt.grid(True) + plt.axis("equal") + label_column = ['Start', 'Goal', 'Path taken', + 'Current computed path', 'Previous computed path', + 'Obstacles'] + columns = [plt.plot([], [], symbol, color=colour, alpha=alpha)[0] + for symbol, colour, alpha in [['o', 'g', 1], + ['x', 'b', 1], + ['-', 'r', 1], + ['.', 'c', 1], + ['.', 'c', 0.3], + ['.', 'k', 1]]] + plt.legend(columns, label_column, bbox_to_anchor=(1, 1), title="Key:", + fontsize="xx-small") + plt.plot() + #plt.pause(pause_time) + + # Obstacles discovered at time = row + # time = 1, obstacles discovered at (0, 2), (9, 2), (4, 0) + # time = 2, obstacles discovered at (0, 1), (7, 7) + # ... + # when the spoofed obstacles are: + # spoofed_ox = [[0, 9, 4], [0, 7], [], [], [], [], [], [5]] + # spoofed_oy = [[2, 2, 0], [1, 7], [], [], [], [], [], [4]] + + # Reroute + # spoofed_ox = [[], [], [], [], [], [], [], [40 for _ in range(10, 21)]] + # spoofed_oy = [[], [], [], [], [], [], [], [i for i in range(10, 21)]] + + # Obstacles that demostrate large rerouting + spoofed_ox = [[], [], [], + [i for i in range(0, 21)] + [0 for _ in range(0, 20)]] + spoofed_oy = [[], [], [], + [20 for _ in range(0, 21)] + [i for i in range(0, 20)]] + + dstarlite = DStarLite(ox, oy) + dstarlite.main(Node(x=sx, y=sy), Node(x=gx, y=gy), + spoofed_ox=spoofed_ox, spoofed_oy=spoofed_oy) + + +if __name__ == "__main__": + main() diff --git a/ekf_slam.py b/ekf_slam.py new file mode 100644 index 0000000..bb648ce --- /dev/null +++ b/ekf_slam.py @@ -0,0 +1,263 @@ +""" +Extended Kalman Filter SLAM example + +author: Atsushi Sakai (@Atsushi_twi) +""" + +import math + +import matplotlib.pyplot as plt +import numpy as np + +# EKF state covariance +Cx = np.diag([0.5, 0.5, np.deg2rad(30.0)]) ** 2 + +# Simulation parameter +Q_sim = np.diag([0.2, np.deg2rad(1.0)]) ** 2 +R_sim = np.diag([1.0, np.deg2rad(10.0)]) ** 2 + +DT = 0.1 # time tick [s] +SIM_TIME = 50.0 # simulation time [s] +MAX_RANGE = 20.0 # maximum observation range +M_DIST_TH = 2.0 # Threshold of Mahalanobis distance for data association. +STATE_SIZE = 3 # State size [x,y,yaw] +LM_SIZE = 2 # LM state size [x,y] + +show_animation = True + + +def ekf_slam(xEst, PEst, u, z): + # Predict + S = STATE_SIZE + G, Fx = jacob_motion(xEst[0:S], u) + xEst[0:S] = motion_model(xEst[0:S], u) + PEst[0:S, 0:S] = G.T @ PEst[0:S, 0:S] @ G + Fx.T @ Cx @ Fx + initP = np.eye(2) + + # Update + for iz in range(len(z[:, 0])): # for each observation + min_id = search_correspond_landmark_id(xEst, PEst, z[iz, 0:2]) + + nLM = calc_n_lm(xEst) + if min_id == nLM: + print("New LM") + # Extend state and covariance matrix + xAug = np.vstack((xEst, calc_landmark_position(xEst, z[iz, :]))) + PAug = np.vstack((np.hstack((PEst, np.zeros((len(xEst), LM_SIZE)))), + np.hstack((np.zeros((LM_SIZE, len(xEst))), initP)))) + xEst = xAug + PEst = PAug + lm = get_landmark_position_from_state(xEst, min_id) + y, S, H = calc_innovation(lm, xEst, PEst, z[iz, 0:2], min_id) + + K = (PEst @ H.T) @ np.linalg.inv(S) + xEst = xEst + (K @ y) + PEst = (np.eye(len(xEst)) - (K @ H)) @ PEst + + xEst[2] = pi_2_pi(xEst[2]) + + return xEst, PEst + + +def calc_input(): + v = 1.0 # [m/s] + yaw_rate = 0.1 # [rad/s] + u = np.array([[v, yaw_rate]]).T + return u + + +def observation(xTrue, xd, u, RFID): + xTrue = motion_model(xTrue, u) + + # add noise to gps x-y + z = np.zeros((0, 3)) + + for i in range(len(RFID[:, 0])): + + dx = RFID[i, 0] - xTrue[0, 0] + dy = RFID[i, 1] - xTrue[1, 0] + d = math.hypot(dx, dy) + angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0]) + if d <= MAX_RANGE: + dn = d + np.random.randn() * Q_sim[0, 0] ** 0.5 # add noise + angle_n = angle + np.random.randn() * Q_sim[1, 1] ** 0.5 # add noise + zi = np.array([dn, angle_n, i]) + z = np.vstack((z, zi)) + + # add noise to input + ud = np.array([[ + u[0, 0] + np.random.randn() * R_sim[0, 0] ** 0.5, + u[1, 0] + np.random.randn() * R_sim[1, 1] ** 0.5]]).T + + xd = motion_model(xd, ud) + return xTrue, z, xd, ud + + +def motion_model(x, u): + F = np.array([[1.0, 0, 0], + [0, 1.0, 0], + [0, 0, 1.0]]) + + B = np.array([[DT * math.cos(x[2, 0]), 0], + [DT * math.sin(x[2, 0]), 0], + [0.0, DT]]) + + x = (F @ x) + (B @ u) + return x + + +def calc_n_lm(x): + n = int((len(x) - STATE_SIZE) / LM_SIZE) + return n + + +def jacob_motion(x, u): + Fx = np.hstack((np.eye(STATE_SIZE), np.zeros( + (STATE_SIZE, LM_SIZE * calc_n_lm(x))))) + + jF = np.array([[0.0, 0.0, -DT * u[0, 0] * math.sin(x[2, 0])], + [0.0, 0.0, DT * u[0, 0] * math.cos(x[2, 0])], + [0.0, 0.0, 0.0]], dtype=float) + + G = np.eye(STATE_SIZE) + Fx.T @ jF @ Fx + + return G, Fx, + + +def calc_landmark_position(x, z): + zp = np.zeros((2, 1)) + + zp[0, 0] = x[0, 0] + z[0] * math.cos(x[2, 0] + z[1]) + zp[1, 0] = x[1, 0] + z[0] * math.sin(x[2, 0] + z[1]) + + return zp + + +def get_landmark_position_from_state(x, ind): + lm = x[STATE_SIZE + LM_SIZE * ind: STATE_SIZE + LM_SIZE * (ind + 1), :] + + return lm + + +def search_correspond_landmark_id(xAug, PAug, zi): + """ + Landmark association with Mahalanobis distance + """ + + nLM = calc_n_lm(xAug) + + min_dist = [] + + for i in range(nLM): + lm = get_landmark_position_from_state(xAug, i) + y, S, H = calc_innovation(lm, xAug, PAug, zi, i) + min_dist.append(y.T @ np.linalg.inv(S) @ y) + + min_dist.append(M_DIST_TH) # new landmark + + min_id = min_dist.index(min(min_dist)) + + return min_id + + +def calc_innovation(lm, xEst, PEst, z, LMid): + delta = lm - xEst[0:2] + q = (delta.T @ delta)[0, 0] + z_angle = math.atan2(delta[1, 0], delta[0, 0]) - xEst[2, 0] + zp = np.array([[math.sqrt(q), pi_2_pi(z_angle)]]) + y = (z - zp).T + y[1] = pi_2_pi(y[1]) + H = jacob_h(q, delta, xEst, LMid + 1) + S = H @ PEst @ H.T + Cx[0:2, 0:2] + + return y, S, H + + +def jacob_h(q, delta, x, i): + sq = math.sqrt(q) + G = np.array([[-sq * delta[0, 0], - sq * delta[1, 0], 0, sq * delta[0, 0], sq * delta[1, 0]], + [delta[1, 0], - delta[0, 0], - q, - delta[1, 0], delta[0, 0]]]) + + G = G / q + nLM = calc_n_lm(x) + F1 = np.hstack((np.eye(3), np.zeros((3, 2 * nLM)))) + F2 = np.hstack((np.zeros((2, 3)), np.zeros((2, 2 * (i - 1))), + np.eye(2), np.zeros((2, 2 * nLM - 2 * i)))) + + F = np.vstack((F1, F2)) + + H = G @ F + + return H + + +def pi_2_pi(angle): + return (angle + math.pi) % (2 * math.pi) - math.pi + + +def main(): + print(__file__ + " start!!") + + time = 0.0 + + # RFID positions [x, y] + RFID = np.array([[10.0, -2.0], + [15.0, 10.0], + [3.0, 15.0], + [-5.0, 20.0]]) + + # State Vector [x y yaw v]' + xEst = np.zeros((STATE_SIZE, 1)) + xTrue = np.zeros((STATE_SIZE, 1)) + PEst = np.eye(STATE_SIZE) + + xDR = np.zeros((STATE_SIZE, 1)) # Dead reckoning + + # history + hxEst = xEst + hxTrue = xTrue + hxDR = xTrue + + while SIM_TIME >= time: + time += DT + u = calc_input() + + xTrue, z, xDR, ud = observation(xTrue, xDR, u, RFID) + + xEst, PEst = ekf_slam(xEst, PEst, ud, z) + + x_state = xEst[0:STATE_SIZE] + + # store data history + hxEst = np.hstack((hxEst, x_state)) + hxDR = np.hstack((hxDR, xDR)) + hxTrue = np.hstack((hxTrue, xTrue)) + + if show_animation: # pragma: no cover + plt.cla() + # for stopping simulation with the esc key. + plt.gcf().canvas.mpl_connect( + 'key_release_event', + lambda event: [exit(0) if event.key == 'escape' else None]) + + plt.plot(RFID[:, 0], RFID[:, 1], "*k") + plt.plot(xEst[0], xEst[1], ".r") + + # plot landmark + for i in range(calc_n_lm(xEst)): + plt.plot(xEst[STATE_SIZE + i * 2], + xEst[STATE_SIZE + i * 2 + 1], "xg") + + plt.plot(hxTrue[0, :], + hxTrue[1, :], "-b") + plt.plot(hxDR[0, :], + hxDR[1, :], "-k") + plt.plot(hxEst[0, :], + hxEst[1, :], "-r") + plt.axis("equal") + plt.grid(True) + plt.pause(0.001) + + +if __name__ == '__main__': + main() diff --git a/lidar01.csv b/lidar01.csv new file mode 100644 index 0000000..1f6b3f5 --- /dev/null +++ b/lidar01.csv @@ -0,0 +1,154 @@ +0.008450416037156572,0.5335 +0.046902201120156306,0.5345 +0.08508127850753233,0.537 +0.1979822644959155,0.2605 +0.21189035697274505,0.2625 +0.2587960806200922,0.26475 +0.3043382657893199,0.2675 +0.34660795861105775,0.27075 +0.43632879047139106,0.59 +0.4739624524675188,0.60025 +0.5137777760286397,0.611 +0.5492297764597742,0.6265 +0.5895905154121426,0.64 +0.6253152235389017,0.6565 +0.6645851317087743,0.676 +0.6997644244442851,0.6975 +0.7785769484796541,0.3345 +0.7772134100015329,0.74575 +0.8652979956881222,0.3315 +0.8996591653367609,0.31775 +0.9397471965935056,0.31275 +0.9847439663714841,0.31125 +1.0283771976713423,0.31325 +1.0641019057981014,0.31975 +1.1009174447073562,0.3335 +1.2012738766970301,0.92275 +1.2397256617800307,0.95325 +1.2779047391674068,0.9865 +1.316629231946031,1.01775 +1.3561718478115274,1.011 +1.3948963405901518,1.0055 +1.4330754179775278,1.00225 +1.4731634492342724,0.99975 +1.5113425266216485,0.9975 +1.5517032655740168,1.001 +1.5896096352657691,1.00275 +1.6288795434356418,1.008 +1.6684221593011381,1.0135 +1.7066012366885142,1.022 +1.7453257294671385,1.02875 +1.7862318838107551,0.9935 +1.8257744996762515,1.0025 +1.8661352386286207,0.96075 +1.9045870237116205,0.92125 +1.9465840088377355,0.8855 +1.986944747790103,0.85725 +2.025669240568728,0.832 +2.065757271825472,0.80675 +2.1066634261690886,0.78875 +2.1464787497302105,0.7705 +2.1865667809869542,0.75625 +2.2261093968524506,0.74475 +2.2683790896741876,0.68275 +2.3090125363221823,0.6375 +2.3510095214482956,0.59925 +2.3916429680962885,0.5665 +2.4330945378311526,0.538 +2.4783640153047557,0.50825 +2.5203610004308707,0.4875 +2.562903400948233,0.46825 +2.599173524466238,0.45 +2.642806755766097,0.4355 +2.685076448587836,0.42275 +2.722437402888339,0.4125 +2.766888757275069,0.40125 +2.8007045115324587,0.39525 +2.841883373571701,0.385 +2.8819714048284446,0.3805 +2.922332143780814,0.38575 +2.9637837135156797,0.38425 +3.0005992524249336,0.36575 +3.0401418682904318,0.3765 +3.079957191851552,0.3915 +3.115409192282687,0.408 +3.154679100452558,0.4265 +3.184949654666836,0.447 +3.2242195628367085,0.4715 +3.2574899017028507,0.49875 +3.2959416867858504,0.52875 +3.3292120256519926,0.564 +3.3665729799524957,0.6055 +3.4031158111661277,0.6515 +3.438022396206014,0.70675 +3.4756560582021407,0.771 +3.513562427893893,0.77075 +3.5522869206725183,0.7785 +3.621827383056667,0.79575 +3.65918833735717,0.8045 +3.697367414744546,0.81725 +3.7377281536969154,0.83325 +3.775634523388667,0.8415 +3.8135408930804187,0.85575 +3.8522653858590425,0.87325 +3.8898990478551703,0.88725 +3.9299870791119154,0.906 +3.9665299103255465,0.9265 +4.006072526191043,0.94575 +4.043978895882795,0.97175 +4.081885265574547,1.02075 +4.1206097583531704,1.046 +4.1587888357405465,1.07025 +4.196422497736674,1.097 +4.234874282819675,1.12575 +4.286688744988257,0.73475 +4.324322406984384,0.72225 +4.364410438241129,0.731 +4.405862007975994,0.7405 +4.44267754688525,0.749 +4.484129116620115,0.76025 +4.522853609398739,0.76825 +4.560759979090491,0.77125 +4.5989390564778665,0.77725 +4.640117918517108,0.782 +4.679115118991357,0.78425 +4.717294196378733,0.789 +4.757109519939853,0.78825 +4.796652135805349,0.7855 +4.8337403824102285,0.786 +4.871646752101981,0.78275 +4.9109166602718535,0.7785 +4.950186568441726,0.7635 +4.990274599698471,0.74725 +5.028180969390222,0.737 +5.0677235852557185,0.72575 +5.109720570381833,0.71525 +5.149263186247329,0.70625 +5.1863514328522085,0.69975 +5.230530079543315,0.693 +5.269799987713188,0.68925 +5.307979065100563,0.68425 +5.347248973270435,0.68275 +5.386518881440308,0.68075 +5.426606912697053,0.68175 +5.465604113171301,0.67825 +5.502419652080556,0.6835 +5.543871221815422,0.6885 +5.580959468420302,0.67925 +5.624319992024535,0.6555 +5.660044700151294,0.639 +5.700950854494911,0.623 +5.740220762664784,0.6075 +5.783581286269018,0.59475 +5.820124117482649,0.58475 +5.861848394913139,0.57325 +5.899209349213642,0.565 +5.938751965079138,0.55525 +5.9782945809446355,0.55175 +6.017564489114507,0.546 +6.059288766544997,0.5405 +6.097467843932373,0.53825 +6.139464829058487,0.534 +6.176825783358991,0.5325 +6.215822983833238,0.53125 +6.252911230438118,0.53075 \ No newline at end of file diff --git a/lidar_to_grid.py b/lidar_to_grid.py new file mode 100644 index 0000000..d22e03c --- /dev/null +++ b/lidar_to_grid.py @@ -0,0 +1,225 @@ +import math +from collections import deque + +import matplotlib.pyplot as plt +import numpy as np + +EXTEND_AREA = 1.0 + + +def file_read(f): + """ + Reading LIDAR laser beams (angles and corresponding distance data) + """ + with open(f) as data: + measures = [line.split(",") for line in data] + angles = [] + distances = [] + for measure in measures: + angles.append(float(measure[0])) + distances.append(float(measure[1])) + angles = np.array(angles) + distances = np.array(distances) + return angles, distances + + +def bresenham(start, end): + """ + >>> points1 = ((4, 4), (6, 10)) + >>> print(points1) + np.array([[4,4], [4,5], [5,6], [5,7], [5,8], [6,9], [6,10]]) + """ + # setup initial conditions + x1, y1 = start + x2, y2 = end + dx = x2 - x1 + dy = y2 - y1 + is_steep = abs(dy) > abs(dx) # determine how steep the line is + if is_steep: # rotate line + x1, y1 = y1, x1 + x2, y2 = y2, x2 + # swap start and end points if necessary and store swap state + swapped = False + if x1 > x2: + x1, x2 = x2, x1 + y1, y2 = y2, y1 + swapped = True + dx = x2 - x1 # recalculate differentials + dy = y2 - y1 # recalculate differentials + error = int(dx / 2.0) # calculate error + y_step = 1 if y1 < y2 else -1 + # iterate over bounding box generating points between start and end + y = y1 + points = [] + for x in range(x1, x2 + 1): + coord = [y, x] if is_steep else (x, y) + points.append(coord) + error -= abs(dy) + if error < 0: + y += y_step + error += dx + if swapped: # reverse the list if the coordinates were swapped + points.reverse() + points = np.array(points) + return points + + +def calc_grid_map_config(ox, oy, xy_resolution): + """ + Calculates the size, and the maximum distances according to the the + measurement center + """ + min_x = round(min(ox) - EXTEND_AREA / 2.0) + min_y = round(min(oy) - EXTEND_AREA / 2.0) + max_x = round(max(ox) + EXTEND_AREA / 2.0) + max_y = round(max(oy) + EXTEND_AREA / 2.0) + xw = int(round((max_x - min_x) / xy_resolution)) + yw = int(round((max_y - min_y) / xy_resolution)) + print("The grid map is ", xw, "x", yw, ".") + return min_x, min_y, max_x, max_y, xw, yw + + +def atan_zero_to_twopi(y, x): + angle = math.atan2(y, x) + if angle < 0.0: + angle += math.pi * 2.0 + return angle + + +def init_flood_fill(center_point, obstacle_points, xy_points, min_coord, + xy_resolution): + """ + center_point: center point + obstacle_points: detected obstacles points (x,y) + xy_points: (x,y) point pairs + """ + center_x, center_y = center_point + prev_ix, prev_iy = center_x - 1, center_y + ox, oy = obstacle_points + xw, yw = xy_points + min_x, min_y = min_coord + occupancy_map = (np.ones((xw, yw))) * 0.5 + for (x, y) in zip(ox, oy): + # x coordinate of the the occupied area + ix = int(round((x - min_x) / xy_resolution)) + # y coordinate of the the occupied area + iy = int(round((y - min_y) / xy_resolution)) + free_area = bresenham((prev_ix, prev_iy), (ix, iy)) + for fa in free_area: + occupancy_map[fa[0]][fa[1]] = 0 # free area 0.0 + prev_ix = ix + prev_iy = iy + return occupancy_map + + +def flood_fill(center_point, occupancy_map): + """ + center_point: starting point (x,y) of fill + occupancy_map: occupancy map generated from Bresenham ray-tracing + """ + # Fill empty areas with queue method + sx, sy = occupancy_map.shape + fringe = deque() + fringe.appendleft(center_point) + while fringe: + n = fringe.pop() + nx, ny = n + # West + if nx > 0: + if occupancy_map[nx - 1, ny] == 0.5: + occupancy_map[nx - 1, ny] = 0.0 + fringe.appendleft((nx - 1, ny)) + # East + if nx < sx - 1: + if occupancy_map[nx + 1, ny] == 0.5: + occupancy_map[nx + 1, ny] = 0.0 + fringe.appendleft((nx + 1, ny)) + # North + if ny > 0: + if occupancy_map[nx, ny - 1] == 0.5: + occupancy_map[nx, ny - 1] = 0.0 + fringe.appendleft((nx, ny - 1)) + # South + if ny < sy - 1: + if occupancy_map[nx, ny + 1] == 0.5: + occupancy_map[nx, ny + 1] = 0.0 + fringe.appendleft((nx, ny + 1)) + + +def generate_ray_casting_grid_map(ox, oy, xy_resolution, breshen=True): + + min_x, min_y, max_x, max_y, x_w, y_w = calc_grid_map_config( + ox, oy, xy_resolution) + # default 0.5 -- [[0.5 for i in range(y_w)] for i in range(x_w)] + occupancy_map = np.ones((x_w, y_w)) / 2 + center_x = int( + round(-min_x / xy_resolution)) # center x coordinate of the grid map + center_y = int( + round(-min_y / xy_resolution)) # center y coordinate of the grid map + # occupancy grid computed with bresenham ray casting + if breshen: + for (x, y) in zip(ox, oy): + # x coordinate of the the occupied area + ix = int(round((x - min_x) / xy_resolution)) + # y coordinate of the the occupied area + iy = int(round((y - min_y) / xy_resolution)) + laser_beams = bresenham((center_x, center_y), ( + ix, iy)) # line form the lidar to the occupied point + for laser_beam in laser_beams: + occupancy_map[laser_beam[0]][ + laser_beam[1]] = 0.0 # free area 0.0 + occupancy_map[ix][iy] = 1.0 # occupied area 1.0 + occupancy_map[ix + 1][iy] = 1.0 # extend the occupied area + occupancy_map[ix][iy + 1] = 1.0 # extend the occupied area + occupancy_map[ix + 1][iy + 1] = 1.0 # extend the occupied area + # occupancy grid computed with with flood fill + else: + occupancy_map = init_flood_fill((center_x, center_y), (ox, oy), + (x_w, y_w), + (min_x, min_y), xy_resolution) + flood_fill((center_x, center_y), occupancy_map) + occupancy_map = np.array(occupancy_map, dtype=float) + for (x, y) in zip(ox, oy): + ix = int(round((x - min_x) / xy_resolution)) + iy = int(round((y - min_y) / xy_resolution)) + occupancy_map[ix][iy] = 1.0 # occupied area 1.0 + occupancy_map[ix + 1][iy] = 1.0 # extend the occupied area + occupancy_map[ix][iy + 1] = 1.0 # extend the occupied area + occupancy_map[ix + 1][iy + 1] = 1.0 # extend the occupied area + return occupancy_map, min_x, max_x, min_y, max_y, xy_resolution + + +def main(): + """ + Example usage + """ + print(__file__, "start") + xy_resolution = 0.02 # x-y grid resolution + ang, dist = file_read("lidar01.csv") + ox = np.sin(ang) * dist + oy = np.cos(ang) * dist + occupancy_map, min_x, max_x, min_y, max_y, xy_resolution = \ + generate_ray_casting_grid_map(ox, oy, xy_resolution, True) + xy_res = np.array(occupancy_map).shape + plt.figure(1, figsize=(10, 4)) + plt.subplot(122) + plt.imshow(occupancy_map, cmap="PiYG_r") + # cmap = "binary" "PiYG_r" "PiYG_r" "bone" "bone_r" "RdYlGn_r" + plt.clim(-0.4, 1.4) + plt.gca().set_xticks(np.arange(-.5, xy_res[1], 1), minor=True) + plt.gca().set_yticks(np.arange(-.5, xy_res[0], 1), minor=True) + plt.grid(True, which="minor", color="r", linewidth=0.6, alpha=0.5) + plt.colorbar() + plt.subplot(121) + plt.plot([oy, np.zeros(np.size(oy))], [ox, np.zeros(np.size(oy))], "ro-") + plt.axis("equal") + plt.plot(0.0, 0.0, "ob") + plt.gca().set_aspect("equal", "box") + bottom, top = plt.ylim() # return the current y-lim + plt.ylim((top, bottom)) # rescale y axis, to match the grid orientation + plt.grid(True) + plt.show() + + +if __name__ == '__main__': + main() \ No newline at end of file