I looking for some best practices/advice to handle processing CSV file for inserting into the database with a queue mechanism (Kafka)
So here what i will do :
Create a new SQL table Service Request
to store information of the user request like :
RequestID, Status, Payload, Response
as you can see i have field status
to indicate the request is succeed or failed
So here is the flow when user upload CSV File :
- Users submit a CSV file
- Validate the CSV File to make sure it used the correct template
- Upload CSV File to Google Cloud Storage and then create a new record on the table
Service Request
with RequestID and the Payload is URL of CSV File - Read all records on CSV File and send Queue to Kafka topic (with JSON payload)
On the consumer side :
- Listen all incoming Queue of the topic (Consume the Queue)
- Processing all the Queue
- If there is an error create a CSV file to store why this Queue Failed
- If all Queue of the RequestID XXX is finished then updated the
status
and set the response with a CSV file error list
So here is the question :
How do I know all Queue of the RequestID XXX is all consumed and I can update thestatus
?
I am using : Go + confluent-kafka-go library
Updates
After doing some research, I discovered that it should used Kafka Stream by implementing GroupByKey
, is that possible to do that in Go ? i cant find the kafka stream api from confluent-kafka-go