日誌處理手冊¶
- 作者:
Vinay Sajip <vinay_sajip at red-dove dot com>
本頁面包含一些與日誌處理相關的實用技巧,它們在過去已被證明很有用。有關教程和參考資訊的連結,請參閱 其他資源。
在多個模組中使用日誌¶
多次呼叫 logging.getLogger('someLogger')
會返回對同一個日誌記錄器物件的引用。這不僅在同一個模組中如此,在同一個 Python 直譯器程序中跨模組也是如此。它適用於對同一個物件的引用;此外,應用程式程式碼可以在一個模組中定義和配置父日誌記錄器,並在一個單獨的模組中建立(但不配置)子日誌記錄器,所有對子日誌記錄器的呼叫都將傳遞給父日誌記錄器。這是一個主模組
import logging
import auxiliary_module
# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')
這是輔助模組
import logging
# create logger
module_logger = logging.getLogger('spam_application.auxiliary')
class Auxiliary:
def __init__(self):
self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
self.logger.info('creating an instance of Auxiliary')
def do_something(self):
self.logger.info('doing something')
a = 1 + 1
self.logger.info('done doing something')
def some_function():
module_logger.info('received a call to "some_function"')
輸出如下所示
2005-03-23 23:47:11,663 - spam_application - INFO -
creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
done with auxiliary_module.some_function()
從多個執行緒記錄日誌¶
從多個執行緒記錄日誌不需要特殊操作。以下示例顯示了從主(初始)執行緒和另一個執行緒記錄日誌
import logging
import threading
import time
def worker(arg):
while not arg['stop']:
logging.debug('Hi from myfunc')
time.sleep(0.5)
def main():
logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
info = {'stop': False}
thread = threading.Thread(target=worker, args=(info,))
thread.start()
while True:
try:
logging.debug('Hello from main')
time.sleep(0.75)
except KeyboardInterrupt:
info['stop'] = True
break
thread.join()
if __name__ == '__main__':
main()
執行時,指令碼應該打印出類似以下內容
0 Thread-1 Hi from myfunc
3 MainThread Hello from main
505 Thread-1 Hi from myfunc
755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc
這顯示了日誌輸出如預期般交錯。當然,這種方法適用於比此處所示更多的執行緒。
多個處理程式和格式化程式¶
日誌記錄器是普通的 Python 物件。addHandler()
方法對您可以新增的處理程式數量沒有最低或最高配額。有時,應用程式將所有嚴重級別的所有訊息記錄到文字檔案,同時將錯誤或更高級別的訊息記錄到控制檯,這會很有益。要設定此功能,只需配置適當的處理程式即可。應用程式程式碼中的日誌記錄呼叫將保持不變。這是對以前基於模組的簡單配置示例的稍微修改
import logging
logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)
# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
請注意,“應用程式”程式碼不關心多個處理程式。唯一改變的是新增和配置了一個名為 *fh* 的新處理程式。
建立具有更高或更低嚴重性過濾器的新處理程式的能力在編寫和測試應用程式時非常有用。與其使用許多 print
語句進行除錯,不如使用 logger.debug
:與您以後必須刪除或註釋掉的 print 語句不同,logger.debug 語句可以在原始碼中保持完整並保持休眠狀態,直到您再次需要它們。屆時,唯一需要做的更改是修改日誌記錄器和/或處理程式的嚴重性級別以進行除錯。
日誌記錄到多個目的地¶
假設您想在不同情況下以不同的訊息格式將日誌記錄到控制檯和檔案。假設您想將 DEBUG 及更高級別的訊息記錄到檔案,並將 INFO 及更高級別的訊息記錄到控制檯。我們還假設檔案應包含時間戳,但控制檯訊息不應包含。以下是您實現此目的的方法
import logging
# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M',
filename='/tmp/myapp.log',
filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
執行此程式時,您將在控制檯上看到
root : INFO Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO How quickly daft jumping zebras vex.
myapp.area2 : WARNING Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR The five boxing wizards jump quickly.
在檔案中,您將看到類似以下內容
10-22 22:19 root INFO Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1 INFO How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2 ERROR The five boxing wizards jump quickly.
如您所見,DEBUG 訊息只顯示在檔案中。其他訊息則傳送到兩個目的地。
此示例使用控制檯和檔案處理程式,但您可以使用任意數量和組合的處理程式。
請注意,上面選擇的日誌檔名 /tmp/myapp.log
意味著在 POSIX 系統上使用臨時檔案的標準位置。在 Windows 上,您可能需要為日誌選擇一個不同的目錄名 - 只需確保該目錄存在,並且您有許可權在該目錄中建立和更新檔案。
自定義級別處理¶
有時,您可能希望在處理程式中對級別進行與標準處理略有不同的操作,即所有高於某個閾值的級別都由處理程式處理。為此,您需要使用過濾器。讓我們看一個您希望按以下方式安排場景的示例
將嚴重性為
INFO
和WARNING
的訊息傳送到sys.stdout
將嚴重性為
ERROR
及以上的資訊傳送到sys.stderr
將
DEBUG
及以上嚴重級別的訊息傳送到檔案app.log
假設您使用以下 JSON 配置日誌記錄
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(levelname)-8s - %(message)s"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout"
},
"stderr": {
"class": "logging.StreamHandler",
"level": "ERROR",
"formatter": "simple",
"stream": "ext://sys.stderr"
},
"file": {
"class": "logging.FileHandler",
"formatter": "simple",
"filename": "app.log",
"mode": "w"
}
},
"root": {
"level": "DEBUG",
"handlers": [
"stderr",
"stdout",
"file"
]
}
}
這個配置 幾乎 達到了我們想要的效果,除了 sys.stdout
會顯示嚴重級別為 ERROR
的訊息,並且只有此嚴重級別及更高的事件以及 INFO
和 WARNING
訊息才會被跟蹤。為了防止這種情況,我們可以設定一個過濾器來排除這些訊息,並將其新增到相關的處理程式中。這可以透過在 formatters
和 handlers
旁邊新增一個 filters
部分來配置
{
"filters": {
"warnings_and_below": {
"()" : "__main__.filter_maker",
"level": "WARNING"
}
}
}
並修改 stdout
處理程式的部分以新增它
{
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
"filters": ["warnings_and_below"]
}
}
過濾器只是一個函式,所以我們可以將 filter_maker
(一個工廠函式)定義如下
def filter_maker(level):
level = getattr(logging, level)
def filter(record):
return record.levelno <= level
return filter
這將傳入的字串引數轉換為數字級別,並返回一個函式,該函式僅在傳入記錄的級別等於或低於指定級別時返回 True
。請注意,在此示例中,我在一個從命令列執行的測試指令碼 main.py
中定義了 filter_maker
,因此其模組將是 __main__
- 因此過濾器配置中的 __main__.filter_maker
。如果您在不同的模組中定義它,則需要更改它。
新增過濾器後,我們可以執行完整的 main.py
指令碼,如下所示
import json
import logging
import logging.config
CONFIG = '''
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(levelname)-8s - %(message)s"
}
},
"filters": {
"warnings_and_below": {
"()" : "__main__.filter_maker",
"level": "WARNING"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
"filters": ["warnings_and_below"]
},
"stderr": {
"class": "logging.StreamHandler",
"level": "ERROR",
"formatter": "simple",
"stream": "ext://sys.stderr"
},
"file": {
"class": "logging.FileHandler",
"formatter": "simple",
"filename": "app.log",
"mode": "w"
}
},
"root": {
"level": "DEBUG",
"handlers": [
"stderr",
"stdout",
"file"
]
}
}
'''
def filter_maker(level):
level = getattr(logging, level)
def filter(record):
return record.levelno <= level
return filter
logging.config.dictConfig(json.loads(CONFIG))
logging.debug('A DEBUG message')
logging.info('An INFO message')
logging.warning('A WARNING message')
logging.error('An ERROR message')
logging.critical('A CRITICAL message')
並像這樣執行它之後
python main.py 2>stderr.log >stdout.log
我們可以看到結果符合預期
$ more *.log
::::::::::::::
app.log
::::::::::::::
DEBUG - A DEBUG message
INFO - An INFO message
WARNING - A WARNING message
ERROR - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stderr.log
::::::::::::::
ERROR - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stdout.log
::::::::::::::
INFO - An INFO message
WARNING - A WARNING message
配置伺服器示例¶
這是一個使用日誌配置伺服器的模組示例
import logging
import logging.config
import time
import os
# read initial config file
logging.config.fileConfig('logging.conf')
# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()
logger = logging.getLogger('simpleExample')
try:
# loop through logging calls to see the difference
# new configurations make, until Ctrl+C is pressed
while True:
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
time.sleep(5)
except KeyboardInterrupt:
# cleanup
logging.config.stopListening()
t.join()
這是一個指令碼,它接受一個檔名,並將該檔案傳送到伺服器,前面適當地加上二進位制編碼的長度,作為新的日誌配置
#!/usr/bin/env python
import socket, sys, struct
with open(sys.argv[1], 'rb') as f:
data_to_send = f.read()
HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')
處理阻塞的處理程式¶
有時您必須讓日誌處理程式完成其工作,而不會阻塞您正在記錄的執行緒。這在 Web 應用程式中很常見,當然也發生在其他場景中。
一個常見的罪魁禍首,表現出緩慢行為的是 SMTPHandler
:傳送電子郵件可能需要很長時間,原因有很多,超出了開發人員的控制範圍(例如,效能不佳的郵件或網路基礎設施)。但幾乎任何基於網路的處理程式都可能阻塞:即使是 SocketHandler
操作也可能會在底層執行過慢的 DNS 查詢(此查詢可能深入到套接字型檔程式碼中,在 Python 層之下,並且超出您的控制)。
一種解決方案是採用兩步法。第一步,僅將 QueueHandler
附加到那些從效能關鍵型執行緒訪問的日誌記錄器。它們只是寫入它們的佇列,佇列的大小可以設定得足夠大,或者初始化時沒有大小上限。寫入佇列通常會很快被接受,儘管您可能需要在程式碼中捕獲 queue.Full
異常作為預防措施。如果您是庫開發人員,並且程式碼中存在效能關鍵型執行緒,請務必記錄這一點(以及建議僅將 QueueHandlers
附加到您的日誌記錄器),以造福使用您程式碼的其他開發人員。
解決方案的第二部分是 QueueListener
,它被設計為 QueueHandler
的對應物。QueueListener
非常簡單:它被傳遞一個佇列和一些處理程式,然後啟動一個內部執行緒,該執行緒監聽其佇列中來自 QueueHandlers
(或任何其他 LogRecords
來源)傳送的 LogRecords。LogRecords
從佇列中移除並傳遞給處理程式進行處理。
擁有一個單獨的 QueueListener
類的好處是,您可以使用同一個例項來服務多個 QueueHandlers
。這比擁有現有處理程式類的執行緒版本(每個處理程式將消耗一個執行緒而沒有任何特殊好處)更節省資源。
下面是一個使用這兩個類的示例(省略了匯入)
que = queue.Queue(-1) # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()
執行時將生成
MainThread: Look out!
備註
儘管前面的討論並非專門針對非同步程式碼,而是關於慢速日誌處理程式,但應該注意的是,從非同步程式碼記錄日誌時,網路甚至檔案處理程式都可能導致問題(阻塞事件迴圈),因為某些日誌記錄是從 asyncio
內部完成的。如果應用程式中使用任何非同步程式碼,最好採用上述方法進行日誌記錄,這樣任何阻塞程式碼只在 QueueListener
執行緒中執行。
版本 3.5 中的變更: 在 Python 3.5 之前,QueueListener
總是將從佇列接收到的每條訊息傳遞給它所初始化的每個處理程式。(這是因為假設所有級別過濾都在另一端完成,即佇列被填充的地方。)從 3.5 開始,可以透過向監聽器的建構函式傳遞關鍵字引數 respect_handler_level=True
來改變此行為。這樣做後,監聽器會將每條訊息的級別與處理程式的級別進行比較,並且只有在適當的情況下才將訊息傳遞給處理程式。
版本 3.14 中的變更: QueueListener
可以透過 with
語句啟動(和停止)。例如
with QueueListener(que, handler) as listener:
# The queue listener automatically starts
# when the 'with' block is entered.
pass
# The queue listener automatically stops once
# the 'with' block is exited.
跨網路傳送和接收日誌事件¶
假設您想透過網路傳送日誌事件,並在接收端進行處理。一種簡單的方法是在傳送端將 SocketHandler
例項附加到根日誌記錄器
import logging, logging.handlers
rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
在接收端,您可以使用 socketserver
模組設定接收器。這是一個基本的實用示例
import pickle
import logging
import logging.handlers
import socketserver
import struct
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.
This basically logs the record using whatever logging policy is
configured locally.
"""
def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""
allow_reuse_address = True
def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None
def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort
def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server...')
tcpserver.serve_until_stopped()
if __name__ == '__main__':
main()
首先執行伺服器,然後執行客戶端。在客戶端,控制檯上不列印任何內容;在伺服器端,您應該會看到類似以下內容
About to start TCP server...
59 root INFO Jackdaws love my big sphinx of quartz.
59 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
69 myapp.area1 INFO How quickly daft jumping zebras vex.
69 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
69 myapp.area2 ERROR The five boxing wizards jump quickly.
請注意,在某些場景中,pickle 存在一些安全問題。如果這些問題影響到您,您可以透過覆蓋 makePickle()
方法並在其中實現您的替代方案,以及調整上述指令碼以使用您的替代序列化方案,來使用替代的序列化方案。
在生產環境中執行日誌套接字監聽器¶
要在生產環境中執行日誌監聽器,您可能需要使用像 Supervisor 這樣的程序管理工具。這裡是一個 Gist,它提供了使用 Supervisor 執行上述功能的骨幹檔案。它包含以下檔案
檔案 |
用途 |
---|---|
|
一個用於準備測試環境的 Bash 指令碼 |
|
Supervisor 配置檔案,其中包含監聽器和多程序 Web 應用程式的條目 |
|
一個 Bash 指令碼,用於確保 Supervisor 正在執行上述配置 |
|
接收日誌事件並將其記錄到檔案的套接字監聽程式 |
|
一個透過連線到監聽器的套接字執行日誌記錄的簡單 Web 應用程式 |
|
Web 應用程式的 JSON 配置檔案 |
|
一個用於執行 Web 應用程式的 Python 指令碼 |
這個網路應用程式使用 Gunicorn,它是一個流行的網路應用程式伺服器,會啟動多個工作程序來處理請求。這個示例設定展示了工作程序如何寫入同一個日誌檔案而不會相互衝突——它們都透過套接字監聽器。
要在 POSIX 環境中測試這些檔案,請執行以下操作
使用 Download ZIP 按鈕將 Gist 下載為 ZIP 歸檔檔案。
將上述檔案從歸檔檔案中解壓到一個臨時目錄中。
在臨時目錄中,執行
bash prepare.sh
進行準備。這將建立一個run
子目錄,用於存放 Supervisor 相關檔案和日誌檔案,以及一個venv
子目錄,用於存放一個虛擬環境,其中安裝了bottle
、gunicorn
和supervisor
。執行
bash ensure_app.sh
以確保 Supervisor 正在執行上述配置。執行
venv/bin/python client.py
來執行 Web 應用程式,這將導致記錄被寫入日誌。檢查
run
子目錄中的日誌檔案。您應該會在與模式app.log*
匹配的檔案中看到最新的日誌行。它們不會以任何特定順序排列,因為它們已由不同的工作程序以非確定性方式併發處理。您可以透過執行
venv/bin/supervisorctl -c supervisor.conf shutdown
來關閉監聽器和 Web 應用程式。
萬一配置的埠與測試環境中的其他東西發生衝突,您可能需要調整配置檔案。
預設配置使用埠 9020 上的 TCP 套接字。您可以改為使用 Unix 域套接字而不是 TCP 套接字,方法如下
在
listener.json
中,新增一個socket
鍵,其值為您要使用的域套接字路徑。如果存在此鍵,則監聽器將監聽相應的域套接字而不是 TCP 套接字(port
鍵將被忽略)。在
webapp.json
中,更改套接字處理程式配置字典,使host
值為域套接字路徑,並將port
值設定為null
。
向日志輸出新增上下文資訊¶
有時,您希望日誌輸出除了傳遞給日誌記錄呼叫的引數外,還包含上下文資訊。例如,在網路應用程式中,可能希望在日誌中記錄客戶端特定的資訊(例如,遠端客戶端的使用者名稱或 IP 地址)。儘管您可以使用 *extra* 引數來實現此目的,但以這種方式傳遞資訊並不總是方便。雖然建立每個連線的 Logger
例項可能很誘人,但這不是一個好主意,因為這些例項不會被垃圾回收。雖然這在實踐中不是問題,但當 Logger
例項的數量取決於您希望在日誌記錄應用程式中使用的粒度級別時,如果 Logger
例項的數量變得實際上無限大,則可能難以管理。
使用 LoggerAdapters 傳遞上下文資訊¶
將上下文資訊與日誌事件資訊一起輸出的一種簡單方法是使用 LoggerAdapter
類。此類的設計類似於 Logger
,因此您可以呼叫 debug()
、info()
、warning()
、error()
、exception()
、critical()
和 log()
。這些方法具有與其在 Logger
中的對應方法相同的簽名,因此您可以互換使用這兩種型別的例項。
當您建立 LoggerAdapter
例項時,您會傳遞給它一個 Logger
例項和一個包含您的上下文資訊的類字典物件。當您在 LoggerAdapter
例項上呼叫其中一個日誌記錄方法時,它會將呼叫委託給傳遞給其建構函式的底層 Logger
例項,並安排在委託呼叫中傳遞上下文資訊。以下是 LoggerAdapter
程式碼片段
def debug(self, msg, /, *args, **kwargs):
"""
Delegate a debug call to the underlying logger, after adding
contextual information from this adapter instance.
"""
msg, kwargs = self.process(msg, kwargs)
self.logger.debug(msg, *args, **kwargs)
LoggerAdapter
的 process()
方法是上下文資訊被新增到日誌輸出的地方。它被傳遞日誌呼叫訊息和關鍵字引數,並傳回(可能)修改後的版本,以便在呼叫底層日誌記錄器時使用。此方法的預設實現保持訊息不變,但在關鍵字引數中插入一個 'extra' 鍵,其值是傳遞給建構函式的類字典物件。當然,如果您在呼叫介面卡時傳遞了 'extra' 關鍵字引數,它將被靜默覆蓋。
使用 'extra' 的優點是,類字典物件中的值會合併到 LogRecord
例項的 __dict__ 中,從而允許您使用自定義字串與瞭解類字典物件鍵的 Formatter
例項。如果您需要不同的方法,例如,如果您想將上下文資訊前置或附加到訊息字串,您只需要子類化 LoggerAdapter
並重寫 process()
以實現您所需的功能。這是一個簡單的示例
class CustomAdapter(logging.LoggerAdapter):
"""
This example adapter expects the passed in dict-like object to have a
'connid' key, whose value in brackets is prepended to the log message.
"""
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
你可以這樣使用它
logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})
然後,您記錄到介面卡的所有事件都將把 some_conn_id
的值前置到日誌訊息中。
使用非字典物件傳遞上下文資訊¶
您不需要將實際的字典傳遞給 LoggerAdapter
- 您可以傳遞一個實現了 __getitem__
和 __iter__
的類的例項,這樣它在日誌記錄看來就像一個字典。如果您想動態生成值(而字典中的值將是常量),這將很有用。
使用過濾器傳遞上下文資訊¶
您還可以使用使用者定義的 Filter
向日志輸出新增上下文資訊。Filter
例項允許修改傳遞給它們的 LogRecords
,包括新增額外的屬性,然後可以使用合適的格式字串輸出這些屬性,或者如果需要,可以使用自定義 Formatter
。
例如,在 Web 應用程式中,正在處理的請求(或至少是其中有趣的部分)可以儲存線上程本地 (threading.local
) 變數中,然後從 Filter
中訪問,以新增例如來自請求的資訊——例如,遠端 IP 地址和遠端使用者的使用者名稱——到 LogRecord
中,使用屬性名 'ip' 和 'user',如上面的 LoggerAdapter
示例所示。在這種情況下,可以使用相同的格式字串獲得與上面所示類似的輸出。這是一個示例指令碼
import logging
from random import choice
class ContextFilter(logging.Filter):
"""
This is a filter which injects contextual information into the log.
Rather than use actual contextual information, we just use random
data in this demo.
"""
USERS = ['jim', 'fred', 'sheila']
IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']
def filter(self, record):
record.ip = choice(ContextFilter.IPS)
record.user = choice(ContextFilter.USERS)
return True
if __name__ == '__main__':
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
a1 = logging.getLogger('a.b.c')
a2 = logging.getLogger('d.e.f')
f = ContextFilter()
a1.addFilter(f)
a2.addFilter(f)
a1.debug('A debug message')
a1.info('An info message with %s', 'some parameters')
for x in range(10):
lvl = choice(levels)
lvlname = logging.getLevelName(lvl)
a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')
執行時,會生成類似如下的內容
2010-09-06 22:38:15,292 a.b.c DEBUG IP: 123.231.231.123 User: fred A debug message
2010-09-06 22:38:15,300 a.b.c INFO IP: 192.168.0.1 User: sheila An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 127.0.0.1 User: jim A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 127.0.0.1 User: sheila A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 123.231.231.123 User: fred A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1 User: jim A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 192.168.0.1 User: jim A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR IP: 127.0.0.1 User: sheila A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG IP: 123.231.231.123 User: fred A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO IP: 123.231.231.123 User: fred A message at INFO level with 2 parameters
使用 contextvars
¶
自 Python 3.7 起,contextvars
模組提供了上下文字地儲存,它適用於 threading
和 asyncio
處理需求。因此,這種型別的儲存通常優於執行緒本地儲存。以下示例展示了在多執行緒環境中,日誌如何透過上下文資訊(例如,Web 應用程式處理的請求屬性)填充。
為了說明目的,假設您有不同的 Web 應用程式,它們彼此獨立但在同一個 Python 程序中執行,並使用它們共用的一個庫。這些應用程式如何擁有自己的日誌,其中所有來自庫(和其他請求處理程式碼)的日誌訊息都指向適當的應用程式日誌檔案,同時在日誌中包含額外的上下文資訊,例如客戶端 IP、HTTP 請求方法和客戶端使用者名稱?
我們假設庫可以透過以下程式碼進行模擬
# webapplib.py
import logging
import time
logger = logging.getLogger(__name__)
def useful():
# Just a representative event logged from the library
logger.debug('Hello from webapplib!')
# Just sleep for a bit so other threads get to run
time.sleep(0.01)
我們可以透過兩個簡單的類 Request
和 WebApp
來模擬多個 Web 應用程式。這些類模擬了真實的執行緒化 Web 應用程式的工作方式——每個請求都由一個執行緒處理
# main.py
import argparse
from contextvars import ContextVar
import logging
import os
from random import choice
import threading
import webapplib
logger = logging.getLogger(__name__)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
class Request:
"""
A simple dummy request class which just holds dummy HTTP request method,
client IP address and client username
"""
def __init__(self, method, ip, user):
self.method = method
self.ip = ip
self.user = user
# A dummy set of requests which will be used in the simulation - we'll just pick
# from this list randomly. Note that all GET requests are from 192.168.2.XXX
# addresses, whereas POST requests are from 192.16.3.XXX addresses. Three users
# are represented in the sample requests.
REQUESTS = [
Request('GET', '192.168.2.20', 'jim'),
Request('POST', '192.168.3.20', 'fred'),
Request('GET', '192.168.2.21', 'sheila'),
Request('POST', '192.168.3.21', 'jim'),
Request('GET', '192.168.2.22', 'fred'),
Request('POST', '192.168.3.22', 'sheila'),
]
# Note that the format string includes references to request context information
# such as HTTP method, client IP and username
formatter = logging.Formatter('%(threadName)-11s %(appName)s %(name)-9s %(user)-6s %(ip)s %(method)-4s %(message)s')
# Create our context variables. These will be filled at the start of request
# processing, and used in the logging that happens during that processing
ctx_request = ContextVar('request')
ctx_appname = ContextVar('appname')
class InjectingFilter(logging.Filter):
"""
A filter which injects context-specific information into logs and ensures
that only information for a specific webapp is included in its log
"""
def __init__(self, app):
self.app = app
def filter(self, record):
request = ctx_request.get()
record.method = request.method
record.ip = request.ip
record.user = request.user
record.appName = appName = ctx_appname.get()
return appName == self.app.name
class WebApp:
"""
A dummy web application class which has its own handler and filter for a
webapp-specific log.
"""
def __init__(self, name):
self.name = name
handler = logging.FileHandler(name + '.log', 'w')
f = InjectingFilter(self)
handler.setFormatter(formatter)
handler.addFilter(f)
root.addHandler(handler)
self.num_requests = 0
def process_request(self, request):
"""
This is the dummy method for processing a request. It's called on a
different thread for every request. We store the context information into
the context vars before doing anything else.
"""
ctx_request.set(request)
ctx_appname.set(self.name)
self.num_requests += 1
logger.debug('Request processing started')
webapplib.useful()
logger.debug('Request processing finished')
def main():
fn = os.path.splitext(os.path.basename(__file__))[0]
adhf = argparse.ArgumentDefaultsHelpFormatter
ap = argparse.ArgumentParser(formatter_class=adhf, prog=fn,
description='Simulate a couple of web '
'applications handling some '
'requests, showing how request '
'context can be used to '
'populate logs')
aa = ap.add_argument
aa('--count', '-c', type=int, default=100, help='How many requests to simulate')
options = ap.parse_args()
# Create the dummy webapps and put them in a list which we can use to select
# from randomly
app1 = WebApp('app1')
app2 = WebApp('app2')
apps = [app1, app2]
threads = []
# Add a common handler which will capture all events
handler = logging.FileHandler('app.log', 'w')
handler.setFormatter(formatter)
root.addHandler(handler)
# Generate calls to process requests
for i in range(options.count):
try:
# Pick an app at random and a request for it to process
app = choice(apps)
request = choice(REQUESTS)
# Process the request in its own thread
t = threading.Thread(target=app.process_request, args=(request,))
threads.append(t)
t.start()
except KeyboardInterrupt:
break
# Wait for the threads to terminate
for t in threads:
t.join()
for app in apps:
print('%s processed %s requests' % (app.name, app.num_requests))
if __name__ == '__main__':
main()
如果您執行上述程式碼,您會發現大約一半的請求進入 app1.log
,其餘的進入 app2.log
,並且所有請求都記錄到 app.log
。每個特定於 Web 應用程式的日誌將只包含該 Web 應用程式的日誌條目,並且請求資訊將一致地顯示在日誌中(即,每個模擬請求中的資訊將始終一起出現在日誌行中)。這由以下 shell 輸出所示
~/logging-contextual-webapp$ python main.py
app1 processed 51 requests
app2 processed 49 requests
~/logging-contextual-webapp$ wc -l *.log
153 app1.log
147 app2.log
300 app.log
600 total
~/logging-contextual-webapp$ head -3 app1.log
Thread-3 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
Thread-3 (process_request) app1 webapplib jim 192.168.3.21 POST Hello from webapplib!
Thread-5 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ head -3 app2.log
Thread-1 (process_request) app2 __main__ sheila 192.168.2.21 GET Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET Hello from webapplib!
Thread-2 (process_request) app2 __main__ jim 192.168.2.20 GET Request processing started
~/logging-contextual-webapp$ head app.log
Thread-1 (process_request) app2 __main__ sheila 192.168.2.21 GET Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET Hello from webapplib!
Thread-2 (process_request) app2 __main__ jim 192.168.2.20 GET Request processing started
Thread-3 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
Thread-2 (process_request) app2 webapplib jim 192.168.2.20 GET Hello from webapplib!
Thread-3 (process_request) app1 webapplib jim 192.168.3.21 POST Hello from webapplib!
Thread-4 (process_request) app2 __main__ fred 192.168.2.22 GET Request processing started
Thread-5 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
Thread-4 (process_request) app2 webapplib fred 192.168.2.22 GET Hello from webapplib!
Thread-6 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ grep app1 app1.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app2.log | wc -l
147
~/logging-contextual-webapp$ grep app1 app.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app.log | wc -l
147
在處理程式中傳遞上下文資訊¶
每個 Handler
都有自己的過濾器鏈。如果您想向 LogRecord
新增上下文資訊而不將其洩漏給其他處理程式,您可以使用一個返回新的 LogRecord
而不是就地修改它的過濾器,如下面的指令碼所示
import copy
import logging
def filter(record: logging.LogRecord):
record = copy.copy(record)
record.user = 'jim'
return record
if __name__ == '__main__':
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(message)s from %(user)-8s')
handler.setFormatter(formatter)
handler.addFilter(filter)
logger.addHandler(handler)
logger.info('A log message')
從多個程序記錄到單個檔案¶
儘管日誌記錄是執行緒安全的,並且在一個程序中從多個執行緒記錄到單個檔案 是 支援的,但從 多個程序 記錄到單個檔案 不 受支援,因為在 Python 中沒有標準方法可以跨多個程序序列化對單個檔案的訪問。如果您需要從多個程序記錄到單個檔案,一種方法是讓所有程序記錄到 SocketHandler
,並有一個單獨的程序實現一個套接字伺服器,該伺服器從套接字讀取並記錄到檔案。(如果您願意,可以將其中一個現有程序中的一個執行緒專門用於執行此功能。)本節 更詳細地記錄了這種方法,幷包含一個可工作的套接字接收器,可以作為您在自己的應用程式中進行調整的起點。
您也可以編寫自己的處理程式,該處理程式使用 Lock
類從 multiprocessing
模組來序列化程序對檔案的訪問。標準庫 FileHandler
及其子類不使用 multiprocessing
。
或者,您可以使用 Queue
和 QueueHandler
將所有日誌事件傳送到多程序應用程式中的一個程序。以下示例指令碼演示瞭如何做到這一點;在示例中,一個單獨的監聽器程序監聽其他程序傳送的事件,並根據自己的日誌配置進行記錄。儘管該示例只演示了一種方法(例如,您可能希望使用監聽器執行緒而不是單獨的監聽器程序——實現方式類似),但它允許監聽器和應用程式中的其他程序使用完全不同的日誌配置,並可以作為滿足您特定要求的程式碼的基礎
# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
# Next two import lines for this demo only
from random import choice, random
import time
#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
print('Worker finished: %s' % name)
# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
main()
上述指令碼的一個變體將日誌記錄儲存在主程序中,在一個單獨的執行緒中
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
if __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'ERROR',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
這個變體展示了您如何例如為特定的日誌記錄器應用配置——例如,foo
日誌記錄器有一個特殊的處理程式,它將 foo
子系統中的所有事件儲存在一個檔案 mplog-foo.log
中。這將由主程序中的日誌記錄機制使用(即使日誌事件是在工作程序中生成的)來將訊息定向到適當的目的地。
使用 concurrent.futures.ProcessPoolExecutor¶
如果您想使用 concurrent.futures.ProcessPoolExecutor
來啟動您的工作程序,您需要稍微不同地建立佇列。而不是
queue = multiprocessing.Queue(-1)
你應該使用
queue = multiprocessing.Manager().Queue(-1) # also works with the examples above
然後你可以將 worker 建立從這裡替換
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
到這裡(記得首先匯入 concurrent.futures
)
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for i in range(10):
executor.submit(worker_process, queue, worker_configurer)
使用 Gunicorn 和 uWSGI 部署 Web 應用程式¶
當使用 Gunicorn 或 uWSGI(或類似工具)部署 Web 應用程式時,會建立多個工作程序來處理客戶端請求。在這種環境中,避免直接在 Web 應用程式中建立基於檔案的處理程式。相反,使用 SocketHandler
從 Web 應用程式向單獨程序中的監聽器記錄日誌。這可以使用 Supervisor 等程序管理工具進行設定——有關詳細資訊,請參閱 在生產環境中執行日誌套接字監聽器。
使用檔案輪換¶
有時,您希望讓日誌檔案增長到一定大小,然後開啟一個新檔案並記錄到其中。您可能希望保留一定數量的這些檔案,並且當建立了這麼多檔案時,輪換檔案,以便檔案數量和檔案大小都保持有界。對於這種使用模式,日誌記錄包提供了 RotatingFileHandler
import glob
import logging
import logging.handlers
LOG_FILENAME = 'logging_rotatingfile_example.out'
# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)
# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=20, backupCount=5)
my_logger.addHandler(handler)
# Log some messages
for i in range(20):
my_logger.debug('i = %d' % i)
# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)
for filename in logfiles:
print(filename)
結果應該是 6 個單獨的檔案,每個檔案都包含應用程式日誌歷史的一部分
logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5
最新檔案始終是 logging_rotatingfile_example.out
,每次達到大小限制時,它都會重新命名並加上字尾 .1
。每個現有備份檔案都會重新命名以遞增字尾(.1
變為 .2
,依此類推),而 .6
檔案則被刪除。
顯然,這個例子將日誌長度設定得太小,作為一個極端的例子。您需要將 *maxBytes* 設定為適當的值。
使用替代格式樣式¶
當日志記錄被新增到 Python 標準庫時,唯一一種用於格式化帶有可變內容的訊息的方法是使用 %-格式化方法。從那時起,Python 獲得了兩種新的格式化方法:string.Template
(在 Python 2.4 中新增)和 str.format()
(在 Python 2.6 中新增)。
日誌記錄(從 3.2 版開始)為這兩種額外的格式樣式提供了改進的支援。Formatter
類已得到增強,可以接受一個額外的可選關鍵字引數 style
。它預設為 '%'
,但其他可能的值是 '{'
和 '$'
,它們對應於另外兩種格式樣式。預設情況下保持向後相容性(如您所料),但透過顯式指定樣式引數,您可以指定與 str.format()
或 string.Template
配合使用的格式字串。這是一個示例控制檯會話,以展示可能性
>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
... style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
... style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>
請注意,日誌訊息的最終輸出格式與單個日誌訊息的構建方式完全獨立。它仍然可以使用 %-格式化,如下所示
>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>
日誌呼叫(logger.debug()
、logger.info()
等)僅接受實際日誌訊息本身的位置引數,關鍵字引數僅用於確定如何處理實際日誌呼叫的選項(例如,exc_info
關鍵字引數表示應記錄回溯資訊,或 extra
關鍵字引數表示要新增到日誌中的額外上下文資訊)。因此,您不能直接使用 str.format()
或 string.Template
語法進行日誌呼叫,因為在內部日誌包使用 %-格式化來合併格式字串和可變引數。在保持向後相容性的情況下無法更改這一點,因為現有程式碼中的所有日誌呼叫都將使用 %-格式字串。
然而,有一種方法可以使用 {} 和 $ 格式來構造您的單個日誌訊息。請記住,對於訊息,您可以使用任意物件作為訊息格式字串,並且日誌包將對該物件呼叫 str()
以獲取實際的格式字串。考慮以下兩個類
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
這些類中的任何一個都可以代替格式字串使用,以允許使用 {} 或 $ 格式來構建在格式化日誌輸出中代替 “%(message)s” 或 “{message}” 或 “$message” 出現的實際“訊息”部分。每次您想記錄一些東西時都使用類名會有點笨拙,但是如果您使用別名(例如 __(雙下劃線——不要與 _ 混淆,_ 是 gettext.gettext()
或其同類的同義詞/別名)),那將非常可口。
上述類不包含在 Python 中,儘管它們很容易複製貼上到您自己的程式碼中。它們可以按如下方式使用(假設它們在名為 wherever
的模組中宣告)
>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
... point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
雖然上述示例使用 print()
來展示格式化如何工作,但您當然會使用 logger.debug()
或類似方法來實際使用這種方法進行日誌記錄。
需要注意的一點是,這種方法不會帶來顯著的效能損失:實際的格式化並不是在您進行日誌記錄呼叫時發生的,而是在(如果)日誌訊息實際即將由處理程式輸出到日誌時發生的。因此,唯一可能讓您感到有點不尋常的是括號環繞著格式字串和引數,而不僅僅是格式字串。這是因為 __ 符號只是對其中一個 XXXMessage
類的建構函式呼叫的語法糖。
如果您願意,可以使用 LoggerAdapter
來達到與上述類似的效果,如以下示例所示
import logging
class Message:
def __init__(self, fmt, args):
self.fmt = fmt
self.args = args
def __str__(self):
return self.fmt.format(*self.args)
class StyleAdapter(logging.LoggerAdapter):
def log(self, level, msg, /, *args, stacklevel=1, **kwargs):
if self.isEnabledFor(level):
msg, kwargs = self.process(msg, kwargs)
self.logger.log(level, Message(msg, args), **kwargs,
stacklevel=stacklevel+1)
logger = StyleAdapter(logging.getLogger(__name__))
def main():
logger.debug('Hello, {}', 'world!')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
當使用 Python 3.8 或更高版本執行上述指令碼時,它應該記錄訊息 Hello, world!
。
自定義 LogRecord
¶
每個日誌事件都由一個 LogRecord
例項表示。當一個事件被記錄並且未被日誌記錄器級別過濾掉時,會建立一個 LogRecord
,其中填充了有關事件的資訊,然後傳遞給該日誌記錄器(及其祖先,包括停用進一步向上級傳播的日誌記錄器)的處理程式。在 Python 3.2 之前,此建立僅在兩個地方完成
Logger.makeRecord()
,在正常日誌記錄事件過程中呼叫。這直接呼叫LogRecord
來建立例項。makeLogRecord()
,用一個包含要新增到 LogRecord 的屬性的字典呼叫。這通常在透過網路接收到合適的字典時呼叫(例如,透過SocketHandler
以 pickle 形式,或透過HTTPHandler
以 JSON 形式)。
這通常意味著如果您需要對 LogRecord
進行任何特殊操作,您必須執行以下操作之一。
建立您自己的
Logger
子類,它覆蓋Logger.makeRecord()
,並在例項化任何您關心的日誌記錄器之前使用setLoggerClass()
設定它。
第一種方法在(例如)幾個不同的庫想要做不同的事情的場景中會有點笨拙。每個庫都會嘗試設定自己的 Logger
子類,而最後設定的那個會獲勝。
第二種方法對於許多情況來說效果很好,但不允許您例如使用 LogRecord
的專門子類。庫開發人員可以在其日誌記錄器上設定合適的過濾器,但他們必須記住每次引入新的日誌記錄器時都這樣做(他們只需透過新增新的包或模組並執行
logger = logging.getLogger(__name__)
在模組級別)。這可能需要考慮的事情太多了。開發人員還可以將過濾器新增到附加到其頂級日誌記錄器的 NullHandler
,但如果應用程式開發人員將處理程式附加到較低級別的庫日誌記錄器,則不會呼叫此過濾器——因此該處理程式的輸出將無法反映庫開發人員的意圖。
在 Python 3.2 及更高版本中,LogRecord
的建立是透過工廠完成的,您可以指定該工廠。工廠只是一個您可以使用 setLogRecordFactory()
設定並使用 getLogRecordFactory()
查詢的可呼叫物件。工廠以與 LogRecord
建構函式相同的簽名被呼叫,因為 LogRecord
是工廠的預設設定。
這種方法允許自定義工廠控制 LogRecord 建立的所有方面。例如,您可以返回一個子類,或者在建立後向記錄新增一些額外的屬性,使用類似以下的模式
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.custom_attribute = 0xdecafbad
return record
logging.setLogRecordFactory(record_factory)
這種模式允許不同的庫將工廠連結在一起,只要它們不覆蓋彼此的屬性或無意中覆蓋標準提供的屬性,就不會出現意外。但是,應該記住,鏈中的每個連結都會增加所有日誌操作的執行時開銷,並且該技術只應在使用 Filter
無法提供所需結果時才使用。
子類化 QueueHandler 和 QueueListener - 一個 ZeroMQ 示例¶
子類化 QueueHandler
¶
您可以使用 QueueHandler
子類將訊息傳送到其他型別的佇列,例如 ZeroMQ “釋出”套接字。在下面的示例中,套接字是單獨建立的,並作為其“佇列”傳遞給處理程式
import zmq # using pyzmq, the Python binding for ZeroMQ
import json # for serializing records portably
ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB) # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556') # or wherever
class ZeroMQSocketHandler(QueueHandler):
def enqueue(self, record):
self.queue.send_json(record.__dict__)
handler = ZeroMQSocketHandler(sock)
當然,還有其他組織方式,例如傳入處理程式建立套接字所需的資料
class ZeroMQSocketHandler(QueueHandler):
def __init__(self, uri, socktype=zmq.PUB, ctx=None):
self.ctx = ctx or zmq.Context()
socket = zmq.Socket(self.ctx, socktype)
socket.bind(uri)
super().__init__(socket)
def enqueue(self, record):
self.queue.send_json(record.__dict__)
def close(self):
self.queue.close()
子類化 QueueListener
¶
您還可以子類化 QueueListener
以從其他型別的佇列獲取訊息,例如 ZeroMQ “訂閱”套接字。這是一個示例
class ZeroMQSocketListener(QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
self.ctx = kwargs.get('ctx') or zmq.Context()
socket = zmq.Socket(self.ctx, zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '') # subscribe to everything
socket.connect(uri)
super().__init__(socket, *handlers, **kwargs)
def dequeue(self):
msg = self.queue.recv_json()
return logging.makeLogRecord(msg)
子類化 QueueHandler 和 QueueListener - 一個 pynng
示例¶
與上述部分類似,我們可以使用 pynng(一個 Python 繫結到 NNG,被譽為 ZeroMQ 的精神繼承者)實現一個監聽器和處理程式。以下程式碼片段展示了這一點——您可以在安裝了 pynng
的環境中測試它們。為了多樣化,我們首先介紹監聽器。
子類化 QueueListener
¶
# listener.py
import json
import logging
import logging.handlers
import pynng
DEFAULT_ADDR = "tcp://:13232"
interrupted = False
class NNGSocketListener(logging.handlers.QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
# Have a timeout for interruptability, and open a
# subscriber socket
socket = pynng.Sub0(listen=uri, recv_timeout=500)
# The b'' subscription matches all topics
topics = kwargs.pop('topics', None) or b''
socket.subscribe(topics)
# We treat the socket as a queue
super().__init__(socket, *handlers, **kwargs)
def dequeue(self, block):
data = None
# Keep looping while not interrupted and no data received over the
# socket
while not interrupted:
try:
data = self.queue.recv(block=block)
break
except pynng.Timeout:
pass
except pynng.Closed: # sometimes happens when you hit Ctrl-C
break
if data is None:
return None
# Get the logging event sent from a publisher
event = json.loads(data.decode('utf-8'))
return logging.makeLogRecord(event)
def enqueue_sentinel(self):
# Not used in this implementation, as the socket isn't really a
# queue
pass
logging.getLogger('pynng').propagate = False
listener = NNGSocketListener(DEFAULT_ADDR, logging.StreamHandler(), topics=b'')
listener.start()
print('Press Ctrl-C to stop.')
try:
while True:
pass
except KeyboardInterrupt:
interrupted = True
finally:
listener.stop()
子類化 QueueHandler
¶
# sender.py
import json
import logging
import logging.handlers
import time
import random
import pynng
DEFAULT_ADDR = "tcp://:13232"
class NNGSocketHandler(logging.handlers.QueueHandler):
def __init__(self, uri):
socket = pynng.Pub0(dial=uri, send_timeout=500)
super().__init__(socket)
def enqueue(self, record):
# Send the record as UTF-8 encoded JSON
d = dict(record.__dict__)
data = json.dumps(d)
self.queue.send(data.encode('utf-8'))
def close(self):
self.queue.close()
logging.getLogger('pynng').propagate = False
handler = NNGSocketHandler(DEFAULT_ADDR)
# Make sure the process ID is in the output
logging.basicConfig(level=logging.DEBUG,
handlers=[logging.StreamHandler(), handler],
format='%(levelname)-8s %(name)10s %(process)6s %(message)s')
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
logger_names = ('myapp', 'myapp.lib1', 'myapp.lib2')
msgno = 1
while True:
# Just randomly select some loggers and levels and log away
level = random.choice(levels)
logger = logging.getLogger(random.choice(logger_names))
logger.log(level, 'Message no. %5d' % msgno)
msgno += 1
delay = random.random() * 2 + 0.5
time.sleep(delay)
您可以在不同的命令列 shell 中執行上述兩個程式碼片段。如果我們在一個 shell 中執行監聽器,並在兩個不同的 shell 中執行傳送器,我們應該會看到類似以下的內容。在第一個傳送器 shell 中
$ python sender.py
DEBUG myapp 613 Message no. 1
WARNING myapp.lib2 613 Message no. 2
CRITICAL myapp.lib2 613 Message no. 3
WARNING myapp.lib2 613 Message no. 4
CRITICAL myapp.lib1 613 Message no. 5
DEBUG myapp 613 Message no. 6
CRITICAL myapp.lib1 613 Message no. 7
INFO myapp.lib1 613 Message no. 8
(and so on)
在第二個傳送器 shell 中
$ python sender.py
INFO myapp.lib2 657 Message no. 1
CRITICAL myapp.lib2 657 Message no. 2
CRITICAL myapp 657 Message no. 3
CRITICAL myapp.lib1 657 Message no. 4
INFO myapp.lib1 657 Message no. 5
WARNING myapp.lib2 657 Message no. 6
CRITICAL myapp 657 Message no. 7
DEBUG myapp.lib1 657 Message no. 8
(and so on)
在監聽器 shell 中
$ python listener.py
Press Ctrl-C to stop.
DEBUG myapp 613 Message no. 1
WARNING myapp.lib2 613 Message no. 2
INFO myapp.lib2 657 Message no. 1
CRITICAL myapp.lib2 613 Message no. 3
CRITICAL myapp.lib2 657 Message no. 2
CRITICAL myapp 657 Message no. 3
WARNING myapp.lib2 613 Message no. 4
CRITICAL myapp.lib1 613 Message no. 5
CRITICAL myapp.lib1 657 Message no. 4
INFO myapp.lib1 657 Message no. 5
DEBUG myapp 613 Message no. 6
WARNING myapp.lib2 657 Message no. 6
CRITICAL myapp 657 Message no. 7
CRITICAL myapp.lib1 613 Message no. 7
INFO myapp.lib1 613 Message no. 8
DEBUG myapp.lib1 657 Message no. 8
(and so on)
如您所見,來自兩個傳送程序的日誌記錄在監聽器的輸出中交錯。
一個基於字典的配置示例¶
下面是一個日誌配置字典的示例——它取自 Django 專案的文件。此字典被傳遞給 dictConfig()
以使配置生效
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'filters': {
'special': {
'()': 'project.logging.SpecialFilter',
'foo': 'bar',
},
},
'handlers': {
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'simple',
},
'mail_admins': {
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'filters': ['special']
}
},
'loggers': {
'django': {
'handlers': ['console'],
'propagate': True,
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myproject.custom': {
'handlers': ['console', 'mail_admins'],
'level': 'INFO',
'filters': ['special']
}
}
}
有關此配置的更多資訊,您可以檢視 Django 文件的 相關部分。
使用輪換器和命名器自定義日誌輪換處理¶
以下可執行指令碼提供了一個如何定義命名器和輪換器的示例,該指令碼演示了日誌檔案的 gzip 壓縮
import gzip
import logging
import logging.handlers
import os
import shutil
def namer(name):
return name + ".gz"
def rotator(source, dest):
with open(source, 'rb') as f_in:
with gzip.open(dest, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(source)
rh = logging.handlers.RotatingFileHandler('rotated.log', maxBytes=128, backupCount=5)
rh.rotator = rotator
rh.namer = namer
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(rh)
f = logging.Formatter('%(asctime)s %(message)s')
rh.setFormatter(f)
for i in range(1000):
root.info(f'Message no. {i + 1}')
執行此程式後,您將看到六個新檔案,其中五個已壓縮
$ ls rotated.log*
rotated.log rotated.log.2.gz rotated.log.4.gz
rotated.log.1.gz rotated.log.3.gz rotated.log.5.gz
$ zcat rotated.log.1.gz
2023-01-20 02:28:17,767 Message no. 996
2023-01-20 02:28:17,767 Message no. 997
2023-01-20 02:28:17,767 Message no. 998
一個更精細的多程序示例¶
以下工作示例展示瞭如何使用配置檔案在多程序中使用日誌記錄。配置相當簡單,但足以說明在真實的多程序場景中如何實現更復雜的配置。
在示例中,主程序生成一個監聽程序和一些工作程序。主程序、監聽器和工作程序都有三個獨立的配置(所有工作程序共享相同的配置)。我們可以看到主程序中的日誌記錄、工作程序如何記錄到 QueueHandler 以及監聽器如何實現 QueueListener 和更復雜的日誌配置,並安排將透過佇列接收的事件分派給配置中指定的處理程式。請注意,這些配置純粹是說明性的,但您應該能夠根據自己的場景調整此示例。
這是指令碼——文件字串和註釋希望能夠解釋它是如何工作的
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time
class MyHandler:
"""
A simple handler for logging events. It runs in the listener process and
dispatches events to loggers based on the name in the received record,
which then get dispatched, by the logging system, to the handlers
configured for those loggers.
"""
def handle(self, record):
if record.name == "root":
logger = logging.getLogger()
else:
logger = logging.getLogger(record.name)
if logger.isEnabledFor(record.levelno):
# The process name is transformed just to show that it's the listener
# doing the logging to files and console
record.processName = '%s (for %s)' % (current_process().name, record.processName)
logger.handle(record)
def listener_process(q, stop_event, config):
"""
This could be done in the main process, but is just done in a separate
process for illustrative purposes.
This initialises logging according to the specified configuration,
starts the listener and waits for the main process to signal completion
via the event. The listener is then stopped, and the process exits.
"""
logging.config.dictConfig(config)
listener = logging.handlers.QueueListener(q, MyHandler())
listener.start()
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
stop_event.wait()
listener.stop()
def worker_process(config):
"""
A number of these are spawned for the purpose of illustration. In
practice, they could be a heterogeneous bunch of processes rather than
ones which are identical to each other.
This initialises logging according to the specified configuration,
and logs a hundred messages with random levels to randomly selected
loggers.
A small sleep is added to allow other processes a chance to run. This
is not strictly needed, but it mixes the output from the different
processes a bit more than if it's left out.
"""
logging.config.dictConfig(config)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
time.sleep(0.01)
def main():
q = Queue()
# The main process gets a simple configuration which prints to the console.
config_initial = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO'
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG'
}
}
# The worker process configuration is just a QueueHandler attached to the
# root logger, which allows all messages to be sent to the queue.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_worker = {
'version': 1,
'disable_existing_loggers': True,
'handlers': {
'queue': {
'class': 'logging.handlers.QueueHandler',
'queue': q
}
},
'root': {
'handlers': ['queue'],
'level': 'DEBUG'
}
}
# The listener process configuration shows that the full flexibility of
# logging configuration is available to dispatch events to handlers however
# you want.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_listener = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
},
'simple': {
'class': 'logging.Formatter',
'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple',
'level': 'INFO'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed'
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed'
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'formatter': 'detailed',
'level': 'ERROR'
}
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'handlers': ['console', 'file', 'errors'],
'level': 'DEBUG'
}
}
# Log some initial events, just to show that logging in the parent works
# normally.
logging.config.dictConfig(config_initial)
logger = logging.getLogger('setup')
logger.info('About to create workers ...')
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1),
args=(config_worker,))
workers.append(wp)
wp.start()
logger.info('Started worker: %s', wp.name)
logger.info('About to create listener ...')
stop_event = Event()
lp = Process(target=listener_process, name='listener',
args=(q, stop_event, config_listener))
lp.start()
logger.info('Started listener')
# We now hang around for the workers to finish their work.
for wp in workers:
wp.join()
# Workers all done, listening can now stop.
# Logging in the parent still works normally.
logger.info('Telling listener to stop ...')
stop_event.set()
lp.join()
logger.info('All done.')
if __name__ == '__main__':
main()
在傳送給 SysLogHandler 的訊息中插入 BOM¶
RFC 5424 要求將 Unicode 訊息作為一組位元組傳送到 syslog 守護程式,這些位元組具有以下結構:一個可選的純 ASCII 元件,後跟一個 UTF-8 位元組序標記 (BOM),後跟使用 UTF-8 編碼的 Unicode。(請參閱規範的相關部分。)
在 Python 3.1 中,程式碼被新增到 SysLogHandler
以在訊息中插入 BOM,但不幸的是,它的實現不正確,BOM 出現在訊息的開頭,因此不允許在它之前出現任何純 ASCII 元件。
由於這種行為已損壞,不正確的 BOM 插入程式碼將從 Python 3.2.4 及更高版本中移除。但是,它不會被替換,如果您想生成包含 BOM、之前可選的純 ASCII 序列以及之後任意 Unicode 的 RFC 5424 相容訊息,並使用 UTF-8 編碼,那麼您需要執行以下操作
將
Formatter
例項附加到您的SysLogHandler
例項,格式字串如下'ASCII section\ufeffUnicode section'
Unicode 程式碼點 U+FEFF,當使用 UTF-8 編碼時,將被編碼為 UTF-8 BOM——位元組字串
b'\xef\xbb\xbf'
。用您喜歡的任何佔位符替換 ASCII 部分,但請確保替換後出現在其中的資料始終是 ASCII(這樣,它在 UTF-8 編碼後將保持不變)。
用您喜歡的任何佔位符替換 Unicode 部分;如果替換後出現的資料包含 ASCII 範圍之外的字元,那也沒關係——它將使用 UTF-8 編碼。
格式化後的訊息 將 由 SysLogHandler
使用 UTF-8 編碼。如果您遵循上述規則,您應該能夠生成 RFC 5424 相容訊息。如果您不這樣做,日誌記錄可能不會抱怨,但您的訊息將不符合 RFC 5424,並且您的 syslog 守護程式可能會抱怨。
實現結構化日誌¶
儘管大多數日誌訊息旨在供人閱讀,因此不易被機器解析,但在某些情況下,您可能希望以結構化格式輸出訊息,該格式 能夠 被程式解析(無需複雜的正則表示式來解析日誌訊息)。使用日誌包可以輕鬆實現這一點。實現此目的有多種方法,但以下是一種簡單的方法,它使用 JSON 以機器可解析的方式序列化事件
import json
import logging
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
return '%s >>> %s' % (self.message, json.dumps(self.kwargs))
_ = StructuredMessage # optional, to improve readability
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))
如果執行上述指令碼,它會列印
message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}
請注意,專案順序可能因使用的 Python 版本而異。
如果您需要更專業的處理,您可以使用自定義 JSON 編碼器,如以下完整示例所示
import json
import logging
class Encoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, set):
return tuple(o)
elif isinstance(o, str):
return o.encode('unicode_escape').decode('ascii')
return super().default(o)
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
s = Encoder().encode(self.kwargs)
return '%s >>> %s' % (self.message, s)
_ = StructuredMessage # optional, to improve readability
def main():
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))
if __name__ == '__main__':
main()
當執行上述指令碼時,它會列印
message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}
請注意,專案順序可能因使用的 Python 版本而異。
使用 dictConfig()
定製處理程式¶
有時您希望以特定方式定製日誌處理程式,如果您使用 dictConfig()
,您可能無需子類化即可實現此目的。例如,考慮您可能希望設定日誌檔案的所有權。在 POSIX 上,這可以透過使用 shutil.chown()
輕鬆完成,但 stdlib 中的檔案處理程式不提供內建支援。您可以使用簡單的函式(例如)自定義處理程式建立:
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
然後,您可以在傳遞給 dictConfig()
的日誌配置中指定透過呼叫此函式來建立日誌處理程式:
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
在此示例中,為了說明目的,我使用 pulse
使用者和組來設定所有權。將其整合到一個可執行的指令碼 chowntest.py
中:
import logging, logging.config, os, shutil
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')
要執行此程式,您可能需要以 root
身份執行:
$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log
請注意,此示例使用 Python 3.3,因為 shutil.chown()
是在該版本中出現的。此方法應該適用於任何支援 dictConfig()
的 Python 版本——即 Python 2.7、3.2 或更高版本。對於 3.3 之前的版本,您需要使用例如 os.chown()
來實現實際的所有權更改。
實際上,處理程式建立函式可能位於專案中的某個實用程式模組中。配置中的那一行:
'()': owned_file_handler,
您可以使用例如:
'()': 'ext://project.util.owned_file_handler',
其中 project.util
可以替換為函式所在的包的實際名稱。在上面的工作指令碼中,使用 'ext://__main__.owned_file_handler'
應該可以工作。在這裡,實際的可呼叫物件由 dictConfig()
從 ext://
規範解析。
此示例也希望能指明如何以相同的方式實現其他型別的檔案更改——例如設定特定的 POSIX 許可權位——使用 os.chmod()
。
當然,這種方法也可以擴充套件到 FileHandler
以外的其他型別的處理程式——例如,一個輪轉檔案處理程式,或完全不同型別的處理程式。
在整個應用程式中使用特定的格式樣式¶
在 Python 3.2 中,Formatter
獲得了一個 style
關鍵字引數,它在預設情況下為了向後相容性設定為 %
,但允許指定 {
或 $
以支援 str.format()
和 string.Template
支援的格式化方法。請注意,這控制了日誌訊息的最終輸出格式,並且與單個日誌訊息的構建方式完全正交。
日誌呼叫(debug()
、info()
等)只接受實際日誌訊息本身的位置引數,關鍵字引數僅用於確定如何處理日誌呼叫的選項(例如,exc_info
關鍵字引數用於指示應記錄回溯資訊,或 extra
關鍵字引數用於指示要新增到日誌的額外上下文資訊)。因此,您不能直接使用 str.format()
或 string.Template
語法進行日誌呼叫,因為日誌包內部使用 %-格式化來合併格式字串和可變引數。在保持向後相容性的情況下,無法更改此行為,因為現有程式碼中的所有日誌呼叫都將使用 %-格式字串。
有人建議將格式樣式與特定日誌記錄器關聯起來,但這種方法也遇到了向後相容性問題,因為任何現有程式碼都可能正在使用給定的日誌記錄器名稱並使用 % 格式化。
為了使日誌記錄在任何第三方庫和您的程式碼之間可互操作,關於格式的決定需要在單個日誌呼叫級別做出。這提供了幾種容納替代格式樣式的方法。
使用 LogRecord 工廠¶
在 Python 3.2 中,除了上面提到的 Formatter
更改之外,日誌包還增加了允許使用者使用 setLogRecordFactory()
函式設定自己的 LogRecord
子類的功能。您可以利用此功能設定自己的 LogRecord
子類,透過重寫 getMessage()
方法來做正確的事情。此方法的基類實現是 msg % args
格式化發生的地方,您可以在此處替換您的替代格式;但是,您應該注意支援所有格式樣式並將 % 格式化作為預設值,以確保與其他程式碼的互操作性。還應注意呼叫 str(self.msg)
,就像基類實現一樣。
有關更多資訊,請參閱 setLogRecordFactory()
和 LogRecord
的參考文件。
使用自定義訊息物件¶
還有另一種可能更簡單的方法,您可以使用 {} 和 $ 格式來構造您的單個日誌訊息。您可能還記得(從 使用任意物件作為訊息)在日誌記錄時,您可以使用任意物件作為訊息格式字串,日誌包將對該物件呼叫 str()
以獲取實際的格式字串。考慮以下兩個類:
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
其中任何一個都可以代替格式字串使用,以允許使用 {}- 或 $- 格式來構建實際的“訊息”部分,該部分在格式化日誌輸出中出現在“%(message)s”或“{message}”或“$message”的位置。如果您覺得每次要記錄某些內容時都使用類名有點笨拙,那麼如果您為訊息使用別名,例如 M
或 _
(或者可能是 __
,如果您將 _
用於本地化),則可以使其更易於接受。
下面給出了這種方法的例子。首先,使用 str.format()
進行格式化:
>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)
其次,使用 string.Template
進行格式化:
>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
需要注意的一點是,這種方法不會造成明顯的效能損失:實際的格式化操作不是在您進行日誌呼叫時發生的,而是在(如果)日誌訊息即將由處理程式輸出到日誌時發生的。因此,唯一可能讓您感到有點不尋常的是,括號環繞著格式字串和引數,而不僅僅是格式字串。這是因為 __ 符號只是上面所示的 XXXMessage
類之一的建構函式呼叫的語法糖。
使用 dictConfig()
配置過濾器¶
您 可以 使用 dictConfig()
配置過濾器,儘管乍一看可能不明顯如何操作(因此有了這個示例)。由於 Filter
是標準庫中唯一的過濾器類,並且它不太可能滿足許多要求(它只作為基類存在),您通常需要定義自己的 Filter
子類並重寫 filter()
方法。為此,請在過濾器的配置字典中指定 ()
鍵,指定一個可呼叫物件,該物件將用於建立過濾器(類是最明顯的選擇,但您可以提供任何返回 Filter
例項的可呼叫物件)。以下是一個完整的示例:
import logging
import logging.config
import sys
class MyFilter(logging.Filter):
def __init__(self, param=None):
self.param = param
def filter(self, record):
if self.param is None:
allow = True
else:
allow = self.param not in record.msg
if allow:
record.msg = 'changed: ' + record.msg
return allow
LOGGING = {
'version': 1,
'filters': {
'myfilter': {
'()': MyFilter,
'param': 'noshow',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': ['myfilter']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console']
},
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.debug('hello')
logging.debug('hello - noshow')
此示例展示瞭如何以關鍵字引數的形式將配置資料傳遞給構造例項的可呼叫物件。執行時,上述指令碼將列印:
changed: hello
這表明過濾器正在按配置工作。
另外幾點注意事項:
如果無法在配置中直接引用可呼叫物件(例如,如果它位於不同的模組中,並且您無法在配置字典中直接匯入它),則可以使用 訪問外部物件 中描述的
ext://...
形式。例如,在上面的示例中,您可以使用文字'ext://__main__.MyFilter'
代替MyFilter
。除了過濾器之外,此技術還可用於配置自定義處理程式和格式化程式。有關日誌記錄如何支援在其配置中使用使用者定義物件的更多資訊,請參閱 使用者定義物件,並參閱上面的另一本食譜 使用 dictConfig() 定製處理程式。
自定義異常格式化¶
有時您可能希望進行自定義異常格式化——例如,假設您希望每個日誌事件只有一行,即使存在異常資訊。您可以使用自定義格式化程式類來實現這一點,如下例所示:
import logging
class OneLineExceptionFormatter(logging.Formatter):
def formatException(self, exc_info):
"""
Format an exception so that it prints on a single line.
"""
result = super().formatException(exc_info)
return repr(result) # or format into one line however you want to
def format(self, record):
s = super().format(record)
if record.exc_text:
s = s.replace('\n', '') + '|'
return s
def configure_logging():
fh = logging.FileHandler('output.txt', 'w')
f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
'%d/%m/%Y %H:%M:%S')
fh.setFormatter(f)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(fh)
def main():
configure_logging()
logging.info('Sample message')
try:
x = 1 / 0
except ZeroDivisionError as e:
logging.exception('ZeroDivisionError: %s', e)
if __name__ == '__main__':
main()
執行時,這將生成一個只有兩行的檔案:
28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: division by zero|'Traceback (most recent call last):\n File "logtest7.py", line 30, in main\n x = 1 / 0\nZeroDivisionError: division by zero'|
雖然上述處理方式很簡單,但它指明瞭如何根據您的喜好格式化異常資訊。traceback
模組可能對更專業的需要有所幫助。
播報日誌訊息¶
在某些情況下,希望日誌訊息以可聽而非可見的格式呈現。如果您的系統中有文字轉語音(TTS)功能,即使它沒有 Python 繫結,這也很容易實現。大多數 TTS 系統都有一個可以執行的命令列程式,可以透過處理程式使用 subprocess
來呼叫。這裡假設 TTS 命令列程式不會期望與使用者互動或花費很長時間才能完成,並且日誌訊息的頻率不會太高以至於讓使用者訊息氾濫,並且可以接受訊息一個接一個地播報而不是同時播報。下面的示例實現等待一個訊息播報完畢後才處理下一個訊息,這可能會導致其他處理程式等待。這是一個簡短的示例,展示了這種方法,它假設 espeak
TTS 包可用:
import logging
import subprocess
import sys
class TTSHandler(logging.Handler):
def emit(self, record):
msg = self.format(record)
# Speak slowly in a female English voice
cmd = ['espeak', '-s150', '-ven+f3', msg]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# wait for the program to finish
p.communicate()
def configure_logging():
h = TTSHandler()
root = logging.getLogger()
root.addHandler(h)
# the default formatter just returns the message
root.setLevel(logging.DEBUG)
def main():
logging.info('Hello')
logging.debug('Goodbye')
if __name__ == '__main__':
configure_logging()
sys.exit(main())
執行此指令碼時,它應該以女性聲音說出“Hello”和“Goodbye”。
當然,上述方法可以適用於其他 TTS 系統,甚至可以適用於透過命令列執行外部程式來處理訊息的完全不同的系統。
緩衝日誌訊息並有條件地輸出它們¶
有時,您可能希望將日誌訊息儲存在臨時區域中,並且只有在滿足特定條件時才輸出它們。例如,您可能希望在函式中開始記錄除錯事件,如果函式無錯誤完成,您不想用收集到的除錯資訊來雜亂日誌,但如果發生錯誤,您希望將所有除錯資訊以及錯誤資訊都輸出。
下面是一個示例,展示瞭如何使用裝飾器來實現這種日誌行為。它利用了 logging.handlers.MemoryHandler
,它允許緩衝日誌事件直到某個條件發生,此時緩衝的事件會被 flush
——傳遞給另一個處理程式(target
處理程式)進行處理。預設情況下,MemoryHandler
在其緩衝區滿或遇到級別大於或等於指定閾值的事件時進行 flush。如果您希望自定義 flush 行為,可以使用 MemoryHandler
的更專業的子類。
示例指令碼有一個簡單的函式 foo
,它只是遍歷所有日誌級別,向 sys.stderr
寫入要記錄的級別,然後實際在該級別記錄一條訊息。您可以向 foo
傳遞一個引數,如果為真,將記錄 ERROR 和 CRITICAL 級別——否則,它只記錄 DEBUG、INFO 和 WARNING 級別。
該指令碼只安排用一個裝飾器來裝飾 foo
,這個裝飾器將完成所需的條件日誌記錄。裝飾器接受一個日誌記錄器作為引數,並在被裝飾函式呼叫的持續時間內附加一個記憶體處理程式。裝飾器可以額外使用目標處理程式、觸發 flush 的級別和緩衝區容量(記錄數量)進行引數化。這些引數預設分別為寫入 sys.stderr
的 StreamHandler
、logging.ERROR
和 100
。
這是指令碼:
import logging
from logging.handlers import MemoryHandler
import sys
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
if target_handler is None:
target_handler = logging.StreamHandler()
if flush_level is None:
flush_level = logging.ERROR
if capacity is None:
capacity = 100
handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)
def decorator(fn):
def wrapper(*args, **kwargs):
logger.addHandler(handler)
try:
return fn(*args, **kwargs)
except Exception:
logger.exception('call failed')
raise
finally:
super(MemoryHandler, handler).flush()
logger.removeHandler(handler)
return wrapper
return decorator
def write_line(s):
sys.stderr.write('%s\n' % s)
def foo(fail=False):
write_line('about to log at DEBUG ...')
logger.debug('Actually logged at DEBUG')
write_line('about to log at INFO ...')
logger.info('Actually logged at INFO')
write_line('about to log at WARNING ...')
logger.warning('Actually logged at WARNING')
if fail:
write_line('about to log at ERROR ...')
logger.error('Actually logged at ERROR')
write_line('about to log at CRITICAL ...')
logger.critical('Actually logged at CRITICAL')
return fail
decorated_foo = log_if_errors(logger)(foo)
if __name__ == '__main__':
logger.setLevel(logging.DEBUG)
write_line('Calling undecorated foo with False')
assert not foo(False)
write_line('Calling undecorated foo with True')
assert foo(True)
write_line('Calling decorated foo with False')
assert not decorated_foo(False)
write_line('Calling decorated foo with True')
assert decorated_foo(True)
執行此指令碼時,應觀察到以下輸出:
Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL
如您所見,只有當事件的嚴重性為 ERROR 或更高時,才會發生實際的日誌輸出,但在這種情況下,之前所有嚴重性較低的事件也會被記錄。
當然,您可以使用傳統的裝飾方法:
@log_if_errors(logger)
def foo(fail=False):
...
將日誌訊息透過電子郵件傳送,並進行緩衝¶
為了說明如何透過電子郵件傳送日誌訊息,以便每封電子郵件傳送指定數量的訊息,您可以子類化 BufferingHandler
。在以下示例中,您可以根據自己的特定需求進行調整,提供了一個簡單的測試工具,允許您使用命令列引數執行指令碼,這些引數指定您通常需要透過 SMTP 傳送郵件的內容。(執行下載的指令碼時帶上 -h
引數以檢視所需和可選引數。)
import logging
import logging.handlers
import smtplib
class BufferingSMTPHandler(logging.handlers.BufferingHandler):
def __init__(self, mailhost, port, username, password, fromaddr, toaddrs,
subject, capacity):
logging.handlers.BufferingHandler.__init__(self, capacity)
self.mailhost = mailhost
self.mailport = port
self.username = username
self.password = password
self.fromaddr = fromaddr
if isinstance(toaddrs, str):
toaddrs = [toaddrs]
self.toaddrs = toaddrs
self.subject = subject
self.setFormatter(logging.Formatter("%(asctime)s %(levelname)-5s %(message)s"))
def flush(self):
if len(self.buffer) > 0:
try:
smtp = smtplib.SMTP(self.mailhost, self.mailport)
smtp.starttls()
smtp.login(self.username, self.password)
msg = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n" % (self.fromaddr, ','.join(self.toaddrs), self.subject)
for record in self.buffer:
s = self.format(record)
msg = msg + s + "\r\n"
smtp.sendmail(self.fromaddr, self.toaddrs, msg)
smtp.quit()
except Exception:
if logging.raiseExceptions:
raise
self.buffer = []
if __name__ == '__main__':
import argparse
ap = argparse.ArgumentParser()
aa = ap.add_argument
aa('host', metavar='HOST', help='SMTP server')
aa('--port', '-p', type=int, default=587, help='SMTP port')
aa('user', metavar='USER', help='SMTP username')
aa('password', metavar='PASSWORD', help='SMTP password')
aa('to', metavar='TO', help='Addressee for emails')
aa('sender', metavar='SENDER', help='Sender email address')
aa('--subject', '-s',
default='Test Logging email from Python logging module (buffering)',
help='Subject of email')
options = ap.parse_args()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
h = BufferingSMTPHandler(options.host, options.port, options.user,
options.password, options.sender,
options.to, options.subject, 10)
logger.addHandler(h)
for i in range(102):
logger.info("Info index = %d", i)
h.flush()
h.close()
如果您執行此指令碼並且您的 SMTP 伺服器設定正確,您應該會發現它會向您指定的收件人傳送十一封電子郵件。前十封電子郵件每封將包含十條日誌訊息,第十一封將包含兩條訊息。這構成了指令碼中指定的 102 條訊息。
透過配置使用 UTC(GMT)格式化時間¶
有時您希望使用 UTC 格式化時間,這可以透過使用 UTCFormatter
這樣的類來完成,如下所示:
import logging
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
然後您可以在程式碼中使用 UTCFormatter
代替 Formatter
。如果您想透過配置來實現這一點,您可以使用 dictConfig()
API,其方法示例如下:
import logging
import logging.config
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'utc': {
'()': UTCFormatter,
'format': '%(asctime)s %(message)s',
},
'local': {
'format': '%(asctime)s %(message)s',
}
},
'handlers': {
'console1': {
'class': 'logging.StreamHandler',
'formatter': 'utc',
},
'console2': {
'class': 'logging.StreamHandler',
'formatter': 'local',
},
},
'root': {
'handlers': ['console1', 'console2'],
}
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.warning('The local time is %s', time.asctime())
執行此指令碼時,它應該列印類似以下內容:
2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015
顯示時間如何以本地時間和 UTC 兩種格式顯示,每個處理程式各一個。
使用上下文管理器進行選擇性日誌記錄¶
有時,暫時更改日誌配置並在完成某些操作後將其恢復會很有用。為此,上下文管理器是儲存和恢復日誌上下文最明顯的方式。下面是這樣一個上下文管理器的簡單示例,它允許您純粹在上下文管理器的範圍內可選地更改日誌級別並新增日誌處理程式:
import logging
import sys
class LoggingContext:
def __init__(self, logger, level=None, handler=None, close=True):
self.logger = logger
self.level = level
self.handler = handler
self.close = close
def __enter__(self):
if self.level is not None:
self.old_level = self.logger.level
self.logger.setLevel(self.level)
if self.handler:
self.logger.addHandler(self.handler)
def __exit__(self, et, ev, tb):
if self.level is not None:
self.logger.setLevel(self.old_level)
if self.handler:
self.logger.removeHandler(self.handler)
if self.handler and self.close:
self.handler.close()
# implicit return of None => don't swallow exceptions
如果您指定了級別值,則在上下文管理器所覆蓋的 with
塊範圍內,日誌記錄器的級別將設定為該值。如果您指定了處理程式,則在進入塊時將其新增到日誌記錄器,並在退出塊時將其刪除。您還可以要求管理器在塊退出時為您關閉處理程式——如果您不再需要該處理程式,就可以這樣做。
為了說明它的工作原理,我們可以在上面新增以下程式碼塊:
if __name__ == '__main__':
logger = logging.getLogger('foo')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
logger.info('1. This should appear just once on stderr.')
logger.debug('2. This should not appear.')
with LoggingContext(logger, level=logging.DEBUG):
logger.debug('3. This should appear once on stderr.')
logger.debug('4. This should not appear.')
h = logging.StreamHandler(sys.stdout)
with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
logger.debug('5. This should appear twice - once on stderr and once on stdout.')
logger.info('6. This should appear just once on stderr.')
logger.debug('7. This should not appear.')
我們最初將日誌記錄器的級別設定為 INFO
,因此訊息 #1 出現而訊息 #2 不出現。然後,我們在接下來的 with
塊中臨時將級別更改為 DEBUG
,因此訊息 #3 出現。塊退出後,日誌記錄器的級別恢復到 INFO
,因此訊息 #4 不出現。在下一個 with
塊中,我們再次將級別設定為 DEBUG
,但同時也添加了一個寫入 sys.stdout
的處理程式。因此,訊息 #5 在控制檯上出現兩次(一次透過 stderr
,一次透過 stdout
)。with
語句完成後,狀態恢復到以前的樣子,因此訊息 #6 出現(像訊息 #1),而訊息 #7 不出現(就像訊息 #2)。
如果我們執行生成的指令碼,結果如下:
$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
如果我們再次執行它,但將 stderr
重定向到 /dev/null
,我們將看到以下內容,這是唯一寫入 stdout
的訊息:
$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.
再次執行,但將 stdout
重定向到 /dev/null
,我們得到:
$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
在這種情況下,列印到 stdout
的訊息 #5 不會出現,正如預期的那樣。
當然,這裡描述的方法可以泛化,例如暫時附加日誌過濾器。請注意,上述程式碼在 Python 2 和 Python 3 中都適用。
CLI 應用程式入門模板¶
這是一個示例,展示瞭如何:
根據命令列引數使用日誌級別
以一致的方式將請求分派到單獨檔案中的多個子命令,所有子命令都以相同的級別記錄日誌
利用簡單、最少的配置
假設我們有一個命令列應用程式,其任務是停止、啟動或重啟某些服務。為了說明目的,這可以組織為一個檔案 app.py
,它是應用程式的主指令碼,各個命令在 start.py
、stop.py
和 restart.py
中實現。進一步假設我們希望透過命令列引數控制應用程式的詳細程度,預設值為 logging.INFO
。這是 app.py
的一種編寫方式:
import argparse
import importlib
import logging
import os
import sys
def main(args=None):
scriptname = os.path.basename(__file__)
parser = argparse.ArgumentParser(scriptname)
levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
parser.add_argument('--log-level', default='INFO', choices=levels)
subparsers = parser.add_subparsers(dest='command',
help='Available commands:')
start_cmd = subparsers.add_parser('start', help='Start a service')
start_cmd.add_argument('name', metavar='NAME',
help='Name of service to start')
stop_cmd = subparsers.add_parser('stop',
help='Stop one or more services')
stop_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to stop')
restart_cmd = subparsers.add_parser('restart',
help='Restart one or more services')
restart_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to restart')
options = parser.parse_args()
# the code to dispatch commands could all be in this file. For the purposes
# of illustration only, we implement each command in a separate module.
try:
mod = importlib.import_module(options.command)
cmd = getattr(mod, 'command')
except (ImportError, AttributeError):
print('Unable to find the code for command \'%s\'' % options.command)
return 1
# Could get fancy here and load configuration from file or dictionary
logging.basicConfig(level=options.log_level,
format='%(levelname)s %(name)s %(message)s')
cmd(options)
if __name__ == '__main__':
sys.exit(main())
而 start
、stop
和 restart
命令可以在單獨的模組中實現,例如啟動模組如下:
# start.py
import logging
logger = logging.getLogger(__name__)
def command(options):
logger.debug('About to start %s', options.name)
# actually do the command processing here ...
logger.info('Started the \'%s\' service.', options.name)
因此,停止模組如下:
# stop.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to stop %s', services)
# actually do the command processing here ...
logger.info('Stopped the %s service%s.', services, plural)
重啟模組也類似:
# restart.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to restart %s', services)
# actually do the command processing here ...
logger.info('Restarted the %s service%s.', services, plural)
如果我們使用預設日誌級別執行此應用程式,我們將獲得如下輸出:
$ python app.py start foo
INFO start Started the 'foo' service.
$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
第一個詞是日誌級別,第二個詞是事件記錄位置的模組或包名稱。
如果我們更改日誌級別,那麼我們可以更改傳送到日誌的資訊。例如,如果我們想要更多資訊:
$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.
$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
如果我們想要更少的資訊:
$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz
在這種情況下,命令不會向控制檯列印任何內容,因為它們沒有記錄任何 WARNING
級別或更高級別的日誌。
日誌記錄的 Qt GUI¶
一個時常出現的問題是如何將日誌記錄到 GUI 應用程式中。Qt 框架是一個流行的跨平臺 UI 框架,它透過 PySide2 或 PyQt5 庫提供 Python 繫結。
以下示例展示瞭如何將日誌記錄到 Qt GUI。這引入了一個簡單的 QtHandler
類,它接受一個可呼叫物件,該物件應該是主執行緒中進行 GUI 更新的槽。還建立了一個工作執行緒,以展示如何從 UI 本身(透過按鈕進行手動日誌記錄)以及在後臺進行工作的工作執行緒(此處只是以隨機級別和隨機短延遲記錄訊息)向 GUI 記錄日誌。
工作執行緒是使用 Qt 的 QThread
類而不是 threading
模組實現的,因為在某些情況下必須使用 QThread
,它提供了與其它 Qt
元件更好的整合。
該程式碼應適用於 PySide6
、PyQt6
、PySide2
或 PyQt5
的任何最新版本。您應該能夠將此方法應用於早期版本的 Qt。請參閱程式碼片段中的註釋以獲取更詳細的資訊。
import datetime
import logging
import random
import sys
import time
# Deal with minor differences between different Qt packages
try:
from PySide6 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
try:
from PyQt6 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
except ImportError:
try:
from PySide2 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
from PyQt5 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
logger = logging.getLogger(__name__)
#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
signal = Signal(str, logging.LogRecord)
#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
def __init__(self, slotfunc, *args, **kwargs):
super().__init__(*args, **kwargs)
self.signaller = Signaller()
self.signaller.signal.connect(slotfunc)
def emit(self, record):
s = self.format(record)
self.signaller.signal.emit(s, record)
#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
return QtCore.QThread.currentThread().objectName()
#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
@Slot()
def start(self):
extra = {'qThreadName': ctname() }
logger.debug('Started work', extra=extra)
i = 1
# Let the thread run until interrupted. This allows reasonably clean
# thread termination.
while not QtCore.QThread.currentThread().isInterruptionRequested():
delay = 0.5 + random.random() * 2
time.sleep(delay)
try:
if random.random() < 0.1:
raise ValueError('Exception raised: %d' % i)
else:
level = random.choice(LEVELS)
logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
except ValueError as e:
logger.exception('Failed: %s', e, extra=extra)
i += 1
#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):
COLORS = {
logging.DEBUG: 'black',
logging.INFO: 'blue',
logging.WARNING: 'orange',
logging.ERROR: 'red',
logging.CRITICAL: 'purple',
}
def __init__(self, app):
super().__init__()
self.app = app
self.textedit = te = QtWidgets.QPlainTextEdit(self)
# Set whatever the default monospace font is for the platform
f = QtGui.QFont('nosuchfont')
if hasattr(f, 'Monospace'):
f.setStyleHint(f.Monospace)
else:
f.setStyleHint(f.StyleHint.Monospace) # for Qt6
te.setFont(f)
te.setReadOnly(True)
PB = QtWidgets.QPushButton
self.work_button = PB('Start background work', self)
self.log_button = PB('Log a message at a random level', self)
self.clear_button = PB('Clear log window', self)
self.handler = h = QtHandler(self.update_status)
# Remember to use qThreadName rather than threadName in the format string.
fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
formatter = logging.Formatter(fs)
h.setFormatter(formatter)
logger.addHandler(h)
# Set up to terminate the QThread when we exit
app.aboutToQuit.connect(self.force_quit)
# Lay out all the widgets
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(te)
layout.addWidget(self.work_button)
layout.addWidget(self.log_button)
layout.addWidget(self.clear_button)
self.setFixedSize(900, 400)
# Connect the non-worker slots and signals
self.log_button.clicked.connect(self.manual_update)
self.clear_button.clicked.connect(self.clear_display)
# Start a new worker thread and connect the slots for the worker
self.start_thread()
self.work_button.clicked.connect(self.worker.start)
# Once started, the button should be disabled
self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))
def start_thread(self):
self.worker = Worker()
self.worker_thread = QtCore.QThread()
self.worker.setObjectName('Worker')
self.worker_thread.setObjectName('WorkerThread') # for qThreadName
self.worker.moveToThread(self.worker_thread)
# This will start an event loop in the worker thread
self.worker_thread.start()
def kill_thread(self):
# Just tell the worker to stop, then tell it to quit and wait for that
# to happen
self.worker_thread.requestInterruption()
if self.worker_thread.isRunning():
self.worker_thread.quit()
self.worker_thread.wait()
else:
print('worker has already exited.')
def force_quit(self):
# For use when the window is closed
if self.worker_thread.isRunning():
self.kill_thread()
# The functions below update the UI and run in the main thread because
# that's where the slots are set up
@Slot(str, logging.LogRecord)
def update_status(self, status, record):
color = self.COLORS.get(record.levelno, 'black')
s = '<pre><font color="%s">%s</font></pre>' % (color, status)
self.textedit.appendHtml(s)
@Slot()
def manual_update(self):
# This function uses the formatted message passed in, but also uses
# information from the record to format the message in an appropriate
# color according to its severity (level).
level = random.choice(LEVELS)
extra = {'qThreadName': ctname() }
logger.log(level, 'Manually logged!', extra=extra)
@Slot()
def clear_display(self):
self.textedit.clear()
def main():
QtCore.QThread.currentThread().setObjectName('MainThread')
logging.getLogger().setLevel(logging.DEBUG)
app = QtWidgets.QApplication(sys.argv)
example = Window(app)
example.show()
if hasattr(app, 'exec'):
rc = app.exec()
else:
rc = app.exec_()
sys.exit(rc)
if __name__=='__main__':
main()
支援 RFC5424 的 syslog 日誌記錄¶
儘管 RFC 5424 釋出於 2009 年,但大多數 syslog 伺服器預設配置為使用較舊的 RFC 3164,該協議可追溯到 2001 年。當 Python 在 2003 年新增 logging
時,它支援當時較早(也是唯一存在)的協議。由於 RFC5424 釋出後,它在 syslog 伺服器中並未得到廣泛部署,因此 SysLogHandler
的功能尚未更新。
RFC 5424 包含一些有用的功能,例如支援結構化資料,如果您需要能夠向支援它的 syslog 伺服器記錄日誌,您可以使用如下所示的子類化處理程式:
import datetime
import logging.handlers
import re
import socket
import time
class SysLogHandler5424(logging.handlers.SysLogHandler):
tz_offset = re.compile(r'([+-]\d{2})(\d{2})$')
escaped = re.compile(r'([\]"\\])')
def __init__(self, *args, **kwargs):
self.msgid = kwargs.pop('msgid', None)
self.appname = kwargs.pop('appname', None)
super().__init__(*args, **kwargs)
def format(self, record):
version = 1
asctime = datetime.datetime.fromtimestamp(record.created).isoformat()
m = self.tz_offset.match(time.strftime('%z'))
has_offset = False
if m and time.timezone:
hrs, mins = m.groups()
if int(hrs) or int(mins):
has_offset = True
if not has_offset:
asctime += 'Z'
else:
asctime += f'{hrs}:{mins}'
try:
hostname = socket.gethostname()
except Exception:
hostname = '-'
appname = self.appname or '-'
procid = record.process
msgid = '-'
msg = super().format(record)
sdata = '-'
if hasattr(record, 'structured_data'):
sd = record.structured_data
# This should be a dict where the keys are SD-ID and the value is a
# dict mapping PARAM-NAME to PARAM-VALUE (refer to the RFC for what these
# mean)
# There's no error checking here - it's purely for illustration, and you
# can adapt this code for use in production environments
parts = []
def replacer(m):
g = m.groups()
return '\\' + g[0]
for sdid, dv in sd.items():
part = f'[{sdid}'
for k, v in dv.items():
s = str(v)
s = self.escaped.sub(replacer, s)
part += f' {k}="{s}"'
part += ']'
parts.append(part)
sdata = ''.join(parts)
return f'{version} {asctime} {hostname} {appname} {procid} {msgid} {sdata} {msg}'
您需要熟悉 RFC 5424 才能完全理解上述程式碼,而且您可能有一些稍微不同的需求(例如,如何將結構化資料傳遞到日誌中)。儘管如此,上述內容應該能夠適應您的特定需求。使用上述處理程式,您可以透過以下方式傳遞結構化資料:
sd = {
'foo@12345': {'bar': 'baz', 'baz': 'bozz', 'fizz': r'buzz'},
'foo@54321': {'rab': 'baz', 'zab': 'bozz', 'zzif': r'buzz'}
}
extra = {'structured_data': sd}
i = 1
logger.debug('Message %d', i, extra=extra)
如何將日誌記錄器視為輸出流¶
有時,您需要與需要寫入檔案類物件的第三方 API 進行介面,但您希望將 API 的輸出定向到日誌記錄器。您可以使用一個將日誌記錄器封裝在檔案類 API 中的類來實現此目的。以下是一個簡短的指令碼,演示了這樣一個類:
import logging
class LoggerWriter:
def __init__(self, logger, level):
self.logger = logger
self.level = level
def write(self, message):
if message != '\n': # avoid printing bare newlines, if you like
self.logger.log(self.level, message)
def flush(self):
# doesn't actually do anything, but might be expected of a file-like
# object - so optional depending on your situation
pass
def close(self):
# doesn't actually do anything, but might be expected of a file-like
# object - so optional depending on your situation. You might want
# to set a flag so that later calls to write raise an exception
pass
def main():
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('demo')
info_fp = LoggerWriter(logger, logging.INFO)
debug_fp = LoggerWriter(logger, logging.DEBUG)
print('An INFO message', file=info_fp)
print('A DEBUG message', file=debug_fp)
if __name__ == "__main__":
main()
執行此指令碼時,它會列印:
INFO:demo:An INFO message
DEBUG:demo:A DEBUG message
您還可以使用 LoggerWriter
來重定向 sys.stdout
和 sys.stderr
,方法如下:
import sys
sys.stdout = LoggerWriter(logger, logging.INFO)
sys.stderr = LoggerWriter(logger, logging.WARNING)
您應該在根據需要配置日誌記錄 之後 執行此操作。在上面的示例中,basicConfig()
呼叫執行此操作(使用 sys.stderr
值,在其被 LoggerWriter
例項覆蓋之前)。然後,您將得到如下結果:
>>> print('Foo')
INFO:demo:Foo
>>> print('Bar', file=sys.stderr)
WARNING:demo:Bar
>>>
當然,上面的示例顯示了根據 basicConfig()
使用的格式的輸出,但您可以在配置日誌記錄時使用不同的格式化程式。
請注意,使用上述方案,您在某種程度上受制於正在攔截的緩衝和寫入呼叫序列。例如,使用上面 LoggerWriter
的定義,如果您有以下程式碼片段:
sys.stderr = LoggerWriter(logger, logging.WARNING)
1 / 0
然後執行指令碼的結果是:
WARNING:demo:Traceback (most recent call last):
WARNING:demo: File "/home/runner/cookbook-loggerwriter/test.py", line 53, in <module>
WARNING:demo:
WARNING:demo:main()
WARNING:demo: File "/home/runner/cookbook-loggerwriter/test.py", line 49, in main
WARNING:demo:
WARNING:demo:1 / 0
WARNING:demo:ZeroDivisionError
WARNING:demo::
WARNING:demo:division by zero
如您所見,此輸出並不理想。這是因為寫入 sys.stderr
的底層程式碼進行了多次寫入,每次寫入都會產生一個單獨的日誌行(例如,上面的最後三行)。為了解決這個問題,您需要緩衝內容,並且只在看到換行符時才輸出日誌行。讓我們使用一個稍微更好的 LoggerWriter
實現:
class BufferingLoggerWriter(LoggerWriter):
def __init__(self, logger, level):
super().__init__(logger, level)
self.buffer = ''
def write(self, message):
if '\n' not in message:
self.buffer += message
else:
parts = message.split('\n')
if self.buffer:
s = self.buffer + parts.pop(0)
self.logger.log(self.level, s)
self.buffer = parts.pop()
for part in parts:
self.logger.log(self.level, part)
這只是將內容緩衝起來直到看到換行符,然後記錄完整的行。使用這種方法,您可以獲得更好的輸出:
WARNING:demo:Traceback (most recent call last):
WARNING:demo: File "/home/runner/cookbook-loggerwriter/main.py", line 55, in <module>
WARNING:demo: main()
WARNING:demo: File "/home/runner/cookbook-loggerwriter/main.py", line 52, in main
WARNING:demo: 1/0
WARNING:demo:ZeroDivisionError: division by zero
如何在日誌輸出中統一處理換行符¶
通常,記錄的訊息(例如到控制檯或檔案)由單行文字組成。但是,有時需要處理多行訊息——無論是由於日誌格式字串包含換行符,還是記錄的資料包含換行符。如果您想統一處理此類訊息,以便記錄訊息中的每一行都像單獨記錄一樣統一格式化,您可以使用處理程式 mixin 來實現,如下面的程式碼片段所示:
# Assume this is in a module mymixins.py
import copy
class MultilineMixin:
def emit(self, record):
s = record.getMessage()
if '\n' not in s:
super().emit(record)
else:
lines = s.splitlines()
rec = copy.copy(record)
rec.args = None
for line in lines:
rec.msg = line
super().emit(rec)
您可以按如下指令碼使用 mixin:
import logging
from mymixins import MultilineMixin
logger = logging.getLogger(__name__)
class StreamHandler(MultilineMixin, logging.StreamHandler):
pass
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-9s %(message)s',
handlers = [StreamHandler()])
logger.debug('Single line')
logger.debug('Multiple lines:\nfool me once ...')
logger.debug('Another single line')
logger.debug('Multiple lines:\n%s', 'fool me ...\ncan\'t get fooled again')
該指令碼執行時,會打印出類似以下內容:
2025-07-02 13:54:47,234 DEBUG Single line
2025-07-02 13:54:47,234 DEBUG Multiple lines:
2025-07-02 13:54:47,234 DEBUG fool me once ...
2025-07-02 13:54:47,234 DEBUG Another single line
2025-07-02 13:54:47,234 DEBUG Multiple lines:
2025-07-02 13:54:47,234 DEBUG fool me ...
2025-07-02 13:54:47,234 DEBUG can't get fooled again
另一方面,如果您擔心 日誌注入,您可以使用一個轉義換行符的格式化程式,如下例所示:
import logging
logger = logging.getLogger(__name__)
class EscapingFormatter(logging.Formatter):
def format(self, record):
s = super().format(record)
return s.replace('\n', r'\n')
if __name__ == '__main__':
h = logging.StreamHandler()
h.setFormatter(EscapingFormatter('%(asctime)s %(levelname)-9s %(message)s'))
logging.basicConfig(level=logging.DEBUG, handlers = [h])
logger.debug('Single line')
logger.debug('Multiple lines:\nfool me once ...')
logger.debug('Another single line')
logger.debug('Multiple lines:\n%s', 'fool me ...\ncan\'t get fooled again')
當然,您可以使用任何對您來說最有意義的轉義方案。該指令碼執行後,應該會產生如下輸出:
2025-07-09 06:47:33,783 DEBUG Single line
2025-07-09 06:47:33,783 DEBUG Multiple lines:\nfool me once ...
2025-07-09 06:47:33,783 DEBUG Another single line
2025-07-09 06:47:33,783 DEBUG Multiple lines:\nfool me ...\ncan't get fooled again
轉義行為不能作為標準庫的預設行為,因為它會破壞向後相容性。
需要避免的模式¶
儘管前面的章節描述了您可能需要做或處理事情的方法,但值得一提的是一些 無益 的使用模式,因此在大多數情況下應避免這些模式。以下章節沒有特定順序。
多次開啟同一個日誌檔案¶
在 Windows 上,您通常無法多次開啟同一個檔案,因為這會導致“檔案被其他程序使用”錯誤。然而,在 POSIX 平臺上,如果您多次開啟同一個檔案,則不會收到任何錯誤。這可能是意外發生的,例如:
多次新增引用同一檔案的檔案處理程式(例如,由於複製/貼上/忘記更改錯誤)。
開啟兩個看起來不同的檔案,因為它們有不同的名稱,但由於一個是另一個的符號連結而實際上是同一個檔案。
派生一個程序後,父程序和子程序都引用同一個檔案。這可能透過使用
multiprocessing
模組來實現。
多次開啟檔案可能 看起來 大部分時間都能正常工作,但在實踐中會導致許多問題:
日誌輸出可能會混亂,因為多個執行緒或程序嘗試寫入同一個檔案。儘管日誌記錄會防止多個執行緒併發使用同一個處理程式例項,但如果兩個不同的執行緒使用兩個不同的處理程式例項(恰好指向同一個檔案)併發嘗試寫入,則沒有這種保護。
刪除檔案的嘗試(例如在檔案輪轉期間)靜默失敗,因為有另一個引用指向它。這可能導致混亂和浪費除錯時間——日誌條目最終出現在意想不到的位置,或完全丟失。或者一個本應移動的檔案仍然保留在原位,並且儘管 supposedly 有基於大小的輪轉,但其大小意外增長。
使用 從多個程序記錄到單個檔案 中概述的技術來規避此類問題。
將日誌記錄器用作類中的屬性或將其作為引數傳遞¶
雖然在某些不尋常的情況下可能需要這樣做,但一般來說沒有意義,因為日誌記錄器是單例。程式碼始終可以透過名稱使用 logging.getLogger(name)
訪問給定的日誌記錄器例項,因此傳遞例項並將其作為例項屬性儲存是毫無意義的。請注意,在其他語言(如 Java 和 C#)中,日誌記錄器通常是靜態類屬性。但是,這種模式在 Python 中沒有意義,因為模組(而不是類)是軟體分解的單位。
向庫中的日誌記錄器新增除 NullHandler
之外的處理程式¶
配置日誌記錄(透過新增處理程式、格式化程式和過濾器)是應用程式開發者的責任,而不是庫開發者的責任。如果您正在維護一個庫,請確保除了 NullHandler
例項之外,不要向您的任何日誌記錄器新增處理程式。
建立大量日誌記錄器¶
日誌記錄器是單例,在指令碼執行期間永不釋放,因此建立大量日誌記錄器將耗盡無法釋放的記憶體。與其為每個(例如)處理的檔案或建立的網路連線建立一個日誌記錄器,不如使用 現有機制 將上下文資訊傳遞到您的日誌中,並將建立的日誌記錄器限制為描述應用程式內區域的日誌記錄器(通常是模組,但偶爾會更細粒度)。
其他資源¶
參見
- 模組
logging
logging 模組的 API 參考。
- 模組
logging.config
日誌模組的配置 API。
- 模組
logging.handlers
日誌模組附帶的有用的處理器。