heapq
— 堆佇列演算法¶
原始碼: Lib/heapq.py
此模組提供了堆佇列演算法的一種實現,也稱為優先佇列演算法。
最小堆(Min-heap)是二叉樹,其中每個父節點的值都小於或等於其任何子節點的值。我們將此條件稱為堆不變數。
對於最小堆,此實現使用列表,其中對於所有存在的比較元素*k*,heap[k] <= heap[2*k+1]
並且 heap[k] <= heap[2*k+2]
。元素從零開始計數。最小堆的一個有趣特性是,其最小的元素始終是根,即 heap[0]
。
最大堆(Max-heap)滿足相反的不變數:每個父節點的值都*大於*其任何子節點的值。它們被實現為列表,其中對於所有存在的比較元素*k*,maxheap[2*k+1] <= maxheap[k]
並且 maxheap[2*k+2] <= maxheap[k]
。根,即 maxheap[0]
,包含*最大*的元素;heap.sort(reverse=True)
維護了最大堆的不變數。
heapq
API 在兩個方面與教科書中的堆演算法不同:(a) 我們使用基於零的索引。這使得節點索引與其子節點索引之間的關係不那麼明顯,但更適合 Python,因為它使用基於零的索引。(b) 教科書通常關注最大堆,因為它們適合原地排序。我們的實現偏向於最小堆,因為它們更好地對應 Python 的 列表
。
這兩個方面使得可以將堆視為一個普通的 Python 列表,而不會有任何意外:heap[0]
是最小的項,並且 heap.sort()
維護了堆不變數!
與 list.sort()
類似,此實現僅使用 <
運算子進行比較,無論是最小堆還是最大堆。
在下面的 API 和本文件中,未限定的術語*堆*通常指最小堆。最大堆的 API 以 _max
字尾命名。
要建立一個堆,可以使用初始化為 []
的列表,或者使用 heapify()
或 heapify_max()
函式將現有列表轉換為最小堆或最大堆。
為最小堆提供了以下函式
- heapq.heappush(heap, item)¶
將值 *item* 推入*堆*中,並保持最小堆不變數。
- heapq.heappop(heap)¶
從*堆*中彈出並返回最小的項,並保持最小堆不變數。如果堆為空,則引發
IndexError
。要訪問最小的項而不彈出它,請使用heap[0]
。
- heapq.heappushpop(heap, item)¶
將 *item* 推入堆中,然後從*堆*中彈出並返回最小的項。這個組合操作比先呼叫
heappush()
再單獨呼叫heappop()
更高效。
- heapq.heapify(x)¶
以線性時間將列表 *x* 原地轉換為最小堆。
- heapq.heapreplace(heap, item)¶
從*堆*中彈出並返回最小的項,同時推入新的*item*。堆的大小不變。如果堆為空,則引發
IndexError
。這個一步到位的操作比先
heappop()
後heappush()
更高效,並且在使用固定大小的堆時可能更合適。彈出/推入的組合總是從堆中返回一個元素並用 *item* 替換它。返回的值可能比新增的 *item* 更大。如果不希望這樣,請考慮改用
heappushpop()
。它的推入/彈出組合會返回兩個值中較小的一個,將較大的值留在堆上。
對於最大堆,提供了以下函式
- heapq.heapify_max(x)¶
以線性時間將列表 *x* 原地轉換為最大堆。
在 3.14 版本加入。
- heapq.heappush_max(heap, item)¶
將值 *item* 推入最大堆 *heap* 中,並保持最大堆不變數。
在 3.14 版本加入。
- heapq.heappop_max(heap)¶
從最大堆 *heap* 中彈出並返回最大的項,並保持最大堆不變數。如果最大堆為空,則引發
IndexError
。要訪問最大的項而不彈出它,請使用maxheap[0]
。在 3.14 版本加入。
- heapq.heappushpop_max(heap, item)¶
將 *item* 推入最大堆 *heap* 中,然後從 *heap* 中彈出並返回最大的項。這個組合操作比先呼叫
heappush_max()
再單獨呼叫heappop_max()
更高效。在 3.14 版本加入。
- heapq.heapreplace_max(heap, item)¶
從最大堆 *heap* 中彈出並返回最大的項,同時推入新的 *item*。最大堆的大小不變。如果最大堆為空,則引發
IndexError
。返回的值可能比新增的 *item* 更小。有關詳細用法說明,請參考類似的函式
heapreplace()
。在 3.14 版本加入。
該模組還提供了三個基於堆的通用函式。
- heapq.merge(*iterables, key=None, reverse=False)¶
將多個已排序的輸入合併為單個已排序的輸出(例如,合併來自多個日誌檔案的時間戳條目)。返回一個包含已排序值的迭代器。
類似於
sorted(itertools.chain(*iterables))
,但它返回一個可迭代物件,不會一次性將所有資料載入到記憶體中,並假定每個輸入流都已經排序(從小到大)。有兩個可選引數,必須以關鍵字引數的形式指定。
key 指定一個接受一個引數的鍵函式,用於從每個輸入元素中提取比較鍵。預設值為
None
(直接比較元素)。reverse 是一個布林值。如果設定為
True
,則輸入元素會按每次比較都反向的方式合併。為了達到與sorted(itertools.chain(*iterables), reverse=True)
類似的行為,所有可迭代物件必須從大到小排序。在 3.5 版本發生變更:添加了可選的 key 和 reverse 引數。
- heapq.nlargest(n, iterable, key=None)¶
從 *iterable* 定義的資料集中返回一個包含 *n* 個最大元素的列表。如果提供了 *key*,它指定了一個接受一個引數的函式,用於從 *iterable* 中的每個元素提取比較鍵(例如,
key=str.lower
)。等價於:sorted(iterable, key=key, reverse=True)[:n]
。
- heapq.nsmallest(n, iterable, key=None)¶
從 *iterable* 定義的資料集中返回一個包含 *n* 個最小元素的列表。如果提供了 *key*,它指定了一個接受一個引數的函式,用於從 *iterable* 中的每個元素提取比較鍵(例如,
key=str.lower
)。等價於:sorted(iterable, key=key)[:n]
。
後兩個函式在 *n* 值較小時效能最佳。對於較大的值,使用 sorted()
函式更高效。此外,當 n==1
時,使用內建的 min()
和 max()
函式更高效。如果需要重複使用這些函式,可以考慮將可迭代物件轉換為一個真正的堆。
基本示例¶
可以透過將所有值推入堆中,然後一次一個地彈出最小值來實現堆排序
>>> def heapsort(iterable):
... h = []
... for value in iterable:
... heappush(h, value)
... return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
這類似於 sorted(iterable)
,但與 sorted()
不同,此實現是不穩定的。
堆元素可以是元組。這對於在跟蹤主記錄的同時分配比較值(例如任務優先順序)非常有用
>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')
優先佇列實現說明¶
優先佇列是堆的常見用途,它帶來了幾個實現上的挑戰
排序穩定性:如何讓兩個具有相同優先順序的任務按照它們最初新增的順序返回?
如果優先順序相等且任務沒有預設的比較順序,元組比較對於 (priority, task) 對會失敗。
如果任務的優先順序發生變化,如何將其移動到堆中的新位置?
或者如果需要刪除一個待處理的任務,如何找到並將其從佇列中移除?
前兩個挑戰的一個解決方案是將條目儲存為包含優先順序、條目計數和任務的 3 元素列表。條目計數作為一個決勝局,這樣具有相同優先順序的兩個任務會按它們新增的順序返回。並且由於沒有兩個條目計數是相同的,元組比較永遠不會嘗試直接比較兩個任務。
對於不可比較任務問題的另一個解決方案是建立一個包裝類,它忽略任務項,只比較優先順序欄位
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PrioritizedItem:
priority: int
item: Any=field(compare=False)
剩下的挑戰圍繞著找到一個待處理的任務並更改其優先順序或完全移除它。可以透過一個指向佇列中條目的字典來找到任務。
移除條目或更改其優先順序更困難,因為它會破壞堆結構的不變數。因此,一個可能的解決方案是將該條目標記為已移除,並新增一個具有修改後優先順序的新條目
pq = [] # list of entries arranged in a heap
entry_finder = {} # mapping of tasks to entries
REMOVED = '<removed-task>' # placeholder for a removed task
counter = itertools.count() # unique sequence count
def add_task(task, priority=0):
'Add a new task or update the priority of an existing task'
if task in entry_finder:
remove_task(task)
count = next(counter)
entry = [priority, count, task]
entry_finder[task] = entry
heappush(pq, entry)
def remove_task(task):
'Mark an existing task as REMOVED. Raise KeyError if not found.'
entry = entry_finder.pop(task)
entry[-1] = REMOVED
def pop_task():
'Remove and return the lowest priority task. Raise KeyError if empty.'
while pq:
priority, count, task = heappop(pq)
if task is not REMOVED:
del entry_finder[task]
return task
raise KeyError('pop from an empty priority queue')
理論¶
堆是陣列,對於所有的 *k*,都滿足 a[k] <= a[2*k+1]
和 a[k] <= a[2*k+2]
,元素從 0 開始計數。為了比較,不存在的元素被認為是無限大。堆的一個有趣特性是 a[0]
永遠是其最小的元素。
上面這個奇怪的不變數旨在成為一種高效的錦標賽記憶體表示。下面的數字是 *k*,而不是 a[k]
0
1 2
3 4 5 6
7 8 9 10 11 12 13 14
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
在上面的樹中,每個單元格 *k* 都位於 2*k+1
和 2*k+2
之上。在體育比賽中常見的二元錦標賽中,每個單元格是它上面兩個單元格的勝者,我們可以沿著樹追溯勝者,看到他/她所有的對手。然而,在許多此類錦標賽的計算機應用中,我們不需要追溯勝者的歷史。為了更節省記憶體,當一個勝者被晉升時,我們嘗試用一個較低級別的其他東西來替換它,規則就變成了:一個單元格和它上面的兩個單元格包含三個不同的項,但頂部的單元格“勝過”下面的兩個單元格。
如果這個堆不變數始終保持,那麼索引 0 顯然是總冠軍。移除它並找到“下一個”冠軍的最簡單的演算法方法是,將某個失敗者(比如上圖中的單元格 30)移動到位置 0,然後將這個新的 0 向下滲透到樹中,交換值,直到不變數重新建立。這顯然在樹中專案總數上是對數級的。透過遍歷所有專案,你會得到一個 *O*(*n* log *n*) 的排序。
這種排序的一個很好的特性是,你可以在排序進行中高效地插入新專案,前提是插入的專案不比你提取的最後一個第 0 個元素“更好”。這在模擬場景中特別有用,其中樹持有所有傳入的事件,而“獲勝”條件意味著最小的排程時間。當一個事件排程其他事件執行時,它們被安排在未來,所以它們可以輕鬆地進入堆。因此,堆是實現排程器的好結構(我就是用它來做我的 MIDI 音序器 :-))。
實現排程器的各種結構已經被廣泛研究,而堆在這方面很不錯,因為它們速度相當快,速度幾乎是恆定的,並且最壞情況與平均情況相差不大。然而,還有其他表示方式在整體上更高效,但其最壞情況可能會非常糟糕。
堆在大型磁碟排序中也非常有用。你們很可能都知道,大型排序意味著要產生“歸併段”(它們是預排序的序列,其大小通常與 CPU 記憶體量有關),然後是對這些歸併段進行合併遍(merge pass),這種合併通常被組織得非常巧妙[1]。非常重要的一點是,初始排序要產生儘可能長的歸併段。錦標賽是實現這一目標的好方法。如果使用所有可用記憶體來舉行一場錦標賽,你替換並滲透那些恰好適合當前歸併段的專案,你將產生對於隨機輸入而言是記憶體大小兩倍的歸併段,對於模糊有序的輸入則更好。
此外,如果你將第 0 項輸出到磁碟,並得到一個可能不適合當前錦標賽的輸入(因為該值“勝過”最後一個輸出值),它就不能放入堆中,所以堆的大小會減小。釋放的記憶體可以立即被巧妙地重用,逐步構建第二個堆,它以與第一個堆融化完全相同的速度增長。當第一個堆完全消失時,你切換堆並開始一個新的歸併段。巧妙且相當有效!
總之,堆是值得了解的有用的記憶體結構。我在一些應用程式中使用了它們,我認為手邊有一個“heap”模組是件好事。:-)
腳註