Skip to content

Python threading

Matthew Bernstein requested to merge bugfix/py_threading into integration

Trying out various approaches to multithreading in Python. When stepping through the existing HowTo_UseThreadPools.py in IPython, I found that the root cause of multiprocessing not working was that PyPulse.Engine, and therefore PulseEngine, is not pickleable. Here is my initial attempt at fixing that.

References: https://docs.python.org/3/library/pickle.html https://docs.python.org/3/howto/unicode.html https://stackoverflow.com/a/70816220 https://stackoverflow.com/q/53855880

Note the new __getstate__ and __setstate__ which wrap the existing string serializing functions, along with the new self.args. Are those together sufficient to clone a PulseEngine? The small size of serialize_to_string output makes me think probably not. Also, __init__ probably should not be called in __getstate__.

However, the main reason this may not be the way to go is that it doesn't work immediately after the class is initialized, since _is_ready is False. Either way, working around that, I get this confusing stack trace:

---------------------------------------------------------------------------
UnicodeDecodeError                        Traceback (most recent call last)
Input In [43], in <cell line: 1>()
      1 for _ in range(1000):
----> 2     ps.append(deepcopy(p1))

File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:172, in deepcopy(x, memo, _nil)
    170                 y = x
    171             else:
--> 172                 y = _reconstruct(x, memo, *rv)
    174 # If is its own copy, don't memoize.
    175 if y is not x:

File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:270, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    268 if state is not None:
    269     if deep:
--> 270         state = deepcopy(state, memo)
    271     if hasattr(y, '__setstate__'):
    272         y.__setstate__(state)

File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:146, in deepcopy(x, memo, _nil)
    144 copier = _deepcopy_dispatch.get(cls)
    145 if copier is not None:
--> 146     y = copier(x, memo)
    147 else:
    148     if issubclass(cls, type):

File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:210, in _deepcopy_tuple(x, memo, deepcopy)
    209 def _deepcopy_tuple(x, memo, deepcopy=deepcopy):
--> 210     y = [deepcopy(a, memo) for a in x]
    211     # We're not going to put the tuple in the memo, but it's still important we
    212     # check for it, in case the tuple contains recursive mutable structures.
    213     try:

File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:210, in <listcomp>(.0)
    209 def _deepcopy_tuple(x, memo, deepcopy=deepcopy):
--> 210     y = [deepcopy(a, memo) for a in x]
    211     # We're not going to put the tuple in the memo, but it's still important we
    212     # check for it, in case the tuple contains recursive mutable structures.
    213     try:

File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:146, in deepcopy(x, memo, _nil)
    144 copier = _deepcopy_dispatch.get(cls)
    145 if copier is not None:
--> 146     y = copier(x, memo)
    147 else:
    148     if issubclass(cls, type):

File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:230, in _deepcopy_dict(x, memo, deepcopy)
    228 memo[id(x)] = y
    229 for key, value in x.items():
--> 230     y[deepcopy(key, memo)] = deepcopy(value, memo)
    231 return y

File ~/.local/conda/envs/pulse/lib/python3.9/copy.py:161, in deepcopy(x, memo, _nil)
    159 reductor = getattr(x, "__reduce_ex__", None)
    160 if reductor is not None:
--> 161     rv = reductor(4)
    162 else:
    163     reductor = getattr(x, "__reduce__", None)

File ~/pulse/engine/src/python/pulse/engine/PulseEngine.py:116, in PulseEngine.__getstate__(self)
    115 def __getstate__(self):
--> 116     return self._args, self.serialize_to_string(PyPulse.serialization_format.binary)

File ~/pulse/engine/src/python/pulse/engine/PulseEngine.py:112, in PulseEngine.serialize_to_string(self, format)
    110 def serialize_to_string(self, format: eSerializationFormat):
    111     if self._is_ready:
--> 112         return self.__pulse.serialize_to_string(format)
    113     return None

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xec in position 26: invalid continuation byte

Here is a MWE:

sys.path.append('/home/local/KHQ/matthew.bernstein/pulse/engine/src/python/pulse/howto/')
from HowTo_UseThreadPools import *

# Using `ub.Executor` as a frontend for concurrent.futures to easily swap between serial, multithreading, and multiprocessing.
import ubelt as ub
from copy import deepcopy
import pickle as pkl

# generate some patients
ps = []
for ix in range(100):
    # ps.append(deepcopy(p1))
    p = PoolPatient(ix)
    p.data_request_mgr = data_req_mgr
    p.patient_configuration = SEPatientConfiguration()
    p.patient_configuration.get_patient().set_name("p1")
    p.patient_configuration.get_patient().set_sex(eSex.Male)
    ps.append(p)

# initialize them as in HowTo_UseThreadPools

# max_workers does nothing in this mode
exc = ub.Executor(mode='serial', max_workers=100)
timer = ub.Timer('running jobs')
with timer: 
    jobs = [exc.submit(initialize_engine, p) for p in ps]
    print([job.result() for job in jobs][:10])
print('serial: ', timer.elapsed)

# same time elapsed as serial, so probably needs some pybind11 magic to bypass the GIL
exc = ub.Executor(mode='thread', max_workers=100)
timer = ub.Timer('running jobs')
with timer:
    jobs = [exc.submit(initialize_engine, p) for p in ps]
    print([job.result() for job in jobs][:10])
print('thread: ', timer.elapsed)

if 0:
    # the following operations all rely on pickling
    
    exc = ub.Executor(mode='process', max_workers=100)
    timer = ub.Timer('running jobs')
    with timer:
        jobs = [exc.submit(initialize_engine, p) for p in ps]
        print([job.result() for job in jobs][:10])
    print('process: ', timer.elapsed)
    
    string = pkl.dumps(p)  # p._is_ready=False, so state is None, but can bypass that by just setting it to True
    new_p = pkl.loads(string)  # if state is None, typechecking error here (as expected), else unicode error happens here (unexpected)
  
    new_p = deepcopy(p)

@aaron.bray any insight here or was anyone else working on something similar?

Edited by Matthew Bernstein

Merge request reports

Loading