0

I am running the following simple example. Submit 20 jobs that take 2 seconds each using a single worker:

celery -A celery_test worker --concurrency 10 -l INFO

This should take 2 * 2 = 4 seconds. This is true for the worker to process the data. However, getting the data adds an additional delay of 6 seconds.

Any ideas how to get rid of this delay?

For scripts and outputs see below:

celery_call.py:

from celery_test import add
import time
results = []
for i in range(20):
    results.append(add.delay(i))
for result in results:
    timeStart = time.time()
    resultValue = result.get(timeout=10)
    timePassed = time.time() - timeStart
    print(timePassed, resultValue)

celery_test.py:

from celery import Celery
app = Celery('celery_test', backend='redis://localhost', broker='redis://localhost')

@app.task
def add(x):
    import time
    time.sleep(2)
    return x

Output celery_call.py -> in total execution takes 10s!!!:

1.9161145687103271 0
0.0035011768341064453 1
0.016004323959350586 2
0.017235994338989258 3
0.01010441780090332 4
0.0038263797760009766 5
0.005273342132568359 6
0.004664897918701172 7
0.012930631637573242 8
0.003242015838623047 9
1.9315376281738281 10
0.0010662078857421875 11
0.013183832168579102 12
0.11239218711853027 13
1.001314640045166 14
1.0015337467193604 15
1.002277135848999 16
1.0016703605651855 17
1.0015861988067627 18
1.0017943382263184 19

Logging output of worker -> as expected it takes 4s to process the data:

[2017-05-30 14:54:21,475: INFO/MainProcess] Received task: celery_test.add[8a4a00cc-29a1-4a2f-a659-0ea7eb3aabb1]  
[2017-05-30 14:54:21,479: INFO/MainProcess] Received task: celery_test.add[498a19df-0dfa-49f2-b4d8-c9eaa0b8782c]  
[2017-05-30 14:54:21,483: INFO/MainProcess] Received task: celery_test.add[7bc232ca-e85c-4ae7-90bf-1d65c919fa4e]  
[2017-05-30 14:54:21,500: INFO/MainProcess] Received task: celery_test.add[12cdb039-00d2-4471-8ce7-4da256dc83ef]  
[2017-05-30 14:54:21,502: INFO/MainProcess] Received task: celery_test.add[931e1d19-640b-4f30-9b04-b65a165a1bc2]  
[2017-05-30 14:54:21,515: INFO/MainProcess] Received task: celery_test.add[dd78de2e-b9a8-465e-a902-6f9eab1386e9]  
[2017-05-30 14:54:21,518: INFO/MainProcess] Received task: celery_test.add[fb27c545-ad48-4d84-a5a2-154c1290aba3]  
[2017-05-30 14:54:21,523: INFO/MainProcess] Received task: celery_test.add[ce079e0a-6fdf-4ee2-a6bf-ea349a435c4f]  
[2017-05-30 14:54:21,534: INFO/MainProcess] Received task: celery_test.add[1222d9e2-9496-4b83-8cba-ad0b34c4df3d]  
[2017-05-30 14:54:21,542: INFO/MainProcess] Received task: celery_test.add[67c2bf84-b39e-40bb-b1f5-a78b902d92a8]  
[2017-05-30 14:54:21,551: INFO/MainProcess] Received task: celery_test.add[8aee72dd-2230-4d0a-8e4e-e7a3d5ca245c]  
[2017-05-30 14:54:21,558: INFO/MainProcess] Received task: celery_test.add[e636f1ab-54cb-47a1-b1da-19744050566a]  
[2017-05-30 14:54:21,561: INFO/MainProcess] Received task: celery_test.add[67a45660-2383-4d00-aaea-30e027a37a7d]  
[2017-05-30 14:54:21,563: INFO/MainProcess] Received task: celery_test.add[4aa3227b-2ea4-4406-b205-d118c31c43bc]  
[2017-05-30 14:54:21,565: INFO/MainProcess] Received task: celery_test.add[de317340-1012-4c9e-9bf1-4fa7248a91fc]  
[2017-05-30 14:54:21,566: INFO/MainProcess] Received task: celery_test.add[791cf66e-2bff-4571-8209-a068451d1cb5]  
[2017-05-30 14:54:21,569: INFO/MainProcess] Received task: celery_test.add[23701df3-138b-4248-a529-fba6789c2c0d]  
[2017-05-30 14:54:21,569: INFO/MainProcess] Received task: celery_test.add[e3154044-39bd-481f-aadf-21e61d95f99e]  
[2017-05-30 14:54:21,570: INFO/MainProcess] Received task: celery_test.add[0770e885-901e-45c0-a269-42c86aba7d05]  
[2017-05-30 14:54:21,571: INFO/MainProcess] Received task: celery_test.add[a377fe5c-eb4e-44a7-9284-e83a67743096]  
[2017-05-30 14:54:23,480: INFO/PoolWorker-7] Task celery_test.add[8a4a00cc-29a1-4a2f-a659-0ea7eb3aabb1] succeeded in 2.003492763997201s: 0
[2017-05-30 14:54:23,483: INFO/PoolWorker-9] Task celery_test.add[498a19df-0dfa-49f2-b4d8-c9eaa0b8782c] succeeded in 2.00371297500169s: 1
[2017-05-30 14:54:23,500: INFO/PoolWorker-1] Task celery_test.add[7bc232ca-e85c-4ae7-90bf-1d65c919fa4e] succeeded in 2.002869830997952s: 2
[2017-05-30 14:54:23,536: INFO/PoolWorker-8] Task celery_test.add[fb27c545-ad48-4d84-a5a2-154c1290aba3] succeeded in 2.016123138000694s: 6
[2017-05-30 14:54:23,536: INFO/PoolWorker-3] Task celery_test.add[12cdb039-00d2-4471-8ce7-4da256dc83ef] succeeded in 2.032121352000104s: 3
[2017-05-30 14:54:23,562: INFO/PoolWorker-10] Task celery_test.add[67c2bf84-b39e-40bb-b1f5-a78b902d92a8] succeeded in 2.005405851999967s: 9
[2017-05-30 14:54:23,562: INFO/PoolWorker-5] Task celery_test.add[1222d9e2-9496-4b83-8cba-ad0b34c4df3d] succeeded in 2.0252396640025836s: 8
[2017-05-30 14:54:23,562: INFO/PoolWorker-4] Task celery_test.add[931e1d19-640b-4f30-9b04-b65a165a1bc2] succeeded in 2.0579610860004323s: 4
[2017-05-30 14:54:23,563: INFO/PoolWorker-2] Task celery_test.add[ce079e0a-6fdf-4ee2-a6bf-ea349a435c4f] succeeded in 2.026003548002336s: 7
[2017-05-30 14:54:23,574: INFO/PoolWorker-6] Task celery_test.add[dd78de2e-b9a8-465e-a902-6f9eab1386e9] succeeded in 2.0539962090006156s: 5
[2017-05-30 14:54:25,492: INFO/PoolWorker-9] Task celery_test.add[e636f1ab-54cb-47a1-b1da-19744050566a] succeeded in 2.005732863002777s: 11
[2017-05-30 14:54:25,493: INFO/PoolWorker-7] Task celery_test.add[8aee72dd-2230-4d0a-8e4e-e7a3d5ca245c] succeeded in 2.0076579160013353s: 10
[2017-05-30 14:54:25,509: INFO/PoolWorker-1] Task celery_test.add[67a45660-2383-4d00-aaea-30e027a37a7d] succeeded in 2.007014112001343s: 12
[2017-05-30 14:54:25,588: INFO/PoolWorker-10] Task celery_test.add[a377fe5c-eb4e-44a7-9284-e83a67743096] succeeded in 2.0102590669994242s: 19
[2017-05-30 14:54:25,588: INFO/PoolWorker-6] Task celery_test.add[e3154044-39bd-481f-aadf-21e61d95f99e] succeeded in 2.0111475869998685s: 17
[2017-05-30 14:54:25,589: INFO/PoolWorker-3] Task celery_test.add[de317340-1012-4c9e-9bf1-4fa7248a91fc] succeeded in 2.0130576739975368s: 14
[2017-05-30 14:54:25,589: INFO/PoolWorker-8] Task celery_test.add[0770e885-901e-45c0-a269-42c86aba7d05] succeeded in 2.0113905420002993s: 18
[2017-05-30 14:54:25,589: INFO/PoolWorker-5] Task celery_test.add[23701df3-138b-4248-a529-fba6789c2c0d] succeeded in 2.012135950000811s: 16
[2017-05-30 14:54:25,617: INFO/PoolWorker-4] Task celery_test.add[791cf66e-2bff-4571-8209-a068451d1cb5] succeeded in 2.04044298000008s: 15
[2017-05-30 14:54:25,619: INFO/PoolWorker-2] Task celery_test.add[4aa3227b-2ea4-4406-b205-d118c31c43bc] succeeded in 2.043387800000346s: 13
gizzmole
  • 1,437
  • 18
  • 26

1 Answers1

1

It's because you wait for each job result in the loop. So you loose somewhat the benefits of concurrency because jobs results don't arrive in the same order as you request the results. See the example below with some timings added to get all time :

from celery_test import add
import time
results = []
for i in range(20):
    results.append(add.delay(i))

allTimeStart = time.time()

for result in results:
    timeStart = time.time()
    resultValue = result.get(timeout=10)
    timePassed = time.time() - timeStart
    allTimePassed = time.time() - allTimeStart
    print(allTimePassed, timePassed, resultValue)

Gives

(1.9835469722747803, 1.9835450649261475, 0)
(1.9858801364898682, 0.0022699832916259766, 1)
(1.988955020904541, 0.003039121627807617, 2)
(1.9928300380706787, 0.003849029541015625, 3)
(1.9935901165008545, 0.0007331371307373047, 4)
(1.9967319965362549, 0.0031011104583740234, 5)
(1.9973289966583252, 0.0005509853363037109, 6)
(2.0004770755767822, 0.003117084503173828, 7)
(2.0007641315460205, 0.00026702880859375, 8)
(3.00203800201416, 1.001255989074707, 9)
(3.9891350269317627, 0.9870359897613525, 10)
(3.9914891719818115, 0.0023059844970703125, 11)
(3.99283504486084, 0.001302957534790039, 12)
(3.99426007270813, 0.0013878345489501953, 13)
(3.997709035873413, 0.003403902053833008, 14)
(3.9984171390533447, 0.0006809234619140625, 15)
(4.000844955444336, 0.0024080276489257812, 16)
(4.004598140716553, 0.003731966018676758, 17)
(4.0053839683532715, 0.0007598400115966797, 18)
(5.006708145141602, 1.0012950897216797, 19)

But if you look the order of celery tasks results in celery log, you see that results don't arrive ordered as you request them :

[2017-05-31 01:06:39,067: INFO/PoolWorker-2] Task celery_test.add[01fe4581-7982-40f3-92d3-9f352d0b8eca] succeeded in 2.00315466001s: 0
[2017-05-31 01:06:39,069: INFO/PoolWorker-8] Task celery_test.add[f468849c-76d9-4479-b7e2-850aab640437] succeeded in 2.003014307s: 1
[2017-05-31 01:06:39,072: INFO/PoolWorker-3] Task celery_test.add[db6a0064-0a83-49dc-a731-54264651a32f] succeeded in 2.002590772s: 3
[2017-05-31 01:06:39,072: INFO/PoolWorker-4] Task celery_test.add[421b1213-e1b7-4c73-8477-1554c53c4b14] succeeded in 2.002614007s: 2
[2017-05-31 01:06:39,076: INFO/PoolWorker-7] Task celery_test.add[90bdde7f-9740-4d18-820d-dc4c66090b2b] succeeded in 2.00297982999s: 4
[2017-05-31 01:06:39,077: INFO/PoolWorker-5] Task celery_test.add[661cba10-326a-4351-9fec-56d029847939] succeeded in 2.003134354s: 5
[2017-05-31 01:06:39,080: INFO/PoolWorker-10] Task celery_test.add[31903dfe-4b35-49b8-bc66-8c8807a1ee53] succeeded in 2.00229301301s: 6
[2017-05-31 01:06:39,080: INFO/PoolWorker-9] Task celery_test.add[60049a1b-009b-4d7b-ad4e-284f0d2e7147] succeeded in 2.00245238301s: 7
[2017-05-31 01:06:39,084: INFO/PoolWorker-1] Task celery_test.add[4e673409-af0e-4a59-8a42-38f0179b495a] succeeded in 2.00299428699s: 8
[2017-05-31 01:06:39,084: INFO/PoolWorker-6] Task celery_test.add[818bcea5-5654-4ec6-8706-1b6ca58f8735] succeeded in 2.002899974s: 9
[2017-05-31 01:06:41,072: INFO/PoolWorker-2] Task celery_test.add[4ab62e6d-ada3-4e0d-82e2-356eb054631f] succeeded in 2.00349172599s: 10
[2017-05-31 01:06:41,074: INFO/PoolWorker-8] Task celery_test.add[649c83db-a065-4cdd-9f5e-32ae1e5047f4] succeeded in 2.003091722s: 11
[2017-05-31 01:06:41,076: INFO/PoolWorker-4] Task celery_test.add[f6a6e067-7f60-4c1f-b8f4-dce40a6094c0] succeeded in 2.00157168499s: 12
[2017-05-31 01:06:41,077: INFO/PoolWorker-3] Task celery_test.add[ee7b0e01-2fa7-4bd0-b2f2-f5636155209b] succeeded in 2.00259804301s: 13
[2017-05-31 01:06:41,081: INFO/PoolWorker-7] Task celery_test.add[521f7903-3594-4aab-b4df-3a4e723347cd] succeeded in 2.002994123s: 14
[2017-05-31 01:06:41,081: INFO/PoolWorker-5] Task celery_test.add[26a3627c-7934-4613-b3c1-618784bbce26] succeeded in 2.003302467s: 15
[2017-05-31 01:06:41,084: INFO/PoolWorker-9] Task celery_test.add[8e796394-b05f-439b-b695-6d3ff3230844] succeeded in 2.00281064s: 17
[2017-05-31 01:06:41,084: INFO/PoolWorker-10] Task celery_test.add[13b40cd8-b0e4-4788-a3bb-4d050c1b6ad0] succeeded in 2.00298337401s: 16
[2017-05-31 01:06:41,088: INFO/PoolWorker-6] Task celery_test.add[cb8f1303-4d05-4eae-9b40-b2d221f20140] succeeded in 2.00274520101s: 19
[2017-05-31 01:06:41,088: INFO/PoolWorker-1] Task celery_test.add[0900bb54-8e2a-472c-99a8-ee18a8f4857c] succeeded in 2.003100015s: 18

One solution : use group to get all results :

from celery_test import add
from celery import group
import time
results = []
jobs = []
for i in range(20):
    jobs.append(add.s(i))
result = group(jobs).apply_async()
timeStart = time.time()
print(result.join())
timePassed = time.time() - timeStart
print(timePassed)

Returns

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
4.00328302383
jmbarbier
  • 814
  • 8
  • 21