2

I have a few .txt files with data in JSON to be loaded to google BigQuery table. Along with the columns in the text files I will need to insert filename and current timestamp for each rows. It is in GCP Dataflow with Python 3.7

I accessed the Filemetadata containing the filepath and size using GCSFileSystem.match and metadata_list.

I believe I need to get the pipeline code to run in a loop, pass the filepath to ReadFromText, and call a FileNameReadFunction ParDo.

   (p
        | "read from file" >> ReadFromText(known_args.input)
        | "parse" >> beam.Map(json.loads)
        | "Add FileName" >>  beam.ParDo(AddFilenamesFn(), GCSFilePath)
        | "WriteToBigQuery" >> beam.io.WriteToBigQuery(known_args.output,          
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
     )

I followed the steps in Dataflow/apache beam - how to access current filename when passing in pattern? but I can't make it quite work.

Any help is appreciated.

rens
  • 43
  • 6

1 Answers1

1

You can use textio.ReadFromTextWithFilename instead of ReadFromText. That will produce a PCollection of (filename,line) tuples.

To include the file and timestamp in your output json record, you could change your "parse" line to

| "parse" >> beam.map(lambda (file, line): {
    **json.loads(line),
    "filename": file,
    "timestamp": datetime.now()})
danielm
  • 3,000
  • 10
  • 15
  • while it gets file name , throws error in the Json parsing step. | "read from file" >> ReadFromTextWithFilename(known_args.input) | "parse" >> beam.Map(json.loads) as it excepts a string to parse, with file name it will be a tuple. Also I need to add Current TimeStamp to each row and later an audit file with all files processed. That is why I thought of looping through all files. – rens Oct 07 '20 at 04:29
  • I've added some more detail to the answer – danielm Oct 07 '20 at 15:46
  • What could be wrong ,I get TypeError: () missing 1 required positional argument: 'line' [while running 'parse'] | "read from file" >> ReadFromTextWithFilename(input_bucket) | "parse" >> beam.Map(lambda file, line: {"row": json.loads(line),"filename": file,"timestamp": datetime.datetime.now()}) – rens Oct 08 '20 at 03:09
  • thanks @danielm , lamba function doesn’t automatically unpack a tuple into two arguments in python. fixed the parse step by using | "parse" >> beam.Map(lambda file: {"row": json.loads(file[1]),"filename": file[0]}) . so it appends the filename to the row. Writing to BQ fails as the output of the parse step is simialr to {'row': {'ClientID': '66.155' 'RuleID': 5073, 'Country': 'Italy'}, 'filename': 'gs://bucket-test/request.log'}, Any guidance on this? – rens Oct 08 '20 at 16:46
  • What failure do you get? You might just need to delete your table first, since the output schema will be different now that your json structure is different – danielm Oct 08 '20 at 18:00
  • this is the error message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.' reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/WaitForDestinationLoadJobs'] " Schema is good, I am successful in loading the file to same BQ without filename field (simply parse with jason.loads) . It looks like the output format isn't compatible. – rens Oct 08 '20 at 19:15
  • I think you need to flatten the json, instead of having the nested "row" field. Updated answer – danielm Oct 08 '20 at 21:22
  • no luck. instead of using lambda I called a custom function that appends filename to the dict and dump it as json like this filecontents = json.loads(file[1]) filecontents['filename'] = file[0]. With this the row format is clean but fails to load to BQ with same error. If I write the output to a file and then load the file to BQ then it succeeds. Any thoughts? – rens Oct 09 '20 at 21:56
  • Is there anything more in the error message? Dataflow is also doing the same thing of writing the output json to a file, then loading that file into BQ. – danielm Oct 09 '20 at 23:20
  • Fixed it, I had to do a json.loads before sending it to BQ. Thanks a lot. – rens Oct 10 '20 at 05:57