itertools
— 用於高效迴圈的迭代器建立函式¶
該模組實現了一些受 APL、Haskell 和 SML 構造啟發的 迭代器 構建塊。每個構建塊都以適合 Python 的形式進行了重鑄。
該模組標準化了一組核心的快速、記憶體高效的工具,這些工具本身或組合使用都很有用。它們共同形成了一個“迭代器代數”,使得可以用純 Python 簡潔高效地構建專門的工具。
例如,SML 提供了一個製表工具:tabulate(f)
,它產生一個序列 f(0), f(1), ...
。在 Python 中,可以透過組合 map()
和 count()
來形成 map(f, count())
來實現相同的效果。
無限迭代器
迭代器 |
引數 |
結果 |
示例 |
---|---|---|---|
[start[, step]] |
start, start+step, start+2*step, … |
|
|
p |
p0, p1, … plast, p0, p1, … |
|
|
elem [,n] |
elem, elem, elem, … 無限次或最多 n 次 |
|
在最短輸入序列上終止的迭代器
迭代器 |
引數 |
結果 |
示例 |
---|---|---|---|
p [,func] |
p0, p0+p1, p0+p1+p2, … |
|
|
p, n |
(p0, p1, …, p_n-1), … |
|
|
p, q, … |
p0, p1, … plast, q0, q1, … |
|
|
iterable |
p0, p1, … plast, q0, q1, … |
|
|
data, selectors |
(d[0] if s[0]), (d[1] if s[1]), … |
|
|
predicate, seq |
seq[n], seq[n+1], 從謂詞失敗時開始 |
|
|
predicate, seq |
seq 中 predicate(elem) 失敗的元素 |
|
|
iterable[, key] |
按 key(v) 的值分組的子迭代器 |
|
|
seq, [start,] stop [, step] |
來自 seq[start:stop:step] 的元素 |
|
|
iterable |
(p[0], p[1]), (p[1], p[2]) |
|
|
func, seq |
func(*seq[0]), func(*seq[1]), … |
|
|
predicate, seq |
seq[0], seq[1], 直到謂詞失敗 |
|
|
it, n |
it1, it2, … itn 將一個迭代器分成 n 個 |
||
p, q, … |
(p[0], q[0]), (p[1], q[1]), … |
|
組合迭代器
迭代器 |
引數 |
結果 |
---|---|---|
p, q, … [repeat=1] |
笛卡爾積,等效於巢狀的 for 迴圈 |
|
p[, r] |
r 長度的元組,所有可能的排序,沒有重複元素 |
|
p, r |
r 長度的元組,按排序順序,沒有重複元素 |
|
p, r |
r 長度的元組,按排序順序,帶有重複元素 |
示例 |
結果 |
---|---|
|
|
|
|
|
|
|
|
Itertool 函式¶
以下函式都構造並返回迭代器。有些函式提供無限長度的流,因此它們只能由截斷流的函式或迴圈訪問。
- itertools.accumulate(iterable[, function, *, initial=None])¶
建立一個返回累積和或來自其他二元函式的累積結果的迭代器。
function 預設為加法。function 應該接受兩個引數,一個累積的總數和一個來自 iterable 的值。
如果提供 initial 值,則累積將從該值開始,並且輸出將比輸入 iterable 多一個元素。
大致等效於
def accumulate(iterable, function=operator.add, *, initial=None): 'Return running totals' # accumulate([1,2,3,4,5]) → 1 3 6 10 15 # accumulate([1,2,3,4,5], initial=100) → 100 101 103 106 110 115 # accumulate([1,2,3,4,5], operator.mul) → 1 2 6 24 120 iterator = iter(iterable) total = initial if initial is None: try: total = next(iterator) except StopIteration: return yield total for element in iterator: total = function(total, element) yield total
要計算執行最小值,請將 function 設定為
min()
。對於執行最大值,請將 function 設定為max()
。或者對於執行乘積,請將 function 設定為operator.mul()
。要構建攤銷表,請累積利息並應用付款>>> data = [3, 4, 6, 2, 1, 9, 0, 7, 5, 8] >>> list(accumulate(data, max)) # running maximum [3, 4, 6, 6, 6, 9, 9, 9, 9, 9] >>> list(accumulate(data, operator.mul)) # running product [3, 12, 72, 144, 144, 1296, 0, 0, 0, 0] # Amortize a 5% loan of 1000 with 10 annual payments of 90 >>> update = lambda balance, payment: round(balance * 1.05) - payment >>> list(accumulate(repeat(90, 10), update, initial=1_000)) [1000, 960, 918, 874, 828, 779, 728, 674, 618, 559, 497]
有關僅返回最終累積值的類似函式,請參閱
functools.reduce()
。在版本 3.2 中新增。
在版本 3.3 中更改: 添加了可選的 function 引數。
在版本 3.8 中更改: 添加了可選的 initial 引數。
- itertools.batched(iterable, n, *, strict=False)¶
將來自 iterable 的資料分批處理成長度為 n 的元組。最後一個批次可能比 n 短。
如果 strict 為 true,當最後一批的長度小於 n 時,會引發
ValueError
異常。迴圈遍歷輸入的可迭代物件,並將資料累積到大小為 n 的元組中。輸入以惰性方式消耗,只消耗足夠填滿一批的資料。一旦批次填滿或輸入的可迭代物件耗盡,就會立即產生結果。
>>> flattened_data = ['roses', 'red', 'violets', 'blue', 'sugar', 'sweet'] >>> unflattened = list(batched(flattened_data, 2)) >>> unflattened [('roses', 'red'), ('violets', 'blue'), ('sugar', 'sweet')]
大致等效於
def batched(iterable, n, *, strict=False): # batched('ABCDEFG', 3) → ABC DEF G if n < 1: raise ValueError('n must be at least one') iterator = iter(iterable) while batch := tuple(islice(iterator, n)): if strict and len(batch) != n: raise ValueError('batched(): incomplete batch') yield batch
在 3.12 版本中新增。
在 3.13 版本中更改: 增加了 strict 選項。
- itertools.chain(*iterables)¶
建立一個迭代器,該迭代器從第一個可迭代物件返回元素,直到它耗盡,然後繼續處理下一個可迭代物件,直到所有可迭代物件都耗盡。這會將多個數據源組合成一個迭代器。大致等效於
def chain(*iterables): # chain('ABC', 'DEF') → A B C D E F for iterable in iterables: yield from iterable
- classmethod chain.from_iterable(iterable)¶
chain()
的替代建構函式。從一個惰性求值的可迭代引數中獲取鏈式輸入。大致等效於def from_iterable(iterables): # chain.from_iterable(['ABC', 'DEF']) → A B C D E F for iterable in iterables: yield from iterable
- itertools.combinations(iterable, r)¶
返回來自輸入 iterable 的元素長度為 r 的子序列。
輸出是
product()
的子序列,僅保留 iterable 的子序列的條目。輸出的長度由math.comb()
給出,它計算n! / r! / (n - r)!
,當0 ≤ r ≤ n
時,或當r > n
時為零。組合元組按照輸入 iterable 的順序以字典序發出。如果輸入 iterable 已排序,則輸出元組將按排序順序生成。
元素根據它們的位置而不是它們的值被視為唯一。如果輸入元素是唯一的,則每個組合中都不會有重複的值。
大致等效於
def combinations(iterable, r): # combinations('ABCD', 2) → AB AC AD BC BD CD # combinations(range(4), 3) → 012 013 023 123 pool = tuple(iterable) n = len(pool) if r > n: return indices = list(range(r)) yield tuple(pool[i] for i in indices) while True: for i in reversed(range(r)): if indices[i] != i + n - r: break else: return indices[i] += 1 for j in range(i+1, r): indices[j] = indices[j-1] + 1 yield tuple(pool[i] for i in indices)
- itertools.combinations_with_replacement(iterable, r)¶
返回來自輸入 iterable 的元素長度為 r 的子序列,允許單個元素重複多次。
輸出是
product()
的子序列,僅保留 iterable 的子序列(具有可能的重複元素)的條目。返回的子序列的數量為(n + r - 1)! / r! / (n - 1)!
,當n > 0
時。組合元組按照輸入 iterable 的順序以字典序發出。如果輸入 iterable 已排序,則輸出元組將按排序順序生成。
元素根據它們的位置而不是它們的值被視為唯一。如果輸入元素是唯一的,則生成的組合也將是唯一的。
大致等效於
def combinations_with_replacement(iterable, r): # combinations_with_replacement('ABC', 2) → AA AB AC BB BC CC pool = tuple(iterable) n = len(pool) if not n and r: return indices = [0] * r yield tuple(pool[i] for i in indices) while True: for i in reversed(range(r)): if indices[i] != n - 1: break else: return indices[i:] = [indices[i] + 1] * (r - i) yield tuple(pool[i] for i in indices)
在 3.1 版本中新增。
- itertools.compress(data, selectors)¶
建立一個迭代器,該迭代器從 data 返回元素,其中 selectors 中對應的元素為 true。當 data 或 selectors 可迭代物件耗盡時停止。大致等效於
def compress(data, selectors): # compress('ABCDEF', [1,0,1,0,1,1]) → A C E F return (datum for datum, selector in zip(data, selectors) if selector)
在 3.1 版本中新增。
- itertools.count(start=0, step=1)¶
建立一個迭代器,該迭代器返回以 start 開頭的均勻間隔的值。可以與
map()
一起使用以生成連續的資料點,或者與zip()
一起使用以新增序列號。大致等效於def count(start=0, step=1): # count(10) → 10 11 12 13 14 ... # count(2.5, 0.5) → 2.5 3.0 3.5 ... n = start while True: yield n n += step
當使用浮點數計數時,有時可以透過替換乘法程式碼來獲得更好的精度,例如:
(start + step * i for i in count())
。在 3.1 版本中更改: 增加了 step 引數並允許非整數引數。
- itertools.cycle(iterable)¶
建立一個迭代器,該迭代器從 iterable 返回元素,並儲存每個元素的副本。當可迭代物件耗盡時,從儲存的副本返回元素。無限重複。大致等效於
def cycle(iterable): # cycle('ABCD') → A B C D A B C D A B C D ... saved = [] for element in iterable: yield element saved.append(element) while saved: for element in saved: yield element
此迭代工具可能需要大量輔助儲存空間(取決於可迭代物件的長度)。
- itertools.dropwhile(predicate, iterable)¶
建立一個迭代器,當 predicate 為 true 時,該迭代器從 iterable 中刪除元素,然後返回每個元素。大致等效於
def dropwhile(predicate, iterable): # dropwhile(lambda x: x<5, [1,4,6,3,8]) → 6 3 8 iterator = iter(iterable) for x in iterator: if not predicate(x): yield x break for x in iterator: yield x
請注意,在謂詞首次變為 false 之前,這不會產生 任何 輸出,因此此迭代工具的啟動時間可能很長。
- itertools.filterfalse(predicate, iterable)¶
建立一個迭代器,該迭代器從 iterable 中篩選元素,僅返回那些 predicate 返回 false 值的元素。如果 predicate 是
None
,則返回那些為 false 的專案。大致等效於def filterfalse(predicate, iterable): # filterfalse(lambda x: x<5, [1,4,6,3,8]) → 6 8 if predicate is None: predicate = bool for x in iterable: if not predicate(x): yield x
- itertools.groupby(iterable, key=None)¶
建立一個迭代器,該迭代器從 iterable 返回連續的鍵和組。key 是一個函式,用於計算每個元素的鍵值。如果未指定或為
None
,則 key 預設為恆等函式並返回未更改的元素。通常,可迭代物件需要已按相同的鍵函式排序。groupby()
的操作類似於 Unix 中的uniq
過濾器。它每次鍵函式的值更改時生成一箇中斷或一個新組(這就是為什麼通常需要使用相同的鍵函式對資料進行排序的原因)。這種行為與 SQL 的 GROUP BY 不同,後者會聚合公共元素,而不管它們的輸入順序如何。返回的組本身是一個迭代器,它與
groupby()
共享底層可迭代物件。由於源是共享的,因此當groupby()
物件前進時,之前的組將不再可見。因此,如果以後需要該資料,則應將其儲存為列表groups = [] uniquekeys = [] data = sorted(data, key=keyfunc) for k, g in groupby(data, keyfunc): groups.append(list(g)) # Store group iterator as a list uniquekeys.append(k)
groupby()
大致等效於def groupby(iterable, key=None): # [k for k, g in groupby('AAAABBBCCDAABBB')] → A B C D A B # [list(g) for k, g in groupby('AAAABBBCCD')] → AAAA BBB CC D keyfunc = (lambda x: x) if key is None else key iterator = iter(iterable) exhausted = False def _grouper(target_key): nonlocal curr_value, curr_key, exhausted yield curr_value for curr_value in iterator: curr_key = keyfunc(curr_value) if curr_key != target_key: return yield curr_value exhausted = True try: curr_value = next(iterator) except StopIteration: return curr_key = keyfunc(curr_value) while not exhausted: target_key = curr_key curr_group = _grouper(target_key) yield curr_key, curr_group if curr_key == target_key: for _ in curr_group: pass
- itertools.islice(iterable, stop)¶
- itertools.islice(iterable, start, stop[, step])
建立一個迭代器,該迭代器返回來自可迭代物件的選定元素。工作方式類似於序列切片,但不支援 start、stop 或 step 的負值。
如果 start 為零或
None
,則迭代從零開始。否則,將跳過可迭代物件中的元素,直到到達 start。如果 stop 為
None
,則迭代會一直進行到輸入耗盡為止(如果會耗盡)。否則,它會在指定位置停止。如果 step 為
None
,則步長預設為 1。除非 step 設定為大於 1 的值,從而跳過一些項,否則會連續返回元素。大致等效於
def islice(iterable, *args): # islice('ABCDEFG', 2) → A B # islice('ABCDEFG', 2, 4) → C D # islice('ABCDEFG', 2, None) → C D E F G # islice('ABCDEFG', 0, None, 2) → A C E G s = slice(*args) start = 0 if s.start is None else s.start stop = s.stop step = 1 if s.step is None else s.step if start < 0 or (stop is not None and stop < 0) or step <= 0: raise ValueError indices = count() if stop is None else range(max(start, stop)) next_i = start for i, element in zip(indices, iterable): if i == next_i: yield element next_i += step
如果輸入是一個迭代器,則完全消耗 islice 會使輸入迭代器前進
max(start, stop)
步,而與 step 值無關。
- itertools.pairwise(iterable)¶
返回從輸入 iterable 中提取的連續重疊的對。
輸出迭代器中的 2 元組的數量將比輸入項的數量少一個。如果輸入可迭代物件的值少於兩個,則輸出為空。
大致等效於
def pairwise(iterable): # pairwise('ABCDEFG') → AB BC CD DE EF FG iterator = iter(iterable) a = next(iterator, None) for b in iterator: yield a, b a = b
在 3.10 版本中新增。
- itertools.permutations(iterable, r=None)¶
返回來自 iterable 的長度為 r 的連續元素排列。
如果未指定 r 或 r 為
None
,則 r 預設為 iterable 的長度,並生成所有可能的完整長度排列。輸出是
product()
的子序列,其中已過濾掉具有重複元素的條目。輸出的長度由math.perm()
給出,它計算n! / (n - r)!
(當0 ≤ r ≤ n
時)或零 (當r > n
時)。排列元組根據輸入 iterable 的順序以字典順序發出。如果輸入 iterable 已排序,則輸出元組將按排序順序生成。
元素被視為基於其位置而不是其值的唯一元素。如果輸入元素是唯一的,則排列中不會有重複的值。
大致等效於
def permutations(iterable, r=None): # permutations('ABCD', 2) → AB AC AD BA BC BD CA CB CD DA DB DC # permutations(range(3)) → 012 021 102 120 201 210 pool = tuple(iterable) n = len(pool) r = n if r is None else r if r > n: return indices = list(range(n)) cycles = list(range(n, n-r, -1)) yield tuple(pool[i] for i in indices[:r]) while n: for i in reversed(range(r)): cycles[i] -= 1 if cycles[i] == 0: indices[i:] = indices[i+1:] + indices[i:i+1] cycles[i] = n - i else: j = cycles[i] indices[i], indices[-j] = indices[-j], indices[i] yield tuple(pool[i] for i in indices[:r]) break else: return
- itertools.product(*iterables, repeat=1)¶
輸入可迭代物件的笛卡爾積。
大致等同於生成器表示式中的巢狀 for 迴圈。例如,
product(A, B)
的返回值與((x,y) for x in A for y in B)
相同。巢狀迴圈像里程錶一樣迴圈,最右邊的元素在每次迭代時前進。此模式建立了字典順序,因此如果輸入的迭代器已排序,則產品元組將按排序順序發出。
要計算可迭代物件與其自身的乘積,請使用可選的 repeat 關鍵字引數指定重複次數。例如,
product(A, repeat=4)
與product(A, A, A, A)
的含義相同。此函式大致等效於以下程式碼,但實際實現不會在記憶體中構建中間結果
def product(*iterables, repeat=1): # product('ABCD', 'xy') → Ax Ay Bx By Cx Cy Dx Dy # product(range(2), repeat=3) → 000 001 010 011 100 101 110 111 if repeat < 0: raise ValueError('repeat argument cannot be negative') pools = [tuple(pool) for pool in iterables] * repeat result = [[]] for pool in pools: result = [x+[y] for x in result for y in pool] for prod in result: yield tuple(prod)
在
product()
執行之前,它會完全消耗輸入的可迭代物件,將值池儲存在記憶體中以生成產品。因此,它僅適用於有限的輸入。
- itertools.repeat(object[, times])¶
建立一個迭代器,該迭代器會一次又一次地返回 object。無限期執行,除非指定了 times 引數。
大致等效於
def repeat(object, times=None): # repeat(10, 3) → 10 10 10 if times is None: while True: yield object else: for i in range(times): yield object
repeat 的常見用法是為 map 或 zip 提供常量值流
>>> list(map(pow, range(10), repeat(2))) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
- itertools.starmap(function, iterable)¶
建立一個迭代器,該迭代器使用從 iterable 獲取的引數計算 function。當引數引數已“預先壓縮”到元組中時,將使用此方法而不是
map()
。map()
和starmap()
之間的區別類似於function(a,b)
和function(*c)
之間的區別。大致等效於def starmap(function, iterable): # starmap(pow, [(2,5), (3,2), (10,3)]) → 32 9 1000 for args in iterable: yield function(*args)
- itertools.takewhile(predicate, iterable)¶
建立一個迭代器,只要 predicate 為真,該迭代器就會從 iterable 返回元素。大致等效於
def takewhile(predicate, iterable): # takewhile(lambda x: x<5, [1,4,6,3,8]) → 1 4 for x in iterable: if not predicate(x): break yield x
注意,第一個未透過謂詞條件的元素將從輸入迭代器中消耗掉,並且無法訪問該元素。如果應用程式希望在 takewhile 執行到耗盡後進一步消耗輸入迭代器,這可能會成為一個問題。要解決此問題,請考慮改用 more-itertools before_and_after()。
- itertools.tee(iterable, n=2)¶
從單個可迭代物件返回 n 個獨立的迭代器。
大致等效於
def tee(iterable, n=2): if n < 0: raise ValueError if n == 0: return () iterator = _tee(iterable) result = [iterator] for _ in range(n - 1): result.append(_tee(iterator)) return tuple(result) class _tee: def __init__(self, iterable): it = iter(iterable) if isinstance(it, _tee): self.iterator = it.iterator self.link = it.link else: self.iterator = it self.link = [None, None] def __iter__(self): return self def __next__(self): link = self.link if link[1] is None: link[0] = next(self.iterator) link[1] = [None, None] value, self.link = link return value
當輸入 iterable 已經是 tee 迭代器物件時,返回元組的所有成員的構造方式就好像它們是由上游的
tee()
呼叫產生的。此“展平步驟”允許巢狀的tee()
呼叫共享相同的基礎資料鏈,並且具有單個更新步驟,而不是一系列呼叫。展平屬性使 tee 迭代器能夠有效地進行窺視
def lookahead(tee_iterator): "Return the next value without moving the input forward" [forked_iterator] = tee(tee_iterator, 1) return next(forked_iterator)
>>> iterator = iter('abcdef') >>> [iterator] = tee(iterator, 1) # Make the input peekable >>> next(iterator) # Move the iterator forward 'a' >>> lookahead(iterator) # Check next value 'b' >>> next(iterator) # Continue moving forward 'b'
tee
迭代器不是執行緒安全的。當同時使用由同一個tee()
呼叫返回的迭代器時,即使原始的 iterable 是執行緒安全的,也可能會引發RuntimeError
。此迭代工具可能需要大量的輔助儲存(取決於需要儲存多少臨時資料)。通常,如果一個迭代器在另一個迭代器啟動之前使用大部分或所有資料,則使用
list()
比tee()
快。
- itertools.zip_longest(*iterables, fillvalue=None)¶
建立一個迭代器,該迭代器會聚合來自每個 iterables 的元素。
如果可迭代物件的長度不均勻,則缺少的值將用 fillvalue 填充。如果未指定,fillvalue 預設為
None
。迭代將一直持續到最長的可迭代物件耗盡為止。
大致等效於
def zip_longest(*iterables, fillvalue=None): # zip_longest('ABCD', 'xy', fillvalue='-') → Ax By C- D- iterators = list(map(iter, iterables)) num_active = len(iterators) if not num_active: return while True: values = [] for i, iterator in enumerate(iterators): try: value = next(iterator) except StopIteration: num_active -= 1 if not num_active: return iterators[i] = repeat(fillvalue) value = fillvalue values.append(value) yield tuple(values)
如果其中一個可迭代物件可能是無限的,則應使用某些限制呼叫次數的方法(例如
islice()
或takewhile()
)包裝zip_longest()
函式。
Itertools 示例¶
本節顯示了使用現有 itertools 作為構建塊建立擴充套件工具集的示例。
itertools 配方的主要目的是教育性的。這些配方展示了思考各個工具的多種方式——例如,chain.from_iterable
與扁平化的概念相關。這些配方還提供了關於如何組合這些工具的想法——例如,starmap()
和 repeat()
如何協同工作。這些配方還展示瞭如何將 itertools 與 operator
和 collections
模組,以及內建的 itertools(如 map()
、filter()
、reversed()
和 enumerate()
)一起使用。
這些配方的第二個目的是作為孵化器。accumulate()
、compress()
和 pairwise()
這些 itertools 最初都是以配方的形式出現的。目前,sliding_window()
、iter_index()
和 sieve()
配方正在測試中,以檢驗它們的價值。
基本上所有這些配方以及許多其他配方都可以從 Python 包索引上的 more-itertools 專案中安裝。
python -m pip install more-itertools
許多配方都提供了與底層工具集相同的高效能。透過一次處理一個元素而不是一次將整個可迭代物件載入到記憶體中來保持卓越的記憶體效能。透過以函式式風格將工具連結在一起,程式碼量保持較小。透過優先使用“向量化”構建塊而不是使用 for 迴圈和會產生直譯器開銷的生成器來保持高速度。
from collections import deque
from contextlib import suppress
from functools import reduce
from math import sumprod, isqrt
from operator import itemgetter, getitem, mul, neg
def take(n, iterable):
"Return first n items of the iterable as a list."
return list(islice(iterable, n))
def prepend(value, iterable):
"Prepend a single value in front of an iterable."
# prepend(1, [2, 3, 4]) → 1 2 3 4
return chain([value], iterable)
def tabulate(function, start=0):
"Return function(0), function(1), ..."
return map(function, count(start))
def repeatfunc(function, times=None, *args):
"Repeat calls to a function with specified arguments."
if times is None:
return starmap(function, repeat(args))
return starmap(function, repeat(args, times))
def flatten(list_of_lists):
"Flatten one level of nesting."
return chain.from_iterable(list_of_lists)
def ncycles(iterable, n):
"Returns the sequence elements n times."
return chain.from_iterable(repeat(tuple(iterable), n))
def loops(n):
"Loop n times. Like range(n) but without creating integers."
# for _ in loops(100): ...
return repeat(None, n)
def tail(n, iterable):
"Return an iterator over the last n items."
# tail(3, 'ABCDEFG') → E F G
return iter(deque(iterable, maxlen=n))
def consume(iterator, n=None):
"Advance the iterator n-steps ahead. If n is None, consume entirely."
# Use functions that consume iterators at C speed.
if n is None:
deque(iterator, maxlen=0)
else:
next(islice(iterator, n, n), None)
def nth(iterable, n, default=None):
"Returns the nth item or a default value."
return next(islice(iterable, n, None), default)
def quantify(iterable, predicate=bool):
"Given a predicate that returns True or False, count the True results."
return sum(map(predicate, iterable))
def first_true(iterable, default=False, predicate=None):
"Returns the first true value or the *default* if there is no true value."
# first_true([a,b,c], x) → a or b or c or x
# first_true([a,b], x, f) → a if f(a) else b if f(b) else x
return next(filter(predicate, iterable), default)
def all_equal(iterable, key=None):
"Returns True if all the elements are equal to each other."
# all_equal('4٤௪౪໔', key=int) → True
return len(take(2, groupby(iterable, key))) <= 1
def unique_justseen(iterable, key=None):
"Yield unique elements, preserving order. Remember only the element just seen."
# unique_justseen('AAAABBBCCDAABBB') → A B C D A B
# unique_justseen('ABBcCAD', str.casefold) → A B c A D
if key is None:
return map(itemgetter(0), groupby(iterable))
return map(next, map(itemgetter(1), groupby(iterable, key)))
def unique_everseen(iterable, key=None):
"Yield unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') → A B C D
# unique_everseen('ABBcCAD', str.casefold) → A B c D
seen = set()
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen.add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen.add(k)
yield element
def unique(iterable, key=None, reverse=False):
"Yield unique elements in sorted order. Supports unhashable inputs."
# unique([[1, 2], [3, 4], [1, 2]]) → [1, 2] [3, 4]
sequenced = sorted(iterable, key=key, reverse=reverse)
return unique_justseen(sequenced, key=key)
def sliding_window(iterable, n):
"Collect data into overlapping fixed-length chunks or blocks."
# sliding_window('ABCDEFG', 4) → ABCD BCDE CDEF DEFG
iterator = iter(iterable)
window = deque(islice(iterator, n - 1), maxlen=n)
for x in iterator:
window.append(x)
yield tuple(window)
def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
"Collect data into non-overlapping fixed-length chunks or blocks."
# grouper('ABCDEFG', 3, fillvalue='x') → ABC DEF Gxx
# grouper('ABCDEFG', 3, incomplete='strict') → ABC DEF ValueError
# grouper('ABCDEFG', 3, incomplete='ignore') → ABC DEF
iterators = [iter(iterable)] * n
match incomplete:
case 'fill':
return zip_longest(*iterators, fillvalue=fillvalue)
case 'strict':
return zip(*iterators, strict=True)
case 'ignore':
return zip(*iterators)
case _:
raise ValueError('Expected fill, strict, or ignore')
def roundrobin(*iterables):
"Visit input iterables in a cycle until each is exhausted."
# roundrobin('ABC', 'D', 'EF') → A D E B F C
# Algorithm credited to George Sakkis
iterators = map(iter, iterables)
for num_active in range(len(iterables), 0, -1):
iterators = cycle(islice(iterators, num_active))
yield from map(next, iterators)
def subslices(seq):
"Return all contiguous non-empty subslices of a sequence."
# subslices('ABCD') → A AB ABC ABCD B BC BCD C CD D
slices = starmap(slice, combinations(range(len(seq) + 1), 2))
return map(getitem, repeat(seq), slices)
def iter_index(iterable, value, start=0, stop=None):
"Return indices where a value occurs in a sequence or iterable."
# iter_index('AABCADEAF', 'A') → 0 1 4 7
seq_index = getattr(iterable, 'index', None)
if seq_index is None:
iterator = islice(iterable, start, stop)
for i, element in enumerate(iterator, start):
if element is value or element == value:
yield i
else:
stop = len(iterable) if stop is None else stop
i = start
with suppress(ValueError):
while True:
yield (i := seq_index(value, i, stop))
i += 1
def iter_except(function, exception, first=None):
"Convert a call-until-exception interface to an iterator interface."
# iter_except(d.popitem, KeyError) → non-blocking dictionary iterator
with suppress(exception):
if first is not None:
yield first()
while True:
yield function()
以下配方具有更強的數學風味
def powerset(iterable):
"Subsequences of the iterable from shortest to longest."
# powerset([1,2,3]) → () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
def sum_of_squares(iterable):
"Add up the squares of the input values."
# sum_of_squares([10, 20, 30]) → 1400
return sumprod(*tee(iterable))
def reshape(matrix, columns):
"Reshape a 2-D matrix to have a given number of columns."
# reshape([(0, 1), (2, 3), (4, 5)], 3) → (0, 1, 2), (3, 4, 5)
return batched(chain.from_iterable(matrix), columns, strict=True)
def transpose(matrix):
"Swap the rows and columns of a 2-D matrix."
# transpose([(1, 2, 3), (11, 22, 33)]) → (1, 11) (2, 22) (3, 33)
return zip(*matrix, strict=True)
def matmul(m1, m2):
"Multiply two matrices."
# matmul([(7, 5), (3, 5)], [(2, 5), (7, 9)]) → (49, 80), (41, 60)
n = len(m2[0])
return batched(starmap(sumprod, product(m1, transpose(m2))), n)
def convolve(signal, kernel):
"""Discrete linear convolution of two iterables.
Equivalent to polynomial multiplication.
Convolutions are mathematically commutative; however, the inputs are
evaluated differently. The signal is consumed lazily and can be
infinite. The kernel is fully consumed before the calculations begin.
Article: https://betterexplained.com/articles/intuitive-convolution/
Video: https://www.youtube.com/watch?v=KuXjwB4LzSA
"""
# convolve([1, -1, -20], [1, -3]) → 1 -4 -17 60
# convolve(data, [0.25, 0.25, 0.25, 0.25]) → Moving average (blur)
# convolve(data, [1/2, 0, -1/2]) → 1st derivative estimate
# convolve(data, [1, -2, 1]) → 2nd derivative estimate
kernel = tuple(kernel)[::-1]
n = len(kernel)
padded_signal = chain(repeat(0, n-1), signal, repeat(0, n-1))
windowed_signal = sliding_window(padded_signal, n)
return map(sumprod, repeat(kernel), windowed_signal)
def polynomial_from_roots(roots):
"""Compute a polynomial's coefficients from its roots.
(x - 5) (x + 4) (x - 3) expands to: x³ -4x² -17x + 60
"""
# polynomial_from_roots([5, -4, 3]) → [1, -4, -17, 60]
factors = zip(repeat(1), map(neg, roots))
return list(reduce(convolve, factors, [1]))
def polynomial_eval(coefficients, x):
"""Evaluate a polynomial at a specific value.
Computes with better numeric stability than Horner's method.
"""
# Evaluate x³ -4x² -17x + 60 at x = 5
# polynomial_eval([1, -4, -17, 60], x=5) → 0
n = len(coefficients)
if not n:
return type(x)(0)
powers = map(pow, repeat(x), reversed(range(n)))
return sumprod(coefficients, powers)
def polynomial_derivative(coefficients):
"""Compute the first derivative of a polynomial.
f(x) = x³ -4x² -17x + 60
f'(x) = 3x² -8x -17
"""
# polynomial_derivative([1, -4, -17, 60]) → [3, -8, -17]
n = len(coefficients)
powers = reversed(range(1, n))
return list(map(mul, coefficients, powers))
def sieve(n):
"Primes less than n."
# sieve(30) → 2 3 5 7 11 13 17 19 23 29
if n > 2:
yield 2
data = bytearray((0, 1)) * (n // 2)
for p in iter_index(data, 1, start=3, stop=isqrt(n) + 1):
data[p*p : n : p+p] = bytes(len(range(p*p, n, p+p)))
yield from iter_index(data, 1, start=3)
def factor(n):
"Prime factors of n."
# factor(99) → 3 3 11
# factor(1_000_000_000_000_007) → 47 59 360620266859
# factor(1_000_000_000_000_403) → 1000000000000403
for prime in sieve(isqrt(n) + 1):
while not n % prime:
yield prime
n //= prime
if n == 1:
return
if n > 1:
yield n
def is_prime(n):
"Return True if n is prime."
# is_prime(1_000_000_000_000_403) → True
return n > 1 and next(factor(n)) == n
def totient(n):
"Count of natural numbers up to n that are coprime to n."
# https://mathworld.wolfram.com/TotientFunction.html
# totient(12) → 4 because len([1, 5, 7, 11]) == 4
for prime in set(factor(n)):
n -= n // prime
return n