Python threading
Trying out various approaches to multithreading in Python. When stepping through the existing HowTo_UseThreadPools.py
in IPython, I found that the root cause of multiprocessing not working was that PyPulse.Engine
, and therefore PulseEngine
, is not pickleable. Here is my initial attempt at fixing that.
References: https://docs.python.org/3/library/pickle.html https://docs.python.org/3/howto/unicode.html https://stackoverflow.com/a/70816220 https://stackoverflow.com/q/53855880
Note the new __getstate__
and __setstate__
which wrap the existing string serializing functions, along with the new self.args
. Are those together sufficient to clone a PulseEngine
? The small size of serialize_to_string
output makes me think probably not. Also, __init__
probably should not be called in __getstate__
.
However, the main reason this may not be the way to go is that it doesn't work immediately after the class is initialized, since _is_ready
is False. Either way, working around that, I get this confusing stack trace:
---------------------------------------------------------------------------
UnicodeDecodeError Traceback (most recent call last)
Input In [43], in <cell line: 1>()
1 for _ in range(1000):
----> 2 ps.append(deepcopy(p1))
File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:172, in deepcopy(x, memo, _nil)
170 y = x
171 else:
--> 172 y = _reconstruct(x, memo, *rv)
174 # If is its own copy, don't memoize.
175 if y is not x:
File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:270, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
268 if state is not None:
269 if deep:
--> 270 state = deepcopy(state, memo)
271 if hasattr(y, '__setstate__'):
272 y.__setstate__(state)
File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:146, in deepcopy(x, memo, _nil)
144 copier = _deepcopy_dispatch.get(cls)
145 if copier is not None:
--> 146 y = copier(x, memo)
147 else:
148 if issubclass(cls, type):
File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:210, in _deepcopy_tuple(x, memo, deepcopy)
209 def _deepcopy_tuple(x, memo, deepcopy=deepcopy):
--> 210 y = [deepcopy(a, memo) for a in x]
211 # We're not going to put the tuple in the memo, but it's still important we
212 # check for it, in case the tuple contains recursive mutable structures.
213 try:
File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:210, in <listcomp>(.0)
209 def _deepcopy_tuple(x, memo, deepcopy=deepcopy):
--> 210 y = [deepcopy(a, memo) for a in x]
211 # We're not going to put the tuple in the memo, but it's still important we
212 # check for it, in case the tuple contains recursive mutable structures.
213 try:
File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:146, in deepcopy(x, memo, _nil)
144 copier = _deepcopy_dispatch.get(cls)
145 if copier is not None:
--> 146 y = copier(x, memo)
147 else:
148 if issubclass(cls, type):
File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:230, in _deepcopy_dict(x, memo, deepcopy)
228 memo[id(x)] = y
229 for key, value in x.items():
--> 230 y[deepcopy(key, memo)] = deepcopy(value, memo)
231 return y
File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:161, in deepcopy(x, memo, _nil)
159 reductor = getattr(x, "__reduce_ex__", None)
160 if reductor is not None:
--> 161 rv = reductor(4)
162 else:
163 reductor = getattr(x, "__reduce__", None)
File ~/pulse/engine/src/python/pulse/engine/PulseEngine.py:116, in PulseEngine.__getstate__(self)
115 def __getstate__(self):
--> 116 return self._args, self.serialize_to_string(PyPulse.serialization_format.binary)
File ~/pulse/engine/src/python/pulse/engine/PulseEngine.py:112, in PulseEngine.serialize_to_string(self, format)
110 def serialize_to_string(self, format: eSerializationFormat):
111 if self._is_ready:
--> 112 return self.__pulse.serialize_to_string(format)
113 return None
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xec in position 26: invalid continuation byte
Here is a MWE:
sys.path.append('/home/local/KHQ/matthew.bernstein/pulse/engine/src/python/pulse/howto/')
from HowTo_UseThreadPools import *
# Using `ub.Executor` as a frontend for concurrent.futures to easily swap between serial, multithreading, and multiprocessing.
import ubelt as ub
from copy import deepcopy
import pickle as pkl
# generate some patients
ps = []
for ix in range(100):
# ps.append(deepcopy(p1))
p = PoolPatient(ix)
p.data_request_mgr = data_req_mgr
p.patient_configuration = SEPatientConfiguration()
p.patient_configuration.get_patient().set_name("p1")
p.patient_configuration.get_patient().set_sex(eSex.Male)
ps.append(p)
# initialize them as in HowTo_UseThreadPools
# max_workers does nothing in this mode
exc = ub.Executor(mode='serial', max_workers=100)
timer = ub.Timer('running jobs')
with timer:
jobs = [exc.submit(initialize_engine, p) for p in ps]
print([job.result() for job in jobs][:10])
print('serial: ', timer.elapsed)
# same time elapsed as serial, so probably needs some pybind11 magic to bypass the GIL
exc = ub.Executor(mode='thread', max_workers=100)
timer = ub.Timer('running jobs')
with timer:
jobs = [exc.submit(initialize_engine, p) for p in ps]
print([job.result() for job in jobs][:10])
print('thread: ', timer.elapsed)
if 0:
# the following operations all rely on pickling
exc = ub.Executor(mode='process', max_workers=100)
timer = ub.Timer('running jobs')
with timer:
jobs = [exc.submit(initialize_engine, p) for p in ps]
print([job.result() for job in jobs][:10])
print('process: ', timer.elapsed)
string = pkl.dumps(p) # p._is_ready=False, so state is None, but can bypass that by just setting it to True
new_p = pkl.loads(string) # if state is None, typechecking error here (as expected), else unicode error happens here (unexpected)
new_p = deepcopy(p)
@aaron.bray any insight here or was anyone else working on something similar?