0

I wrote a code for daily batch operation in appache beam - data flow python code. I'm trying to make the date range runner. ie currently its running fine for yesterday - If I want to run from a start date to end date its difficult.Requesting you to suggest any method for that. Please find my code snippet for yesterday run.

start_date = '20180101'
end_date = '20190101'


p = beam.Pipeline(options=options)

read = (
        p
        | 'BQRead: ' >> BQReader(
    query=test_query.format(date=date))
)

transformed = (
        read
        | 'Transform 1 ' >> beam.ParDo(Transform1())
)

transformed | 'BQWrite' >> BQWriter(table + date, table_schema)

I tried like below but its not working

start_date = datetime.strptime('20190101', "%Y%m%d")
end_date = datetime.strptime('20190110', "%Y%m%d")
dates = list(rrule.rrule(rrule.DAILY, dtstart=start_date, until=end_date))

for date in dates:
    ds_nd = date.strftime('%Y%m%d')

    p = beam.Pipeline(options=options)

    read = (
        p
        | 'BQRead: ' >> BQReader(
    query=test_query.format(date=ds_nd))
    )

    transformed = (
        read
        | 'Transform 1 ' >> beam.ParDo(Transform1())
    )

    transformed | 'BQWrite' >> BQWriter(table + ds_nd, table_schema)
jabir
  • 1
  • 2
  • If I understand your question correct, could you have a for loop to create end_date - start_date + 1 pipelines, and each one execute for a date from [start_date, end_date]? – Rui Wang Jan 28 '19 at 07:12
  • @RuiWang I updated .. I tried like that but its not working – jabir Jan 28 '19 at 10:28
  • 1
    Although I don't know if Beam can have such for loop, there is a type of system you can use to create such multiple tasks: you can use Apache Airflow/Cloud Composer. I used to use Airflow to kick off hundred of spark batch jobs for some different dates, just like your case. – Rui Wang Jan 28 '19 at 18:58
  • https://stackoverflow.com/questions/54154816/using-dataflow-vs-cloud-composer might also help you understand what Apache Airflow/Cloud Composer can give. – Rui Wang Jan 28 '19 at 18:59
  • I dont want to run multiple dataflow jobs... I want to run the data range with a single dataflow job. @RuiWang – jabir Feb 02 '19 at 09:54
  • Is each date's data independent or you actually want flatten all those dates' data? – Rui Wang Feb 02 '19 at 20:18
  • Each date is independend @RuiWang – jabir Feb 04 '19 at 05:39
  • Each date is independent and will save to different date sharded tables - Basically, I just want to run my data from start date to end date @RuiWang – jabir Feb 04 '19 at 09:15
  • @RuiWang can you help me on this – jabir Feb 11 '19 at 09:36
  • Sorry. I don't have better idea at this moment. – Rui Wang Feb 11 '19 at 18:58

0 Answers0