heapq — 堆佇列演算法

原始碼: Lib/heapq.py


此模組提供了堆佇列演算法(也稱為優先順序佇列演算法)的實現。

堆是二叉樹,其中每個父節點的值都小於或等於其任何子節點的值。我們將此條件稱為堆不變性。

此實現使用陣列,對於所有 *k*,從零開始計數元素,滿足 heap[k] <= heap[2*k+1]heap[k] <= heap[2*k+2]。 為了方便比較,不存在的元素被認為是無限大的。 堆的一個有趣的屬性是其最小元素始終是根,即 heap[0]

下面的 API 在兩個方面不同於教科書中的堆演算法:(a) 我們使用從零開始的索引。這使得節點索引與其子節點索引之間的關係稍微不那麼明顯,但更適合,因為 Python 使用從零開始的索引。(b) 我們的 pop 方法返回最小的項,而不是最大的項(在教科書中稱為“最小堆”;由於其適合就地排序,“最大堆”在文字中更常見)。

這兩個特性使得可以將堆視為普通的 Python 列表而不會感到意外:heap[0] 是最小的項,並且 heap.sort() 保持堆不變性!

要建立一個堆,請使用初始化為 [] 的列表,或者你可以透過函式 heapify() 將填充的列表轉換為堆。

提供了以下函式

heapq.heappush(heap, item)

將值 *item* 推入 *heap*,並保持堆不變性。

heapq.heappop(heap)

彈出並返回 *heap* 中最小的項,並保持堆不變性。 如果堆為空,則引發 IndexError。 要在不彈出的情況下訪問最小的項,請使用 heap[0]

heapq.heappushpop(heap, item)

將 *item* 推入堆中,然後從 *heap* 中彈出並返回最小的項。 此組合操作的執行效率高於 heappush(),然後再單獨呼叫 heappop()

heapq.heapify(x)

線上性時間內將列表 *x* 就地轉換為堆。

heapq.heapreplace(heap, item)

從 *heap* 中彈出並返回最小的項,並且還推送新的 *item*。 堆的大小不會更改。 如果堆為空,則引發 IndexError

此單步操作比 heappop() 後跟 heappush() 更有效,並且在使用固定大小的堆時可能更合適。 pop/push 組合始終從堆中返回一個元素,並將其替換為 *item*。

返回的值可能大於新增的 *item*。 如果不希望這樣,請考慮改用 heappushpop()。 其 push/pop 組合會返回兩個值中較小的一個,而將較大的值留在堆上。

該模組還提供了三個基於堆的通用函式。

heapq.merge(*iterables, key=None, reverse=False)

將多個已排序的輸入合併為單個已排序的輸出(例如,合併來自多個日誌檔案的時間戳條目)。 返回排序值的 迭代器

類似於 sorted(itertools.chain(*iterables)),但返回一個可迭代物件,不會立即將所有資料拉入記憶體,並假設每個輸入流都已排序(從小到大)。

具有兩個必須指定為關鍵字引數的可選引數。

*key* 指定一個接受一個引數的 鍵函式,該引數用於從每個輸入元素中提取比較鍵。 預設值為 None(直接比較元素)。

*reverse* 是一個布林值。 如果設定為 True,則合併輸入元素,就像每個比較都被反轉一樣。 為了實現類似於 sorted(itertools.chain(*iterables), reverse=True) 的行為,所有可迭代物件都必須從最大到最小排序。

在 3.5 版本中更改: 添加了可選的 *key* 和 *reverse* 引數。

heapq.nlargest(n, iterable, key=None)

返回一個列表,其中包含由 *iterable* 定義的資料集中最大的 *n* 個元素。 如果提供了 *key*,則它指定一個接受一個引數的函式,該函式用於從 *iterable* 中的每個元素中提取比較鍵(例如,key=str.lower)。 等效於:sorted(iterable, key=key, reverse=True)[:n]

heapq.nsmallest(n, iterable, key=None)

返回一個列表,其中包含由iterable定義的資料集中最小的 n 個元素。如果提供了 key,則它指定一個單引數函式,用於從 iterable 中的每個元素提取比較鍵(例如,key=str.lower)。等效於: sorted(iterable, key=key)[:n]

後兩個函式對於較小的 n 值效果最佳。對於較大的值,使用 sorted() 函式效率更高。此外,當 n==1 時,使用內建的 min()max() 函式效率更高。如果需要重複使用這些函式,請考慮將可迭代物件轉換為實際的堆。

基本示例

可以透過將所有值壓入堆中,然後一次彈出最小值來實現 堆排序

>>> def heapsort(iterable):
...     h = []
...     for value in iterable:
...         heappush(h, value)
...     return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

這類似於 sorted(iterable),但與 sorted() 不同,此實現是不穩定的。

堆元素可以是元組。這對於將比較值(例如任務優先順序)與正在跟蹤的主記錄一起分配非常有用。

>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')

優先順序佇列實現說明

優先順序佇列是堆的常見用途,它帶來了一些實現挑戰。

  • 排序穩定性:如何使兩個具有相同優先順序的任務按照它們最初新增的順序返回?

  • 如果優先順序相等且任務沒有預設比較順序,則元組比較對於(優先順序,任務)對會中斷。

  • 如果任務的優先順序發生變化,如何將其移動到堆中的新位置?

  • 或者,如果需要刪除待處理的任務,如何找到它並將其從佇列中移除?

前兩個挑戰的解決方案是將條目儲存為包含優先順序、條目計數和任務的 3 元素列表。條目計數充當決勝器,以便兩個具有相同優先順序的任務按照新增的順序返回。並且由於沒有兩個條目計數相同,因此元組比較永遠不會嘗試直接比較兩個任務。

解決不可比較任務問題的另一個解決方案是建立一個包裝類,該類忽略任務項並且僅比較優先順序欄位。

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)

其餘的挑戰圍繞著查詢待處理的任務以及更改其優先順序或完全刪除它。可以使用指向佇列中條目的字典來完成查詢任務。

刪除條目或更改其優先順序更加困難,因為它會破壞堆結構的不變數。因此,一個可能的解決方案是將條目標記為已刪除,並新增一個具有修訂優先順序的新條目。

pq = []                         # list of entries arranged in a heap
entry_finder = {}               # mapping of tasks to entries
REMOVED = '<removed-task>'      # placeholder for a removed task
counter = itertools.count()     # unique sequence count

def add_task(task, priority=0):
    'Add a new task or update the priority of an existing task'
    if task in entry_finder:
        remove_task(task)
    count = next(counter)
    entry = [priority, count, task]
    entry_finder[task] = entry
    heappush(pq, entry)

def remove_task(task):
    'Mark an existing task as REMOVED.  Raise KeyError if not found.'
    entry = entry_finder.pop(task)
    entry[-1] = REMOVED

def pop_task():
    'Remove and return the lowest priority task. Raise KeyError if empty.'
    while pq:
        priority, count, task = heappop(pq)
        if task is not REMOVED:
            del entry_finder[task]
            return task
    raise KeyError('pop from an empty priority queue')

理論

堆是陣列,對於所有 k,從 0 開始計數元素,a[k] <= a[2*k+1]a[k] <= a[2*k+2]。為了比較起見,不存在的元素被認為是無限的。堆的有趣特性是 a[0] 始終是其最小的元素。

上面的奇怪不變數旨在成為錦標賽的有效記憶體表示形式。下面的數字是 k,而不是 a[k]

                               0

              1                                 2

      3               4                5               6

  7       8       9       10      11      12      13      14

15 16   17 18   19 20   21 22   23 24   25 26   27 28   29 30

在上面的樹中,每個單元格 k 都位於 2*k+12*k+2 之上。在我們在體育比賽中看到的常見二元錦標賽中,每個單元格都是它所處的兩個單元格的獲勝者,我們可以沿著樹向下追溯獲勝者,以檢視他/她所有的對手。但是,在許多此類錦標賽的計算機應用中,我們不需要追溯獲勝者的歷史。為了更節省記憶體,當獲勝者被提升時,我們嘗試用較低級別的其他東西替換它,規則變成一個單元格和它所在的兩個單元格包含三個不同的專案,但頂部單元格“勝出” 位於頂部的兩個單元格之上。

如果始終保護此堆不變數,則索引 0 顯然是總冠軍。刪除它並找到“下一個”獲勝者的最簡單演算法方法是將一些失敗者(比如上圖中的單元格 30)移動到 0 位置,然後將這個新的 0 下滲到樹中,交換值,直到重新建立不變數。這顯然是對樹中專案總數的對數。透過迭代所有專案,您將獲得 O(n log n) 排序。

這種排序的一個優點是,只要插入的專案不“優於”您提取的最後一個 0'th 元素,您就可以在排序進行時有效地插入新專案。這在模擬環境中特別有用,在模擬環境中,樹儲存所有傳入的事件,“獲勝”條件表示最小的計劃時間。當一個事件計劃其他事件執行時,它們會被安排到未來,因此它們可以輕鬆進入堆中。因此,堆是實現排程程式的好結構(這就是我用於 MIDI 音序器的原因 :-))。

已經對實現排程程式的各種結構進行了廣泛的研究,堆在這方面很不錯,因為它們的速度相當快,速度幾乎是恆定的,而且最壞的情況與平均情況沒有太大差異。但是,還有其他更有效的表示形式,但最壞的情況可能很糟糕。

堆在大磁碟排序中也非常有用。您很可能都知道,大排序意味著產生“執行”(即預先排序的序列,其大小通常與 CPU 記憶體量有關),然後對這些執行進行合併傳遞,合併通常組織得非常巧妙 [1]。初始排序產生儘可能長的執行非常重要。錦標賽是實現這一目標的好方法。如果使用所有可用於儲存錦標賽的記憶體,替換並滲入適合當前執行的專案,您將產生執行,對於隨機輸入,執行的大小是記憶體的兩倍,而對於模糊有序的輸入,執行會更好得多。

此外,如果您在磁碟上輸出第 0 個專案並獲得一個可能不適合當前錦標賽的輸入(因為該值“勝過”最後一個輸出值),它就無法放入堆中,因此堆的大小會減小。可以巧妙地立即將釋放的記憶體用於逐步構建第二個堆,第二個堆的增長速度與第一個堆的熔化速度完全相同。當第一個堆完全消失時,您將切換堆並開始新的執行。巧妙且非常有效!

總而言之,堆是有用的記憶體結構,需要了解。我在一些應用程式中使用它們,我認為保留一個“堆”模組是件好事。 :-)

腳註