Wednesday, April 13, 2022

[SOLVED] Excepion handling in python multiprocessing pool

Issue

I am trying to handle the KeyboardInterrupt exception in my program, but I can't find out how to do it with multiprocessing pool. Even though I am putting the pool operations in a try-exception block and handling the exceptions, I receive 4 KeyboardInterrupt exceptions.

import time
import multiprocessing as mp

def calc(i):
    return i*i 

def main():
    try:
        with mp.Pool(4) as p:
            while True:
                print(p.map(calc, range(10)))
                time.sleep(1)
    except KeyboardInterrupt:
        print("\nShutting down.")
    except Exception as e:
        print(e)


if __name__ == '__main__':
    main()

I understand that the processes are running inside an isolated environment, but I also want to handle the exceptions somehow.

Edit: The output what I get when I run my code:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
^CProcess ForkPoolWorker-3:
Process ForkPoolWorker-5:
Process ForkPoolWorker-4:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
    with self._rlock:
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
KeyboardInterrupt
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 335, in get
    res = self._reader.recv_bytes()
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
    with self._rlock:
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Process ForkPoolWorker-6:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
    with self._rlock:
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt

Shutting down.

Solution

As you appear to be running under a Linux-type platform (you really should be tagging your multiprocessing questions with the platform), you need to ignore the CTRL-C in your pool processes. The easiest way to do that is to use the initializer argument when you create the pool:

import time
import multiprocessing as mp


def init_pool_processes():
    """
    Each pool process will execute this as part of its
    initialization.
    """
    import signal
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def calc(i):
    return i*i

def main():
    try:
        with mp.Pool(4, initializer=init_pool_processes) as p:
            while True:
                print(p.map(calc, range(10)))
                time.sleep(1)
    except KeyboardInterrupt:
        print("\nShutting down.")
    except Exception as e:
        print(e)


if __name__ == '__main__':
    main()

Update

To handle exceptions thrown by the worker function you should use method imap, which returns an iterator that when iterated returns the next return value or raises an exception if the corresponding task raised an exception. In that way you can catch exceptions for the individual tasks that were submitted. For example:

import multiprocessing as mp


def calc(i):
    if i == 3:
        raise ValueError(f'bad i value {i}')
    return i*i

def main():
    return_values = []
    with mp.Pool(4) as p:
        results = p.imap(calc, range(10))
        while True:
            try:
                return_value = next(results)
                return_values.append(return_value)
            except StopIteration:
                # No more results:
                break
            except Exception as e:
                # worker function raised an exception
                print('Got exception:', e)
                # Let's also append the exception as the return value:
                return_values.append(e)
        print(return_values)


if __name__ == '__main__':
    main()

Prints:

Got exception: bad i value 3
[0, 1, 4, ValueError('bad i value 3'), 16, 25, 36, 49, 64, 81]


Answered By - Booboo
Answer Checked By - Gilberto Lyons (WPSolving Admin)