Issue
I am trying to handle the KeyboardInterrupt exception in my program, but I can't find out how to do it with multiprocessing pool. Even though I am putting the pool operations in a try-exception block and handling the exceptions, I receive 4 KeyboardInterrupt exceptions.
import time
import multiprocessing as mp
def calc(i):
return i*i
def main():
try:
with mp.Pool(4) as p:
while True:
print(p.map(calc, range(10)))
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down.")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
I understand that the processes are running inside an isolated environment, but I also want to handle the exceptions somehow.
Edit: The output what I get when I run my code:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
^CProcess ForkPoolWorker-3:
Process ForkPoolWorker-5:
Process ForkPoolWorker-4:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
with self._rlock:
File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
KeyboardInterrupt
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 335, in get
res = self._reader.recv_bytes()
File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
with self._rlock:
File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Process ForkPoolWorker-6:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
with self._rlock:
File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Shutting down.
Solution
As you appear to be running under a Linux-type platform (you really should be tagging your multiprocessing questions with the platform), you need to ignore the CTRL-C in your pool processes. The easiest way to do that is to use the initializer argument when you create the pool:
import time
import multiprocessing as mp
def init_pool_processes():
"""
Each pool process will execute this as part of its
initialization.
"""
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
def calc(i):
return i*i
def main():
try:
with mp.Pool(4, initializer=init_pool_processes) as p:
while True:
print(p.map(calc, range(10)))
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down.")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
Update
To handle exceptions thrown by the worker function you should use method imap
, which returns an iterator that when iterated returns the next return value or raises an exception if the corresponding task raised an exception. In that way you can catch exceptions for the individual tasks that were submitted. For example:
import multiprocessing as mp
def calc(i):
if i == 3:
raise ValueError(f'bad i value {i}')
return i*i
def main():
return_values = []
with mp.Pool(4) as p:
results = p.imap(calc, range(10))
while True:
try:
return_value = next(results)
return_values.append(return_value)
except StopIteration:
# No more results:
break
except Exception as e:
# worker function raised an exception
print('Got exception:', e)
# Let's also append the exception as the return value:
return_values.append(e)
print(return_values)
if __name__ == '__main__':
main()
Prints:
Got exception: bad i value 3
[0, 1, 4, ValueError('bad i value 3'), 16, 25, 36, 49, 64, 81]
Answered By - Booboo Answer Checked By - Gilberto Lyons (WPSolving Admin)