0

What is the proper way to write TIMESTAMP into PostgreSQL using Apache Beam (Direct Runner)? I couldn't find this documented anywhere. I tried formatting date into rfc3339 string as below and writing using Python SDK apache_beam.io.jdbc.WriteToJdbc to no avail. My pipline fails with the following error:

Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO beam_direct_load VALUES('Product_0993', 'Whse_J', 'Category_028', '2012-07-27T00:00:00', 100) was aborted: ERROR: column "date" is of type timestamp without time zone but expression is of type character varying

The table is defined as follows:

CREATE TABLE IF NOT EXISTS public.beam_direct_load(
    product_code VARCHAR(255),  
    warehouse VARCHAR(255),
    product_category VARCHAR(255),
    date TIMESTAMP,
    order_demand INTEGER
);

I have registered coder for ProductDemand as such:

class ProductDemand(typing.NamedTuple):
    product_code: str
    warehouse: str
    product_category: str
    date: str
    order_demand: int

coders.registry.register_coder(ProductDemand, coders.RowCoder)

And my Pipeline is defined as below:

(
    pipeline
    | 'ExtractFromText' >> ReadFromText(input_file, skip_header_lines=1)
    | 'Split' >> Map(lambda x: [element.strip() for element in x.split(',')])
    | 'DropNA' >> Filter(lambda x: x[3] != 'NA' )
    | 'FormatData' >> Map(lambda x: 
                                [
                                    x[0], 
                                    x[1], 
                                    x[2], 
                                    datetime.strftime(datetime.strptime(x[3], '%Y/%m/%d'), '%Y-%m-%dT%H:%M:%S'), 
                                    int(x[4].replace('(', '').replace(')', ''))
                                ]
                            )
    | 'MapToDBRow' >> Map(lambda x: ProductDemand(product_code=x[0], warehouse=x[1], product_category=x[2], date=x[3], order_demand=x[4])).with_output_types(ProductDemand)
    | 'LoadToPostgres' >> WriteToJdbc
    (
        table_name='beam_direct_load',
        driver_class_name='org.postgresql.Driver',
        jdbc_url='jdbc:postgresql://localhost:5432/{}'.format(pg_db),
        username=pg_username,
        password=pg_password,
    )
)
LordBertson
  • 398
  • 4
  • 11
  • 1
    2012-07-27T00:00:00 is a valid timestamp, works fine in PostgreSQL, at least with my settings. What about date: str ? Doesn't Java have a better data type? (I'm not a Java programmer) – Frank Heikens Dec 17 '21 at 14:24
  • @FrankHeikens , thanks for looking into my issue. Yes, the very SQL insert from the error also works on my PostgreSQL when I execute it on database itself. It seems that Beam is using some abstraction on top of the SQL driver to insert the data. Nonetheless, I believe you are right, that `date: str` is probably the problem, issue is I can't find what is the proper way to do this different from that. – LordBertson Dec 17 '21 at 14:57
  • 1
    What you need is to add the time zone. I am not completely sure if this works, but I just looked at the `datetime` library that you are using, and you can also add the `timezone` library to do this approach. `datetime.strftime(datetime.strptime('2012/07/27','%Y/%m/%d').replace(tzinfo=timezone.utc).astimezone(tz=None),'%Y-%m-%dT%H:%M:%S')` Another approach is to hardcode the timezone such as: `datetime.strftime(datetime.strptime('2012/07/27','%Y/%m/%d'),'%Y-%m-%dT%H:%M:%S'+'-08')` here `-08` is PST time zone. – Jose Gutierrez Paliza Dec 17 '21 at 23:06
  • @JoseGutierrezPaliza thank you for suggestion, I've tried to include the timezone in the string timestamp as you suggest and the error remains unchanged. I suppose I will just fall-back to Python's `psycopg2` based write – LordBertson Dec 20 '21 at 06:39

1 Answers1

0

I tried add connection_properties for jdbc this worked for me. using datetime.astimezone(pytz.utc).strftime('%Y-%m-%d %H:%M:%S.%f'). but I don't know if it will have effect to another thing or not.

| 'write' >> WriteToJdbc(
                driver_class_name='org.postgresql.Driver',
                jdbc_url='<source>',
                username='<username>',
                password='<password>',
                table_name='<table>',
                connection_properties="stringtype=unspecified"
            ) 

source

kelv
  • 1
  • 2