Empty messages in caplog when logs emmited in a different process
Become part of the top 3% of the developers by applying to Toptal https://topt.al/25cXVn
--
Track title: CC H Dvoks String Quartet No 12 Ame
--
Chapters
00:00 Question
01:33 Accepted answer (Score 3)
02:48 Thank you
--
Full question
https://stackoverflow.com/questions/6305...
Question links:
[Pytest capture not working - caplog and capsys are empty]: https://stackoverflow.com/questions/6170...
--
Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...
--
Tags
#python #pytest #fixtures
#avk47
--
Track title: CC H Dvoks String Quartet No 12 Ame
--
Chapters
00:00 Question
01:33 Accepted answer (Score 3)
02:48 Thank you
--
Full question
https://stackoverflow.com/questions/6305...
Question links:
[Pytest capture not working - caplog and capsys are empty]: https://stackoverflow.com/questions/6170...
--
Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...
--
Tags
#python #pytest #fixtures
#avk47
ACCEPTED ANSWER
Score 4
Inspired by @hoefling and still the will to use caplog, there is a solution. The idea is to create a fixture, which takes the queue from the QueueHandler handler and reemits the logs in the main process, which is capturable by the caplog
import logging
import sys
from contextlib import contextmanager
from logging import handlers
from multiprocessing import Process, Queue
import pytest
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger(__name__)
class ActorContext:
def __init__(self):
self.log = logger
def run(self):
self.log.debug("Some msg")
current_actor_context = ActorContext()
@pytest.fixture()
def caplog_workaround():
@contextmanager
def ctx():
logger_queue = Queue()
logger = logging.getLogger()
logger.addHandler(handlers.QueueHandler(logger_queue))
yield
while not logger_queue.empty():
log_record: logging.LogRecord = logger_queue.get()
logger._log(
level=log_record.levelno,
msg=log_record.message,
args=log_record.args,
exc_info=log_record.exc_info,
)
return ctx
def test_caplog_already_not_fails(caplog, caplog_workaround):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
with caplog_workaround():
p = Process(target=current_actor_context.run)
p.start()
p.join()
assert "Some msg" in caplog.text
def test_caplog_passes(caplog, capsys):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
current_actor_context.run()
assert "Some msg" in caplog.text