I have the following code (Python 2.7):
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
NUM_WORKERS = cpu_count()
c = 0
while True:
results = []
pages = [i for i in range(c, c + NUM_WORKERS)]
with ThreadPoolExecutor(NUM_WORKERS) as executor:
futures = [executor.submit(bro.get_content, page) for page in pages]
for future in as_completed(futures):
results.extend(future.result())
if len(results) < 1:
break
print("Get batch {0} with {1} results".format(c, len(results)))
df = DataFrame(results)
df.to_sql(sql_table_stage, engine, sql_schema, if_exists='append', index=False)
print("Pages {0} to {1} was insert".format(c, c + NUM_WORKERS))
c += NUM_WORKERS
The code runs and the action is executed as expected (Tough incredibly slow!). The thing is that when I look over my logs I see:
[2018-08-21 01:06:54,513] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,513] {bash_operator.py:70} INFO - Tmp dir root location:
[2018-08-21 01:06:54,513] {base_task_runner.py:98} INFO - Subtask: /tmp
[2018-08-21 01:06:54,514] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,514] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpGyRCX2//tmp/airflowtmpGyRCX2/importwQaRgB
[2018-08-21 01:06:54,514] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,514] {bash_operator.py:88} INFO - Running command: python /home/ubuntu/airflow/scripts/import.py
[2018-08-21 01:06:54,519] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,518] {bash_operator.py:97} INFO - Output:
[2018-08-21 05:45:48,758] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 0 with 20000 results
[2018-08-21 05:45:48,759] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Pages 0 to 4 was insert
[2018-08-21 05:45:48,759] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 4 with 19996 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Pages 4 to 8 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 8 with 20000 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 8 to 12 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 12 with 20000 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 12 to 16 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 16 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 16 to 20 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 20 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 20 to 24 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 24 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 24 to 28 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 28 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 28 to 32 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 32 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 32 to 36 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 36 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 36 to 40 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 40 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 40 to 44 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 44 with 20000 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 44 to 48 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 48 with 19997 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 48 to 52 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 52 with 20000 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 52 to 56 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 56 with 20000 results
[2018-08-21 05:45:48,764] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 56 to 60 was insert
- I'm wondering why all threads prints the exact same time? Note that the action began at 1:00 the first batch of print is at 5:45 am. Shouldn't I see prints at different times as each thread my finish sooner or later?
- I'm wondering what is the best number for workers? Currently it starts from
cpu_count()
which is 4.
This is a code I inherited I'm trying to get the sense of it before I'm converting the script to Python 3.