|
| 1 | +"""Python 3 implementation of Djikstra's algorithm for finding the shortest |
| 2 | +path between nodes in a graph. Written as a learning exercise, so lots of |
| 3 | +comments and no error handling. |
| 4 | +""" |
| 5 | +from collections import deque |
| 6 | + |
| 7 | +INFINITY = float("inf") |
| 8 | + |
| 9 | + |
| 10 | +class Graph: |
| 11 | + def __init__(self, filename): |
| 12 | + """Reads graph definition and stores it. Each line of the graph |
| 13 | + definition file defines an edge by specifying the start node, |
| 14 | + end node, and distance, delimited by spaces. |
| 15 | +
|
| 16 | + Stores the graph definition in two properties which are used by |
| 17 | + Dijkstra's algorithm in the shortest_path method: |
| 18 | + self.nodes = set of all unique nodes in the graph |
| 19 | + self.adjacency_list = dict that maps each node to an unordered set of |
| 20 | + (neighbor, distance) tuples. |
| 21 | + """ |
| 22 | + |
| 23 | + # Read the graph definition file and store in graph_edges as a list of |
| 24 | + # lists of [from_node, to_node, distance]. This data structure is not |
| 25 | + # used by Dijkstra's algorithm, it's just an intermediate step in the |
| 26 | + # create of self.nodes and self.adjacency_list. |
| 27 | + graph_edges = [] |
| 28 | + with open(filename) as fhandle: |
| 29 | + for line in fhandle: |
| 30 | + edge_from, edge_to, cost, *_ = line.strip().split(" ") |
| 31 | + graph_edges.append((edge_from, edge_to, float(cost))) |
| 32 | + |
| 33 | + self.nodes = set() |
| 34 | + for edge in graph_edges: |
| 35 | + self.nodes.update([edge[0], edge[1]]) |
| 36 | + |
| 37 | + self.adjacency_list = {node: set() for node in self.nodes} |
| 38 | + for edge in graph_edges: |
| 39 | + self.adjacency_list[edge[0]].add((edge[1], edge[2])) |
| 40 | + |
| 41 | + def shortest_path(self, start_node, end_node): |
| 42 | + """Uses Dijkstra's algorithm to determine the shortest path from |
| 43 | + start_node to end_node. Returns (path, distance). |
| 44 | + """ |
| 45 | + |
| 46 | + unvisited_nodes = self.nodes.copy() # All nodes are initially unvisited. |
| 47 | + |
| 48 | + # Create a dictionary of each node's distance from start_node. We will |
| 49 | + # update each node's distance whenever we find a shorter path. |
| 50 | + distance_from_start = { |
| 51 | + node: (0 if node == start_node else INFINITY) for node in self.nodes |
| 52 | + } |
| 53 | + |
| 54 | + # Initialize previous_node, the dictionary that maps each node to the |
| 55 | + # node it was visited from when the the shortest path to it was found. |
| 56 | + previous_node = {node: None for node in self.nodes} |
| 57 | + |
| 58 | + while unvisited_nodes: |
| 59 | + # Set current_node to the unvisited node with shortest distance |
| 60 | + # calculated so far. |
| 61 | + current_node = min( |
| 62 | + unvisited_nodes, key=lambda node: distance_from_start[node] |
| 63 | + ) |
| 64 | + unvisited_nodes.remove(current_node) |
| 65 | + |
| 66 | + # If current_node's distance is INFINITY, the remaining unvisited |
| 67 | + # nodes are not connected to start_node, so we're done. |
| 68 | + if distance_from_start[current_node] == INFINITY: |
| 69 | + break |
| 70 | + |
| 71 | + # For each neighbor of current_node, check whether the total distance |
| 72 | + # to the neighbor via current_node is shorter than the distance we |
| 73 | + # currently have for that node. If it is, update the neighbor's values |
| 74 | + # for distance_from_start and previous_node. |
| 75 | + for neighbor, distance in self.adjacency_list[current_node]: |
| 76 | + new_path = distance_from_start[current_node] + distance |
| 77 | + if new_path < distance_from_start[neighbor]: |
| 78 | + distance_from_start[neighbor] = new_path |
| 79 | + previous_node[neighbor] = current_node |
| 80 | + |
| 81 | + if current_node == end_node: |
| 82 | + break # we've visited the destination node, so we're done |
| 83 | + |
| 84 | + # To build the path to be returned, we iterate through the nodes from |
| 85 | + # end_node back to start_node. Note the use of a deque, which can |
| 86 | + # appendleft with O(1) performance. |
| 87 | + path = deque() |
| 88 | + current_node = end_node |
| 89 | + while previous_node[current_node] is not None: |
| 90 | + path.appendleft(current_node) |
| 91 | + current_node = previous_node[current_node] |
| 92 | + path.appendleft(start_node) |
| 93 | + |
| 94 | + return path, distance_from_start[end_node] |
| 95 | + |
| 96 | + |
| 97 | +def main(): |
| 98 | + """Runs a few simple tests to verify the implementation. |
| 99 | + """ |
| 100 | + """verify_algorithm( |
| 101 | + filename="", |
| 102 | + start="", |
| 103 | + end="", |
| 104 | + path=["", "", "", ""], |
| 105 | + distance=, |
| 106 | + )""" |
| 107 | + |
| 108 | + |
| 109 | +def verify_algorithm(filename, start, end, path, distance): |
| 110 | + """Helper function to run simple tests and print results to console. |
| 111 | +
|
| 112 | + filename = graph definition file |
| 113 | + start/end = path to be calculated |
| 114 | + path = expected shorted path |
| 115 | + distance = expected distance of path |
| 116 | + """ |
| 117 | + graph = Graph(filename) |
| 118 | + returned_path, returned_distance = graph.shortest_path(start, end) |
| 119 | + |
| 120 | + assert list(returned_path) == path |
| 121 | + assert returned_distance == distance |
| 122 | + |
| 123 | + print('\ngraph definition file: {0}'.format(filename)) |
| 124 | + print(' start/end nodes: {0} -> {1}'.format(start, end)) |
| 125 | + print(' shortest path: {0}'.format(path)) |
| 126 | + print(' total distance: {0}'.format(distance)) |
| 127 | + |
| 128 | + |
| 129 | +if __name__ == "__main__": |
| 130 | + main() |
0 commit comments