4

I am trying to perform Lookup for messages arriving in a Kafka topic (say "datatopic"). Lookup source is another Kafka Topic (say "lookuptopic"). To do this using faust, I created a Table using lookuptopic and created an agent to update this table with new messages. within the same App, I created another agent to peform the lookup (based on a common ID attribute). With only few 1000's of records in lookup table performance is little slow - 8 ms per record which would be like 125 records per second. I just wanted to validate if I am doing it the right way (apologies I couldn't go through entire faust documentation).

Here is the code

import faust,time
from faust import current_event

app = faust.App(
    'lookup_table_v1',
    broker='kafka://localhost:9092',
    topic_partitions=4,store='rocksdb://'
)

class lookupSchema(faust.Record):
    id: str
    uuid: str

class tableSchema(faust.Record):
    uuid: str

class dataSchema(faust.Record):
    id: str


lookup_topic = app.topic('lookuptopic', value_type=lookupSchema)
data_topic = app.topic('datatopic', value_type=dataSchema)

lookup_table = app.Table('lookup_table')


# Agent to Update Lookup Table
@app.agent(lookup_topic)
async def count_lookup_table(lookup_stream):
    async for record in lookup_stream:
        lookup_table[record.id] = tableSchema.loads({"uuid":record.uuid}) 


# Agent to Perform Lookups for Data arriving in Data Topic
@app.agent(data_topic)
async def perform_lookup(data_stream):
  async for data in data_stream:
    event = current_event()
    print('found:'+lookup_table[data.id].to_representation()["uuid"]+'--> ms:'+str(round((time.time()-event.message.timestamp)*1000)))
some_user
  • 315
  • 2
  • 14
  • from the code it's not clear what's the source of `data_topic` – Ameida Jul 06 '21 at 20:53
  • well its an old question but it is a kafka topic see.. `data_topic = app.topic('datatopic', value_type=dataSchema)` . Cluster details are defined while creating app. – some_user Jul 07 '21 at 04:51
  • I see that this is a little old, so perhaps you already have a solution or figured something else out, but the message timestamp isn't the best time to use as it doesn't depend on when the message arrived at the place where you perform the lookup. Better to define a timestamp when you pull the message from the event stream and compare that to the lookup retrieval time. Also, I recently had some issues in getting accurate timestamps and found `time.perf_counter_ns()` to be the best as it seems to give better than the default 15.625ms resolution. – Fonty Aug 21 '23 at 05:04

0 Answers0