本来想直接写A* 的,不过看完最佳路径优先搜索算法后觉得还是要先理解一下这个算法后才能更好的理解A* 算法,所以把这篇文章放到A* 前面。
基本概念
最佳优先搜索算法(Best-first-searching)是一种启发式搜索算法(Heuristic Algorithm),其基于广度优先搜索算法,不同点是其依赖于估价函数对将要遍历的节点进行估价,选择代价小的节点进行遍历,直到找到目标点为止。BFS算法不能保证找到的路径是一条最短路径,但是其计算过程相对于Dijkstra算法会快很多。
BFS算法的启发估价函数公式为:
f ( n ) = h ( n ) f(n) = h(n) f(n)=h(n)
n表示当前的点,g(n)为从起始点到点n的实际代价,h(n)为从点n到目标点的估价。
从上述公式我们可以看出,BFS的搜索方式非常简单粗暴,直接从起点开始连接终点,对线上的点进行搜索,遇到障碍物时向四周展开。在没有障碍物的场景下,BFS是一种快速有效的路径搜索方式:
但是在大部分情况下,我们的场景中肯定会存在不同的障碍物,此时它的搜索路径就不再是最优的了:
代码实现
import os
import sys
from collections import deque
import matplotlib.pyplot as pltimport math
import heapqclass BFS:"""BFS add the new visited node in the end of the openset"""def __init__(self, s_start, s_goal, heuristic_type,xI, xG):self.s_start = s_startself.s_goal = s_goalself.heuristic_type = heuristic_typeself.u_set = [(-1, 0), (-1, 1), (0, 1), (1, 1),(1, 0), (1, -1), (0, -1), (-1, -1)] # feasible input setself.obs = self.obs_map() # position of obstaclesself.OPEN = [] # priority queue / OPEN setself.CLOSED = [] # CLOSED set / VISITED orderself.PARENT = dict() # recorded parentself.g = dict() # cost to comeself.x_range = 51 # size of backgroundself.y_range = 31self.xI, self.xG = xI, xGself.obs = self.obs_map()def update_obs(self, obs):self.obs = obsdef obs_map(self):"""Initialize obstacles' positions:return: map of obstacles"""x = 51y = 31obs = set()for i in range(x):obs.add((i, 0))for i in range(x):obs.add((i, y - 1))for i in range(y):obs.add((0, i))for i in range(y):obs.add((x - 1, i))for i in range(10, 21):obs.add((i, 15))for i in range(15):obs.add((20, i))for i in range(15, 30):obs.add((30, i))for i in range(16):obs.add((40, i))return obsdef update_obs(self, obs):self.obs = obsdef animation(self, path, visited, name):self.plot_grid(name)self.plot_visited(visited)self.plot_path(path)plt.show()def plot_grid(self, name):obs_x = [x[0] for x in self.obs]obs_y = [x[1] for x in self.obs]plt.plot(self.xI[0], self.xI[1], "bs")plt.plot(self.xG[0], self.xG[1], "gs")plt.plot(obs_x, obs_y, "sk")plt.title(name)plt.axis("equal")def plot_visited(self, visited, cl='gray'):if self.xI in visited:visited.remove(self.xI)if self.xG in visited:visited.remove(self.xG)count = 0for x in visited:count += 1plt.plot(x[0], x[1], color=cl, marker='o')plt.gcf().canvas.mpl_connect('key_release_event',lambda event: [exit(0) if event.key == 'escape' else None])if count < len(visited) / 3:length = 20elif count < len(visited) * 2 / 3:length = 30else:length = 40## length = 15if count % length == 0:plt.pause(0.001)plt.pause(0.01)def plot_path(self, path, cl='r', flag=False):path_x = [path[i][0] for i in range(len(path))]path_y = [path[i][1] for i in range(len(path))]if not flag:plt.plot(path_x, path_y, linewidth='3', color='r')else:plt.plot(path_x, path_y, linewidth='3', color=cl)plt.plot(self.xI[0], self.xI[1], "bs")plt.plot(self.xG[0], self.xG[1], "gs")plt.pause(0.01)def searching(self):"""Breadth-first Searching.:return: path, visited order"""self.PARENT[self.s_start] = self.s_startself.g[self.s_start] = 0self.g[self.s_goal] = math.infheapq.heappush(self.OPEN,(self.heuristic(self.s_start), self.s_start))while self.OPEN:_, s = heapq.heappop(self.OPEN)self.CLOSED.append(s)if s == self.s_goal:breakfor s_n in self.get_neighbor(s):new_cost = self.g[s] + self.cost(s, s_n)if s_n not in self.g:self.g[s_n] = math.infif new_cost < self.g[s_n]: # conditions for updating Costself.g[s_n] = new_costself.PARENT[s_n] = s# best first set the heuristics as the priority heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n))return self.extract_path(self.PARENT), self.CLOSEDdef get_neighbor(self, s):"""find neighbors of state s that not in obstacles.:param s: state:return: neighbors"""return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]def cost(self, s_start, s_goal):"""Calculate Cost for this motion:param s_start: starting node:param s_goal: end node:return: Cost for this motion:note: Cost function could be more complicate!"""if self.is_collision(s_start, s_goal):return math.infreturn math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])def is_collision(self, s_start, s_end):"""check if the line segment (s_start, s_end) is collision.:param s_start: start node:param s_end: end node:return: True: is collision / False: not collision"""if s_start in self.obs or s_end in self.obs:return Trueif s_start[0] != s_end[0] and s_start[1] != s_end[1]:if s_end[0] - s_start[0] == s_start[1] - s_end[1]:s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))else:s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))if s1 in self.obs or s2 in self.obs:return Truereturn Falsedef extract_path(self, PARENT):"""Extract the path based on the PARENT set.:return: The planning path"""path = [self.s_goal]s = self.s_goalwhile True:s = PARENT[s]path.append(s)if s == self.s_start:breakreturn list(path)def heuristic(self, s):"""Calculate heuristic.:param s: current node (state):return: heuristic function value"""heuristic_type = self.heuristic_type # heuristic typegoal = self.s_goal # goal nodeif heuristic_type == "manhattan":return abs(goal[0] - s[0]) + abs(goal[1] - s[1])else:#sqrt(x^2+y^2)return math.hypot(goal[0] - s[0], goal[1] - s[1])def main():s_start = (5, 5)s_goal = (45, 25)bfs = BFS(s_start, s_goal, 'None',s_start,s_goal)path, visited = bfs.searching()bfs.animation(path, visited, "Breadth-first Searching (BFS)")if __name__ == '__main__':main()
简单讲解一下原理(跟A*那篇类似):
简单解析一下:
在获取起点后,算法维护了两个字典:
self.PARENT[self.s_start] = self.s_startself.g[self.s_start] = 0self.g[self.s_goal] = math.inf
PARENT字典中存取的是这个点由哪个点延伸出来的,即它的上一个节点是谁,用于最后的路径的返回;g这个字典中存储的是每个坐标的代价值,即g[s]。
然后,算法通过堆栈的概念维护了一个堆栈:
heapq.heappush(self.OPEN,(self.heuristic(self.s_start), self.s_start))
将初始值的f[s]存储在这里,然后进行while循环:
首先从堆栈中取出栈顶元素:
_, s = heapq.heappop(self.OPEN)
注意到这里使用的heapq堆栈功能中的两个函数heapq.heappop与heapq.heappush。heappop会取出栈顶元素并将原始数据从堆栈中删除,而heappush则是对插入的数据按大小排序并存储在堆栈中。所以每一个遍历的点都会按照它的代价值放入堆栈中,同时每次取出的都是代价值最小的那个。
然后判断出栈顶元素是否为目标点,如果为目标点,则退出:
if s == self.s_goal: # stop conditionbreak
如果不是,则更新该点附近点的代价值:
for s_n in self.get_neighbor(s):new_cost = self.g[s] + self.cost(s, s_n)if s_n not in self.g:self.g[s_n] = math.infif new_cost < self.g[s_n]: # conditions for updating Costself.g[s_n] = new_costself.PARENT[s_n] = s# best first set the heuristics as the priority heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n))
get_neighbor为获取该点周围的点的坐标。heappush入栈时需要存储的该点的代价值的计算方式为:
def heuristic(self, s):"""Calculate heuristic.:param s: current node (state):return: heuristic function value"""heuristic_type = self.heuristic_type # heuristic typegoal = self.s_goal # goal nodeif heuristic_type == "manhattan":return abs(goal[0] - s[0]) + abs(goal[1] - s[1])else:#sqrt(x^2+y^2)return math.hypot(goal[0] - s[0], goal[1] - s[1])
这里计算了一个启发函数的代价值h(n),h的计算可以根据实际情况进行选择,例如在栅格地图中,计算h的方式一般可以分为两种:曼哈顿距离与欧几里德距离。
另外这里注意一个非常巧妙的问题,对于同一个点,如果它在计算其左边的点的时候会将其添加到堆栈中,计算其本身时会出栈,再计算其右边时是否还需要重新入栈呢?
这个是不一定的,是否重新入栈的关键取决于该点在右边的点的计算代价值是否小于左边:
if new_cost < self.g[s_n]:
如果代价值比原来的小,则会重新入栈,否则,这个点就不需要重新计算,这样子就避免了大量的重复计算以及死循环的问题。
最后得到结果如下:
从结果可以看出,虽然最佳路径优先搜索算法得到的结果不是最优的,但是在搜索过程中确实效率还是非常高的。
参考:
1、最佳优先搜索和A*搜索算法
2、路径规划算法