-1

I have dataflow pipeline, it's in Python and this is what it is doing:

  1. Read Message from PubSub. Messages are zipped protocol buffer. One Message receive on a PubSub contain multiple type of messages. See the protocol parent's message specification below:

    message BatchEntryPoint {
    
     /**
     * EntryPoint
     * 
     * Description: Encapsulation message
     */
    message EntryPoint {
    // Proto Message
    google.protobuf.Any proto = 1;
    
    // Timestamp
    google.protobuf.Timestamp timestamp = 4;
    }
    
    // Array of EntryPoint messages
    repeated EntryPoint entrypoints = 1;
    }
    

So, to explain a bit better, I have several protobuf messages. Each message must be packed in the proto field of the EntryPoint message, we are sending several messages at once because of MQTT limitations, that's why we then use a repeated field pointing to EntryPoint message on BatchEntryPoint.

  1. Parsing the received messages.

Nothing fancy here, just unzipping and unserializing the message we just read from the PubSub. to get 'humain readable' data.

  1. For Loop on BatchEntryPoint to evaluate each EntryPoint messages.

As Each messages on BatchEntryPoint can have different type, we need to process them differently

  1. Parsed message data

Doing different process to get all information I need and format it to a BigQuery readable format

  1. Write data to bigQuery

This is where my 'trouble' begin, so my code work but it is very dirty in my opinion and hardly maintainable. There is two things to be aware of.
Each message's type can be send to 3 different datasets, a r&d dataset, a dev dataset and a production dataset. let's say I have a message named System. It could go to:

  • my-project:rd_dataset.system
  • my-project:dev_dataset.system
  • my-project:prod_dataset.system

So this is what I am doing now:

console_records | 'Write to Console BQ' >> beam.io.WriteToBigQuery(
    lambda e: 'my-project:rd_dataset.table1' if dataset_is_rd_table1(e) else (
        'my-project:dev_dataset.table1' if dataset_is_dev_table1(e) else (
        'my-project:prod_dataset.table1' if dataset_is_prod_table1(e) else (
        'my-project:rd_dataset.table2' if dataset_is_rd_table2(e) else (
        'my-project:dev_dataset.table2' if dataset_is_dev_table2(e) else (
        ...) else 0

I have more than 30 different type of messages, making more of 90 lines for inserting data to big query.

Here is what a dataset_is_..._tableX method looks like:

def dataset_is_rd_messagestype(element) -> bool:
""" check if env is rd for message's type message """
valid: bool = False
is_type = check_element_type(element, 'MessagesType')
if is_type:
    valid = dataset_is_rd(element)
return valid

check_element_type Check that the message has the right type (ex: System).
dataset_is_rd looks like this:

def dataset_is_rd(element) -> bool:
    """ Check if dataset should be RD from registry id """
    if element['device_registry_id'] == 'rd':
        del element['device_registry_id']
        del element['bq_type']
        return True
    return False

The element as a key indicating us on which dataset we must send the message.

SO this is working as expected, But I wish I could do cleaner code and maybe reduce the amount of code to change in case of adding or deleting a type of message.

Any ideas?

Jason Aller
  • 3,541
  • 28
  • 38
  • 38
Kimor
  • 532
  • 4
  • 17
  • 1
    IMO all you need is to give a name to your lambda according to [WriteToBigQuery's documentation](https://beam.apache.org/releases/pydoc/2.28.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery) - I mean, as in _define_ it as a _function_ something like `get_table_destination_from_element` – vdolez Apr 15 '21 at 15:50

3 Answers3

0

How about using TaggedOutput.

Tudor Plugaru
  • 347
  • 2
  • 10
  • 1
    Thing is, I can't use a Pcollection to set my table because we can extract data from it – Kimor May 03 '21 at 09:44
0

Can you write something like this instead:

def dataset_type(element) -> bool:
    """ Check if dataset should be RD from registry id """
    dev_registry = element['device_registry_id']
    del element['device_registry_id']
    del element['bq_type']
    table_type = get_element_type(element, 'MessagesType')
    return 'my-project:%s_dataset.table%d' % (dev_registry, table_type)

And use that as the table lambda that you pass to BQ?

Pablo
  • 10,425
  • 1
  • 44
  • 67
  • Is this a good practice to change the `element` in this function? The changes from here will be written to BigQuery from what I tested a while ago... – Tudor Plugaru Apr 20 '21 at 11:19
  • You pmust not modify the element. and even more a simple method like this cannot treat PCollection data – Kimor May 03 '21 at 09:43
0

So I manage to create code to insert data to dynamic table by crafting the table name dynamically.

This is not perfect because I have to modify the element I pass to the method, however I am still very happy with the result, it has clean up my code from hundreds of line. If I have a new table, adding it would take one line on an array compare to 6 line in the pipeline before.

Here is my solution:

def batch_pipeline(pipeline):
    console_message = (
            pipeline
            | 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='sub1',
        with_attributes=True)
    )
    common_message = (
            pipeline
            | 'Get common\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='sub2',
        with_attributes=True)
    )
    jetson_message = (
            pipeline
            | 'Get jetson\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='sub3',
        with_attributes=True)
    )

 

message = (console_message, common_message, jetson_message) | beam.Flatten()
clear_message = message | beam.ParDo(GetClearMessage())
console_bytes = clear_message | beam.ParDo(SetBytesData())
console_bytes | 'Write to big query back up table' >> beam.io.WriteToBigQuery(
    lambda e: write_to_backup(e)
)
records = clear_message | beam.ParDo(GetProtoData())
gps_records = clear_message | 'Get GPS Data' >> beam.ParDo(GetProtoData())
parsed_gps = gps_records | 'Parse GPS Data' >> beam.ParDo(ParseGps())
if parsed_gps:
    parsed_gps | 'Write to big query gps table' >> beam.io.WriteToBigQuery(
        lambda e: write_gps(e)
    )
records | 'Write to big query table' >> beam.io.WriteToBigQuery(
    lambda e: write_to_bq(e)
)

So the pipeline is reading from 3 different pub sub, extracting the data and writing to big query.

The structure of an element use by WriteToBigQuery looks like this:

  obj = {
        'data': data_to_write_on_bq,
        'registry_id': data_needed_to_craft_table_name,
        'gcloud_id': data_to_write_on_bq,
        'proto_type': data_needed_to_craft_table_name
  }

and then one of my method used on the lambda on WriteToBigQuery looks like this:

def write_to_bq(e):
    logging.info(e)
    element = copy(e)
    registry = element['registry_id']
    logging.info(registry)
    dataset = set_dataset(registry) # set dataset name, knowing the registry, this is to set the environment (dev/prod/rd/...)
    proto_type = element['proto_type']
    logging.info('Proto Type %s', proto_type)
    table_name = reduce(lambda x, y: x + ('_' if y.isupper() else '') + y, proto_type).lower()
    full_table_name = f'my_project:{dataset}.{table_name}'
    logging.info(full_table_name)
    del e['registry_id']
    del e['proto_type']

    return full_table_name

And that's it, after 3 days of trouble !!

Kimor
  • 532
  • 4
  • 17