Python Forum
Terminating Subprocesses and Threads while they're calculating
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Terminating Subprocesses and Threads while they're calculating
#1
I am working on a larger project, where I have 2 Threads (same process) and one separate process. One of the threads is the gui, the other thread is a sentinel thread, observing the subprocess, and the subprocess is doing some heavy lifting with neural networks. The architecture looks somewhat like this:

[Image: 8uI4KBJ.png]

I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do.

from multiprocessing import Process, Queue from threading import Thread from time import sleep class Worker(Process): # The worker resembles the neural network. It does some calculations and shares # the information via the queue. def __init__(self, queue: Queue): Process.__init__(self) self.queue = queue def run(self): i = 0 while True: self.queue.put(i) i += 1 def stop(self): # I used the stop function for trying out some things, like using a joinable # queue and block execution as long as the queue is not empty, which is not # working self.queue.put(None) self.terminate() class Listener(Thread): # This class resembles the sentinel thread. It checks in an infinite loop for # messages. In the real application I send signals via the signals and slots # design pattern to the gui and display the sent information. def __init__(self): Thread.__init__(self) self.queue = Queue() self.worker = Worker(self.queue) def run(self): self.worker.start() while True: data = self.queue.get() if data is not None: print(data) else: break print("broken") def stop(self): self.worker.stop() class System: # This class resembles the gui def __init__(self): self.listener = Listener() def start(self): self.listener.start() def stop(self): self.listener.stop() if __name__ == "__main__": system = System() system.start() sleep(0.1) system.stop()
What is the problem
As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors.

What I have tried so far:
  1. Using a Joinable Queue and join() for each task_done()
  2. Rewriting the SIGTERM signalhandler to wait the queue to be emptied
  3. Using a Joinable Queue and only join() within the SIGTERM signalhandler

The results:
  1. The speed of the processing collapsed greatly, but termination worked properly
  2. termination does not work the way I implemented it
  3. Sometimes it worked, sometimes it did not. So no reliable output and knowledge from this method

An attempt for (3) is the following:

class Worker(Process): def __init__(self, queue: Queue): Process.__init__(self) self.queue = queue self.abort = False self.lock = Lock() signal(SIGTERM, self.stop) def run(self): i = 0 while True: self.lock.acquire() if self.abort: break else: self.queue.put(i) i += 1 self.lock.release() exit(0) def stop(self, sig, frame): self.abort = True self.queue.put(None) self.queue.join() exit(0)
Reply
#2
As a workaround, I suggest to use Queue.get() with a timeout, say half a second in the sentinel thread instead of a blocking Queue.get() and check if the worker process is still running when the timeout occurs. This gives the sentinel thread an option to exit instead of waiting for queue events when the worker thread is already dead.
lvlanson likes this post
Reply
#3
(Oct-16-2020, 07:08 PM)Gribouillis Wrote: As a workaround, I suggest to use Queue.get() with a timeout, say half a second in the sentinel thread instead of a blocking Queue.get() and check if the worker process is still running when the timeout occurs. This gives the sentinel thread an option to exit instead of waiting for queue events when the worker thread is already dead.

I guess this is a last resort option. There still would data remain in the multiprocessing queue. My instinct tells me that somehow interrupt handlers/ asynchronous event handlers should do the trick somehow.
Reply
#4
There are still things to try. You could call queue.close() and queue.join_thread() in the worker process when it receives the instruction to stop. This would wait until every data previously sent to the queue has been written in the underlying pipe. Also it would raise an exception if the run() function still tries to put() anything else in the queue.
lvlanson likes this post
Reply
#5
(Oct-17-2020, 06:04 AM)Gribouillis Wrote: There are still things to try. You could call queue.close() and queue.join_thread() in the worker process when it receives the instruction to stop. This would wait until every data previously sent to the queue has been written in the underlying pipe. Also it would raise an exception if the run() function still tries to put() anything else in the queue.

This solution works
from multiprocessing import Process, Lock from threading import Thread from time import sleep from sys import exit from multiprocessing import Queue class Worker(Process): def __init__(self, queue: Queue): Process.__init__(self) self.queue = queue def run(self): i = 0 try: while True: self.queue.put(i) i += 1 except ValueError: pass def stop(self): self.queue.put("END") self.queue.close() self.queue.join_thread() self.terminate() class Listener(Thread): def __init__(self): Thread.__init__(self) self.queue = Queue() self.worker = Worker(self.queue) self.is_running = True def run(self): self.worker.start() while self.is_running: try: data = self.queue.get() if data != "END": pass else: self.is_running = False except TypeError: self.is_running = False def stop(self): self.worker.stop() self.is_running = False class System: def __init__(self): self.listener = Listener() def start(self): self.listener.start() def stop(self): self.listener.stop() if __name__ == "__main__": system = System() system.start() sleep(0.1) system.stop() sleep(1) print(f"Process Alive: {system.listener.worker.is_alive()}") print(f"Thread Alive: {system.listener.is_alive()}")
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  [Solved]Help terminating arduino properly classified 1 988 Dec-28-2024, 10:40 PM
Last Post: classified
  Subprocesses not opening File Select Dialog teut 2 4,044 Feb-22-2021, 08:07 PM
Last Post: teut
Question Terminating threads Gilush 1 3,052 Jun-09-2020, 09:57 AM
Last Post: Gribouillis
  Using Terminating Signal to Terminate Long Threads crcali 1 3,779 Apr-06-2018, 01:26 AM
Last Post: woooee

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020
This forum uses Lukasz Tkacz MyBB addons.
Forum use Krzysztof "Supryk" Supryczynski addons.