I have a problem where I need to read a large text file and conditionally send data from it to two different database tables using Python. I would like to use a generator pipeline to avoid loading all the data into memory.
The logic of what I am trying to do is equivalent to turning a generator that yields integers into one for odd numbers and another for even numbers then simultaneously them writing them to separate files.
I can split into two generators with itertools.tee as follows:
def generate_numbers(limit):
# My real-life function has this form.
for i in range(limit):
yield i
numbers1, numbers2 = itertools.tee(generate_numbers(10), n=2)
evens = (num for num in numbers1 if num % 2 == 0)
odds = (num for num in numbers2 if num %2 != 0)
This produces the expected result:
>>> list(evens)
[0, 2, 4, 6, 8]
>>> list(odds)
[1, 3, 5, 7, 9]
The tee
documentation warns the lots of temporary data will be stored if one iterator uses most or all of the data before the other starts. This is what happens when I write the data (from fresh iterators) to files as below.
def write_to_file(filename, numbers):
# My real-life function takes an iterable as an argument.
with open(filename, 'wt') as outfile:
for i in numbers:
outfile.write(f"{i}\n")
write_to_file('evens.txt', evens)
write_to_file('odds.txt', odds)
Is it possible to consume the generators simultaneously? The tee
documentation also warns that it isn't thread safe. Can this be done with asyncio
?
Alternatively, is there another approach? Would chunking the data help? My main constraints are that I don't want to hold all the data for a single table in memory and that my consuming functions expect an iterable of items, rather than an individual item.
Similar questions
This question: Separate odd and even lines in a generator with python is very similar to mine. The accepted answer suggests passing through the input file twice. I may end up doing this, but it would be nice to do a single pass.
There is another answer the opens both output files at once for writing and then processes each item one-by-one. This isn't suitable for me as my real-life consuming function expects to read all the items in an iterator.
Edit: After posting, I found this answer from 2015: https://stackoverflow.com/a/28030261/3508733, which suggests that you can't do it with itertools.tee
, because of the memory issue. Is there an alternative way?