I am trying to perform Lookup for messages arriving in a Kafka topic (say "datatopic"). Lookup source is another Kafka Topic (say "lookuptopic"). To do this using faust, I created a Table using lookuptopic and created an agent to update this table with new messages. within the same App, I created another agent to peform the lookup (based on a common ID attribute). With only few 1000's of records in lookup table performance is little slow - 8 ms per record which would be like 125 records per second. I just wanted to validate if I am doing it the right way (apologies I couldn't go through entire faust documentation).
Here is the code
import faust,time
from faust import current_event
app = faust.App(
'lookup_table_v1',
broker='kafka://localhost:9092',
topic_partitions=4,store='rocksdb://'
)
class lookupSchema(faust.Record):
id: str
uuid: str
class tableSchema(faust.Record):
uuid: str
class dataSchema(faust.Record):
id: str
lookup_topic = app.topic('lookuptopic', value_type=lookupSchema)
data_topic = app.topic('datatopic', value_type=dataSchema)
lookup_table = app.Table('lookup_table')
# Agent to Update Lookup Table
@app.agent(lookup_topic)
async def count_lookup_table(lookup_stream):
async for record in lookup_stream:
lookup_table[record.id] = tableSchema.loads({"uuid":record.uuid})
# Agent to Perform Lookups for Data arriving in Data Topic
@app.agent(data_topic)
async def perform_lookup(data_stream):
async for data in data_stream:
event = current_event()
print('found:'+lookup_table[data.id].to_representation()["uuid"]+'--> ms:'+str(round((time.time()-event.message.timestamp)*1000)))