alg: 图论相关算法

重申一下,alg 系列博文(本文在内)中的伪代码部分复制或改动自清华大学数据结构课程(邓俊辉课堂的)讲义,它可以从这个链接获取。

关键词:
最小支撑树 Minimum Spanning Tree:Prim 算法,Kruskal 算法,其中 Kruskal 算法用到了并查集
单源最短路径树 Shortest Path Tree / SSSP Single Source Shortest Path:Dijkstra 算法
所有点对间的最短距离:Floyd Warshall 算法

图的基础算法

图的存储方式:邻接矩阵 Adjacency Matrix(点-点)、邻接表 Incidence Matrix(点-边)

邻接矩阵的空间复杂度是 O(n^2) 的,查找是否存在 / 增加 / 删除 节点 i 到 j 的边是 O(1) 的,查找节点 i 的入度 / 出度是 O(1) 的(额外维护数组),遍历节点 i 的所有出边或入边是 O(n) 的。

邻接表,即每个节点存储一个列表记录由它引出的边(和 / 或以它结束的边),其空间复杂度是 O(n+e) 的。查找是否存在 / 增加 / 删除 节点 u 到 v 的边是 O(deg(u)) 或 O(deg(v)) 的,查找节点 i 的入度 / 出度是 O(1) 的,遍历节点 v 的出边 / 入边需要 O(deg(v))。

广度优先搜索 BFS:

初始化节点状态

for vertex v in graph:
  status[v] = "undiscovered"

从节点 s 做 BFS

status[s] = "discovered"
queue q
q.enqueue(s)
while queue is not empty
  v = q.dequeue()
  for every neighbor u of v:
    if status[u] == "undiscovered":
      status[u] = "discovered"
      q.enqueue(u)
      # (v, u) is an edge in the BFS tree

如果图是连通的,则一次 BFS 就足够了;否则,可以遍历节点列表,只要遇到未被发现的节点,就从它开始做 BFS,这样,BFS 访问一遍,可以构造出每个连通分支的一棵生成树。

c = 0 # number of connected components
for vertex v in graph:
  if status[v] == "undiscovered":
    BFS(v)
    c = c + 1

由于每个节点仅会入队一次,每条边仅会被检查一次,所以算法是 O(n+e) 的。

注:BFS 生成的森林包含 c 棵树、共 n-c 条边

深度优先搜索 DFS

从节点 v 做 DFS

def dfs(v)
  status[v] = "discovered"
  for every neighbor u of v:
    if status[u] == "undiscovered":
      dfs(u)
      # (v, u) is an edge in the BFS tree

拓扑排序 Topological Sort

接口:给定有向图 G,尝试将其所有顶点排成一个线性序列,使其与原图相容,即对于每条有向边 (x, y), x 的编号都要小于 y。如果 G 是 DAG,应该返回这样一个序列,否则应该报告不是 DAG。

引理:DAG 必然有零入度的节点

算法:动态地修改图 G 并维护一个栈 S 存储所有入度为 0 的节点;重复从栈中弹出节点 v,从 G 中删除 v,直到 G 空(成功)或者 S 空(失败)

initialize stack S with all vertices with indegree 0
seq = []
while S is not empty:
  v = S.pop()
  seq.append(v)
  for each neighbor u of v:
    if u.indegree == 1:
      S.push(u)
  G = G \ {v} # delete v from graph
if G is empty:
  return seq
else:
  return NOT_A_DAG

从图 G 中删除 v,即删除每条 v 的出边,是 O(deg_out(v)) 的,所以整个算法是 O(n+e) 的。

图的进阶算法

优先级搜索

我们想要按照一个用户自定义的优先级遍历 / 搜索图中每个节点,这里的优先级可以随着算法的推进而调整。具体地,遍历每个节点时只能影响其邻接节点的优先级。

initialize priority queue
while priority queue is not empty
  pop node v with top priority
  for each neighbor u of v:
    update u's priority

关于优先级队列的实现:

  • 若使用向量来实现优先级队列(最简单易行),则更新优先级耗时 O(1),但遍历查找最优先节点耗时 O(n),因此算法的复杂度为 O(n^2 + e)
  • 若采用平衡二叉搜索树来实现优先级队列,则更新优先级耗时 O(log n),因此算法的复杂度为 O( (n+e) log n )。
  • 若使用堆来实现优先级队列,复杂度相同,但需要额外维护一个索引记录每个节点在堆中的哪里,这样才能实现“更新下标为 i 的节点的优先级“(优先级升高则上滤,优先级下降则下滤,所有上下滤操作要同时更新索引)【见附录 1 的代码】。
  • 使用 Fibonacci 堆,可以优化到 O(e + n log n) (来源来源)

很多问题或算法都可以通过优先级搜索实现,如:

  • PRIM 算法、Dijkstra 算法
  • A* 寻路算法(优先级是开销估计 f(n) = g(n)+h(n),优先搜索开销小的节点。其中 g(n) 为到节点 n 已经产生的开销,h(n) 为从节点 n 到终点的开销估计值)。Dijkstra 算法可以看成 A* 算法的在 h(n) 为零时的特例。
  • BFS 可以看成 Dijkstra 算法在边权全部为 1 时的特例;DFS 可以看成新节点优先级等于它被发现的时刻的优先级搜索(维护全局“计时器”)。

最小支撑树问题

我们希望得到连通无向图的最小支撑树(MST,Minimum Spanning Tree)。

最小生成树的表示很简单,比如用一个长度为 n 的数组 A 来表示,A[i] 存储第 i 个节点在最小支撑树中父节点的编号即可。

Prim 算法

引理:连通图任意割切中的最短跨边必然包含在一颗 MST 中。反之,连通图的任意 MST 都必然通过每个割切的最短跨边。

思路:不断扩大一个节点集合 V_k 及其对应的支撑树(集合可以初始化为任意一个节点),每一步将割切 ( V_k ; V \ V_k ) 中的最短跨边及其对应的新节点加入。

实现:利用优先级搜索,节点的优先级定义为它与 V_k 的距离(最短跨边权重)。每个循环仅需遍历当前节点的邻居,即可正确地更新优先级。如果跨边 (i, j) 使得 j 的优先级得到更新,则赋值 A[j] = i (画家算法)。复杂度与优先级队列的实现有关,见前面关于优先级搜索的分析。

Kruskal 算法

思路:依据权重从小到大依次尝试各边,若安全(仍为树),则加入。

实现:维护一个森林,包含 n 个单节点树和 0 条边。按照边权顺序从小到大遍历所有边,若其顶点来自不同的树,则将两侧的树合二为一。正确性很容易说明:将引入的边 e 一侧的树 T 和 V \ T 作为图的割切,那么 e 必然是最短跨边。

如何快速“合并”两棵树(或者说两个等价类)?需要利用并查集的算法。

一种做法是维护一个长度为 n 的数组 T,始终保证第 i 个节点属于第 T[i] 棵树,初始化 T[i] = i 即可。遍历到边 (i, j) 时,如果 T[i] != T[j],则说明两侧的节点来自不同的树,可以合并,合并的方法为扫描一遍向量 T,将所有的值为 T[i] 的元素赋值为 T[j],同时赋值 A[i] = j。这样合并的复杂度是 O(n) 的,使得 Kruskal 算法的复杂度成为 O(n*e)

还有一种做法是维护一个长度为 n 的数组 parent,仅仅表示并查集中的关系,与支撑树中的父子节点关系区分,初始化 parent[i] = i。找到元素所属的等价类 find(i) 函数的方法是沿着 parent 逐层上行,直到 parent[x] == x,表明 i 属于第 x 类。将第 i 类合并到第 j 类,只需设置 parent[i] = j。为了控制树的高度,防止查找用时过多,需要保证每次合并将矮树合并到高树,并在查找时实现“路径压缩”,即将“上行”沿途的节点全部接入上行的终点树根中。这样,每次查找的用时近似常数的(反阿克曼函数)

最短距离问题

DIJKSTRA 算法

单源最短路问题,即给定节点 s,给出 s 到其余各个顶点的最短路径树。最短路径树可以用一个长度为 n 的数组 A 来表示,A[i] 存储第 i 个节点在最短路径树中父节点的编号即可。

利用优先级搜索,节点的优先级定义为它与 s 间的目前已知的最短路径的距离。每个循环仅需遍历当前节点的邻居,即可正确地更新优先级。如果边 (i, j) 使得 j 的优先级得到更新,则赋值 A[j] = i (画家算法)。复杂度见优先级搜索的分析。

正确性:Dijkstra算法的正确性依赖于下列前缀特性:一条最短路径的前缀也是一条最短路径,且长度不超过原先的最短路径。对于边权非负的图,它是正确的(权重为零的边没问题);对于有负权重边的图,它不正确。

Floyd-Warshall 算法

计算所有点对之间的最短路径的长度。这个问题的输出复杂度就是 O(n^2) 的。将每个节点作为源,调用 n 次 Dijkstra 算法是 O(n^3) 的;下面要介绍的 Floyd 算法也是 O(n^3) 的。不过 Floyd 算法形式上更简单,且允许出现负的单向边(不允许负权重的环路)。

将节点从 1 到 n 编号,定义 d^k(u, v) 为直接相连,或中途仅通过前 k 个节点中转的最短路径长度。它满足:

d^k(u, v)
= w(u, v) if k = 0
  min( d^{k-1}(u,v), d^{k-1}(u,k)+d^{k-1}(k,v) ) if k > 0

注意到 d^{k-1}(u, k) = d^k(u, k) 且 d^{k-1}(k, v) = d^k(k, v),可以按照这个算式从外到内三层循环进行计算,复用同一块存储空间 A[1..n][1..n] (可以看成状态压缩了的动态规划):

for k in 1..n
  for u in 1..n
    for v in 1..n
      A[u][v] = min(A[u][v], A[u][k]+A[k][v])

附录

附录 1,可以改变既有事件优先级的优先级队列

class PQMod:
    """
    A priority queue that lets you:
    1) Insert event with priority
    2) Remove an existing event
    3) Pull or peek the highest priority event
    4) Change the priority of an existing event
    each operation is O(log n) time.
    """

    def __init__(self):
        # The heap invariant is heap[k] <= heap[2*k+1] and heap[k] <= heap[2*k+2]
        self.heap = list()  # Each element is a (event, priority) tuple
        self.idxof = dict()  # Maps event to index in the heap

    def _swap(self, i, j):
        # Swap two elements in the heap, update the mapping
        self.idxof[self.heap[i][0]], self.idxof[self.heap[j][0]] = j, i
        self.heap[i], self.heap[j] = self.heap[j], self.heap[i]

    def _sift_up_at(self, idx: int):
        while idx > 0 and self.heap[idx][1] < self.heap[(idx-1)//2][1]:
            self._swap(idx, (idx-1)//2)
            idx = (idx-1)//2

    def _children_status(self, idx: int):
        if 2*idx+2 <= len(self.heap)-1 and self.heap[idx][1] > self.heap[2*idx+2][1] \
           and self.heap[2*idx+1][1] > self.heap[2*idx+2][1]:
            return 2  # right child goes up
        elif 2*idx+1 <= len(self.heap)-1 and self.heap[idx][1] > self.heap[2*idx+1][1]:
            return 1  # left child goes up
        else:
            return 0  # neither child goes up

    def _sift_down_at(self, idx: int):
        c = self._children_status(idx)
        while c > 0:
            self._swap(idx, 2*idx+c)
            idx = 2*idx+c
            c = self._children_status(idx)

    def _adjust_at(self, idx: int):
        self._sift_up_at(idx)  # does nothing if should not sift up
        self._sift_down_at(idx)  # does nothing if should not sift down

    def insert(self, e, p):
        if e in self.idxof:
            raise ValueError("Duplicate Events are not allowed")
        self.heap.append((e, p))
        self.idxof[e] = len(self.heap) - 1
        self._sift_up_at(len(self.heap)-1)

    def remove(self, e):
        idx = self.idxof[e]
        self._swap(idx, len(self.heap)-1)
        del self.heap[len(self.heap)-1]
        del self.idxof[e]
        if idx == len(self.heap):  # we removed the last element
            return
        self._adjust_at(idx)

    def pull_highest(self, peek=False):
        if len(self.heap) == 0:
            raise IndexError("Cannot Pull from Empty Queue")
        e, p = self.heap[0]
        if not peek:
            self.remove(e)
        return e, p

    def change_priority(self, e, new_p):
        idx = self.idxof[e]
        self.heap[idx] = (e, new_p)
        self._adjust_at(idx)

使用示例

def pq_mod_example():
    pq = PQMod()
    events = [("code", 1), ("sleep", 0), ("eat", 2), ("run", 5), ("drink", 3)]
    for e, p in events:
        pq.insert(e, p)
    print(pq.heap)
    print(pq.pull_highest())
    print(pq.heap)
    pq.change_priority("drink", -1)
    print(pq.heap)

对拍测试的脚本

class PQList:
    """
    Reference implementation of PQMod using a List and linear scan in O(n) time
    """

    def __init__(self):
        self.events = list()  # (event, priority) tuple

    def insert(self, e, p):
        self.events.append((e, p))

    def remove(self, event):
        for e, p in self.events:
            if e == event:
                self.events.remove((e, p))

    def pull_highest(self):
        e_hi, p_hi = self.events[0]
        for e, p in self.events:
            if p < p_hi:
                e_hi, p_hi = e, p
        self.remove(e_hi)
        return e_hi, p_hi

    def change_priority(self, e, new_p):
        for i in range(len(self.events)):
            if self.events[i][0] == e:
                self.events[i] = (e, new_p)


def p_generator(n):
    # Generate distinct priorities
    # When there exists same priorties, the results of PQMod and PQList may differ
    import random
    ps = list(range(n))
    random.shuffle(ps)
    for p in ps:
        yield p


def pq_mod_test(n):
    import random
    pq = PQMod()
    pq_ref = PQList()
    cnt = (2*n)//3
    p_gen = p_generator(100+5*n)  # big enough so that it does not run out
    priorities = [next(p_gen) for _ in range(cnt)]
    events = [str(i) for i in range(cnt)]
    print("events: " + str(events))
    print("priorities: "+str(priorities))
    for i in range(cnt):
        pq.insert(events[i], priorities[i])
        pq_ref.insert(events[i], priorities[i])
    # print(pq.heap)
    while len(pq.heap) < n:
        r = random.random()
        if r < 0.3:
            p = next(p_gen)
            print("inserting event "+str(cnt)+" with priority "+str(p))
            pq.insert(str(cnt), p)
            pq_ref.insert(str(cnt), p)
            events.append(str(cnt))
            cnt += 1
        elif r < 0.8:
            e = random.choice(events)
            new_p = next(p_gen)
            print("changing event "+e+" 's priority to "+str(new_p))
            pq.change_priority(str(e), new_p)
            pq_ref.change_priority(str(e), new_p)
        elif r < 0.9:
            if len(pq.heap) > 0:
                e, p = pq.pull_highest()
                e_ref, p_ref = pq_ref.pull_highest()
                print("pulled highest priority event " +
                      e+" with priority "+str(p))
                try:
                    assert e == e_ref
                    assert p == p_ref
                except AssertionError:
                    print("ERROR: e, e_ref, p, p_ref is")
                    print(e, e_ref)
                    print(p, p_ref)
                    exit()
                events.remove(e)
        else:
            if len(pq.heap) > 0:
                e = random.choice(events)
                print("cancelling event "+e)
                events.remove(e)
                pq.remove(e)
                pq_ref.remove(e)
        # print(pq.heap)
        # print(pq_ref.events)
    print("cleaning...")
    while len(pq.heap) > 0:
        e, p = pq.pull_highest()
        e_ref, p_ref = pq_ref.pull_highest()
        print("pulled highest priority event "+e+" with priority "+str(p))
        try:
            assert e == e_ref
            assert p == p_ref
        except AssertionError:
            print("ERROR: e, e_ref, p, p_ref is")
            print(e, e_ref)
            print(p, p_ref)
            exit()
        # print(pq.heap)
        # print(pq_ref.events)
    print("TEST PASSED")


pq_mod_test(300)


Posted

in

by

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *