I have an application that reads a series of XML files containing logs of vehicles passages in a road. The application then processes each record, transform a few of the informations to match the database columns and inserts it into a cassandra database (running a single node in a remote server [it's in an internal network so connection isn't really an issue]). After inserting data in the database, the process for each file then goes on to read this data and produce information for summary tables, that leaves information ready for a drilldown analysis made in an unrelated part of the application.
I'm using multiprocessing to process many XML files in parallel, and the trouble I'm having is with communicating to the cassandra server. Schematically, the process goes as follows:
- Read record from XML file
- Process record's data
- insert processed data into the database (using
.execute_async(query)
) - repeat 1 to 3 until the XMl file is over
- Wait for the responses of all the insert queries I made
- Read data from the database
- Process the read data
- Insert the processed data in summary tables
Now, this is running smoothly in multiple parallel processes, until, when one process goes on to step 6, its request (that's made using .execute(query)
, meaning I'll wait for the response) is always facing a timeout. The error I receive is:
Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
File "C:\Users\Lucas\Miniconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\ImportacaoArquivosPNCT.py", line 231, in run
core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 336, in processa_equipamento
desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 206, in desvio_medias
veiculos = sessao_cassandra.execute(sql_pronto)
File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 1594, in execute
result = future.result(timeout)
File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 3296, in result
raise self._final_exception
ReadTimeout: code=1200 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'ONE'}
I have changed the timeout in the server to absurd amounts of time (500000000 ms for instance), and I have also attempted setting the timeout limit in the client, with .execute(query, timeout=3000)
but still, no success.
Now, when more processes hit the same problem and the intense writing from steps 1-3 in multiple processes stops, the last processes to get to step 6 have success in following the procedure, which makes me think the problem is that cassandra is giving priority to the tens of thousands of insert requests I'm asking per second and either ignoring my read request or putting it way back in the line.
A way to solve this, in my opinion, would be if in any way I could ask cassandra to give priority to my read request so that I can keep processing, even if that means slowing down the other processes.
Now, as a side note, you might think my process modelling is not optimal, and I'd love to hear opinions on that, but for the reality of this application this is, in our vision, the best way to proceed. So we have actually thought extensively about optimising the process, but (if the cassandra server can handle it) this is optimal for our reality.
So, TL;DR: Is there a way of giving priority to a query when executing tens of thousands of assynchronous queries? If not, is there a way of executing tens of thousands of insert queries and read queries per second in a way that the requests don't timeout? additionally, what would you suggest I do to solve the problem? run less processes in parallel is obviously a solution but one I'm trying to avoid. So, Would love to hear everyone's thoughts.
Storing the data while inserting so I don't need to read it again for summary is not a possibility because the XML files are huge and memory is an issue.