Empty messages in caplog when logs emmited in a different process
--------------------------------------------------
Rise to the top 3% as a developer or hire one of them at Toptal: https://topt.al/25cXVn
--------------------------------------------------
Music by Eric Matyas
https://www.soundimage.org
Track title: Lost Civilization
--
Chapters
00:00 Empty Messages In Caplog When Logs Emmited In A Different Process
01:06 Accepted Answer Score 4
01:53 Thank you
--
Full question
https://stackoverflow.com/questions/6305...
--
Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...
--
Tags
#python #pytest #fixtures
#avk47
Rise to the top 3% as a developer or hire one of them at Toptal: https://topt.al/25cXVn
--------------------------------------------------
Music by Eric Matyas
https://www.soundimage.org
Track title: Lost Civilization
--
Chapters
00:00 Empty Messages In Caplog When Logs Emmited In A Different Process
01:06 Accepted Answer Score 4
01:53 Thank you
--
Full question
https://stackoverflow.com/questions/6305...
--
Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...
--
Tags
#python #pytest #fixtures
#avk47
ACCEPTED ANSWER
Score 4
Inspired by @hoefling and still the will to use caplog, there is a solution. The idea is to create a fixture, which takes the queue from the QueueHandler handler and reemits the logs in the main process, which is capturable by the caplog
import logging
import sys
from contextlib import contextmanager
from logging import handlers
from multiprocessing import Process, Queue
import pytest
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger(__name__)
class ActorContext:
def __init__(self):
self.log = logger
def run(self):
self.log.debug("Some msg")
current_actor_context = ActorContext()
@pytest.fixture()
def caplog_workaround():
@contextmanager
def ctx():
logger_queue = Queue()
logger = logging.getLogger()
logger.addHandler(handlers.QueueHandler(logger_queue))
yield
while not logger_queue.empty():
log_record: logging.LogRecord = logger_queue.get()
logger._log(
level=log_record.levelno,
msg=log_record.message,
args=log_record.args,
exc_info=log_record.exc_info,
)
return ctx
def test_caplog_already_not_fails(caplog, caplog_workaround):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
with caplog_workaround():
p = Process(target=current_actor_context.run)
p.start()
p.join()
assert "Some msg" in caplog.text
def test_caplog_passes(caplog, capsys):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
current_actor_context.run()
assert "Some msg" in caplog.text