1

I have a generator object, that loads quite big amount of data and hogs the I/O of the system. The data is too big to fit into memory all at once, hence the use of generator. And I have a consumer that all of the CPU to process the data yielded by generator. It does not consume much of other resources. Is it possible to interleave these tasks using threads?

For example I'd guess it is possible to run the simplified code below in 11 seconds.

import time, threading
lock = threading.Lock()
def gen():
    for x in range(10):
        time.sleep(1)
        yield x
def con(x):
    lock.acquire()
    time.sleep(1)
    lock.release()
    return x+1

However, the simplest application of threads does not run in that time. It does speed up, but I assume because of parallelism between the dispatcher which does generation and the worked. But not thanks to parallelism between workers.

import joblib
%time joblib.Parallel(n_jobs=2,backend='threading',pre_dispatch=2)((joblib.delayed(con)(x) for x in gen()))
# CPU times: user 0 ns, sys: 0 ns, total: 0 ns
# Wall time: 16 s
Dimitry
  • 2,204
  • 1
  • 16
  • 24
  • If your question is `Is it possible ...?` Then the simple answer is yes. Threads in Python don't run in parallel - you would need multiple processes. If your data can be processed in chunks you could use threads for the I/O bound data acquisition and distribution, each thread feeding data to one or more processes as it *acquires* data. You would have to find a way to throttle that to conserve resources. Python has a lot of builtin tools: threading, multiprocessing, concurrent.futures, subprocesses, asyncio. .... – wwii Jun 13 '21 at 14:12
  • Without more details it would hard to recommend a strategy. Recommending a strategy might be off topic for SO - but I'll let the community decide. There are a **lot** of Q&A's here that touch on your *question*, maybe continually refined searches will help you define your strategy. The module docstring for the [concurrent.futures source](https://github.com/python/cpython/tree/3.9/Lib/concurrent/futures/process.py) has a nice graphic of how they used threads to feed processes. – wwii Jun 13 '21 at 14:20
  • Well, it is possible to understand `Is it possible ...?` in the meaning you did. However the title reads `How to ... ?` The question has a concrete example of a problem and a question about that concrete example. – Dimitry Jun 13 '21 at 14:22
  • [Here is an answer](https://stackoverflow.com/a/51074176/2823755) I wrote with some negative feedback regarding not addressing memory consumption - it feeds data to processes using concurrent.futures. [Here is another](https://stackoverflow.com/a/60802781/2823755). If your generator works, it seems you don't really need threads, just a way to feed data to multiple processes. Again, you didn't say but can we assume that the data can be processed in chunks? Maybe I am misreading your question - it is a bad habit I have. – wwii Jun 13 '21 at 14:33
  • @wwii The question is about threads. A single consumer uses all cpu, to process a single chunk mostly with released GIL. The first paragraph was specifically written to focus the question on threads, not processes. – Dimitry Jun 13 '21 at 15:53
  • I edited my answer with a solution using threads. – wwii Jun 13 '21 at 16:15

2 Answers2

1

Send your data to separate processes. I used concurrent.futures because I like the simple interface.

This runs in about 11 seconds on my computer.

from concurrent.futures import ThreadPoolExecutor
import concurrent
import threading
lock = threading.Lock()

def gen():
    for x in range(10):
        time.sleep(1)
        yield x

def con(x):
    lock.acquire()
    time.sleep(1)
    lock.release()
    return f'{x+1}'

if __name__ == "__main__":

    futures = []
    with ThreadPoolExecutor() as executor:
        t0 = time.time()
        for x in gen():
            futures.append(executor.submit(con,x))
    results = []
    for future in concurrent.futures.as_completed(futures):
        results.append(future.result())
    print(time.time() - t0)
    print('\n'.join(results))

Using 100 generator iterations (def gen(): for x in range(100):) it took about 102 seconds.


Your process may need to keep track of how much data has been sent to tasks that haven't finished to prevent swamping memory resources.

Adding some diagnostic prints to con seems to show that there might be at least two chunks of data out there at a time.

def con(x):
    print(f'{x} received payload at t0 + {time.time()-t0:3.3f}')
    lock.acquire()
    time.sleep(1)
    lock.release()
    print(f'{x} released lock at t0 + {time.time()-t0:3.3f}')
    return f'{x+1}'
Dimitry
  • 2,204
  • 1
  • 16
  • 24
wwii
  • 23,232
  • 7
  • 37
  • 77
1

I've created this question to see if there was an idiomatic drop-in replacement of the for-loop pattern. While wwii's answer does solve the problem, it has a caveat that the generator may get ahead of the consumer thread and swarm the memory if its output is sizeable. I also liked the joblib more.

The problem with joblib code in the question text is that gen is iterated in the main thread, so instead of dispatching the jobs it spends time waiting on gen. I've given up on trying to make sense of the scheduling is so weird when the input generator is slow with joblib. I however did manage to get it do the thing properly after moving both the producer and consumer inside the delayed function.

When the length of the iterable is actually known beforehand (e.g. a list of files to be processed one by one), the code is simple. The code below ensures that there is only one thread that does data generation and one thread does data consumption at the same time.

sync_gen,sync_con = threading.Lock(), threading.Lock()
@joblib.delayed
def work(iterable):
    with sync_gen:
        x = next(iterable)
    with sync_con:
        return con(x)

N=10
iterable = gen()
res1 = joblib.Parallel(2,'threading')(work(iterable) for x in range(N))
#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

If the generator length is not known, the thread workers are better off accumulating their results rather than processing a single input.

sync_gen,sync_con = threading.Lock(), threading.Lock()
def thread_safe(gen):
    try:
        while True:
            with sync_gen:
                x = next(gen)
            yield x
    except StopIteration:
        pass

def work2(safe_iterable):
    res = []
    for x in safe_iterable:
        with sync_con:
            res.append(con(x))
    return res

iterable = gen()
de_work2= joblib.delayed(work2)
res2 = joblib.Parallel(2,'threading')(de_work2(thread_safe(iterable)) for x in range(2))
#[[1, 3, 5, 7, 9], [2, 4, 6, 8, 10]]

Or with ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
iterable = gen()
with ThreadPoolExecutor() as e:
    futures = [e.submit(work2,thread_safe(iterable)) for x in range(2)]
res = [future.result() for future in futures]
Dimitry
  • 2,204
  • 1
  • 16
  • 24