The Python Oracle

multiprocessing.Process doesn't terminate after putting requests response.content to queue

--------------------------------------------------
Rise to the top 3% as a developer or hire one of them at Toptal: https://topt.al/25cXVn
--------------------------------------------------

Music by Eric Matyas
https://www.soundimage.org
Track title: Over a Mysterious Island

--

Chapters
00:00 Multiprocessing.Process Doesn'T Terminate After Putting Requests Response.Content To Queue
01:04 Accepted Answer Score 0
01:26 Answer 2 Score 2
02:22 Answer 3 Score 6
03:20 Thank you

--

Full question
https://stackoverflow.com/questions/5048...

--

Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...

--

Tags
#python #python3x #pythonrequests #pythonmultiprocessing

#avk47



ANSWER 1

Score 6


As noted in the Pipes and Queues documentation

if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed.

...

Note that a queue created using a manager does not have this issue.

If you switch over to a manager queue, then the process terminates successfully:

import multiprocessing as mp
import queue
import requests
import time


class ChildProcess(mp.Process):
    def __init__(self, q, qout):
        super().__init__()
        self.qin = qin
        self.qout = qout
        self.daemon = True

    def run(self):
        while True:
            try:
                url = self.qin.get(block=False)
                r = requests.get(url, verify=False)
                self.qout.put(r.content)
                self.qin.task_done()
            except queue.Empty:
                break
            except requests.exceptions.RequestException as e:
                print(self.name, e)
                self.qin.task_done()
        print("Infinite loop terminates")


if __name__ == '__main__':
    manager = mp.Manager()
    qin = mp.JoinableQueue()
    qout = manager.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    time.sleep(1)
    print(w.name, w.is_alive())



ANSWER 2

Score 2


It's a bit hard to figure this out based on the Queue documentation - I struggled with the same problem.

The key concept here is that before a producer thread terminates, it joins any queues that it has put data into; that join then blocks until the queue's background thread terminates, which only happens when the queue is empty. So basically, before your ChildProcess can exit, someone has to consume all the stuff it put into the queue!

There is some documentation of the Queue.cancel_join_thread function, which is supposed to circumvent this problem, but I couldn't get it to have any effect - maybe I'm not using it correctly.

Here's an example modification you can make that should fix the issue:

if __name__ == '__main__':
    qin = mp.JoinableQueue()
    qout = mp.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    while True:
        try:
            qout.get(True, 0.1)     # Throw away remaining stuff in qout (or process it or whatever,
                                    # just get it out of the queue so the queue background process
                                    # can terminate, so your ChildProcess can terminate.
        except queue.Empty:
            break
    w.join()                # Wait for your ChildProcess to finish up.
    # time.sleep(1)         # Not necessary since we've joined the ChildProcess
    print(w.name, w.is_alive())



ACCEPTED ANSWER

Score 0


Add a call to w.terminate() above the print message.


Regarding why the process doesn't terminate itself; your function code is an infinite loop, so it doesn't ever return. Calling terminate signals the process to kill itself.