#Source Configuration
source_config = relational_db.SourceConfiguration(
drivername=CONFIG['drivername'],
host=CONFIG['host'],
port=CONFIG['port'],
database=CONFIG['database'],
username=CONFIG['username'],
password=CONFIG['password']
)
# Target database table
customer_purchase_config = relational_db.TableConfiguration(
name = 'customer_purchase',
create_if_missing = False,
primary_key_columns = ['purchaseId']
)
with beam.Pipeline(options=options) as p:
res = (
p
| "Read data from PubSub"
>> beam.io.ReadFromPubSub(subscription=SUB).with_output_types(bytes)
|'Transformation' >> (beam.ParDo(PubSubToDict()))
)
customer_purchase = res | beam.ParDo(Customer_Purchase())
customer_purchase | 'Writing to customer_purchase' >> relational_db.Write(
source_config=source_config,
table_config=customer_purchase_config
)
So when I am trying with these configuration I am able to insert and update data in PostgreSQL but when I receive huge spike in inputs then My connection limits are reaching and number of retries from worker nodes are increasing so is there any way to define the connection pool so that I can reuse connections.