1

I have the following code (Python 2.7):

from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
NUM_WORKERS = cpu_count()
c = 0
while True:
    results = []
    pages = [i for i in range(c, c + NUM_WORKERS)]
    with ThreadPoolExecutor(NUM_WORKERS) as executor:
        futures = [executor.submit(bro.get_content, page) for page in pages]
    for future in as_completed(futures):
        results.extend(future.result())
    if len(results) < 1:
        break
    print("Get batch {0} with {1} results".format(c, len(results)))
    df = DataFrame(results)
    df.to_sql(sql_table_stage, engine, sql_schema, if_exists='append', index=False)
    print("Pages {0} to {1} was insert".format(c, c + NUM_WORKERS))
    c += NUM_WORKERS

The code runs and the action is executed as expected (Tough incredibly slow!). The thing is that when I look over my logs I see:

[2018-08-21 01:06:54,513] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,513] {bash_operator.py:70} INFO - Tmp dir root location: 
[2018-08-21 01:06:54,513] {base_task_runner.py:98} INFO - Subtask:  /tmp
[2018-08-21 01:06:54,514] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,514] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpGyRCX2//tmp/airflowtmpGyRCX2/importwQaRgB
[2018-08-21 01:06:54,514] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,514] {bash_operator.py:88} INFO - Running command: python /home/ubuntu/airflow/scripts/import.py 
[2018-08-21 01:06:54,519] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,518] {bash_operator.py:97} INFO - Output:
[2018-08-21 05:45:48,758] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 0 with 20000 results
[2018-08-21 05:45:48,759] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Pages 0 to 4 was insert
[2018-08-21 05:45:48,759] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 4 with 19996 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Pages 4 to 8 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 8 with 20000 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 8 to 12 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 12 with 20000 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 12 to 16 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 16 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 16 to 20 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 20 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 20 to 24 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 24 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 24 to 28 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 28 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 28 to 32 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 32 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 32 to 36 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 36 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 36 to 40 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 40 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 40 to 44 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 44 with 20000 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 44 to 48 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 48 with 19997 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 48 to 52 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 52 with 20000 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 52 to 56 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 56 with 20000 results
[2018-08-21 05:45:48,764] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 56 to 60 was insert
  1. I'm wondering why all threads prints the exact same time? Note that the action began at 1:00 the first batch of print is at 5:45 am. Shouldn't I see prints at different times as each thread my finish sooner or later?
  2. I'm wondering what is the best number for workers? Currently it starts from cpu_count() which is 4.

This is a code I inherited I'm trying to get the sense of it before I'm converting the script to Python 3.

Programmer120
  • 2,362
  • 9
  • 30
  • 48
  • For starters, [`concurrent.futures.ThreadPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor) uses threads and not processes so setting a pool size of `multiprocessing.cpu_count` makes little sense - due to the [dreaded GIL](https://realpython.com/python-gil/) your workers will never execute in parallel (_'only one thread allowed at the time'_) so if your workers are not waiting for I/O (the rare exception to the GIL _reign_) your code will end up executing slower than if you just did it directly in a loop. – zwer Aug 21 '18 at 08:08
  • @zwer this code imports records from API so all it does is get the records from the API (I/O) and then merge then insert them to my Table. However I want that all threads will work simultaneously. – Programmer120 Aug 21 '18 at 08:12
  • If you want to stay with the high-level interfaces of `concurrent.futures`, use [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) to launch processes instead of threads. That's the only way to make your code execute in parallel (provided you have free CPU cores). Otherwise, just use the `multiprocessing` interface directly, you can adapt [this test](https://stackoverflow.com/questions/44521931/what-is-some-example-code-for-demonstrating-multicore-speedup-in-python-on-windo/44525554#44525554) to do any work you want. – zwer Aug 21 '18 at 08:17
  • @zwer correct me If I'm wrong but I can just replace ThreadPoolExecutor to ProcessPoolExecutor and see if it works better? It seems like no other changes needed? – Programmer120 Aug 21 '18 at 08:25
  • That's the idea behind high-level interfaces like the `concurrent.futures` (or `multiprocessing.dummy` for the lower-level interface to multiprocessing). Beware that the data you're sending to your processes as well as the data returning from them has to be [_pickleable_](https://docs.python.org/3/library/pickle.html) because that's the default mode for Python to share data between processes. Also, to save you some manual work, check [`concurrent.futures.Executor.map()`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map) which does the split for you. – zwer Aug 21 '18 at 08:32
  • So I should change the `ThreadPoolExecutor(NUM_WORKERS)` to `ProcessPoolExecutor(NUM_WORKERS)` and `executor.submit(bro.get_content, page)` to `executor.map(bro.get_content, page)` – Programmer120 Aug 21 '18 at 08:43
  • Start with switching to `ProcessPoolExecutor`, then make other needed changes - i.e. to use `map()` you'll have to forgo your own chopping and just use `executor.map(bro.get_content, c)` as it will split your `c` iterable over the underlying pool of workers. You won't need to do the whole `while True:...` routine, or use `as_completed` as the mapping does all that for you. Do not switch to `map()` if you want to process the results immediately as they show up, tho - `map()` will wait for the processes to execute in order so to get to the second result, the first one has to be available etc. – zwer Aug 21 '18 at 08:54

1 Answers1

0

When creating a thread pool in python, the threads are user level threads and are run on the same processor, due to Global Interpreter Lock(GIL) in python. As only one thread can control the python interpreter at a time. So, using (python)threads we don't get any real concurrency in data-intensive tasks.

How to solve this? Easy. Spawn multiple python processes running on different processors(each with its own interpreter). This is where the multi processing(mp) module is used, to spawn multiple processes from the parent python process in which it is called.

You can verify this by running htop(on linux, mac) and analysing the number of python processes. In case of mp module, they all will have the same name as the parent script where the pool.map function is called.

Deepak Saini
  • 2,810
  • 1
  • 19
  • 26
  • Multiprocessing is a good idea when the process do not need to share data. Here all threads are merging results in order to insert data into SQL table. – Programmer120 Aug 21 '18 at 08:16
  • You can use ``mp.Queue`` to pass messages between processes through a memory pipe. – Deepak Saini Aug 21 '18 at 08:19
  • 1
    Python (CPython) `threading` (and the underlying `_thread`) use real, proper system threads so they may actually end up executing on different CPUs if that's what the underlying OSs' thread scheduler decides. What they cannot do, due to GIL, is execute in parallel. Concurrent execution is just fine. See [this question](https://stackoverflow.com/questions/1050222/what-is-the-difference-between-concurrency-and-parallelism) for the difference between concurrency and parallelism. – zwer Aug 21 '18 at 08:24
  • @zwer cpu_count() give me 4. which means 4 cores. Wouldn't it run 4 threads on the 4 cores? or only 1 core is used at any given time. – Programmer120 Aug 21 '18 at 08:29
  • @Programmer120 - Which CPU is used for Python (CPython that is) to execute your threads is not under your control - the underlying OS schedules that. However, except in special circumstances (i.e. heavy system load with manually set affinities), your threads will execute on the same CPU as the process that started them because otherwise the system would have to do L1-L2 synchronization just to get the process memory in order whenever it switches contexts and no decent thread scheduler will prefer that if there are other options. – zwer Aug 21 '18 at 08:36
  • @zwer So basically my code runs like iterative code :( – Programmer120 Aug 21 '18 at 08:41