Question Details

No question body available.

Tags

python python-3.x multithreading multiprocessing

Answers (2)

Accepted Answer Available
Accepted Answer
April 11, 2025 Score: 1 Rep: 23,173 Quality: High Completeness: 80%

you should use cloudpickle, you'll have to do the cloudpickle.dumps and cloudpickle.loads yourself.

from threading import Thread
from multiprocessing import Queue, Manager, Process
from dataclasses import dataclass
from typing import Optional
import logging
import inspect
import cloudpickle

@dataclass class Service: id: Optional[int] = None name: str = "" port: int = 0

class Filter: def init(self, filterfunction: callable): self.filterfunction: callable = filterfunction self.subscribers: list[Service] = []

def call(self, *args, kwds): return self.filterfunction(args, kwds)

class FilterBroker(Thread): def init(self, queue: Queue) -> None: super().init() self.queue = queue self.filters: dict[str, Filter] = {}

def add_filter(self, name: str, filter: Filter): if len(inspect.signature(filter).parameters) != 2: raise TypeError("Invalid Filter: must have exactly two parameters") self.filters[name] = filter

def run(self): class_name = self.class.name logging.info(f"[{class_name}]: Process started") while True: try: task = self.queue.get() logging.debug(f"[{class_name}]: Task received: {task}") if task is None: break response_queue, pickled_task = task method, args = cloudpickle.loads(pickledtask) response = method(self, *args) except Exception: import traceback traceback.printexc() response = None finally: if task is not None: responsequeue.putnowait(response)

class FilterBrokerAsker:

def init(self, queue: Queue) -> None: super().init() self.queue = queue

@staticmethod def ask(fb: 'FilterBrokerAsker', *task): pickledtask = cloudpickle.dumps(task) responsequeue = Manager().Queue() fb.queue.put((responsequeue, pickledtask)) print("I put in queue") result = responsequeue.get() print("I got result") return result

def task(broker): def function(x): print("running local function!") return x > 0

f = Filter(function) print(f(2)) FilterBrokerAsker.ask(broker, FilterBroker.addfilter, 'test', f) logging.debug(f"Filter added")

if name == "main": manager = Manager() broker = FilterBroker(manager.Queue()) broker.start()

brokerdata = FilterBrokerAsker(broker.queue)

process = Process(target=task, args=(brokerdata,))

process.start() process.join()

print("Process finished") broker.queue.put(None)

running local function!
True
I put in queue
I got result
Process finished

the split to FilterBrokerAsker is to get it to work with spawn instead of fork, as threads are not picklable.

note that cloudpickle has problems with imports, and you may need to re-import things inside your functions, the whole concept is very fragile, and FWIW you should just use threads instead.

April 11, 2025 Score: 1 Rep: 46,061 Quality: Medium Completeness: 80%

A few points:

  1. Function function should be at global scope.
  2. There is no reason why method ask cannot be an instance method given the rest of the code.
  3. Trying to pickle a queue as is done in your current ask implementation by putting responsequeue to a queue will not work. But given that the FilterBroker.run method is processing requests serially in a loop anyway, there is no reason why the response queue should not be pre-allocated by the init method. Then we just need to use a lock to ensure that ask calls are serialized so that the response always matches up with the request.
  4. We can use the multiprocessing.Queue class instead of a managed queue thereby avoiding having to start an additional process and making for more efficient get and put method calls.

Check the code thoroughly for other minor changes/additions that I have made:

from threading import Thread
from multiprocessing import Lock, Queue
from dataclasses import dataclass
from typing import Optional
import logging
import inspect

@dataclass class Service: id: Optional[int] = None name: str = "" port: int = 0

class Filter: def init(self, filterfunction: callable): self.filterfunction: callable = filterfunction self.subscribers: list[Service] = []

def call(self, args, kwds): return self.filter_function(args, kwds)

class FilterBroker(Thread): def init(self) -> None: super().init() self.requestqueue = Queue() self.responsequeue = Queue() self.lock = Lock() self.filters: dict[str, Filter] = {}

def addfilter(self, name: str, filter: Filter): if len(inspect.signature(filter).parameters) != 2: raise TypeError("Invalid Filter: must have exactly two parameters") self.filters[name] = filter

def run(self): classname = self.class.name logging.info(f"[{classname}]: Process started") while True: try: task = self.requestqueue.get() logging.debug(f"[{classname}]: Task received: {task}") if task is None: response = None # We must set this due to the finally block break if not isinstance(task, tuple) or not callable(task[0]): raise ValueError('bad arguments') method, *args = task response = method(self, *args) except Exception as e: print(e) response = None finally: self.responsequeue.putnowait(response)

def shutdownthread(self): with self.lock: self.requestqueue.put(None) self.responsequeue.get() self.join()

def ask(self, task): with self._lock: self._request_queue.put(task) print("I put in queue") result = self._response_queue.get() print("I got result", result) return result

def function(x): return x > 0

def task(broker): f = Filter(function) print(f(2)) broker.ask(FilterBroker.add_filter, 'test', f) logging.debug(f"Filter added")

if name == 'main': broker = FilterBroker() broker.start()

thread = Thread(target=task, args=(broker,)) thread.start() thread.join()

print("Thread finished")

# Kill broker: broker.shutdown_thread()

Update

Since you emphatically state that your filter functions cannot be global (I was clearly living in denial), then the cloudpickle package, as recommended by Ahmed, needs to be used. But let me suggest an alternate implementation that allows the FilterBroker.add_filter method to be multithreaded. I would suggest:

  1. Create a class FilterBrokerImpl as a managed object. It's add_filter method expects a single argument that is a tuple serialized with cloudpickle.dumps and which contains the filter's name and filter function. For good measure we will add an additional method, get_filters, which will return the dictionary of filters serialized with cloudpickle.
  2. Class FilterBroker is now a class that wraps a proxy to a FilterBrokerImpl instance and has methods add_filter and get_filter that have a more user-friendly interface that hides the need for serializing.
from dataclasses import dataclass
from typing import Optional
import logging
import inspect
from multiprocessing import Process
from multiprocessing.managers import BaseManager
import cloudpickle

@dataclass class Service: id: Optional[int] = None name: str = "" port: int = 0

class Filter: def init(self, filter_function: callable): self.filter_function: callable = filter_function self.subscribers: list[Service] = []

def call(self, args, kwds): return self.filterfunction(*args, kwds)

class FilterBrokerImpl: def init(self) -> None: self.filters: dict[str, Filter] = {}

def addfilter(self, serializedargs: bytes): """Add a filter.""" name, filter = cloudpickle.loads(serializedargs) if len(inspect.signature(filter).parameters) != 2: raise TypeError("Invalid Filter: must have exactly two parameters") print(f'addfilter name = {name}, filter = {filter}' ) self.filters[name] = filter

def getfilters(self) -> bytes: """Return the filters.""" return cloudpickle.dumps(self.filters)

class FilterBroker: """A wrapper (adapter) that hides the intricacies of serializing arguments and return values to the various FilterBrokerImpl methods."""

def init(self, broker: FilterBrokerImpl): self.broker = broker

def addfilter(self, name: str, filter: Filter) -> None: self.broker.addfilter(cloudpickle.dumps((name, filter)))

def getfilters(self) -> dict[str, Filter]: return cloudpickle.loads(self.broker.getfilters())

def task(broker, name, n): def genfilter(): def function(x): return x > n return function

f = Filter(genfilter()) print(f'result of executing filter named {name} with argument 2: {f(2)}') broker.addfilter(name, f) logging.debug(f"Filter added")

class FilterBrokerManager(BaseManager): pass

def main(): FilterBrokerManager.register('FilterBroker', FilterBrokerImpl)

with FilterBrokerManager() as manager: broker = FilterBroker(manager.FilterBroker())

p1 = Process(target=task, args=(broker, 'test0', 0)) p2 = Process(target=task, args=(broker, 'test5', 5))

p1.start() p2.start()

p1.join() p2.join()

print('filters:', broker.getfilters())

if name == 'main': main()

Prints:

result of executing filter named test5 with argument 2: False result of executing filter named test0 with argument 2: True addfilter name = test5, filter = addfilter name = test0, filter = filters: {'test5': , 'test0': }