0

I am getting this error when I try 2 relational_db.Write from the same Pcollection.

Its very strange that sometimes it works, sometimes doesnt.

From persons_raw to persons_stage I added 3 new columns to the dict: tra_personalEmail, tra_companyDomain, tra_linkedInProfile

I tried to change the order, but didn't work.

Does anyone can help me?

Thanks in advance

Code:

with beam.Pipeline(options=pipeline_options) as p:

persons_raw = (
p
|'Create pipeline' >> beam.Create(['data.csv'])
|'Read multiline CSV' >> beam.FlatMap(lambda filename: csv.reader(io.TextIOWrapper(beam.io.filesystems.FileSystems.open(input_path),encoding='utf-8')))
|'Remove header' >> beam.Filter(lambda x: x[0] != 'FIRST NAME')
|'Raw schema' >> beam.Map(schema_persons_raw)
)

persons_raw_to_postgres = (
persons_raw
|'Persons_raw to DB' >> relational_db.Write(source_config=source_config,table_config=table_config_persons_raw)
)

persons_stage = (
persons_raw
|'Personal E-mail' >> beam.Map(tra_personal_email)
|'Company domain' >> beam.Map(tra_company_domain)
|'Linkedin url' >> beam.Map(tra_linkedin_url)
)

persons_stage_to_postgres = (
persons_stage
|'Persons_stage to DB' >> relational_db.Write(source_config=source_config,table_config=table_config_persons_stage)
)

Error:

sqlalchemy.exc.CompileError: Unconsumed column names: tra_personalEmail, tra_companyDomain, tra_linkedInProfile [while running 'Persons_raw to DB/ParDo(_WriteToRelationalDBFn)']
Igor Gois
  • 73
  • 2
  • 9

1 Answers1

0

A similar question CompileError when trying to run Insert Statement on SQLAlchemy.

The source of the confusion is that when you create tables in Postgres, Postgres will automatically lower case your column names unless you use quotes when naming them.

ningk
  • 1,298
  • 1
  • 7
  • 7
  • Hi. Thank you for the answer. but I think that's not the case because when I comment one of the write db transforms, the other one works. Also, sometimes both works in the same run – Igor Gois Jun 14 '21 at 19:31