日誌記錄食譜¶
- 作者:
Vinay Sajip <vinay_sajip at red-dove dot com>
本頁包含許多與日誌記錄相關的配方,這些配方在過去被發現很有用。有關教程和參考資訊的連結,請參閱其他資源。
在多個模組中使用日誌記錄¶
多次呼叫 logging.getLogger('someLogger')
返回對同一記錄器物件的引用。這不僅在同一模組中成立,而且在同一 Python 直譯器程序中的不同模組之間也成立。對於對同一物件的引用來說是這樣;此外,應用程式程式碼可以在一個模組中定義和配置父記錄器,並在單獨的模組中建立(但不配置)子記錄器,並且所有對子記錄器的記錄器呼叫都會傳遞給父記錄器。這是一個主模組
import logging
import auxiliary_module
# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')
這是輔助模組
import logging
# create logger
module_logger = logging.getLogger('spam_application.auxiliary')
class Auxiliary:
def __init__(self):
self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
self.logger.info('creating an instance of Auxiliary')
def do_something(self):
self.logger.info('doing something')
a = 1 + 1
self.logger.info('done doing something')
def some_function():
module_logger.info('received a call to "some_function"')
輸出如下所示
2005-03-23 23:47:11,663 - spam_application - INFO -
creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
done with auxiliary_module.some_function()
從多個執行緒記錄日誌¶
從多個執行緒記錄日誌不需要特殊的操作。以下示例顯示了從主(初始)執行緒和另一個執行緒記錄日誌
import logging
import threading
import time
def worker(arg):
while not arg['stop']:
logging.debug('Hi from myfunc')
time.sleep(0.5)
def main():
logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
info = {'stop': False}
thread = threading.Thread(target=worker, args=(info,))
thread.start()
while True:
try:
logging.debug('Hello from main')
time.sleep(0.75)
except KeyboardInterrupt:
info['stop'] = True
break
thread.join()
if __name__ == '__main__':
main()
執行時,該指令碼應列印如下內容
0 Thread-1 Hi from myfunc
3 MainThread Hello from main
505 Thread-1 Hi from myfunc
755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc
這顯示了日誌輸出像人們預期的那樣交錯出現。當然,這種方法適用於比此處顯示的更多執行緒。
多個處理器和格式器¶
記錄器是普通的 Python 物件。addHandler()
方法對於可以新增的處理器數量沒有最小或最大配額。有時,應用程式將所有嚴重性級別的訊息記錄到文字檔案,同時將錯誤或更高級別的訊息記錄到控制檯,這將很有益處。要設定此項,只需配置相應的處理器。應用程式程式碼中的日誌記錄呼叫將保持不變。以下是對先前基於簡單模組的配置示例的略微修改
import logging
logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)
# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
請注意,“應用程式”程式碼不關心多個處理器。所有更改只是新增和配置了一個名為 fh 的新處理器。
在編寫和測試應用程式時,建立具有更高或更低嚴重性過濾器的新處理器非常有幫助。與其使用許多 print
語句進行除錯,不如使用 logger.debug
:與您稍後必須刪除或註釋掉的 print 語句不同,logger.debug 語句可以保留在原始碼中,並在您再次需要它們之前保持休眠狀態。那時,唯一需要發生的更改是修改記錄器和/或處理器的嚴重性級別為除錯。
記錄到多個目標¶
假設您想要以不同的訊息格式並在不同的情況下記錄到控制檯和檔案。假設您想要將級別為 DEBUG 和更高級別的訊息記錄到檔案,並將級別為 INFO 和更高級別的訊息記錄到控制檯。我們還假設該檔案應包含時間戳,但控制檯訊息不應包含。以下是如何實現此目的
import logging
# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M',
filename='/tmp/myapp.log',
filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
當您執行它時,在控制檯上您將看到
root : INFO Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO How quickly daft jumping zebras vex.
myapp.area2 : WARNING Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR The five boxing wizards jump quickly.
並且在檔案中您將看到類似如下的內容
10-22 22:19 root INFO Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1 INFO How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2 ERROR The five boxing wizards jump quickly.
如您所見,DEBUG 訊息僅顯示在檔案中。其他訊息將傳送到兩個目標。
此示例使用控制檯和檔案處理器,但您可以根據需要使用任意數量和組合的處理器。
請注意,上面選擇的日誌檔名 /tmp/myapp.log
意味著在 POSIX 系統上使用臨時檔案的標準位置。在 Windows 上,您可能需要為日誌選擇不同的目錄名稱 - 只需確保該目錄存在並且您具有在其中建立和更新檔案的許可權。
自定義級別的處理¶
有時,您可能希望執行與處理器中級別的標準處理略有不同的操作,其中閾值以上的所有級別都由處理器處理。為此,您需要使用過濾器。讓我們看一個您希望按如下方式安排事情的場景
將嚴重性為
INFO
和WARNING
的訊息傳送到sys.stdout
將嚴重性為
ERROR
及以上的訊息傳送到sys.stderr
將嚴重性為
DEBUG
及以上的訊息傳送到檔案app.log
假設您使用以下 JSON 配置日誌記錄
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(levelname)-8s - %(message)s"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout"
},
"stderr": {
"class": "logging.StreamHandler",
"level": "ERROR",
"formatter": "simple",
"stream": "ext://sys.stderr"
},
"file": {
"class": "logging.FileHandler",
"formatter": "simple",
"filename": "app.log",
"mode": "w"
}
},
"root": {
"level": "DEBUG",
"handlers": [
"stderr",
"stdout",
"file"
]
}
}
此配置幾乎達到了我們的要求,只是 sys.stdout
將顯示嚴重性為 ERROR
的訊息,並且只有此嚴重性及更高的事件以及 INFO
和 WARNING
訊息才會被跟蹤。為防止這種情況,我們可以設定一個過濾器來排除這些訊息並將其新增到相關的處理器。可以透過新增一個與 formatters
和 handlers
並行的 filters
部分來配置
{
"filters": {
"warnings_and_below": {
"()" : "__main__.filter_maker",
"level": "WARNING"
}
}
}
並將 stdout
處理器上的部分更改為新增它
{
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
"filters": ["warnings_and_below"]
}
}
過濾器只是一個函式,因此我們可以將 filter_maker
(一個工廠函式)定義如下
def filter_maker(level):
level = getattr(logging, level)
def filter(record):
return record.levelno <= level
return filter
這會將傳入的字串引數轉換為數值級別,並返回一個函式,該函式僅在傳入記錄的級別等於或低於指定的級別時才返回 True
。請注意,在此示例中,我已在從命令列執行的測試指令碼 main.py
中定義了 filter_maker
,因此其模組將為 __main__
- 因此在過濾器配置中為 __main__.filter_maker
。如果您在其他模組中定義它,則需要更改它。
新增過濾器後,我們可以執行 main.py
,其完整內容如下
import json
import logging
import logging.config
CONFIG = '''
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(levelname)-8s - %(message)s"
}
},
"filters": {
"warnings_and_below": {
"()" : "__main__.filter_maker",
"level": "WARNING"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
"filters": ["warnings_and_below"]
},
"stderr": {
"class": "logging.StreamHandler",
"level": "ERROR",
"formatter": "simple",
"stream": "ext://sys.stderr"
},
"file": {
"class": "logging.FileHandler",
"formatter": "simple",
"filename": "app.log",
"mode": "w"
}
},
"root": {
"level": "DEBUG",
"handlers": [
"stderr",
"stdout",
"file"
]
}
}
'''
def filter_maker(level):
level = getattr(logging, level)
def filter(record):
return record.levelno <= level
return filter
logging.config.dictConfig(json.loads(CONFIG))
logging.debug('A DEBUG message')
logging.info('An INFO message')
logging.warning('A WARNING message')
logging.error('An ERROR message')
logging.critical('A CRITICAL message')
像這樣執行之後
python main.py 2>stderr.log >stdout.log
我們可以看到結果符合預期
$ more *.log
::::::::::::::
app.log
::::::::::::::
DEBUG - A DEBUG message
INFO - An INFO message
WARNING - A WARNING message
ERROR - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stderr.log
::::::::::::::
ERROR - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stdout.log
::::::::::::::
INFO - An INFO message
WARNING - A WARNING message
配置伺服器示例¶
這是一個使用日誌配置伺服器的模組示例
import logging
import logging.config
import time
import os
# read initial config file
logging.config.fileConfig('logging.conf')
# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()
logger = logging.getLogger('simpleExample')
try:
# loop through logging calls to see the difference
# new configurations make, until Ctrl+C is pressed
while True:
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
time.sleep(5)
except KeyboardInterrupt:
# cleanup
logging.config.stopListening()
t.join()
這是一個指令碼,它接受一個檔名,並將該檔案傳送到伺服器,並在前面正確加上二進位制編碼的長度,作為新的日誌配置
#!/usr/bin/env python
import socket, sys, struct
with open(sys.argv[1], 'rb') as f:
data_to_send = f.read()
HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')
處理阻塞的處理器¶
有時,您必須讓日誌處理器在不阻塞您正在記錄的執行緒的情況下完成它們的工作。這在 Web 應用程式中很常見,當然在其他場景中也會發生。
一個常見的導致行為遲緩的罪魁禍首是 SMTPHandler
:傳送電子郵件可能需要很長時間,這有很多原因,且不受開發人員控制(例如,效能不佳的郵件或網路基礎設施)。但是幾乎任何基於網路的處理器都可能阻塞:即使是 SocketHandler
操作也可能在後臺執行 DNS 查詢,速度太慢(並且此查詢可能位於套接字型檔程式碼的深處,低於 Python 層,且不受您的控制)。
一種解決方案是使用兩部分方法。對於第一部分,僅將 QueueHandler
附加到從效能關鍵執行緒訪問的那些記錄器。它們只是寫入它們的佇列,該佇列可以調整為足夠大的容量,或者初始化為對其大小沒有上限。寫入佇列通常會被快速接受,儘管您可能需要捕獲 queue.Full
異常,以在您的程式碼中採取預防措施。如果您是庫開發人員,並且在您的程式碼中具有效能關鍵執行緒,請務必記錄這一點(以及僅將 QueueHandlers
附加到您的記錄器的建議),以方便使用您的程式碼的其他開發人員。
解決方案的第二部分是 QueueListener
,它被設計為 QueueHandler
的對應項。QueueListener
非常簡單:它會傳遞一個佇列和一些處理器,並且它會啟動一個內部執行緒,該執行緒會監聽其佇列中從 QueueHandlers
(或任何其他 LogRecords
的來源)傳送的 LogRecords。 LogRecords
從佇列中刪除並傳遞給處理器進行處理。
擁有單獨的 QueueListener
類的優點是,您可以使用相同的例項來服務多個 QueueHandlers
。這比擁有現有處理器類的執行緒版本更節省資源,因為每個處理器會佔用一個執行緒,而沒有任何特殊好處。
以下是使用這兩個類的示例(省略了匯入)
que = queue.Queue(-1) # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()
執行時,將產生
MainThread: Look out!
注意
儘管之前的討論並非專門針對非同步程式碼,而是針對慢速日誌處理器,但應注意,當從非同步程式碼記錄時,網路甚至檔案處理器都可能導致問題(阻塞事件迴圈),因為某些日誌記錄是從 asyncio
內部完成的。如果應用程式中使用了任何非同步程式碼,最好使用上述方法進行日誌記錄,以便任何阻塞程式碼僅在 QueueListener
執行緒中執行。
在 3.5 版本中更改:在 Python 3.5 之前,QueueListener
始終將從佇列收到的每條訊息傳遞給它初始化的每個處理器。(這是因為假設級別過濾都在另一側完成,即填充佇列的位置。)從 3.5 開始,可以透過將關鍵字引數 respect_handler_level=True
傳遞給偵聽器的建構函式來更改此行為。完成此操作後,偵聽器會將每條訊息的級別與處理器的級別進行比較,並且僅在適合的情況下才將訊息傳遞給處理器。
跨網路傳送和接收日誌事件¶
假設您要跨網路傳送日誌事件,並在接收端處理它們。一種簡單的方法是在傳送端將 SocketHandler
例項附加到根記錄器
import logging, logging.handlers
rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
在接收端,您可以使用 socketserver
模組設定接收器。這是一個基本的工作示例
import pickle
import logging
import logging.handlers
import socketserver
import struct
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.
This basically logs the record using whatever logging policy is
configured locally.
"""
def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""
allow_reuse_address = True
def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None
def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort
def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server...')
tcpserver.serve_until_stopped()
if __name__ == '__main__':
main()
首先執行伺服器,然後執行客戶端。在客戶端,控制檯上不列印任何內容;在伺服器端,您應該看到類似以下內容
About to start TCP server...
59 root INFO Jackdaws love my big sphinx of quartz.
59 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
69 myapp.area1 INFO How quickly daft jumping zebras vex.
69 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
69 myapp.area2 ERROR The five boxing wizards jump quickly.
請注意,在某些情況下,pickle 存在一些安全問題。如果這些問題影響到您,您可以透過覆蓋 makePickle()
方法並在其中實現您的替代方案,以及調整上述指令碼以使用您的替代序列化方案,來使用替代的序列化方案。
在生產環境中執行日誌套接字偵聽器¶
要在生產環境中執行日誌偵聽器,您可能需要使用程序管理工具,例如 Supervisor。這是一個 Gist,它提供了使用 Supervisor 執行上述功能的基本檔案。它由以下檔案組成
檔案 |
目的 |
---|---|
|
用於準備測試環境的 Bash 指令碼 |
|
Supervisor 配置檔案,其中包含偵聽器和多程序 Web 應用程式的條目 |
|
一個 Bash 指令碼,用於確保 Supervisor 以上述配置執行 |
|
接收日誌事件並將其記錄到檔案的套接字偵聽器程式 |
|
透過連線到偵聽器的套接字執行日誌記錄的簡單 Web 應用程式 |
|
Web 應用程式的 JSON 配置檔案 |
|
用於練習 Web 應用程式的 Python 指令碼 |
Web 應用程式使用 Gunicorn,這是一個流行的 Web 應用程式伺服器,它啟動多個工作程序來處理請求。此示例設定顯示了工作程序如何寫入同一日誌檔案而不會相互衝突——它們都透過套接字偵聽器進行處理。
要測試這些檔案,請在 POSIX 環境中執行以下操作
使用 下載 ZIP 按鈕將 Gist 下載為 ZIP 存檔。
將上述檔案從存檔解壓到一個臨時目錄。
在臨時目錄中,執行
bash prepare.sh
以準備就緒。這將建立一個run
子目錄以包含 Supervisor 相關的檔案和日誌檔案,以及一個venv
子目錄以包含一個虛擬環境,其中安裝了bottle
、gunicorn
和supervisor
。執行
bash ensure_app.sh
以確保 Supervisor 以上述配置執行。執行
venv/bin/python client.py
以練習 Web 應用程式,這將導致記錄寫入日誌。檢查
run
子目錄中的日誌檔案。您應該在與模式app.log*
匹配的檔案中看到最新的日誌行。它們不會按任何特定順序排列,因為它們是由不同的工作程序以不確定的方式同時處理的。您可以透過執行
venv/bin/supervisorctl -c supervisor.conf shutdown
來關閉偵聽器和 Web 應用程式。
在不太可能的情況下,如果配置的埠與測試環境中的其他埠衝突,您可能需要調整配置檔案。
向日志輸出新增上下文資訊¶
有時,您希望日誌輸出除了傳遞給日誌呼叫的引數之外,還包含上下文資訊。例如,在網路應用程式中,可能需要在日誌中記錄特定於客戶端的資訊(例如,遠端客戶端的使用者名稱或 IP 地址)。雖然您可以使用 extra 引數來實現這一點,但以這種方式傳遞資訊並不總是很方便。雖然在每個連線的基礎上建立 Logger
例項可能很誘人,但這並不是一個好主意,因為這些例項不會被垃圾回收。雖然這在實踐中不是問題,但當 Logger
例項的數量取決於您希望在應用程式日誌中使用的粒度級別時,如果 Logger
例項的數量變得實際上無界,則可能難以管理。
使用 LoggerAdapters 傳遞上下文資訊¶
一種可以輕鬆地傳遞上下文資訊以與日誌事件資訊一起輸出的方法是使用 LoggerAdapter
類。這個類被設計得看起來像一個 Logger
,因此您可以呼叫 debug()
、info()
、warning()
、error()
、exception()
、critical()
和 log()
。這些方法的簽名與其在 Logger
中的對應方法相同,因此您可以互換使用這兩種型別的例項。
當您建立 LoggerAdapter
的例項時,您將向其傳遞一個 Logger
例項和一個包含您的上下文資訊的類似字典的物件。當您在 LoggerAdapter
的例項上呼叫其中一個日誌記錄方法時,它會將呼叫委託給傳遞給其建構函式的底層 Logger
例項,並安排在委託呼叫中傳遞上下文資訊。以下是 LoggerAdapter
程式碼中的片段
def debug(self, msg, /, *args, **kwargs):
"""
Delegate a debug call to the underlying logger, after adding
contextual information from this adapter instance.
"""
msg, kwargs = self.process(msg, kwargs)
self.logger.debug(msg, *args, **kwargs)
process()
方法是 LoggerAdapter
將上下文資訊新增到日誌輸出的地方。它會傳遞日誌呼叫的訊息和關鍵字引數,並返回(可能)修改後的版本,以用於呼叫底層日誌記錄器。此方法的預設實現會保持訊息不變,但在關鍵字引數中插入一個“extra”鍵,其值是傳遞給建構函式的類字典物件。當然,如果您在呼叫介面卡時傳遞了“extra”關鍵字引數,它將被靜默覆蓋。
使用“extra”的優點是,類字典物件中的值會合併到 LogRecord
例項的 __dict__ 中,允許您將自定義字串與瞭解類字典物件鍵的 Formatter
例項一起使用。如果您需要其他方法,例如,如果您想將上下文資訊新增到訊息字串之前或之後,您只需要子類化 LoggerAdapter
並覆蓋 process()
以執行您需要的操作。這是一個簡單的例子
class CustomAdapter(logging.LoggerAdapter):
"""
This example adapter expects the passed in dict-like object to have a
'connid' key, whose value in brackets is prepended to the log message.
"""
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
您可以像這樣使用它
logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})
然後,您記錄到介面卡的任何事件都將在日誌訊息前面加上 some_conn_id
的值。
使用字典以外的物件傳遞上下文資訊¶
您不需要將實際的字典傳遞給 LoggerAdapter
- 您可以傳遞一個實現了 __getitem__
和 __iter__
的類的例項,以便它看起來像一個日誌記錄的字典。如果您想動態生成值(而字典中的值是常量),這將非常有用。
使用過濾器傳遞上下文資訊¶
您還可以使用使用者定義的 Filter
將上下文資訊新增到日誌輸出中。Filter
例項允許修改傳遞給它們的 LogRecords
,包括新增額外的屬性,然後可以使用合適的格式字串輸出這些屬性,或者如果需要,可以使用自定義的 Formatter
。
例如,在 Web 應用程式中,正在處理的請求(或至少是它的有趣部分)可以儲存線上程本地 (threading.local
) 變數中,然後從 Filter
中訪問,以將請求中的資訊(例如,遠端 IP 地址和遠端使用者的使用者名稱)新增到 LogRecord
,使用屬性名稱“ip”和“user”,就像上面的 LoggerAdapter
示例一樣。在這種情況下,可以使用相同的格式字串來獲得與上面顯示的類似的輸出。這是一個示例指令碼
import logging
from random import choice
class ContextFilter(logging.Filter):
"""
This is a filter which injects contextual information into the log.
Rather than use actual contextual information, we just use random
data in this demo.
"""
USERS = ['jim', 'fred', 'sheila']
IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']
def filter(self, record):
record.ip = choice(ContextFilter.IPS)
record.user = choice(ContextFilter.USERS)
return True
if __name__ == '__main__':
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
a1 = logging.getLogger('a.b.c')
a2 = logging.getLogger('d.e.f')
f = ContextFilter()
a1.addFilter(f)
a2.addFilter(f)
a1.debug('A debug message')
a1.info('An info message with %s', 'some parameters')
for x in range(10):
lvl = choice(levels)
lvlname = logging.getLevelName(lvl)
a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')
執行時,會產生如下內容
2010-09-06 22:38:15,292 a.b.c DEBUG IP: 123.231.231.123 User: fred A debug message
2010-09-06 22:38:15,300 a.b.c INFO IP: 192.168.0.1 User: sheila An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 127.0.0.1 User: jim A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 127.0.0.1 User: sheila A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 123.231.231.123 User: fred A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1 User: jim A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 192.168.0.1 User: jim A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR IP: 127.0.0.1 User: sheila A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG IP: 123.231.231.123 User: fred A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO IP: 123.231.231.123 User: fred A message at INFO level with 2 parameters
使用 contextvars
¶
自 Python 3.7 起,contextvars
模組提供了上下文字地儲存,它適用於 threading
和 asyncio
處理需求。因此,這種型別的儲存通常比執行緒本地更可取。以下示例展示瞭如何在多執行緒環境中,使用上下文資訊(例如,Web 應用程式處理的請求屬性)填充日誌。
為了便於說明,假設您有不同的 Web 應用程式,每個應用程式都是獨立的,但在同一個 Python 程序中執行,並使用它們通用的庫。如何讓這些應用程式中的每一個都有自己的日誌,其中來自庫(和其他請求處理程式碼)的所有日誌訊息都被定向到相應應用程式的日誌檔案,同時在日誌中包含其他上下文資訊,例如客戶端 IP、HTTP 請求方法和客戶端使用者名稱?
讓我們假設該庫可以透過以下程式碼模擬
# webapplib.py
import logging
import time
logger = logging.getLogger(__name__)
def useful():
# Just a representative event logged from the library
logger.debug('Hello from webapplib!')
# Just sleep for a bit so other threads get to run
time.sleep(0.01)
我們可以透過兩個簡單的類 Request
和 WebApp
來模擬多個 Web 應用程式。這些模擬了實際的執行緒 Web 應用程式的工作方式 - 每個請求都由一個執行緒處理
# main.py
import argparse
from contextvars import ContextVar
import logging
import os
from random import choice
import threading
import webapplib
logger = logging.getLogger(__name__)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
class Request:
"""
A simple dummy request class which just holds dummy HTTP request method,
client IP address and client username
"""
def __init__(self, method, ip, user):
self.method = method
self.ip = ip
self.user = user
# A dummy set of requests which will be used in the simulation - we'll just pick
# from this list randomly. Note that all GET requests are from 192.168.2.XXX
# addresses, whereas POST requests are from 192.16.3.XXX addresses. Three users
# are represented in the sample requests.
REQUESTS = [
Request('GET', '192.168.2.20', 'jim'),
Request('POST', '192.168.3.20', 'fred'),
Request('GET', '192.168.2.21', 'sheila'),
Request('POST', '192.168.3.21', 'jim'),
Request('GET', '192.168.2.22', 'fred'),
Request('POST', '192.168.3.22', 'sheila'),
]
# Note that the format string includes references to request context information
# such as HTTP method, client IP and username
formatter = logging.Formatter('%(threadName)-11s %(appName)s %(name)-9s %(user)-6s %(ip)s %(method)-4s %(message)s')
# Create our context variables. These will be filled at the start of request
# processing, and used in the logging that happens during that processing
ctx_request = ContextVar('request')
ctx_appname = ContextVar('appname')
class InjectingFilter(logging.Filter):
"""
A filter which injects context-specific information into logs and ensures
that only information for a specific webapp is included in its log
"""
def __init__(self, app):
self.app = app
def filter(self, record):
request = ctx_request.get()
record.method = request.method
record.ip = request.ip
record.user = request.user
record.appName = appName = ctx_appname.get()
return appName == self.app.name
class WebApp:
"""
A dummy web application class which has its own handler and filter for a
webapp-specific log.
"""
def __init__(self, name):
self.name = name
handler = logging.FileHandler(name + '.log', 'w')
f = InjectingFilter(self)
handler.setFormatter(formatter)
handler.addFilter(f)
root.addHandler(handler)
self.num_requests = 0
def process_request(self, request):
"""
This is the dummy method for processing a request. It's called on a
different thread for every request. We store the context information into
the context vars before doing anything else.
"""
ctx_request.set(request)
ctx_appname.set(self.name)
self.num_requests += 1
logger.debug('Request processing started')
webapplib.useful()
logger.debug('Request processing finished')
def main():
fn = os.path.splitext(os.path.basename(__file__))[0]
adhf = argparse.ArgumentDefaultsHelpFormatter
ap = argparse.ArgumentParser(formatter_class=adhf, prog=fn,
description='Simulate a couple of web '
'applications handling some '
'requests, showing how request '
'context can be used to '
'populate logs')
aa = ap.add_argument
aa('--count', '-c', type=int, default=100, help='How many requests to simulate')
options = ap.parse_args()
# Create the dummy webapps and put them in a list which we can use to select
# from randomly
app1 = WebApp('app1')
app2 = WebApp('app2')
apps = [app1, app2]
threads = []
# Add a common handler which will capture all events
handler = logging.FileHandler('app.log', 'w')
handler.setFormatter(formatter)
root.addHandler(handler)
# Generate calls to process requests
for i in range(options.count):
try:
# Pick an app at random and a request for it to process
app = choice(apps)
request = choice(REQUESTS)
# Process the request in its own thread
t = threading.Thread(target=app.process_request, args=(request,))
threads.append(t)
t.start()
except KeyboardInterrupt:
break
# Wait for the threads to terminate
for t in threads:
t.join()
for app in apps:
print('%s processed %s requests' % (app.name, app.num_requests))
if __name__ == '__main__':
main()
如果您執行以上程式碼,您應該會發現大約一半的請求進入 app1.log
,其餘的請求進入 app2.log
,並且所有請求都記錄到 app.log
。每個 Web 應用程式特定的日誌將僅包含該 Web 應用程式的日誌條目,並且請求資訊將一致地顯示在日誌中(即,每個虛擬請求中的資訊將始終一起顯示在日誌行中)。以下 shell 輸出說明了這一點
~/logging-contextual-webapp$ python main.py
app1 processed 51 requests
app2 processed 49 requests
~/logging-contextual-webapp$ wc -l *.log
153 app1.log
147 app2.log
300 app.log
600 total
~/logging-contextual-webapp$ head -3 app1.log
Thread-3 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
Thread-3 (process_request) app1 webapplib jim 192.168.3.21 POST Hello from webapplib!
Thread-5 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ head -3 app2.log
Thread-1 (process_request) app2 __main__ sheila 192.168.2.21 GET Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET Hello from webapplib!
Thread-2 (process_request) app2 __main__ jim 192.168.2.20 GET Request processing started
~/logging-contextual-webapp$ head app.log
Thread-1 (process_request) app2 __main__ sheila 192.168.2.21 GET Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET Hello from webapplib!
Thread-2 (process_request) app2 __main__ jim 192.168.2.20 GET Request processing started
Thread-3 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
Thread-2 (process_request) app2 webapplib jim 192.168.2.20 GET Hello from webapplib!
Thread-3 (process_request) app1 webapplib jim 192.168.3.21 POST Hello from webapplib!
Thread-4 (process_request) app2 __main__ fred 192.168.2.22 GET Request processing started
Thread-5 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
Thread-4 (process_request) app2 webapplib fred 192.168.2.22 GET Hello from webapplib!
Thread-6 (process_request) app1 __main__ jim 192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ grep app1 app1.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app2.log | wc -l
147
~/logging-contextual-webapp$ grep app1 app.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app.log | wc -l
147
在處理程式中傳遞上下文資訊¶
每個 Handler
都有自己的過濾器鏈。如果你想向 LogRecord
新增上下文資訊,而不洩露給其他處理器,你可以使用一個過濾器,它返回一個新的 LogRecord
,而不是像以下指令碼中顯示的那樣就地修改它。
import copy
import logging
def filter(record: logging.LogRecord):
record = copy.copy(record)
record.user = 'jim'
return record
if __name__ == '__main__':
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(message)s from %(user)-8s')
handler.setFormatter(formatter)
handler.addFilter(filter)
logger.addHandler(handler)
logger.info('A log message')
從多個程序記錄到單個檔案¶
儘管日誌記錄是執行緒安全的,並且支援從單個程序中的多個執行緒記錄到單個檔案,但不支援從多個程序記錄到單個檔案,因為在 Python 中沒有標準方法可以序列化對多個程序中單個檔案的訪問。如果你需要從多個程序記錄到單個檔案,一種方法是讓所有程序記錄到 SocketHandler
,並讓一個單獨的程序實現一個套接字伺服器,該伺服器從套接字讀取並記錄到檔案。(如果願意,你可以將現有程序中的一個執行緒專用於執行此功能。) 本節 更詳細地記錄了此方法,幷包含一個可用的套接字接收器,你可以將其用作在自己的應用程式中進行適配的起點。
你也可以編寫自己的處理器,它使用 Lock
類,該類來自 multiprocessing
模組,以序列化你的程序對檔案的訪問。stdlib 的 FileHandler
及其子類不使用 multiprocessing
。
或者,你可以使用 Queue
和 QueueHandler
將所有日誌事件傳送到你的多程序應用程式中的一個程序。以下示例指令碼演示瞭如何執行此操作;在該示例中,一個單獨的偵聽器程序偵聽其他程序傳送的事件,並根據其自身的日誌配置記錄它們。雖然該示例僅演示了一種執行方式(例如,你可能希望使用偵聽器執行緒而不是單獨的偵聽器程序——實現方式是類似的),但它允許為偵聽器和應用程式中的其他程序使用完全不同的日誌配置,並且可以用作滿足你自身特定要求的程式碼的基礎。
# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
# Next two import lines for this demo only
from random import choice, random
import time
#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
print('Worker finished: %s' % name)
# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
main()
上述指令碼的一個變體將日誌記錄保留在主程序中,在一個單獨的執行緒中。
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
if __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'ERROR',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
此變體顯示瞭如何為特定記錄器應用配置,例如 foo
記錄器具有一個特殊的處理器,該處理器將所有事件儲存在 foo
子系統中的檔案 mplog-foo.log
中。這將由主程序中的日誌記錄機制使用(即使日誌事件是在工作程序中生成的),以將訊息定向到適當的目標位置。
使用 concurrent.futures.ProcessPoolExecutor¶
如果你想使用 concurrent.futures.ProcessPoolExecutor
來啟動你的工作程序,你需要以稍微不同的方式建立佇列。而不是
queue = multiprocessing.Queue(-1)
你應該使用
queue = multiprocessing.Manager().Queue(-1) # also works with the examples above
然後你可以將工作程序的建立從這裡替換
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
到這裡(請記住首先匯入 concurrent.futures
)
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for i in range(10):
executor.submit(worker_process, queue, worker_configurer)
使用 Gunicorn 和 uWSGI 部署 Web 應用程式¶
當使用 Gunicorn 或 uWSGI(或類似)部署 Web 應用程式時,會建立多個工作程序來處理客戶端請求。在此類環境中,請避免直接在你的 Web 應用程式中建立基於檔案的處理器。相反,請使用 SocketHandler
從 Web 應用程式記錄到單獨程序中的偵聽器。可以使用諸如 Supervisor 之類的程序管理工具進行設定,有關更多詳細資訊,請參閱在生產環境中執行日誌記錄套接字偵聽器。
使用檔案輪換¶
有時你希望讓日誌檔案增長到一定大小,然後開啟一個新檔案並記錄到該檔案。你可能希望保留一定數量的這些檔案,並且當建立了那麼多檔案後,輪換這些檔案,以便檔案的數量和檔案的大小都保持有界。對於此使用模式,日誌記錄包提供了 RotatingFileHandler
import glob
import logging
import logging.handlers
LOG_FILENAME = 'logging_rotatingfile_example.out'
# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)
# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=20, backupCount=5)
my_logger.addHandler(handler)
# Log some messages
for i in range(20):
my_logger.debug('i = %d' % i)
# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)
for filename in logfiles:
print(filename)
結果應該是 6 個單獨的檔案,每個檔案都包含應用程式的部分日誌歷史記錄。
logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5
當前檔案始終是 logging_rotatingfile_example.out
,並且每次達到大小限制時,都會使用字尾 .1
重新命名它。每個現有的備份檔案都會被重新命名以遞增字尾(.1
變為 .2
等),並且 .6
檔案會被刪除。
顯然,此示例將日誌長度設定得太小,作為一個極端的示例。你可能希望將 maxBytes 設定為適當的值。
使用替代格式化樣式¶
當將日誌記錄新增到 Python 標準庫時,格式化帶有可變內容的訊息的唯一方法是使用 %-格式化方法。此後,Python 獲得了兩種新的格式化方法:string.Template
(在 Python 2.4 中新增)和 str.format()
(在 Python 2.6 中新增)。
日誌記錄(從 3.2 開始)為這兩種額外的格式化樣式提供了改進的支援。Formatter
類已得到增強,可以使用一個名為 style
的附加可選關鍵字引數。此引數預設為 '%'
,但其他可能的值為 '{'
和 '$'
,它們對應於另外兩種格式化樣式。預設情況下(正如你所期望的那樣)保持向後相容性,但透過顯式指定 style 引數,你可以指定使用 str.format()
或 string.Template
的格式字串。這是一個示例控制檯會話,顯示了各種可能性
>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
... style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
... style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>
請注意,最終輸出到日誌的日誌訊息的格式完全獨立於單個日誌訊息的構造方式。它仍然可以使用 %-格式化,如此處所示
>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>
日誌記錄呼叫(logger.debug()
、logger.info()
等)僅將位置引數用於實際的日誌訊息本身,而關鍵字引數僅用於確定如何處理實際日誌記錄呼叫的選項(例如,exc_info
關鍵字引數用於指示應記錄回溯資訊,或者 extra
關鍵字引數用於指示要新增到日誌的其他上下文資訊)。因此,你不能直接使用 str.format()
或 string.Template
語法進行日誌記錄呼叫,因為在內部,日誌記錄包使用 %-格式化來合併格式字串和可變引數。在保留向後相容性的同時,不可能更改此行為,因為現有程式碼中存在的所有日誌記錄呼叫都將使用 %-格式字串。
但是,你可以使用 {}- 和 $- 格式化來構造你的單個日誌訊息。回想一下,對於訊息,你可以使用任意物件作為訊息格式字串,並且日誌記錄包將呼叫該物件的 str()
來獲取實際的格式字串。請考慮以下兩個類
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
這兩個類中的任何一個都可以用來代替格式字串,以允許使用 {}- 或 $-格式化來構建實際的“訊息”部分,該部分將出現在格式化的日誌輸出中,而不是“%(message)s”或“{message}”或“$message”。每次你想記錄某些內容時都使用類名有點笨拙,但如果你使用諸如 __ (雙下劃線——不要與 _ 混淆,_ 是用作 gettext.gettext()
或其同類的同義詞/別名的單下劃線)的別名,則會非常容易接受。
以上類不包含在 Python 中,儘管它們很容易複製並貼上到你自己的程式碼中。它們可以按如下方式使用(假設它們是在一個名為 wherever
的模組中宣告的)
>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
... point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
雖然以上示例使用 print()
來顯示格式化的工作方式,但你當然會使用 logger.debug()
或類似的方法來實際記錄使用此方法。
需要注意的一點是,使用這種方法您不會付出明顯的效能代價:實際的格式化不是在您進行日誌呼叫時發生的,而是在(如果)已記錄的訊息實際即將被處理程式輸出到日誌時發生的。因此,唯一稍微不尋常且可能讓您困惑的是,括號圍繞的是格式化字串和引數,而不僅僅是格式化字串。這是因為 `__` 符號只是對 `XXXMessage` 類之一的建構函式呼叫的語法糖。
如果您願意,可以使用 LoggerAdapter
來達到與上述類似的效果,如下面的示例所示
import logging
class Message:
def __init__(self, fmt, args):
self.fmt = fmt
self.args = args
def __str__(self):
return self.fmt.format(*self.args)
class StyleAdapter(logging.LoggerAdapter):
def log(self, level, msg, /, *args, stacklevel=1, **kwargs):
if self.isEnabledFor(level):
msg, kwargs = self.process(msg, kwargs)
self.logger.log(level, Message(msg, args), **kwargs,
stacklevel=stacklevel+1)
logger = StyleAdapter(logging.getLogger(__name__))
def main():
logger.debug('Hello, {}', 'world!')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
上面的指令碼在 Python 3.8 或更高版本中執行時,應記錄訊息 Hello, world!
。
自定義 LogRecord
¶
每個日誌事件都由一個 LogRecord
例項表示。當一個事件被記錄且未被記錄器的級別過濾掉時,會建立一個 LogRecord
,其中填充有關該事件的資訊,然後將其傳遞給該記錄器的處理程式(及其祖先,直到包括停用進一步向層次結構傳播的記錄器)。在 Python 3.2 之前,只有兩個地方會進行此建立
Logger.makeRecord()
,在記錄事件的正常過程中呼叫。這將直接呼叫LogRecord
來建立一個例項。makeLogRecord()
,使用包含要新增到 LogRecord 的屬性的字典呼叫。這通常在透過網路收到合適的字典時呼叫(例如,透過SocketHandler
以 pickle 形式,或透過HTTPHandler
以 JSON 形式)。
這通常意味著如果您需要對 LogRecord
進行任何特殊處理,則必須執行以下操作之一。
建立您自己的
Logger
子類,它覆蓋Logger.makeRecord()
,並在例項化任何您關心的記錄器之前使用setLoggerClass()
設定它。
第一種方法在(例如)多個不同的庫想要執行不同的操作的情況下會有些笨拙。每個庫都會嘗試設定自己的 Logger
子類,最後執行此操作的庫將獲勝。
第二種方法在許多情況下效果相當好,但它不允許您例如使用 LogRecord
的專門子類。庫開發人員可以在其記錄器上設定合適的過濾器,但他們必須記住每次引入新記錄器時都這樣做(他們只需新增新軟體包或模組並執行
logger = logging.getLogger(__name__)
在模組級別)。這可能需要考慮的事情太多了。開發人員還可以將過濾器新增到附加到其頂級記錄器的 NullHandler
,但如果應用程式開發人員將處理程式附加到較低級別的庫記錄器,則不會呼叫此過濾器 — 因此該處理程式的輸出將無法反映庫開發人員的意圖。
在 Python 3.2 及更高版本中,LogRecord
的建立是透過您可以指定的工廠完成的。該工廠只是一個可以使用 setLogRecordFactory()
設定和使用 getLogRecordFactory()
查詢的可呼叫物件。該工廠的呼叫簽名與 LogRecord
的建構函式相同,因為 LogRecord
是工廠的預設設定。
此方法允許自定義工廠控制 LogRecord 建立的所有方面。例如,您可以返回一個子類,或者僅在建立後使用類似於此的模式向記錄新增一些額外的屬性
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.custom_attribute = 0xdecafbad
return record
logging.setLogRecordFactory(record_factory)
此模式允許不同的庫將工廠連結在一起,只要它們不覆蓋彼此的屬性或無意中覆蓋作為標準提供的屬性,就不應該有任何意外。但是,應該記住,鏈中的每個連結都會向所有日誌記錄操作增加執行時開銷,並且只有在 Filter
的使用未提供所需結果時才應使用該技術。
QueueHandler 和 QueueListener 的子類化 - 一個 ZeroMQ 示例¶
子類化 QueueHandler
¶
您可以使用 QueueHandler
子類將訊息傳送到其他型別的佇列,例如 ZeroMQ “釋出”套接字。在下面的示例中,套接字是單獨建立的並傳遞給處理程式(作為其“佇列”)
import zmq # using pyzmq, the Python binding for ZeroMQ
import json # for serializing records portably
ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB) # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556') # or wherever
class ZeroMQSocketHandler(QueueHandler):
def enqueue(self, record):
self.queue.send_json(record.__dict__)
handler = ZeroMQSocketHandler(sock)
當然,還有其他組織方法,例如傳遞處理程式建立套接字所需的資料
class ZeroMQSocketHandler(QueueHandler):
def __init__(self, uri, socktype=zmq.PUB, ctx=None):
self.ctx = ctx or zmq.Context()
socket = zmq.Socket(self.ctx, socktype)
socket.bind(uri)
super().__init__(socket)
def enqueue(self, record):
self.queue.send_json(record.__dict__)
def close(self):
self.queue.close()
子類化 QueueListener
¶
您還可以子類化 QueueListener
從其他型別的佇列中獲取訊息,例如 ZeroMQ “訂閱”套接字。這是一個例子
class ZeroMQSocketListener(QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
self.ctx = kwargs.get('ctx') or zmq.Context()
socket = zmq.Socket(self.ctx, zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '') # subscribe to everything
socket.connect(uri)
super().__init__(socket, *handlers, **kwargs)
def dequeue(self):
msg = self.queue.recv_json()
return logging.makeLogRecord(msg)
QueueHandler 和 QueueListener 的子類化 - 一個 pynng
示例¶
與上一節類似,我們可以使用 pynng 來實現偵聽器和處理程式,pynng 是 NNG 的 Python 繫結,被譽為 ZeroMQ 的精神繼承者。以下程式碼片段說明了這一點 – 您可以在安裝了 pynng
的環境中測試它們。為了多樣化,我們首先介紹偵聽器。
子類化 QueueListener
¶
# listener.py
import json
import logging
import logging.handlers
import pynng
DEFAULT_ADDR = "tcp://:13232"
interrupted = False
class NNGSocketListener(logging.handlers.QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
# Have a timeout for interruptability, and open a
# subscriber socket
socket = pynng.Sub0(listen=uri, recv_timeout=500)
# The b'' subscription matches all topics
topics = kwargs.pop('topics', None) or b''
socket.subscribe(topics)
# We treat the socket as a queue
super().__init__(socket, *handlers, **kwargs)
def dequeue(self, block):
data = None
# Keep looping while not interrupted and no data received over the
# socket
while not interrupted:
try:
data = self.queue.recv(block=block)
break
except pynng.Timeout:
pass
except pynng.Closed: # sometimes happens when you hit Ctrl-C
break
if data is None:
return None
# Get the logging event sent from a publisher
event = json.loads(data.decode('utf-8'))
return logging.makeLogRecord(event)
def enqueue_sentinel(self):
# Not used in this implementation, as the socket isn't really a
# queue
pass
logging.getLogger('pynng').propagate = False
listener = NNGSocketListener(DEFAULT_ADDR, logging.StreamHandler(), topics=b'')
listener.start()
print('Press Ctrl-C to stop.')
try:
while True:
pass
except KeyboardInterrupt:
interrupted = True
finally:
listener.stop()
子類化 QueueHandler
¶
# sender.py
import json
import logging
import logging.handlers
import time
import random
import pynng
DEFAULT_ADDR = "tcp://:13232"
class NNGSocketHandler(logging.handlers.QueueHandler):
def __init__(self, uri):
socket = pynng.Pub0(dial=uri, send_timeout=500)
super().__init__(socket)
def enqueue(self, record):
# Send the record as UTF-8 encoded JSON
d = dict(record.__dict__)
data = json.dumps(d)
self.queue.send(data.encode('utf-8'))
def close(self):
self.queue.close()
logging.getLogger('pynng').propagate = False
handler = NNGSocketHandler(DEFAULT_ADDR)
# Make sure the process ID is in the output
logging.basicConfig(level=logging.DEBUG,
handlers=[logging.StreamHandler(), handler],
format='%(levelname)-8s %(name)10s %(process)6s %(message)s')
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
logger_names = ('myapp', 'myapp.lib1', 'myapp.lib2')
msgno = 1
while True:
# Just randomly select some loggers and levels and log away
level = random.choice(levels)
logger = logging.getLogger(random.choice(logger_names))
logger.log(level, 'Message no. %5d' % msgno)
msgno += 1
delay = random.random() * 2 + 0.5
time.sleep(delay)
您可以在單獨的命令 shell 中執行以上兩個程式碼片段。如果我們在一個 shell 中執行偵聽器,並在兩個單獨的 shell 中執行傳送者,我們應該會看到類似以下的內容。在第一個傳送者 shell 中
$ python sender.py
DEBUG myapp 613 Message no. 1
WARNING myapp.lib2 613 Message no. 2
CRITICAL myapp.lib2 613 Message no. 3
WARNING myapp.lib2 613 Message no. 4
CRITICAL myapp.lib1 613 Message no. 5
DEBUG myapp 613 Message no. 6
CRITICAL myapp.lib1 613 Message no. 7
INFO myapp.lib1 613 Message no. 8
(and so on)
在第二個傳送者 shell 中
$ python sender.py
INFO myapp.lib2 657 Message no. 1
CRITICAL myapp.lib2 657 Message no. 2
CRITICAL myapp 657 Message no. 3
CRITICAL myapp.lib1 657 Message no. 4
INFO myapp.lib1 657 Message no. 5
WARNING myapp.lib2 657 Message no. 6
CRITICAL myapp 657 Message no. 7
DEBUG myapp.lib1 657 Message no. 8
(and so on)
在偵聽器 shell 中
$ python listener.py
Press Ctrl-C to stop.
DEBUG myapp 613 Message no. 1
WARNING myapp.lib2 613 Message no. 2
INFO myapp.lib2 657 Message no. 1
CRITICAL myapp.lib2 613 Message no. 3
CRITICAL myapp.lib2 657 Message no. 2
CRITICAL myapp 657 Message no. 3
WARNING myapp.lib2 613 Message no. 4
CRITICAL myapp.lib1 613 Message no. 5
CRITICAL myapp.lib1 657 Message no. 4
INFO myapp.lib1 657 Message no. 5
DEBUG myapp 613 Message no. 6
WARNING myapp.lib2 657 Message no. 6
CRITICAL myapp 657 Message no. 7
CRITICAL myapp.lib1 613 Message no. 7
INFO myapp.lib1 613 Message no. 8
DEBUG myapp.lib1 657 Message no. 8
(and so on)
如您所見,來自兩個傳送者程序的日誌記錄在偵聽器的輸出中交錯顯示。
基於字典的配置示例¶
下面是一個日誌配置字典的示例 - 它取自 Django 專案的文件。此字典傳遞給 dictConfig()
以使配置生效
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'filters': {
'special': {
'()': 'project.logging.SpecialFilter',
'foo': 'bar',
},
},
'handlers': {
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'simple',
},
'mail_admins': {
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'filters': ['special']
}
},
'loggers': {
'django': {
'handlers': ['console'],
'propagate': True,
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myproject.custom': {
'handlers': ['console', 'mail_admins'],
'level': 'INFO',
'filters': ['special']
}
}
}
有關此配置的更多資訊,您可以檢視 Django 文件的相關部分。
使用旋轉器和命名器來自定義日誌旋轉處理¶
以下可執行指令碼給出瞭如何定義命名器和旋轉器的示例,該指令碼顯示了日誌檔案的 gzip 壓縮
import gzip
import logging
import logging.handlers
import os
import shutil
def namer(name):
return name + ".gz"
def rotator(source, dest):
with open(source, 'rb') as f_in:
with gzip.open(dest, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(source)
rh = logging.handlers.RotatingFileHandler('rotated.log', maxBytes=128, backupCount=5)
rh.rotator = rotator
rh.namer = namer
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(rh)
f = logging.Formatter('%(asctime)s %(message)s')
rh.setFormatter(f)
for i in range(1000):
root.info(f'Message no. {i + 1}')
執行此操作後,您將看到六個新檔案,其中五個已壓縮
$ ls rotated.log*
rotated.log rotated.log.2.gz rotated.log.4.gz
rotated.log.1.gz rotated.log.3.gz rotated.log.5.gz
$ zcat rotated.log.1.gz
2023-01-20 02:28:17,767 Message no. 996
2023-01-20 02:28:17,767 Message no. 997
2023-01-20 02:28:17,767 Message no. 998
一個更詳細的多程序示例¶
以下工作示例顯示瞭如何使用配置檔案將日誌記錄與多程序結合使用。配置相當簡單,但可以說明如何在實際的多程序場景中實現更復雜的配置。
在示例中,主程序會生成一個監聽器程序和一些工作程序。主程序、監聽器和工作程序各自都有三個獨立的配置(所有工作程序共享同一個配置)。我們可以看到主程序中的日誌記錄,工作程序如何將日誌記錄到 QueueHandler,以及監聽器如何實現 QueueListener 和更復雜的日誌配置,並安排將透過佇列接收到的事件分派到配置中指定的處理程式。請注意,這些配置純粹是說明性的,但您應該能夠根據自己的場景調整此示例。
這是指令碼 - 文件字串和註釋有望解釋其工作原理
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time
class MyHandler:
"""
A simple handler for logging events. It runs in the listener process and
dispatches events to loggers based on the name in the received record,
which then get dispatched, by the logging system, to the handlers
configured for those loggers.
"""
def handle(self, record):
if record.name == "root":
logger = logging.getLogger()
else:
logger = logging.getLogger(record.name)
if logger.isEnabledFor(record.levelno):
# The process name is transformed just to show that it's the listener
# doing the logging to files and console
record.processName = '%s (for %s)' % (current_process().name, record.processName)
logger.handle(record)
def listener_process(q, stop_event, config):
"""
This could be done in the main process, but is just done in a separate
process for illustrative purposes.
This initialises logging according to the specified configuration,
starts the listener and waits for the main process to signal completion
via the event. The listener is then stopped, and the process exits.
"""
logging.config.dictConfig(config)
listener = logging.handlers.QueueListener(q, MyHandler())
listener.start()
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
stop_event.wait()
listener.stop()
def worker_process(config):
"""
A number of these are spawned for the purpose of illustration. In
practice, they could be a heterogeneous bunch of processes rather than
ones which are identical to each other.
This initialises logging according to the specified configuration,
and logs a hundred messages with random levels to randomly selected
loggers.
A small sleep is added to allow other processes a chance to run. This
is not strictly needed, but it mixes the output from the different
processes a bit more than if it's left out.
"""
logging.config.dictConfig(config)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
time.sleep(0.01)
def main():
q = Queue()
# The main process gets a simple configuration which prints to the console.
config_initial = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO'
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG'
}
}
# The worker process configuration is just a QueueHandler attached to the
# root logger, which allows all messages to be sent to the queue.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_worker = {
'version': 1,
'disable_existing_loggers': True,
'handlers': {
'queue': {
'class': 'logging.handlers.QueueHandler',
'queue': q
}
},
'root': {
'handlers': ['queue'],
'level': 'DEBUG'
}
}
# The listener process configuration shows that the full flexibility of
# logging configuration is available to dispatch events to handlers however
# you want.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_listener = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
},
'simple': {
'class': 'logging.Formatter',
'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple',
'level': 'INFO'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed'
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed'
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'formatter': 'detailed',
'level': 'ERROR'
}
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'handlers': ['console', 'file', 'errors'],
'level': 'DEBUG'
}
}
# Log some initial events, just to show that logging in the parent works
# normally.
logging.config.dictConfig(config_initial)
logger = logging.getLogger('setup')
logger.info('About to create workers ...')
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1),
args=(config_worker,))
workers.append(wp)
wp.start()
logger.info('Started worker: %s', wp.name)
logger.info('About to create listener ...')
stop_event = Event()
lp = Process(target=listener_process, name='listener',
args=(q, stop_event, config_listener))
lp.start()
logger.info('Started listener')
# We now hang around for the workers to finish their work.
for wp in workers:
wp.join()
# Workers all done, listening can now stop.
# Logging in the parent still works normally.
logger.info('Telling listener to stop ...')
stop_event.set()
lp.join()
logger.info('All done.')
if __name__ == '__main__':
main()
將 BOM 插入傳送到 SysLogHandler 的訊息中¶
RFC 5424 要求將 Unicode 訊息作為一組位元組傳送到 syslog 守護程式,該位元組具有以下結構:一個可選的純 ASCII 元件,後跟一個 UTF-8 位元組順序標記 (BOM),後跟使用 UTF-8 編碼的 Unicode。(請參閱 規範的相關部分。)
在 Python 3.1 中,程式碼被新增到 SysLogHandler
中以在訊息中插入 BOM,但不幸的是,它的實現不正確,BOM 出現在訊息的開頭,因此不允許任何純 ASCII 元件出現在它之前。
由於此行為已損壞,因此不正確的 BOM 插入程式碼將從 Python 3.2.4 及更高版本中刪除。但是,它不會被替換,如果您想生成符合 RFC 5424 的訊息,其中包含 BOM,其前有一個可選的純 ASCII 序列,後跟使用 UTF-8 編碼的任意 Unicode,那麼您需要執行以下操作
將一個
Formatter
例項附加到您的SysLogHandler
例項,並使用如下格式字串'ASCII section\ufeffUnicode section'
Unicode 程式碼點 U+FEFF,當使用 UTF-8 編碼時,將被編碼為 UTF-8 BOM – 位元組字串
b'\xef\xbb\xbf'
。用您喜歡的任何佔位符替換 ASCII 部分,但請確保替換後出現在其中的資料始終為 ASCII(這樣,它在 UTF-8 編碼後將保持不變)。
用您喜歡的任何佔位符替換 Unicode 部分;如果替換後出現在那裡的資料包含 ASCII 範圍之外的字元,那也沒關係 - 它將使用 UTF-8 編碼。
格式化後的訊息將由 SysLogHandler
使用 UTF-8 編碼進行編碼。如果遵循上述規則,您應該能夠生成符合 RFC 5424 的訊息。如果您不這樣做,日誌記錄可能不會報錯,但您的訊息將不符合 RFC 5424,並且您的 syslog 守護程式可能會報錯。
實現結構化日誌記錄¶
儘管大多數日誌訊息旨在供人類閱讀,因此不易被機器解析,但在某些情況下,您可能希望以結構化格式輸出訊息,該格式是能夠被程式解析的(無需複雜的正則表示式來解析日誌訊息)。使用 logging 包很容易實現這一點。可以透過多種方式來實現,但以下是一種簡單的方法,它使用 JSON 以機器可解析的方式序列化事件
import json
import logging
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
return '%s >>> %s' % (self.message, json.dumps(self.kwargs))
_ = StructuredMessage # optional, to improve readability
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))
如果執行上述指令碼,它會列印
message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}
請注意,專案的順序可能因使用的 Python 版本而異。
如果您需要更專業的處理,可以使用自定義的 JSON 編碼器,如下面的完整示例所示
import json
import logging
class Encoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, set):
return tuple(o)
elif isinstance(o, str):
return o.encode('unicode_escape').decode('ascii')
return super().default(o)
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
s = Encoder().encode(self.kwargs)
return '%s >>> %s' % (self.message, s)
_ = StructuredMessage # optional, to improve readability
def main():
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))
if __name__ == '__main__':
main()
當執行上述指令碼時,它會列印
message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}
請注意,專案的順序可能因使用的 Python 版本而異。
使用 dictConfig()
自定義處理程式¶
有時您可能希望以特定的方式自定義日誌處理程式,如果您使用 dictConfig()
,您或許可以做到這一點而無需子類化。例如,假設您可能希望設定日誌檔案的所有權。在 POSIX 上,使用 shutil.chown()
可以輕鬆完成,但 stdlib 中的檔案處理程式不提供內建支援。您可以使用如下的普通函式自定義處理程式的建立
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
然後,您可以在傳遞給 dictConfig()
的日誌配置中指定,透過呼叫此函式建立日誌處理程式
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
在此示例中,我只是為了說明目的,使用 pulse
使用者和組設定所有權。將其組合成一個可執行的指令碼,chowntest.py
import logging, logging.config, os, shutil
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')
要執行此指令碼,您可能需要以 root
身份執行
$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log
請注意,此示例使用 Python 3.3,因為 shutil.chown()
在此版本中出現。此方法應適用於任何支援 dictConfig()
的 Python 版本 - 即 Python 2.7、3.2 或更高版本。對於 pre-3.3 版本,您需要使用例如 os.chown()
來實現實際的所有權更改。
實際上,處理程式建立函式可能位於專案中的某個實用程式模組中。您可以使用例如以下程式碼來替換配置中的行
'()': owned_file_handler,
您可以使用例如
'()': 'ext://project.util.owned_file_handler',
其中 project.util
可以替換為函式所在的包的實際名稱。在上面的工作指令碼中,使用 'ext://__main__.owned_file_handler'
應該可以工作。在這裡,實際的可呼叫物件由 dictConfig()
從 ext://
規範中解析。
此示例有望指示您如何以相同的方式,使用 os.chmod()
實現其他型別的檔案更改 - 例如設定特定的 POSIX 許可權位。
當然,該方法也可以擴充套件到 FileHandler
以外的處理程式型別 - 例如,輪換檔案處理程式之一,或完全不同型別的處理程式。
在整個應用程式中使用特定的格式樣式¶
在 Python 3.2 中,Formatter
獲得了一個 style
關鍵字引數,該引數在預設情況下為 %
以實現向後相容性,但允許指定 {
或 $
以支援 str.format()
和 string.Template
支援的格式化方法。請注意,這控制日誌訊息的格式,以便最終輸出到日誌,並且與如何構造單個日誌訊息完全正交。
日誌呼叫(debug()
、info()
等)僅採用實際日誌訊息本身的位置引數,關鍵字引數僅用於確定如何處理日誌呼叫的選項(例如,exc_info
關鍵字引數指示應記錄回溯資訊,或 extra
關鍵字引數指示要新增到日誌的其他上下文資訊)。因此,您無法直接使用 str.format()
或 string.Template
語法進行日誌呼叫,因為內部日誌包使用 % 格式來合併格式字串和可變引數。在保持向後相容性的同時,無法更改此行為,因為現有程式碼中所有已有的日誌呼叫都將使用 % 格式字串。
有人建議將格式樣式與特定的記錄器關聯,但這種方法也會遇到向後相容性問題,因為任何現有程式碼都可能正在使用給定的記錄器名稱並使用 % 格式。
為了使日誌記錄在任何第三方庫和您的程式碼之間互操作,需要在單個日誌呼叫的級別上做出關於格式的決策。這提供了幾種可以容納替代格式樣式的方法。
使用 LogRecord 工廠¶
在 Python 3.2 中,除了上面提到的 Formatter
更改之外,logging 包還新增了允許使用者使用 setLogRecordFactory()
函式來設定自己的 LogRecord
子類的功能。 你可以使用它來設定你自己的 LogRecord
子類,透過重寫 getMessage()
方法來完成正確的事情。 此方法的基本類實現是進行 msg % args
格式化的地方,你可以在這裡替換你自己的格式化方式;但是,你應該小心支援所有格式化樣式並允許 % 格式作為預設格式,以確保與其他程式碼的互操作性。還應注意呼叫 str(self.msg)
,就像基本實現一樣。
有關更多資訊,請參閱關於 setLogRecordFactory()
和 LogRecord
的參考文件。
使用自定義訊息物件¶
還有另一種可能更簡單的方法,你可以使用 {}- 和 $- 格式化來構造你的單獨日誌訊息。 你可能還記得(來自 使用任意物件作為訊息),在記錄日誌時,你可以使用任意物件作為訊息格式字串,並且 logging 包會呼叫該物件的 str()
來獲取實際的格式字串。 考慮以下兩個類
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
這兩個類中的任何一個都可以用來代替格式字串,以允許使用 {}- 或 $- 格式化來構建實際的“訊息”部分,該部分會出現在格式化的日誌輸出中,代替 “%(message)s” 或 “{message}” 或 “$message”。 如果你覺得每次想要記錄某些內容時都使用類名有點笨拙,那麼如果你為訊息使用一個別名(例如 M
或 _
,或者如果你將 _
用於本地化,則可以使用 __
),那麼就可以使其更易於使用。
下面給出了這種方法的示例。 首先,使用 str.format()
進行格式化
>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)
其次,使用 string.Template
進行格式化
>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
需要注意的一點是,使用這種方法你不會付出顯著的效能損失:實際的格式化不會在你進行日誌記錄呼叫時發生,而是在(並且如果)記錄的訊息即將由處理程式輸出到日誌時發生。 因此,唯一可能讓你感到困惑的稍微不尋常的事情是,括號會圍繞格式字串和引數,而不僅僅是格式字串。 這是因為 __ 表示法只是對上面顯示的 XXXMessage
類之一的建構函式呼叫的語法糖。
使用 dictConfig()
配置過濾器¶
你可以使用 dictConfig()
配置過濾器,儘管乍一看可能不太明顯如何操作(因此提供了此方法)。 由於 Filter
是標準庫中包含的唯一過濾器類,並且它不太可能滿足許多要求(它僅作為基類存在),因此你通常需要使用重寫的 filter()
方法定義你自己的 Filter
子類。 為此,請在過濾器的配置字典中指定 ()
鍵,指定一個可呼叫物件,該物件將用於建立過濾器(類是最明顯的選擇,但是你可以提供任何返回 Filter
例項的可呼叫物件)。 這是一個完整的示例
import logging
import logging.config
import sys
class MyFilter(logging.Filter):
def __init__(self, param=None):
self.param = param
def filter(self, record):
if self.param is None:
allow = True
else:
allow = self.param not in record.msg
if allow:
record.msg = 'changed: ' + record.msg
return allow
LOGGING = {
'version': 1,
'filters': {
'myfilter': {
'()': MyFilter,
'param': 'noshow',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': ['myfilter']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console']
},
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.debug('hello')
logging.debug('hello - noshow')
此示例顯示瞭如何以關鍵字引數的形式將配置資料傳遞給構造例項的可呼叫物件。 執行時,上面的指令碼將列印
changed: hello
這表明該過濾器正在按配置工作。
需要注意的幾個額外點
如果無法在配置中直接引用可呼叫物件(例如,如果它位於不同的模組中,並且你無法在配置字典所在的位置直接匯入它),則可以使用 訪問外部物件 中描述的
ext://...
形式。 例如,你可以在上面的示例中使用文字'ext://__main__.MyFilter'
代替MyFilter
。除了用於過濾器外,此技術還可以用於配置自定義處理程式和格式化程式。 有關 logging 如何支援在其配置中使用使用者定義物件的資訊,請參閱 使用者定義的物件,並參閱上面的其他菜譜 使用 dictConfig() 自定義處理程式。
自定義異常格式化¶
有時你可能想要進行自定義異常格式化,例如,假設你希望每個記錄的事件都只有一行,即使存在異常資訊也是如此。你可以使用自定義格式化程式類來執行此操作,如以下示例所示
import logging
class OneLineExceptionFormatter(logging.Formatter):
def formatException(self, exc_info):
"""
Format an exception so that it prints on a single line.
"""
result = super().formatException(exc_info)
return repr(result) # or format into one line however you want to
def format(self, record):
s = super().format(record)
if record.exc_text:
s = s.replace('\n', '') + '|'
return s
def configure_logging():
fh = logging.FileHandler('output.txt', 'w')
f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
'%d/%m/%Y %H:%M:%S')
fh.setFormatter(f)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(fh)
def main():
configure_logging()
logging.info('Sample message')
try:
x = 1 / 0
except ZeroDivisionError as e:
logging.exception('ZeroDivisionError: %s', e)
if __name__ == '__main__':
main()
執行時,這將生成一個包含兩行的檔案
28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n File "logtest7.py", line 30, in main\n x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|
雖然上述處理方法很簡單,但它指明瞭如何根據自己的喜好格式化異常資訊。 traceback
模組可能對更專業的需要有所幫助。
語音播報日誌訊息¶
在某些情況下,可能需要以可聽見而非可見的格式呈現日誌訊息。 如果你的系統中有文字轉語音 (TTS) 功能,即使它沒有 Python 繫結,也很容易做到這一點。 大多數 TTS 系統都有一個可以執行的命令列程式,可以使用 subprocess
從處理程式呼叫它。 此處假定 TTS 命令列程式不希望與使用者互動或花費很長時間才能完成,並且日誌訊息的頻率不會太高以至於使使用者被訊息淹沒,並且可以接受一次說出一條訊息而不是併發地說出訊息,以下示例實現等待一條訊息被說出後才處理下一條訊息,這可能會導致其他處理程式保持等待狀態。 這是一個簡短的示例,展示了該方法,該方法假定可以使用 espeak
TTS 包
import logging
import subprocess
import sys
class TTSHandler(logging.Handler):
def emit(self, record):
msg = self.format(record)
# Speak slowly in a female English voice
cmd = ['espeak', '-s150', '-ven+f3', msg]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# wait for the program to finish
p.communicate()
def configure_logging():
h = TTSHandler()
root = logging.getLogger()
root.addHandler(h)
# the default formatter just returns the message
root.setLevel(logging.DEBUG)
def main():
logging.info('Hello')
logging.debug('Goodbye')
if __name__ == '__main__':
configure_logging()
sys.exit(main())
執行時,此指令碼應以女聲說 “Hello”,然後說 “Goodbye”。
當然,上述方法可以適用於其他 TTS 系統,甚至可以適用於其他可以透過命令列執行的外部程式處理訊息的系統。
緩衝日誌訊息並有條件地輸出它們¶
在某些情況下,你可能希望將訊息記錄在臨時區域中,並且僅在發生特定條件時才輸出它們。 例如,你可能想要開始在函式中記錄除錯事件,如果函式完成且沒有錯誤,則你不想用收集的除錯資訊來混亂日誌,但是如果有錯誤,則你希望輸出所有除錯資訊以及錯誤。
這裡有一個示例,展示瞭如何使用裝飾器來實現你希望的日誌記錄行為。它使用了logging.handlers.MemoryHandler
,它允許緩衝記錄的事件,直到滿足某個條件時,緩衝的事件會被重新整理
- 傳遞給另一個處理器(target
處理器)進行處理。預設情況下,當 MemoryHandler
的緩衝區被填滿或遇到級別大於或等於指定閾值的事件時,它會重新整理。如果你想要自定義重新整理行為,可以將此方法與 MemoryHandler
的更專門的子類一起使用。
示例指令碼有一個簡單的函式 foo
,它只是迴圈遍歷所有日誌級別,向 sys.stderr
寫入即將記錄的級別,然後在該級別實際記錄一條訊息。你可以向 foo
傳遞一個引數,如果為 true,它將在 ERROR 和 CRITICAL 級別記錄 - 否則,它只會在 DEBUG、INFO 和 WARNING 級別記錄。
該指令碼只是安排用一個裝飾器來裝飾 foo
,該裝飾器將執行所需的條件日誌記錄。裝飾器接收一個記錄器作為引數,並在呼叫被裝飾函式期間附加一個記憶體處理器。裝飾器還可以使用目標處理器、應發生重新整理的級別和緩衝區容量(緩衝記錄的數量)進行引數化。這些引數預設分別為寫入 sys.stderr
的 StreamHandler
、logging.ERROR
和 100
。
這是指令碼:
import logging
from logging.handlers import MemoryHandler
import sys
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
if target_handler is None:
target_handler = logging.StreamHandler()
if flush_level is None:
flush_level = logging.ERROR
if capacity is None:
capacity = 100
handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)
def decorator(fn):
def wrapper(*args, **kwargs):
logger.addHandler(handler)
try:
return fn(*args, **kwargs)
except Exception:
logger.exception('call failed')
raise
finally:
super(MemoryHandler, handler).flush()
logger.removeHandler(handler)
return wrapper
return decorator
def write_line(s):
sys.stderr.write('%s\n' % s)
def foo(fail=False):
write_line('about to log at DEBUG ...')
logger.debug('Actually logged at DEBUG')
write_line('about to log at INFO ...')
logger.info('Actually logged at INFO')
write_line('about to log at WARNING ...')
logger.warning('Actually logged at WARNING')
if fail:
write_line('about to log at ERROR ...')
logger.error('Actually logged at ERROR')
write_line('about to log at CRITICAL ...')
logger.critical('Actually logged at CRITICAL')
return fail
decorated_foo = log_if_errors(logger)(foo)
if __name__ == '__main__':
logger.setLevel(logging.DEBUG)
write_line('Calling undecorated foo with False')
assert not foo(False)
write_line('Calling undecorated foo with True')
assert foo(True)
write_line('Calling decorated foo with False')
assert not decorated_foo(False)
write_line('Calling decorated foo with True')
assert decorated_foo(True)
當執行此指令碼時,應觀察到以下輸出:
Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL
如你所見,只有當記錄的事件的嚴重性為 ERROR 或更高時,才會發生實際的日誌輸出,但在這種情況下,任何先前較低嚴重性的事件也會被記錄。
你當然可以使用傳統的裝飾方式:
@log_if_errors(logger)
def foo(fail=False):
...
將帶有緩衝的日誌訊息傳送到電子郵件¶
為了說明如何透過電子郵件傳送日誌訊息,以便每封郵件傳送一定數量的訊息,你可以繼承 BufferingHandler
。在以下示例中,你可以根據你的特定需求進行調整,它提供了一個簡單的測試工具,允許你使用命令列引數來執行指令碼,指定透過 SMTP 傳送郵件時通常需要的內容。(使用 -h
引數執行下載的指令碼,檢視所需的和可選的引數。)
import logging
import logging.handlers
import smtplib
class BufferingSMTPHandler(logging.handlers.BufferingHandler):
def __init__(self, mailhost, port, username, password, fromaddr, toaddrs,
subject, capacity):
logging.handlers.BufferingHandler.__init__(self, capacity)
self.mailhost = mailhost
self.mailport = port
self.username = username
self.password = password
self.fromaddr = fromaddr
if isinstance(toaddrs, str):
toaddrs = [toaddrs]
self.toaddrs = toaddrs
self.subject = subject
self.setFormatter(logging.Formatter("%(asctime)s %(levelname)-5s %(message)s"))
def flush(self):
if len(self.buffer) > 0:
try:
smtp = smtplib.SMTP(self.mailhost, self.mailport)
smtp.starttls()
smtp.login(self.username, self.password)
msg = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n" % (self.fromaddr, ','.join(self.toaddrs), self.subject)
for record in self.buffer:
s = self.format(record)
msg = msg + s + "\r\n"
smtp.sendmail(self.fromaddr, self.toaddrs, msg)
smtp.quit()
except Exception:
if logging.raiseExceptions:
raise
self.buffer = []
if __name__ == '__main__':
import argparse
ap = argparse.ArgumentParser()
aa = ap.add_argument
aa('host', metavar='HOST', help='SMTP server')
aa('--port', '-p', type=int, default=587, help='SMTP port')
aa('user', metavar='USER', help='SMTP username')
aa('password', metavar='PASSWORD', help='SMTP password')
aa('to', metavar='TO', help='Addressee for emails')
aa('sender', metavar='SENDER', help='Sender email address')
aa('--subject', '-s',
default='Test Logging email from Python logging module (buffering)',
help='Subject of email')
options = ap.parse_args()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
h = BufferingSMTPHandler(options.host, options.port, options.user,
options.password, options.sender,
options.to, options.subject, 10)
logger.addHandler(h)
for i in range(102):
logger.info("Info index = %d", i)
h.flush()
h.close()
如果你執行此指令碼並且你的 SMTP 伺服器設定正確,你應該發現它向你指定的收件人傳送了 11 封電子郵件。前 10 封電子郵件將分別包含 10 條日誌訊息,第 11 封電子郵件將包含 2 條訊息。這構成了指令碼中指定的 102 條訊息。
透過配置使用 UTC (GMT) 格式化時間¶
有時你希望使用 UTC 格式化時間,這可以使用如下所示的 UTCFormatter
類來完成:
import logging
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
然後你可以在程式碼中使用 UTCFormatter
而不是 Formatter
。如果你想透過配置來做到這一點,你可以使用 dictConfig()
API,其方法如下面的完整示例所示:
import logging
import logging.config
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'utc': {
'()': UTCFormatter,
'format': '%(asctime)s %(message)s',
},
'local': {
'format': '%(asctime)s %(message)s',
}
},
'handlers': {
'console1': {
'class': 'logging.StreamHandler',
'formatter': 'utc',
},
'console2': {
'class': 'logging.StreamHandler',
'formatter': 'local',
},
},
'root': {
'handlers': ['console1', 'console2'],
}
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.warning('The local time is %s', time.asctime())
執行此指令碼時,應列印如下內容:
2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015
顯示時間如何以本地時間和 UTC 格式化,每個處理器一個。
使用上下文管理器進行選擇性日誌記錄¶
有時,臨時更改日誌記錄配置並在執行某些操作後恢復它會很有用。為此,上下文管理器是儲存和恢復日誌記錄上下文的最明顯方式。這是一個簡單的上下文管理器示例,它允許你選擇性地更改日誌記錄級別,並在上下文管理器的範圍內純粹地新增日誌記錄處理器:
import logging
import sys
class LoggingContext:
def __init__(self, logger, level=None, handler=None, close=True):
self.logger = logger
self.level = level
self.handler = handler
self.close = close
def __enter__(self):
if self.level is not None:
self.old_level = self.logger.level
self.logger.setLevel(self.level)
if self.handler:
self.logger.addHandler(self.handler)
def __exit__(self, et, ev, tb):
if self.level is not None:
self.logger.setLevel(self.old_level)
if self.handler:
self.logger.removeHandler(self.handler)
if self.handler and self.close:
self.handler.close()
# implicit return of None => don't swallow exceptions
如果你指定了級別值,則在上下文管理器覆蓋的 with 塊的範圍內,記錄器的級別將設定為該值。如果你指定了一個處理器,則在進入該塊時將其新增到記錄器,並在退出該塊時將其刪除。你還可以要求管理器在塊退出時為你關閉處理器 - 如果你不再需要該處理器,則可以這樣做。
為了說明它是如何工作的,我們可以將以下程式碼塊新增到上面:
if __name__ == '__main__':
logger = logging.getLogger('foo')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
logger.info('1. This should appear just once on stderr.')
logger.debug('2. This should not appear.')
with LoggingContext(logger, level=logging.DEBUG):
logger.debug('3. This should appear once on stderr.')
logger.debug('4. This should not appear.')
h = logging.StreamHandler(sys.stdout)
with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
logger.debug('5. This should appear twice - once on stderr and once on stdout.')
logger.info('6. This should appear just once on stderr.')
logger.debug('7. This should not appear.')
我們最初將記錄器的級別設定為 INFO
,因此顯示訊息 #1,而不顯示訊息 #2。然後,我們在下面的 with
塊中臨時將級別更改為 DEBUG
,因此顯示訊息 #3。在該塊退出後,記錄器的級別將恢復為 INFO
,因此不顯示訊息 #4。在下一個 with
塊中,我們再次將級別設定為 DEBUG
,但還添加了一個寫入 sys.stdout
的處理器。因此,訊息 #5 在控制檯上顯示兩次(一次透過 stderr
,一次透過 stdout
)。在 with
語句完成後,狀態恢復到之前的狀態,因此顯示訊息 #6(如訊息 #1),而訊息 #7 不顯示(如訊息 #2)。
如果我們執行生成的指令碼,結果如下:
$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
如果我們再次執行它,但將 stderr
管道到 /dev/null
,我們會看到以下內容,這是唯一寫入 stdout
的訊息:
$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.
再次,但將 stdout
管道到 /dev/null
,我們會得到:
$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
在這種情況下,按預期,列印到 stdout
的訊息 #5 不會顯示。
當然,這裡描述的方法可以推廣,例如臨時附加日誌記錄過濾器。請注意,上面的程式碼在 Python 2 和 Python 3 中都可以工作。
CLI 應用程式啟動模板¶
這是一個示例,展示瞭如何:
使用基於命令列引數的日誌級別
在單獨的檔案中排程到多個子命令,所有子命令都以一致的方式在同一級別進行日誌記錄
利用簡單、最小的配置
假設我們有一個命令列應用程式,其任務是停止、啟動或重新啟動某些服務。為了便於說明,可以將其組織為一個檔案 app.py
,它是應用程式的主指令碼,各個命令在 start.py
、stop.py
和 restart.py
中實現。進一步假設我們想透過命令列引數控制應用程式的詳細程度,預設為 logging.INFO
。以下是編寫 app.py
的一種方法:
import argparse
import importlib
import logging
import os
import sys
def main(args=None):
scriptname = os.path.basename(__file__)
parser = argparse.ArgumentParser(scriptname)
levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
parser.add_argument('--log-level', default='INFO', choices=levels)
subparsers = parser.add_subparsers(dest='command',
help='Available commands:')
start_cmd = subparsers.add_parser('start', help='Start a service')
start_cmd.add_argument('name', metavar='NAME',
help='Name of service to start')
stop_cmd = subparsers.add_parser('stop',
help='Stop one or more services')
stop_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to stop')
restart_cmd = subparsers.add_parser('restart',
help='Restart one or more services')
restart_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to restart')
options = parser.parse_args()
# the code to dispatch commands could all be in this file. For the purposes
# of illustration only, we implement each command in a separate module.
try:
mod = importlib.import_module(options.command)
cmd = getattr(mod, 'command')
except (ImportError, AttributeError):
print('Unable to find the code for command \'%s\'' % options.command)
return 1
# Could get fancy here and load configuration from file or dictionary
logging.basicConfig(level=options.log_level,
format='%(levelname)s %(name)s %(message)s')
cmd(options)
if __name__ == '__main__':
sys.exit(main())
並且 start
、stop
和 restart
命令可以在單獨的模組中實現,如下所示用於啟動:
# start.py
import logging
logger = logging.getLogger(__name__)
def command(options):
logger.debug('About to start %s', options.name)
# actually do the command processing here ...
logger.info('Started the \'%s\' service.', options.name)
用於停止的:
# stop.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to stop %s', services)
# actually do the command processing here ...
logger.info('Stopped the %s service%s.', services, plural)
同樣用於重新啟動:
# restart.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to restart %s', services)
# actually do the command processing here ...
logger.info('Restarted the %s service%s.', services, plural)
如果我們以預設日誌級別執行此應用程式,我們將得到如下輸出:
$ python app.py start foo
INFO start Started the 'foo' service.
$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
第一個單詞是日誌級別,第二個單詞是記錄事件的位置的模組或包名稱。
如果我們更改日誌級別,則可以更改傳送到日誌的資訊。例如,如果我們想要更多資訊:
$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.
$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
如果我們想要更少的資訊:
$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz
在這種情況下,這些命令不會向控制檯列印任何內容,因為它們沒有記錄任何 WARNING
級別或更高級別的訊息。
用於日誌記錄的 Qt GUI¶
不時出現的一個問題是如何記錄到 GUI 應用程式。 Qt 框架是一個流行的跨平臺 UI 框架,它使用 PySide2 或 PyQt5 庫進行 Python 繫結。
以下示例展示瞭如何記錄到 Qt GUI。這引入了一個簡單的 QtHandler
類,它接受一個可呼叫物件,該物件應是主執行緒中執行 GUI 更新的槽。還建立了一個工作執行緒,以展示如何從 UI 本身(透過按鈕進行手動日誌記錄)以及在後臺執行工作的工作執行緒(此處只是以隨機級別記錄訊息,並在其間隨機短暫延遲)向 GUI 記錄日誌。
工作執行緒使用 Qt 的 QThread
類實現,而不是 threading
模組,因為在某些情況下必須使用 QThread
,它可以更好地與其他 Qt
元件整合。
此程式碼應適用於 PySide6
、PyQt6
、PySide2
或 PyQt5
的最新版本。您應該能夠將此方法應用於早期版本的 Qt。請參閱程式碼片段中的註釋以獲取更詳細的資訊。
import datetime
import logging
import random
import sys
import time
# Deal with minor differences between different Qt packages
try:
from PySide6 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
try:
from PyQt6 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
except ImportError:
try:
from PySide2 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
from PyQt5 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
logger = logging.getLogger(__name__)
#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
signal = Signal(str, logging.LogRecord)
#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
def __init__(self, slotfunc, *args, **kwargs):
super().__init__(*args, **kwargs)
self.signaller = Signaller()
self.signaller.signal.connect(slotfunc)
def emit(self, record):
s = self.format(record)
self.signaller.signal.emit(s, record)
#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
return QtCore.QThread.currentThread().objectName()
#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
@Slot()
def start(self):
extra = {'qThreadName': ctname() }
logger.debug('Started work', extra=extra)
i = 1
# Let the thread run until interrupted. This allows reasonably clean
# thread termination.
while not QtCore.QThread.currentThread().isInterruptionRequested():
delay = 0.5 + random.random() * 2
time.sleep(delay)
try:
if random.random() < 0.1:
raise ValueError('Exception raised: %d' % i)
else:
level = random.choice(LEVELS)
logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
except ValueError as e:
logger.exception('Failed: %s', e, extra=extra)
i += 1
#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):
COLORS = {
logging.DEBUG: 'black',
logging.INFO: 'blue',
logging.WARNING: 'orange',
logging.ERROR: 'red',
logging.CRITICAL: 'purple',
}
def __init__(self, app):
super().__init__()
self.app = app
self.textedit = te = QtWidgets.QPlainTextEdit(self)
# Set whatever the default monospace font is for the platform
f = QtGui.QFont('nosuchfont')
if hasattr(f, 'Monospace'):
f.setStyleHint(f.Monospace)
else:
f.setStyleHint(f.StyleHint.Monospace) # for Qt6
te.setFont(f)
te.setReadOnly(True)
PB = QtWidgets.QPushButton
self.work_button = PB('Start background work', self)
self.log_button = PB('Log a message at a random level', self)
self.clear_button = PB('Clear log window', self)
self.handler = h = QtHandler(self.update_status)
# Remember to use qThreadName rather than threadName in the format string.
fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
formatter = logging.Formatter(fs)
h.setFormatter(formatter)
logger.addHandler(h)
# Set up to terminate the QThread when we exit
app.aboutToQuit.connect(self.force_quit)
# Lay out all the widgets
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(te)
layout.addWidget(self.work_button)
layout.addWidget(self.log_button)
layout.addWidget(self.clear_button)
self.setFixedSize(900, 400)
# Connect the non-worker slots and signals
self.log_button.clicked.connect(self.manual_update)
self.clear_button.clicked.connect(self.clear_display)
# Start a new worker thread and connect the slots for the worker
self.start_thread()
self.work_button.clicked.connect(self.worker.start)
# Once started, the button should be disabled
self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))
def start_thread(self):
self.worker = Worker()
self.worker_thread = QtCore.QThread()
self.worker.setObjectName('Worker')
self.worker_thread.setObjectName('WorkerThread') # for qThreadName
self.worker.moveToThread(self.worker_thread)
# This will start an event loop in the worker thread
self.worker_thread.start()
def kill_thread(self):
# Just tell the worker to stop, then tell it to quit and wait for that
# to happen
self.worker_thread.requestInterruption()
if self.worker_thread.isRunning():
self.worker_thread.quit()
self.worker_thread.wait()
else:
print('worker has already exited.')
def force_quit(self):
# For use when the window is closed
if self.worker_thread.isRunning():
self.kill_thread()
# The functions below update the UI and run in the main thread because
# that's where the slots are set up
@Slot(str, logging.LogRecord)
def update_status(self, status, record):
color = self.COLORS.get(record.levelno, 'black')
s = '<pre><font color="%s">%s</font></pre>' % (color, status)
self.textedit.appendHtml(s)
@Slot()
def manual_update(self):
# This function uses the formatted message passed in, but also uses
# information from the record to format the message in an appropriate
# color according to its severity (level).
level = random.choice(LEVELS)
extra = {'qThreadName': ctname() }
logger.log(level, 'Manually logged!', extra=extra)
@Slot()
def clear_display(self):
self.textedit.clear()
def main():
QtCore.QThread.currentThread().setObjectName('MainThread')
logging.getLogger().setLevel(logging.DEBUG)
app = QtWidgets.QApplication(sys.argv)
example = Window(app)
example.show()
if hasattr(app, 'exec'):
rc = app.exec()
else:
rc = app.exec_()
sys.exit(rc)
if __name__=='__main__':
main()
使用 RFC5424 支援記錄到 syslog¶
儘管 RFC 5424 的日期可以追溯到 2009 年,但大多數 syslog 伺服器預設配置為使用較舊的 RFC 3164,後者可以追溯到 2001 年。當 logging
於 2003 年新增到 Python 時,它支援當時較早的(也是唯一的)協議。自從 RFC5424 釋出以來,由於它在 syslog 伺服器中沒有得到廣泛部署,因此 SysLogHandler
功能尚未更新。
RFC 5424 包含一些有用的功能,例如對結構化資料的支援,如果您需要能夠記錄到支援它的 syslog 伺服器,則可以使用類似於以下內容的子類處理程式來實現
import datetime
import logging.handlers
import re
import socket
import time
class SysLogHandler5424(logging.handlers.SysLogHandler):
tz_offset = re.compile(r'([+-]\d{2})(\d{2})$')
escaped = re.compile(r'([\]"\\])')
def __init__(self, *args, **kwargs):
self.msgid = kwargs.pop('msgid', None)
self.appname = kwargs.pop('appname', None)
super().__init__(*args, **kwargs)
def format(self, record):
version = 1
asctime = datetime.datetime.fromtimestamp(record.created).isoformat()
m = self.tz_offset.match(time.strftime('%z'))
has_offset = False
if m and time.timezone:
hrs, mins = m.groups()
if int(hrs) or int(mins):
has_offset = True
if not has_offset:
asctime += 'Z'
else:
asctime += f'{hrs}:{mins}'
try:
hostname = socket.gethostname()
except Exception:
hostname = '-'
appname = self.appname or '-'
procid = record.process
msgid = '-'
msg = super().format(record)
sdata = '-'
if hasattr(record, 'structured_data'):
sd = record.structured_data
# This should be a dict where the keys are SD-ID and the value is a
# dict mapping PARAM-NAME to PARAM-VALUE (refer to the RFC for what these
# mean)
# There's no error checking here - it's purely for illustration, and you
# can adapt this code for use in production environments
parts = []
def replacer(m):
g = m.groups()
return '\\' + g[0]
for sdid, dv in sd.items():
part = f'[{sdid}'
for k, v in dv.items():
s = str(v)
s = self.escaped.sub(replacer, s)
part += f' {k}="{s}"'
part += ']'
parts.append(part)
sdata = ''.join(parts)
return f'{version} {asctime} {hostname} {appname} {procid} {msgid} {sdata} {msg}'
您需要熟悉 RFC 5424 才能完全理解以上程式碼,並且您可能有一些略有不同的需求(例如,如何將結構化資料傳遞給日誌)。但是,以上內容應該可以根據您的具體需求進行調整。使用上述處理程式,您可以使用類似以下方式傳遞結構化資料
sd = {
'foo@12345': {'bar': 'baz', 'baz': 'bozz', 'fizz': r'buzz'},
'foo@54321': {'rab': 'baz', 'zab': 'bozz', 'zzif': r'buzz'}
}
extra = {'structured_data': sd}
i = 1
logger.debug('Message %d', i, extra=extra)
如何將記錄器視為輸出流¶
有時,您需要與期望寫入類似檔案的物件的第三方 API 介面,但您希望將 API 的輸出定向到記錄器。您可以使用一個類來包裝具有類似檔案 API 的記錄器來實現此目的。這是一個演示此類類的簡短指令碼
import logging
class LoggerWriter:
def __init__(self, logger, level):
self.logger = logger
self.level = level
def write(self, message):
if message != '\n': # avoid printing bare newlines, if you like
self.logger.log(self.level, message)
def flush(self):
# doesn't actually do anything, but might be expected of a file-like
# object - so optional depending on your situation
pass
def close(self):
# doesn't actually do anything, but might be expected of a file-like
# object - so optional depending on your situation. You might want
# to set a flag so that later calls to write raise an exception
pass
def main():
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('demo')
info_fp = LoggerWriter(logger, logging.INFO)
debug_fp = LoggerWriter(logger, logging.DEBUG)
print('An INFO message', file=info_fp)
print('A DEBUG message', file=debug_fp)
if __name__ == "__main__":
main()
執行此指令碼時,它會列印
INFO:demo:An INFO message
DEBUG:demo:A DEBUG message
您還可以使用 LoggerWriter
透過執行類似以下的操作來重定向 sys.stdout
和 sys.stderr
import sys
sys.stdout = LoggerWriter(logger, logging.INFO)
sys.stderr = LoggerWriter(logger, logging.WARNING)
您應該在為您的需求配置日誌記錄之後執行此操作。在上面的示例中,basicConfig()
呼叫執行此操作(使用 sys.stderr
值,在它被 LoggerWriter
例項覆蓋之前)。然後,您將獲得這樣的結果
>>> print('Foo')
INFO:demo:Foo
>>> print('Bar', file=sys.stderr)
WARNING:demo:Bar
>>>
當然,上面的示例顯示了根據 basicConfig()
使用的格式的輸出,但是您可以在配置日誌記錄時使用不同的格式化程式。
請注意,使用上述方案,您在某種程度上會受到緩衝和您正在攔截的寫入呼叫序列的影響。例如,對於上述 LoggerWriter
的定義,如果您有以下程式碼片段
sys.stderr = LoggerWriter(logger, logging.WARNING)
1 / 0
然後執行該指令碼會導致
WARNING:demo:Traceback (most recent call last):
WARNING:demo: File "/home/runner/cookbook-loggerwriter/test.py", line 53, in <module>
WARNING:demo:
WARNING:demo:main()
WARNING:demo: File "/home/runner/cookbook-loggerwriter/test.py", line 49, in main
WARNING:demo:
WARNING:demo:1 / 0
WARNING:demo:ZeroDivisionError
WARNING:demo::
WARNING:demo:division by zero
如您所見,此輸出並不理想。這是因為寫入 sys.stderr
的底層程式碼會進行多次寫入,每次寫入都會導致單獨的記錄行(例如,上面的最後三行)。要解決此問題,您需要緩衝資料,並且僅在新行出現時輸出日誌行。讓我們使用一個稍微好一點的 LoggerWriter
實現
class BufferingLoggerWriter(LoggerWriter):
def __init__(self, logger, level):
super().__init__(logger, level)
self.buffer = ''
def write(self, message):
if '\n' not in message:
self.buffer += message
else:
parts = message.split('\n')
if self.buffer:
s = self.buffer + parts.pop(0)
self.logger.log(self.level, s)
self.buffer = parts.pop()
for part in parts:
self.logger.log(self.level, part)
這只是緩衝資料直到看到新行,然後記錄完整的行。使用這種方法,您將獲得更好的輸出
WARNING:demo:Traceback (most recent call last):
WARNING:demo: File "/home/runner/cookbook-loggerwriter/main.py", line 55, in <module>
WARNING:demo: main()
WARNING:demo: File "/home/runner/cookbook-loggerwriter/main.py", line 52, in main
WARNING:demo: 1/0
WARNING:demo:ZeroDivisionError: division by zero
要避免的模式¶
儘管前面的章節描述了您可能需要執行或處理的方法,但值得一提的是一些無益的使用模式,因此在大多數情況下應避免這些模式。以下各節沒有特定的順序。
多次開啟同一個日誌檔案¶
在 Windows 上,您通常無法多次開啟同一個檔案,因為這會導致“檔案正在被另一個程序使用”的錯誤。但是,在 POSIX 平臺上,如果您多次開啟同一個檔案,則不會收到任何錯誤。這可能是意外發生的,例如透過
多次新增引用同一個檔案的檔案處理程式(例如,透過複製/貼上/忘記更改錯誤)。
開啟兩個看起來不同的檔案,因為它們具有不同的名稱,但是它們是相同的,因為一個是另一個的符號連結。
派生一個程序,之後父程序和子程序都具有對同一個檔案的引用。例如,這可能是透過使用
multiprocessing
模組來實現的。
多次開啟一個檔案可能在大多數情況下看起來可以工作,但實際上會導致許多問題
日誌輸出可能會被搞亂,因為多個執行緒或程序嘗試寫入同一個檔案。儘管日誌記錄可以防止多個執行緒同時使用同一個處理程式例項,但是如果兩個不同的執行緒使用兩個不同的處理程式例項(恰好指向同一個檔案)嘗試同時寫入,則不會有這種保護。
刪除檔案(例如,在檔案輪換期間)的嘗試會靜默失敗,因為還有另一個引用指向它。這會導致混亂和浪費除錯時間 - 日誌條目最終出現在意外的位置,或者完全丟失。或者,本應移動的檔案仍保留在原位,並且儘管應該進行基於大小的輪換,但其大小卻出乎意料地增長。
使用 從多個程序記錄到單個檔案 中概述的技術來規避此類問題。
在類中將記錄器用作屬性或將它們作為引數傳遞¶
儘管在某些不尋常的情況下您可能需要這樣做,但通常沒有必要這樣做,因為記錄器是單例。程式碼始終可以使用 logging.getLogger(name)
按名稱訪問給定的記錄器例項,因此傳遞例項並將它們保留為例項屬性毫無意義。請注意,在其他語言(例如 Java 和 C#)中,記錄器通常是靜態類屬性。但是,這種模式在 Python 中沒有意義,因為模組(而不是類)是軟體分解的單位。
在庫中將 NullHandler
之外的處理程式新增到記錄器¶
透過新增處理程式、格式化程式和篩選器來配置日誌記錄是應用程式開發人員的責任,而不是庫開發人員的責任。如果您正在維護庫,請確保除了 NullHandler
例項之外,不要將處理程式新增到任何記錄器。
建立大量記錄器¶
記錄器是指令碼執行期間永遠不會釋放的單例,因此建立大量記錄器會佔用無法釋放的記憶體。與其為每個已處理的檔案或建立的網路連線建立一個記錄器,不如使用用於將上下文資訊傳遞到日誌中的現有機制,並將建立的記錄器限制為那些描述應用程式內區域的記錄器(通常是模組,但有時比模組更精細)。
其他資源¶
另請參閱
- 模組
logging
日誌記錄模組的 API 參考。
- 模組
logging.config
日誌記錄模組的配置 API。
- 模組
logging.handlers
日誌記錄模組中包含的有用處理程式。