I am getting this error when I try 2 relational_db.Write from the same Pcollection.
Its very strange that sometimes it works, sometimes doesnt.
From persons_raw to persons_stage I added 3 new columns to the dict: tra_personalEmail, tra_companyDomain, tra_linkedInProfile
I tried to change the order, but didn't work.
Does anyone can help me?
Thanks in advance
Code:
with beam.Pipeline(options=pipeline_options) as p:
persons_raw = (
p
|'Create pipeline' >> beam.Create(['data.csv'])
|'Read multiline CSV' >> beam.FlatMap(lambda filename: csv.reader(io.TextIOWrapper(beam.io.filesystems.FileSystems.open(input_path),encoding='utf-8')))
|'Remove header' >> beam.Filter(lambda x: x[0] != 'FIRST NAME')
|'Raw schema' >> beam.Map(schema_persons_raw)
)
persons_raw_to_postgres = (
persons_raw
|'Persons_raw to DB' >> relational_db.Write(source_config=source_config,table_config=table_config_persons_raw)
)
persons_stage = (
persons_raw
|'Personal E-mail' >> beam.Map(tra_personal_email)
|'Company domain' >> beam.Map(tra_company_domain)
|'Linkedin url' >> beam.Map(tra_linkedin_url)
)
persons_stage_to_postgres = (
persons_stage
|'Persons_stage to DB' >> relational_db.Write(source_config=source_config,table_config=table_config_persons_stage)
)
Error:
sqlalchemy.exc.CompileError: Unconsumed column names: tra_personalEmail, tra_companyDomain, tra_linkedInProfile [while running 'Persons_raw to DB/ParDo(_WriteToRelationalDBFn)']