2

I have a problem where I need to read a large text file and conditionally send data from it to two different database tables using Python. I would like to use a generator pipeline to avoid loading all the data into memory.

The logic of what I am trying to do is equivalent to turning a generator that yields integers into one for odd numbers and another for even numbers then simultaneously them writing them to separate files.

I can split into two generators with itertools.tee as follows:

def generate_numbers(limit):
    # My real-life function has this form.
    for i in range(limit):
        yield i

numbers1, numbers2 = itertools.tee(generate_numbers(10), n=2)
evens = (num for num in numbers1 if num % 2 == 0)
odds = (num for num in numbers2 if num %2 != 0)

This produces the expected result:

>>> list(evens)                                                                              
[0, 2, 4, 6, 8]
>>> list(odds)                                                                               
[1, 3, 5, 7, 9]

The tee documentation warns the lots of temporary data will be stored if one iterator uses most or all of the data before the other starts. This is what happens when I write the data (from fresh iterators) to files as below.

def write_to_file(filename, numbers):
    # My real-life function takes an iterable as an argument.
    with open(filename, 'wt') as outfile:
        for i in numbers:
            outfile.write(f"{i}\n")

write_to_file('evens.txt', evens)
write_to_file('odds.txt', odds)

Is it possible to consume the generators simultaneously? The tee documentation also warns that it isn't thread safe. Can this be done with asyncio?

Alternatively, is there another approach? Would chunking the data help? My main constraints are that I don't want to hold all the data for a single table in memory and that my consuming functions expect an iterable of items, rather than an individual item.

Similar questions

This question: Separate odd and even lines in a generator with python is very similar to mine. The accepted answer suggests passing through the input file twice. I may end up doing this, but it would be nice to do a single pass.

There is another answer the opens both output files at once for writing and then processes each item one-by-one. This isn't suitable for me as my real-life consuming function expects to read all the items in an iterator.

Edit: After posting, I found this answer from 2015: https://stackoverflow.com/a/28030261/3508733, which suggests that you can't do it with itertools.tee, because of the memory issue. Is there an alternative way?

  • Having the two iterators independent forces you to memorize when the other is ahead. You either need to have two distinct iters (each ignores what isn't destined to itself). Or hardly couple the two iter by having just one returning a tuple each time. I don't know if I am getting the situation right... – uben Jan 26 '23 at 23:21
  • Might be duplicate of https://stackoverflow.com/q/73900617/12671057 – Kelly Bundy Jan 27 '23 at 00:10

2 Answers2

0

After reading more about itertools, I've found a way that works for me. I still create two generators with tee as before, but then I split the data into chunks with itertools.islice. That way I can alternate between generators without letting one get too far ahead of the other.

# Create two generators using tee
numbers1, numbers2 = itertools.tee(generate_numbers(10), 2)
evens = (num for num in numbers1 if num % 2 == 0)
odds = (num for num in numbers2 if num %2 != 0)


def append_to_file(filename, numbers):
    # Append to file, instead of writing the whole file at once
    with open(filename, 'at') as f:
        for num in numbers:
            f.write(f"{num}\n")

            
# Use islice to move through both generators in chunks
chunksize = 2
while True:
    odds_chunk = list(itertools.islice(odds, chunksize))
    append_to_file('/tmp/odds.txt', odds_chunk)
    evens_chunk = list(itertools.islice(evens, chunksize))
    append_to_file('/tmp/evens.txt', evens_chunk)
    if odds_chunk == evens_chunk == []:
        break

In my real-life case, I expect that a chunk size of a few thousand will be a good balance between memory use and reducing round-trips to the database.

  • but here you cast the iterators to `list`... is it really needed? – cards Jan 27 '23 at 00:01
  • That's *"without letting one get too far ahead"* only if the input is "balanced", like odd and even numbers alternating. If you have a million even numbers followed by one odd number, your first odd_chunk will buffer all million even numbers. – Kelly Bundy Jan 27 '23 at 00:04
  • The example is also not realistic since you can just open both files and iterate the input once, writing each element into the appropriate file. – Kelly Bundy Jan 27 '23 at 00:05
  • @cards - `itertools.islice` returns another iterable, so you have to cast to a list to see if it is empty. – Dr John A Stevenson Jan 27 '23 at 00:08
  • @KellyBundy - Good point about the balance. I got too caught up in the example, and I can't guarantee that my real inputs will be balanced. – Dr John A Stevenson Jan 27 '23 at 00:12
  • use `next` with a default value, for ex `None`, to check if empty – cards Jan 27 '23 at 00:12
  • @cards How would you rewrite their code to do that? Where they currently do the check, the iterators are *definitely* already "empty". – Kelly Bundy Jan 27 '23 at 00:15
  • @cards - I didn't know that you could set a default on `next`. Thanks. However, when I tried it my `append_to_file` function threw an error as it is expecting something to iterate over. – Dr John A Stevenson Jan 27 '23 at 00:15
  • @Kelly I forgot about the function's call. I think I lost the focus of the question and got lost in small details – cards Jan 27 '23 at 00:18
  • @Dr John A Stevenson notice that you are opening *and* closing a file descriptors for each iteration. Maybe open them once at global level? `with open() as even, open() as odd: chunk-stuffs` – cards Jan 27 '23 at 01:11
  • 1
    A global open could help in this case, @cards, but in my real-life case the consuming function is different. I used the file writing as an example function that takes an iterable as input. – Dr John A Stevenson Jan 27 '23 at 10:10
0

Based on the chunking method in my earlier answer and comments from @KellyBundy about unbalanced inputs, I have modified the code. The following meets my requirements, even if it isn't technically generators all the way through.

  • ✅ Doesn't hold more than chunksize items in memory at once
  • ✅ Downstream function receives a generator
  • ✅ Single pass through the data
  • ❌ Data are temporarily materialized (but only in chunks)
import itertools


def generate_numbers(limit):
    # My real-life function has this form.
    for i in range(limit):
        yield i


def append_to_file(filename, numbers):
    # My real-life function takes an iterable as an argument.
    with open(filename, 'at') as f:
        for num in numbers:
            f.write(f"{num}\n")


numbers = generate_numbers(10)
chunksize = 2
while True:
    evens_chunk = []
    odds_chunk = []

    for num in itertools.islice(numbers, chunksize):
        # Could use `match-case` in Python >= 3.10
        if num %2 == 0:
            evens_chunk.append(num)
        else:
            odds_chunk.append(num)

    if evens_chunk:
        evens_gen = (num for num in evens_chunk)
        append_to_file('evens.txt', evens_gen)
    if odds_chunk:
        odds_gen = (num for num in odds_chunk)
        append_to_file('odds.txt', odds_gen)

    if odds_chunk == evens_chunk == []:
        break

If the downstream functions don't require a generator as input, the loop can be simplified further:

while True:
    evens_chunk = []
    odds_chunk = []

    for num in itertools.islice(numbers, chunksize):
        if num %2 == 0:
            evens_chunk.append(num)
        else:
            odds_chunk.append(num)

    append_to_file('evens.txt', evens_chunk)
    append_to_file('odds.txt', odds_chunk)

    if odds_chunk == evens_chunk == []:
        break
  • 1
    Are your real downstream functions like this, i.e., you can call them multiple times with partial data? And do they really need generator iterators, not just any iterators? – Kelly Bundy Jan 27 '23 at 10:26
  • In this case, yes. The downstream functions run a SQL INSERT statement via `executemany`. I can call them repeatedly, even with an empty list. With this question, I had hoped to find an answer to the general case that I could use with generator iterators in the future. – Dr John A Stevenson Jan 27 '23 at 15:43
  • For cases where you can't call them repeatedly, see the other question I linked to under your question. Still unclear why you focus on generators instead of allowing all iterators or even all iterables. Do you really have or know such picky downstream functions? That is rather unusual, especially the generator restriction. If they allow any iterators but not any iterables, you could just use `iter(odds_chunk)` instead of the generator expression. – Kelly Bundy Jan 27 '23 at 21:33