当前位置: 代码迷 >> 综合 >> Python实现最短路问题常见求解算法2——Label Correcting Algorithm(deque)
  详细解决方案

Python实现最短路问题常见求解算法2——Label Correcting Algorithm(deque)

热度:64   发布时间:2023-10-27 19:56:51.0

原文链接

1. 适用场景

  • 可识别网络中存在的负环
  • 起点与终点间至少存在一条可行路径
  • 可求解单一起点到多点的最短路径

2. 负环识别方法

在介绍网络负环识别方法之前,先定义几个符号:
nnn :网络节点数量
CCC :网络弧费用绝对值的最大值
根据《netwrok flow》文献介绍,针对label correcting algorithms有以下识别方法:

  • 如果某一节点的距离标签小于 ?nC?nC?nC 则说明存在负环(在最坏情况下源点到此点经过所有节点,所有的弧费用值的绝对值都是C,则最小的cost为 ?nC?nC?nC

  • 首先将源节点指定为已标记节点,而将所有其他节点指定为未标记的。然后,我们一个接一个检查每个未标记节点 kkk,并执行以下操作:标记节点 kkk,溯回节点kkk 的前向节点并将其标记,直到我们到达第一个标记节点lll,如果k=lk=lk=l 则网络中存在负环。

  • 标记每个网络节点被检查最优性条件的次数,如果次数超过了网络节点数-1,则网络中存在负环。

3. 测试网络

这里在测试网络中人为设立一个负环结构,如下图红色示意图。
在这里插入图片描述

4. 代码实现

(1)定义网络数据结构

class Network():def __init__(self):self.node_neighbors={
    }self.node_x_coord={
    }self.node_y_coord={
    }self.node_id_list=[]self.number_of_nodes=0self.number_of_check={
    }self.link_cost={
    }self.link_list=[]self.max_link_cost=Noneself.source_node_id=0self.sink_node_id=1

(2)读取网络文件

# read network csv files
def read_network_file(node_file,link_file,network):# read node csv filewith open(node_file) as node_f:node_reader=csv.reader(node_f)next(node_reader)for row in node_reader:network.node_id_list.append(int(row[0]))network.node_x_coord[int(row[0])] = float(row[1])network.node_y_coord[int(row[0])] = float(row[2])network.node_neighbors[int(row[0])] = []network.number_of_check[int(row[0])]=0# read link csv filewith open(link_file) as f:link_reader = csv.reader(f)next(link_reader)for row in link_reader:from_node_id = int(row[1])to_node_id = int(row[2])cost = float(row[3])network.node_neighbors[from_node_id].append(to_node_id)network.link_list.append([from_node_id, to_node_id])network.link_cost[from_node_id, to_node_id] = costnetwork.number_of_nodes = len(network.node_id_list)network.max_link_cost=max(network.link_cost.values())

(3)可视化最短路径

def show_shortest_path(net,path_node_id_list):for from_node_id,to_node_id in net.link_list:x_coords=[net.node_x_coord[from_node_id],net.node_x_coord[to_node_id]]y_coords=[net.node_y_coord[from_node_id],net.node_y_coord[to_node_id]]plt.plot(x_coords,y_coords,color='black',linewidth=0.5)path_x_coord=[]path_y_coord=[]for node_id in path_node_id_list:path_x_coord.append(net.node_x_coord[node_id])path_y_coord.append(net.node_y_coord[node_id])plt.plot(path_x_coord,path_y_coord,color='b')plt.xlabel('x_coord')plt.ylabel('y_coord')plt.show()

(4)保存最短路径信息

# save the shortest path information to csv file
def save_to_file(network,path_node_id_list,path_cost,node_predecessor=None,node_label_cost=None):outfile = open('shortest_path.csv', 'w', newline='', errors='ignore')write = csv.writer(outfile)write.writerow(['source_node_id', 'sink_node_id', 'total cost', 'path node id list'])path = '-'.join([str(i) for i in path_node_id_list])line = [network.source_node_id, network.sink_node_id,path_cost,path]write.writerow(line)# whether save the shortest path information from  the source node to other nodesif node_predecessor and node_label_cost:try:for node_id in network.node_id_list:if node_id!=network.source_node_id and node_id!=network.sink_node_id:path_node_id_list = [node_id]pre_node_id = node_predecessor[node_id]path_cost = 0if node_label_cost[node_id]<float('inf'):while pre_node_id != network.source_node_id:path_node_id_list.insert(0, pre_node_id)path_cost += network.link_cost[path_node_id_list[0], path_node_id_list[1]]pre_node_id = node_predecessor[pre_node_id]path_node_id_list.insert(0, network.source_node_id)path_cost += network.link_cost[path_node_id_list[0], path_node_id_list[1]]path = '-'.join([str(i) for i in path_node_id_list])line = [network.source_node_id, node_id, path_cost, path]write.writerow(line)else:line = [network.source_node_id, node_id, 'inf', 'nan']write.writerow(line)except Exception as e:passoutfile.close()

(5)采用label correcting算法的deque实现寻找最短路径(含负环识别)

# find the shortest path from source node to sink node using deque label correcting algorithm
def find_shortest_path(network):node_predecessor = {
    }node_predecessor[network.source_node_id]=-1node_label_cost = {
     node_id:float('inf') for node_id in network.node_id_list}node_label_cost[network.source_node_id] = 0SEList = deque()SEList_all = []SEList.append(network.source_node_id)SEList_all.append(network.source_node_id)while len(SEList)>0:current_node_id=SEList[0]SEList.popleft()current_node_label_cost = node_label_cost[current_node_id]for to_node_id in network.node_neighbors[current_node_id]:new_label=current_node_label_cost+network.link_cost[current_node_id,to_node_id]network.number_of_check[to_node_id]+=1if new_label<node_label_cost[to_node_id]:node_label_cost[to_node_id]=new_labelnode_predecessor[to_node_id]=current_node_idif to_node_id in SEList_all:SEList.insert(0,to_node_id)else:SEList.append(to_node_id)SEList_all.append(to_node_id)# check whether the new label has fallen below -n*max_link_cost(detecting negative cycle)if new_label<-network.max_link_cost or network.number_of_check[to_node_id]>network.number_of_nodes-1:path_node_id_list = [to_node_id]pre_node_id = node_predecessor[to_node_id]while pre_node_id != to_node_id:path_node_id_list.insert(0, pre_node_id)pre_node_id = node_predecessor[pre_node_id]path_node_id_list.insert(0, to_node_id)path = '-'.join([str(i) for i in path_node_id_list])print("there exists a negative cycle:%s"%path)sys.exit(0)path_node_id_list = [network.sink_node_id]pre_node_id = node_predecessor[network.sink_node_id]path_cost = 0if node_label_cost[network.sink_node_id]<float('inf'):while pre_node_id != network.source_node_id:path_node_id_list.insert(0, pre_node_id)path_cost += network.link_cost[path_node_id_list[0], path_node_id_list[1]]pre_node_id = node_predecessor[pre_node_id]path_node_id_list.insert(0, network.source_node_id)path_cost += network.link_cost[path_node_id_list[0], path_node_id_list[1]]path = '-'.join([str(i) for i in path_node_id_list])print("the trave cost from node id=%s to node id=%s is: %s" % (network.source_node_id, network.sink_node_id, path_cost))print("the shortest path from node id=%s to node id=%s is: %s" % (network.source_node_id, network.sink_node_id, path))show_shortest_path(network, path_node_id_list)save_to_file(network, path_node_id_list, path_cost,node_predecessor,node_label_cost)else:print("there is no feasible path from node id=%s to node id=%s"%(network.source_node_id,network.sink_node_id))

(6)负环识别实现

这里实现了两种负环检测方法,一种是通过节点的距离标签判断,一种是通过节点最优性条件检查次数判断,满足任一条件即可。

 # check whether the new label has fallen below -n*max_link_cost(detecting negative cycle)if new_label<-network.max_link_cost or network.number_of_check[to_node_id]>network.number_of_nodes-1:path_node_id_list = [to_node_id]pre_node_id = node_predecessor[to_node_id]while pre_node_id != to_node_id:path_node_id_list.insert(0, pre_node_id)pre_node_id = node_predecessor[pre_node_id]path_node_id_list.insert(0, to_node_id)path = '-'.join([str(i) for i in path_node_id_list])print("there exists a negative cycle:%s"%path)sys.exit(0)

(7)主程序

if __name__=='__main__':# init network data structnetwork=Network()# setting source node  id and sink node idnetwork.source_node_id=1network.sink_node_id=81# read network filesread_network_file('./node_2.csv','./link_2.csv',network)# check whether the source node id and sink node id is in the networkif network.source_node_id not in network.node_id_list:print(" %s not found"%network.source_node_id)sys.exit(0)if network.sink_node_id not in network.node_id_list:print(" %s not found"%network.sink_node_id)sys.exit(0)# find the shortest pathfind_shortest_path(network)

5. 求解结果

(1)无负环
在这里插入图片描述
(2)存在负环
在这里插入图片描述

https://github.com/PariseC/Shortest_Path_Algorithm/tree/master/label_correcting_algorithm
参考
R.K. Ahuja, T.L. Magnanti and J.B. Orlin, Network Flows: Theory, Algorithms, and Applications. Prentice-Hall, 1993. A comprehensive recent survey of the topic.

  相关解决方案