3

I am attempting to use dataflow to read a pubsub message and write it to big query. I was given alpha access by the Google team and have gotten the provided examples working but now I need to apply it to my scenario.

Pubsub payload:

Message {
    data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
    attributes: {}
}

Big Query Schema:

schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP',

My goal is to simply read the message payload and insert into bigquery. I am struggling with getting my head around the transformations and how should I map the key/values to the big query schema.

I am very new to this so any help is appreciated.

Current code:https://codeshare.io/ayqX8w

Thanks!

Andrew Mo
  • 1,433
  • 9
  • 12
glux
  • 532
  • 7
  • 23

3 Answers3

5

I was able to successfully parse the pubsub string by defining a function that loads it into a json object (see parse_pubsub()). One weird issue I encountered was that I was not able to import json at the global scope. I was receiving "NameError: global name 'json' is not defined" errors. I had to import json within the function.

See my working code below:

from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window

'''Normalize pubsub string to json object'''
# Lines look like this:
  # {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['mac']), (record['status']), (record['datetime'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  parser.add_argument(
      '--output_table', required=True,
      help=
      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:
    # Read the pubsub topic into a PCollection.
    lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(parse_pubsub)
                | beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq})
                | beam.io.WriteToBigQuery(
                    known_args.output_table,
                    schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
glux
  • 532
  • 7
  • 23
  • If you set save_main_session to true you should be able to import modules globally - example: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/pubsub/streaming-analytics/PubSubToGCS.py – Michael Weber Dec 02 '19 at 14:45
0

Data written to Python SDK's BigQuery sink should be in the form of a dictionary where each key of the dictionary gives a field of the BigQuery table and corresponding value gives the value to be written to that field. For a BigQuery RECORD type, value itself should be a dictionary with corresponding key,value pairs.

I filed a JIRA to improve documentation of corresponding python module in Beam: https://issues.apache.org/jira/browse/BEAM-3090

chamikara
  • 1,896
  • 1
  • 9
  • 6
  • Thanks for the feedback. After some more experimentation it appears the incoming pub/sub message is coming in as a string (obviously). I have to apply a transformation that converts the lines object into a dictionary. An error message I encountered in dataflow is: **Input type hint violation at Group: expected Tuple[TypeVariable[K], TypeVariable[V]], got ** – glux Oct 23 '17 at 20:33
0

I have a similar usecase (reading rows as strings from PubSub, converting them to dicts and then processing them).

I am using ast.literal_eval(), which seems to work for me. This command will evaluate the string, but in a safer way than eval() (see here). It should return a dict whose keys are strings, and values are evaluated to the most likely type (int, str, float...). You may want to make sure the values take the correct type though.

This would give you a pipeline like this

import ast
lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
            | "JSON row to dict" >> beam.Map(
                        lambda s: ast.literal_eval(s))
            | beam.io.WriteToBigQuery( ... )
        )

I have not used BigQuery (yet), so I cannot help you on the last line, but what you wrote seems correct at first glance.

Pascal Delange
  • 417
  • 5
  • 17