Q: Fastest parallel requests in Python
I cannot waste 1 millisecond
One can easily spend 5x more time on doing the same amount of work, if bad approach was selected. Check the [ Epilogue ] section below as as to see one such exemplified code ( an MCVE-example ), where any of the Threads and/or Processes were way slower, than a pure [SERIAL]
-form of the process-execution. So indeed a due care will be necessary here and in every real-world use-case.
- Async using
asyncio
: I do not want to rely on a single thread, for some reason it may get stucked.
- Threads: Is it really reliable on Python to use threads? Do I have the risk of 1 thread make
other get stucked?
- Multiprocesses: If a have on process controlling the others, would I loose to much time in interprocess communication?
The long story short :
HFT/Trading may benefit from an intentionally restricted-duration asyncio
code, as was in detail demonstrated below, so as to benefit from transport-latency masking ( interleaved progress of execution, due to having to still wait for a delivery of a remote-processing results - so can do some useful work in the meantime, letting the I/O-related waiting threads stay idle and handling some other work in the meantime ). Computing heavy tasks or tight, the less very tight request/response-behavioural patterns will not be able to use this, right due to computing intensive nature ( no reason there to go idle at all, so no beneficial CPU-releases will ever happen ) or due to having a need to avoid any ( potentially deteriorating ) in-determinism in code-execution tight response time-window.
Threads are an a priori lost game in standard python interpreter. The central GIL-lock stepping enforces a pure-[SERIAL]
code execution, one-after-another( round-robin scheduling ordered ) as explained here and interactively demonstrated ( here + code included ) - click +
to zoom, until you see 1-tick per pixel resolution, and you will see how often other cores go and try to ask for GIL-lock acquisition and fail to get it, and you will also never see more than a one-and-only-one green-field of a CPU-execution in any column, so a pure-[SERIAL]
-code execution happens even in a crowd of python-threads ( the real-time goes to the right in the graphs ).
Processes-based multiprocessing is quite an expensive tool, yet gives one a way, how to escape from the trap of the GIL-lock internally [SERIAL]
-ised python flow of processing. Inter-process communication is expensive, if performed using the standard multiprocessing.Queue
, but HFT/trading platforms may enjoy much faster / lower latency tools for truly distributed, multi-host, performance-motivated designs. Details go beyond this format, yet after tens of years using microseconds-shaving for ultimate response robustness and latency minimisation for such a distributed-computing trading system.
The Computer Science has taught me a lot of lessons on doing this right.
From a pure Computer-Science point of view, the approach to the problem ( a solution not being a parallel in its nature ) proposed here by @Felipe Faria made me to post this answer.
I will forget now about all HFT-trading-tricks and just decompose the concept of latency masking ( asking 150+ API calls across a global internet for some data is by far not a true [PARALLEL]
process-flow organisation ).
The example.com
url-target, used in the simplified test code, looks for my test-site having about some ~ 104-116 [ms]
network transport-latency. So my side has about that amount of CPU-idle time once each request has been dispatched over the network ( and there will never be an answer arriving sooner than that ~ 100 ms
).
Here, the time, the ( principally that very loooooooooooong ) latency, can become hidden right by letting the CPU handle more threads do another request, as the one that have already sent one, no matter what, have to wait. This is called a latency-masking and it may help reduce the end-to-end run-time, even inside GIL-stepped pythonic threads ( that otherwise must have been for years fully avoided in the true and hardcore HPC-grade parallel-code ). For details, one may read about GIL-release time, and one may also deduce, or observe in test, the upper-limit of such latency-masking, if there are going to be way more requests in the salvo, than there are GIL-lock thread switching ( forced transfers of execution ), than one's actual network transport-latency.
So the latency masking tricks de-masked:
The simplified experiment has shown, that the fired salvo of 25 test calls took ~ 273 [ms]
in batch,
whereas each of the 25, latency-masked, calls has taken ~ 232.6-266.9 [ms]
i.e. the responses were heavily latency-masked, being just loosely concurrently monitored from "outside" of their respective context-managers by the orchestrating tooling inside the event-loop async
/ await
mechanics, for their respective async completion.
The powers of the latency-masking could be seen from the fact, that the first call launch_id:< 0>
to the API has finished as the last but one (!)
This was possible as the url-retrieve process takes so long without having anything to do with the local CPU-workload ( which is IDLE until anything gets there-and-back to first start any processing on the fetched data ).
This is also the reason for which latency-masking does not help "so impressively well" for processes, where each [ns]
-shaving is in place, like the said HPC-processing or in HFT-trading engines.
>>> pass; anAsyncEventLOOP = asyncio.get_event_loop()
>>> aClk.start(); anAsyncEventLOOP.run_until_complete( mainAsyncLoopPAYLOAD_wrapper( anAsyncEventLOOP, 25 ) );aClk.stop()
Now finished urlGetCOROUTINE(launch_id:<11>) E2E execution took 246193 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<21>) E2E execution took 247013 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 2>) E2E execution took 237278 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<20>) E2E execution took 247111 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<23>) E2E execution took 252462 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<16>) E2E execution took 237591 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 1>) E2E execution took 243398 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 9>) E2E execution took 232643 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 6>) E2E execution took 247308 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<17>) E2E execution took 250773 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<24>) E2E execution took 245354 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<10>) E2E execution took 259812 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<13>) E2E execution took 241707 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 3>) E2E execution took 258745 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 4>) E2E execution took 243659 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<18>) E2E execution took 249252 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 8>) E2E execution took 245812 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<12>) E2E execution took 244684 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 5>) E2E execution took 257701 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<15>) E2E execution took 243001 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 7>) E2E execution took 256776 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<22>) E2E execution took 266979 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<14>) E2E execution took 252169 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:< 0>) E2E execution took 263190 [us](Safety anAsyncTIMEOUT was set 10 [s])
Now finished urlGetCOROUTINE(launch_id:<19>) E2E execution took 247591 [us](Safety anAsyncTIMEOUT was set 10 [s])
273829
pass; import aiohttp, asyncio, async_timeout
from zmq import Stopwatch
async def urlGetCOROUTINE( aSESSION, anURL2GET, aCoroID = -1, anAsyncTIMEOUT = 10 ):
aLocalCLK = Stopwatch()
res = ""
############################################# SECTION-UNDER-TEST
aLocalCLK.start() ##############################################
with async_timeout.timeout( anAsyncTIMEOUT ):# RESPONSE ######## TIMEOUT-PROTECTED
async with aSESSION.get( anURL2GET ) as aRESPONSE:
while True:
pass; aGottenCHUNK = await aRESPONSE.content.read( 1024 )
if not aGottenCHUNK:
break
res += str( aGottenCHUNK )
await aRESPONSE.release()
################################################################ TIMEOUT-PROTECTED
aTestRunTIME_us = aLocalCLK.stop() ########## SECTION-UNDER-TEST
print( "Now finished urlGetCOROUTINE(launch_id:<{2: >2d}>) E2E execution took {0: >9d} [us](Safety anAsyncTIMEOUT was set {1: >2d} [s])".format( aTestRunTIME_us, anAsyncTIMEOUT, aCoroID ) )
return ( aTestRunTIME_us, len( res ) )
async def mainAsyncLoopPAYLOAD_wrapper( anAsyncLOOP_to_USE, aNumOfTESTs = 10, anUrl2GoGET = "http://example.com" ):
'''
aListOfURLs2GET = [ "https://www.irs.gov/pub/irs-pdf/f1040.pdf",
"https://www.forexfactory.com/news",
...
]
'''
async with aiohttp.ClientSession( loop = anAsyncLOOP_to_USE ) as aSESSION:
aBlockOfAsyncCOROUTINEs_to_EXECUTE = [ urlGetCOROUTINE( aSESSION, anUrl2GoGET, launchID ) for launchID in range( min( aNumOfTESTs, 1000 ) ) ]
await asyncio.gather( *aBlockOfAsyncCOROUTINEs_to_EXECUTE )
Epilogue: the same work may take 5x longer ...
All the run-time times are in [us].
Both the Process- and Thread-based forms of a just-[CONCURRENT]
-processing have accumulated immense instantiation overheads and results-collection and transfer overheads ( the threading with additional, indeterministic variability of run-time ), whereas the pure-[SERIAL]
process-flow was by far the fastest and the most efficient way to get the job done. For larger f
-s these overheads will grow beyond all limits and may soon introduce O/S swapping and other system-resources deteriorating side-effects, so be careful.
602283L _ _ _ _ _ _ _ _ _
>>> aClk.start(); len( str( Parallel( n_jobs = -1 )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 512459L [PAR] QUAD-CORE .multiprocessing
>>> aClk.start(); len( str( Parallel( n_jobs = -1 )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 511655L
>>> aClk.start(); len( str( Parallel( n_jobs = -1 )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 506400L
>>> aClk.start(); len( str( Parallel( n_jobs = -1 )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 508031L
>>> aClk.start(); len( str( Parallel( n_jobs = -1 )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 514377L _ _ _ _ _ _ _ _ _
>>> aClk.start(); len( str( Parallel( n_jobs = 1 )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 123185L [PAR] SINGLE-CORE
>>> aClk.start(); len( str( Parallel( n_jobs = 1 )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 122631L
>>> aClk.start(); len( str( Parallel( n_jobs = 1 )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 125139L
>>> aClk.start(); len( str( Parallel( n_jobs = 1 )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 124358L _ _ _ _ _ _ _ _ _
>>> aClk.start(); len( str( Parallel( n_jobs = -1, backend = 'threading' )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 213990L [PAR] QUAD-CORE .threading
>>> aClk.start(); len( str( Parallel( n_jobs = -1, backend = 'threading' )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 201337L
>>> aClk.start(); len( str( Parallel( n_jobs = -1, backend = 'threading' )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 199485L
>>> aClk.start(); len( str( Parallel( n_jobs = -1, backend = 'threading' )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 198174L
>>> aClk.start(); len( str( Parallel( n_jobs = -1, backend = 'threading' )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 169204L
>>> aClk.start(); len( str( Parallel( n_jobs = -1, backend = 'threading' )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 168658L
>>> aClk.start(); len( str( Parallel( n_jobs = -1, backend = 'threading' )( delayed( np.math.factorial ) ( 2**f ) for f in range( 14 ) ) [-1] ) ); aClk.stop() 28504 171793L _ _ _ _ _ _ _ _ _
>>> aClk.start(); len( str( [ np.math.factorial( 2**f ) for f in range( 14 ) ] [-1] ) ); aClk.stop() 28504 121401L [SEQ] SINGLE-CORE
126381L