Multiprocessing: How to use Pool.map on a function defined in a class?
Rise to the top 3% as a developer or hire one of them at Toptal: https://topt.al/25cXVn
--------------------------------------------------
Music by Eric Matyas
https://www.soundimage.org
Track title: Book End
--
Chapters
00:00 Multiprocessing: How To Use Pool.Map On A Function Defined In A Class?
00:43 Answer 1 Score 95
01:21 Answer 2 Score 78
02:25 Accepted Answer Score 74
02:46 Answer 4 Score 42
03:32 Answer 5 Score 18
04:27 Thank you
--
Full question
https://stackoverflow.com/questions/3288...
--
Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...
--
Tags
#python #multiprocessing #pickle
#avk47
ANSWER 1
Score 95
I could not use the code posted so far because code using "multiprocessing.Pool" do not work with lambda expressions and code not using "multiprocessing.Pool" spawn as many processes as there are work items.
I adapted the code s.t. it spawns a predefined amount of workers and only iterates through the input list if there exists an idle worker. I also enabled the "daemon" mode for the workers s.t. ctrl-c works as expected.
import multiprocessing
def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))
def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()
    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]
    [p.join() for p in proc]
    return [x for i, x in sorted(res)]
if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
ACCEPTED ANSWER
Score 74
I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun
def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]
if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))
ANSWER 3
Score 42
There is currently no solution to your problem, as far as I know: the function that you give to map() must be accessible through an import of your module.  This is why robert's code works: the function f() can be obtained by importing the following code:
def f(x):
    return x*x
class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])
if __name__ == '__main__':
    cl = Calculate()
    print cl.run()
I actually added a "main" section, because this follows the recommendations for the Windows platform ("Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects").
I also added an uppercase letter in front of Calculate, so as to follow PEP 8. :)
ANSWER 4
Score 18
The solution by mrule is correct but has a bug: if the child sends back a large amount of data, it can fill the pipe's buffer, blocking on the child's pipe.send(), while the parent is waiting for the child to exit on pipe.join().  The solution is to read the child's data before join()ing the child.  Furthermore the child should close the parent's end of the pipe to prevent a deadlock.  The code below fixes that.  Also be aware that this parmap creates one process per element in X.  A more advanced solution is to use multiprocessing.cpu_count() to divide X into a number of chunks, and then merge the results before returning.  I leave that as an exercise to the reader so as not to spoil the conciseness of the nice answer by mrule.  ;)
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun
def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret
if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))