The Python Oracle

Keyboard Interrupts with python's multiprocessing Pool

--------------------------------------------------
Hire the world's top talent on demand or became one of them at Toptal: https://topt.al/25cXVn
and get $2,000 discount on your first invoice
--------------------------------------------------

Music by Eric Matyas
https://www.soundimage.org
Track title: Book End

--

Chapters
00:00 Keyboard Interrupts With Python'S Multiprocessing Pool
00:42 Accepted Answer Score 145
01:29 Answer 2 Score 34
02:19 Answer 3 Score 67
02:54 Answer 4 Score 18
03:18 Thank you

--

Full question
https://stackoverflow.com/questions/1408...

--

Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...

--

Tags
#python #multiprocessing #pool #keyboardinterrupt

#avk47



ACCEPTED ANSWER

Score 145


This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

The KeyboardInterrupt exception won't be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.

Note that this doesn't happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace

    results = pool.map(slowly_square, range(40))

with

    results = pool.map_async(slowly_square, range(40)).get(9999999)

or similar.




ANSWER 2

Score 67


From what I have recently found, the best solution is to set up the worker processes to ignore SIGINT altogether, and confine all the cleanup code to the parent process. This fixes the problem for both idle and busy worker processes, and requires no error handling code in your child processes.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Explanation and full example code can be found at http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ and http://github.com/jreese/multiprocessing-keyboardinterrupt respectively.




ANSWER 3

Score 34


For some reasons, only exceptions inherited from the base Exception class are handled normally. As a workaround, you may re-raise your KeyboardInterrupt as an Exception instance:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normally you would get the following output:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

So if you hit ^C, you will get:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end



ANSWER 4

Score 18


The voted answer does not tackle the core issue but a similar side effect.

Jesse Noller, the author of the multiprocessing library, explains how to correctly deal with CTRL+C when using multiprocessing.Pool in a old blog post.

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()