The Python Oracle

`DummyExecutor` for Python's `futures`

--------------------------------------------------
Hire the world's top talent on demand or became one of them at Toptal: https://topt.al/25cXVn
and get $2,000 discount on your first invoice
--------------------------------------------------

Take control of your privacy with Proton's trusted, Swiss-based, secure services.
Choose what you need and safeguard your digital life:
Mail: https://go.getproton.me/SH1CU
VPN: https://go.getproton.me/SH1DI
Password Manager: https://go.getproton.me/SH1DJ
Drive: https://go.getproton.me/SH1CT


Music by Eric Matyas
https://www.soundimage.org
Track title: Cosmic Puzzle

--

Chapters
00:00 `Dummyexecutor` For Python'S `Futures`
00:27 Accepted Answer Score 25
01:04 Answer 2 Score 4
01:28 Answer 3 Score 0
02:28 Thank you

--

Full question
https://stackoverflow.com/questions/1043...

--

Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...

--

Tags
#python #debugging #concurrency #future #concurrentfutures

#avk47



ACCEPTED ANSWER

Score 25


Something like this should do it:

from concurrent.futures import Future, Executor
from threading import Lock


class DummyExecutor(Executor):

    def __init__(self):
        self._shutdown = False
        self._shutdownLock = Lock()

    def submit(self, fn, *args, **kwargs):
        with self._shutdownLock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = Future()
            try:
                result = fn(*args, **kwargs)
            except BaseException as e:
                f.set_exception(e)
            else:
                f.set_result(result)

            return f

    def shutdown(self, wait=True):
        with self._shutdownLock:
            self._shutdown = True


if __name__ == '__main__':

    def fnc(err):
        if err:
            raise Exception("test")
        else:
            return "ok"

    ex = DummyExecutor()
    print(ex.submit(fnc, True))
    print(ex.submit(fnc, False))
    ex.shutdown()
    ex.submit(fnc, True) # raises exception

locking is probably not needed in this case, but can't hurt to have it.




ANSWER 2

Score 4


Use this to mock your ThreadPoolExecutor

class MockThreadPoolExecutor():
    def __init__(self, **kwargs):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        pass

    def submit(self, fn, *args, **kwargs):
        # execute functions in series without creating threads
        # for easier unit testing
        result = fn(*args, **kwargs)
        return result

    def shutdown(self, wait=True):
        pass

if __name__ == "__main__":
    def sum(a, b):
        return a + b

    with MockThreadPoolExecutor(max_workers=3) as executor:
        future_result = list()
        for i in range(5):
            future_result.append(executor.submit(sum, i + 1, i + 2))



ANSWER 3

Score 0


Pools from concurrent.futures package are eager (which you of course want and which means they pick up calculations as soon as possible - some time between pool.submit() call and associated future.result() method returns).

From perspective of synchronous code you have two choices - either calculate tasks result on pool.submit() call, or future.result() retrieval.

I find second approach more natural since it better imitates "nonblocking" nature of pool.map() from perspective of main thread - results can be obtained one after another as soon as they done calculating (rather then when they are all ready).

Here is my code (additional implementation of DummyFuture is needed):

from concurrent.futures import Executor
from threading import Lock
from functools import partial


class DummyFuture():
    def __init__(self, calculation) -> None:
        self.calculation = calculation
    def result(self, timeout=None):
        return self.calculation()
    def cancel(self):
        pass


class DummyExecutor(Executor):

    def __init__(self):
        self._shutdown = False
        self._shutdown_lock = Lock()

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            return DummyFuture(partial(fn, *args, **kwargs))

    def shutdown(self, wait=True, *, cancel_futures=False):
        with self._shutdown_lock:
            self._shutdown = True

(I have to give credit to mata's answer since it was initial start of my implementation.)