`DummyExecutor` for Python's `futures`
Hire the world's top talent on demand or became one of them at Toptal: https://topt.al/25cXVn
and get $2,000 discount on your first invoice
--------------------------------------------------
Take control of your privacy with Proton's trusted, Swiss-based, secure services.
Choose what you need and safeguard your digital life:
Mail: https://go.getproton.me/SH1CU
VPN: https://go.getproton.me/SH1DI
Password Manager: https://go.getproton.me/SH1DJ
Drive: https://go.getproton.me/SH1CT
Music by Eric Matyas
https://www.soundimage.org
Track title: Cosmic Puzzle
--
Chapters
00:00 `Dummyexecutor` For Python'S `Futures`
00:27 Accepted Answer Score 25
01:04 Answer 2 Score 4
01:28 Answer 3 Score 0
02:28 Thank you
--
Full question
https://stackoverflow.com/questions/1043...
--
Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...
--
Tags
#python #debugging #concurrency #future #concurrentfutures
#avk47
ACCEPTED ANSWER
Score 25
Something like this should do it:
from concurrent.futures import Future, Executor
from threading import Lock
class DummyExecutor(Executor):
def __init__(self):
self._shutdown = False
self._shutdownLock = Lock()
def submit(self, fn, *args, **kwargs):
with self._shutdownLock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = Future()
try:
result = fn(*args, **kwargs)
except BaseException as e:
f.set_exception(e)
else:
f.set_result(result)
return f
def shutdown(self, wait=True):
with self._shutdownLock:
self._shutdown = True
if __name__ == '__main__':
def fnc(err):
if err:
raise Exception("test")
else:
return "ok"
ex = DummyExecutor()
print(ex.submit(fnc, True))
print(ex.submit(fnc, False))
ex.shutdown()
ex.submit(fnc, True) # raises exception
locking is probably not needed in this case, but can't hurt to have it.
ANSWER 2
Score 4
Use this to mock your ThreadPoolExecutor
class MockThreadPoolExecutor():
def __init__(self, **kwargs):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
def submit(self, fn, *args, **kwargs):
# execute functions in series without creating threads
# for easier unit testing
result = fn(*args, **kwargs)
return result
def shutdown(self, wait=True):
pass
if __name__ == "__main__":
def sum(a, b):
return a + b
with MockThreadPoolExecutor(max_workers=3) as executor:
future_result = list()
for i in range(5):
future_result.append(executor.submit(sum, i + 1, i + 2))
ANSWER 3
Score 0
Pools from concurrent.futures package are eager (which you of course want and which means they pick up calculations as soon as possible - some time between pool.submit() call and associated future.result() method returns).
From perspective of synchronous code you have two choices - either calculate tasks result on pool.submit() call, or future.result() retrieval.
I find second approach more natural since it better imitates "nonblocking" nature of pool.map() from perspective of main thread - results can be obtained one after another as soon as they done calculating (rather then when they are all ready).
Here is my code (additional implementation of DummyFuture is needed):
from concurrent.futures import Executor
from threading import Lock
from functools import partial
class DummyFuture():
def __init__(self, calculation) -> None:
self.calculation = calculation
def result(self, timeout=None):
return self.calculation()
def cancel(self):
pass
class DummyExecutor(Executor):
def __init__(self):
self._shutdown = False
self._shutdown_lock = Lock()
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
return DummyFuture(partial(fn, *args, **kwargs))
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
(I have to give credit to mata's answer since it was initial start of my implementation.)