Issue
I have a python script aiming to process some a large file and write the results in a new txt file. I simplified it as Code example 1. Code example 1:
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from collections import OrderedDict
import os
def process(args):
parm1, parm2, parm3, output_file = args
scores = []
# do something with scores...
if parm1:
scores = [parm2, sum(parm3)]
else:
scores = [parm2/len(parm3), sum(parm3)/len(parm3)]
# create result
result = '\t'.join(scores) + '\n'
output_file.write(result)
def main(large_file_path, output_path, max_processes):
#do something ...
with open(large_file_path, 'r') as large_file, open(output_path, 'w') as output_file:
arg_list = []
parm1, parm2, parm3 = False, 0, []
for line in large_file:
if line.startswith('#'):
continue
else:
# do something to update parm1, parm2, parm3...
data = line.strip().split('\t')
parm1 = data[0] > 0
parm2 = data[1]
parm3 = data[2:]
# add to list
arg_list.append((parm1, parm2, parm3, output_file))
with ProcessPoolExecutor(max_processes) as executor:
executor.map(process, arg_list, chunksize=int(max_processes/2))
if __name__ == "__main__":
large_path = "/path/to/large_file"
output_path = f"para_scores.txt"
max_processes = int(os.cpu_count()/2)# Set the maximum number of processes
main(large_path, output_path, max_processes)
I realized that arg_list
might be quite large if the large_file
is very large. I am not sure if there is enough free memory for it. Then I tried to use yield generator instead of just a python list as Code example 2, which runs normally but does not generate anything.
Code example 2:
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from collections import OrderedDict
import os
def process(args):
parm1, parm2, parm3, output_file = args
scores = []
# do something with scores...
if parm1:
scores = [parm2, sum(parm3)]
else:
scores = [parm2/len(parm3), sum(parm3)/len(parm3)]
# create result
result = '\t'.join(scores) + '\n'
output_file.write(result)
def main(large_file_path, output_path, max_processes):
#do something ...
with open(large_file_path, 'r') as large_file, open(output_path, 'w') as output_file:
def arg_generator(large_file, output_file):
parm1, parm2, parm3 = False, 0, []
for line in large_file:
if line.startswith('#'):
continue
else:
# do something to update parm1, parm2, parm3...
data = line.strip().split('\t')
parm1 = data[0] > 0
parm2 = data[1]
parm3 = data[2:]
# yield
yield (parm1, parm2, parm3, output_file)
with ProcessPoolExecutor(max_processes) as executor:
executor.map(process, arg_generator(large_file, output_file), chunksize=int(max_processes/2))
if __name__ == "__main__":
large_path = "/path/to/large_file"
output_path = f"para_scores.txt"
max_processes = int(os.cpu_count()/2)# Set the maximum number of processes
main(large_path, output_path, max_processes)
I ran the code on a ubuntu 20.04.6 LTS server, python 3.9.18.
So can ProcessPoolExecutor
work with yield
generator in Python? Or the use of executor.map
is problemmatic? What should I do to make it work?
Solution
Yes it can. Here's an example:
from concurrent.futures import ProcessPoolExecutor
def process(x):
print(x * 2)
def gen():
for i in range(3):
yield i
def main():
print('starting')
with ProcessPoolExecutor() as executor:
executor.map(process, gen())
print('done')
main()
Output:
starting
0
2
4
done
The processes don't know where their arguments came from, they recieve individual items, not the entire generator. Using a generator makes no difference.
However, trying to pass an open file handle does seem to weirdly break things:
from concurrent.futures import ProcessPoolExecutor
def process(f):
print('processing', f)
def main():
print('starting')
with open('path_to_file.txt') as f:
with ProcessPoolExecutor() as executor:
executor.map(process, [f])
print('done')
main()
The output is missing the 'processing':
starting
done
Rather pass the filename as a string to each process. Use a different filename for each process so that they don't overwrite each other.
Answered By - Alex Hall Answer Checked By - Dawn Plyler (WPSolving Volunteer)