2

I have a table named "mytable" in Postgres with two columns, id (bigint) and value (varchar(255)).

id gets its value from a sequence using nextval('my_sequence').

A PySpark application takes a dataframe and uses the Postgres JDBC jar (postgresql-42.1.4.jar) to insert the dataframe into "mytable". I'm creating the id column using:

df.withColumn('id', lit("nextval('my_sequence')"))

Postgres is interpreting the column as a 'varying character'.

I can see that there are ways for calling Postgres methods when reading data (How to remotely execute a Postgres SQL function on Postgres using PySpark JDBC connector?), but I'm not sure how to call a Postgres function like nextval() for writing data to Postgres.

Here's how I am currently writing the data from Pyspark to Postgres:

df.write.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", 'mytable') \
    .mode('append') \
    .save()

How can one write to a Postgres table using PySpark when one column needs a sequence number using nextval()?

zero323
  • 322,348
  • 103
  • 959
  • 935
Eka
  • 63
  • 2
  • 10

1 Answers1

1

TL;DR You cannot execute database code on insert unless you create your own JdbcDialect and override insert logic. I reckon it is not something you want to do for such a small feature.

Personally I would use trigger:

CREATE FUNCTION set_id() RETURNS trigger AS $set_id$
  BEGIN
    IF NEW.id IS NULL THEN
      NEW.id = nextval('my_sequence');
    END IF;
    RETURN NEW;
  END;
$set_id$ LANGUAGE plpgsql;

CREATE TRIGGER set_id BEFORE INSERT ON mytable
    FOR EACH ROW EXECUTE PROCEDURE set_id();

and leave the rest of the job to the database server.

df.select(lit(null).cast("bigint").alias("id"), col("value")).write
    ...

You could also use monotonically_increasing_id (Primary keys with Apache Spark) and just shift values according to the largest id in the database, but it might be brittle.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Now I would need the particular id per each of the rows which the database assigned. Is there a simple way to get it? – TechCrap Jan 31 '18 at 14:17
  • 1
    @SimeonKredatus Could you elaborate? – zero323 Jan 31 '18 at 20:51
  • Hey, thanks for a prompt reaction. Let's say I define a schema as follows: schema = StructType( [ StructField("id", IntegerType(), True), StructField("property1", IntegerType(), True)]), then I do some crunching and store it to database as follows: frame.write.option('mergeSchema', 'true').jdbc(url=url, table=table, mode='append', properties=properties). I do have a trigger in a database as you proposed, meaning DB assigns ID. Now I would need to fetch that id for each row, preferably without doing the select. Is there an easy way of getting that out without running a select? – TechCrap Feb 01 '18 at 11:41
  • The other option for my usecase is to compute a unique hash for each row and manage it out of database - meaning spark would always fill the database column, yet I would still prefer to have postgres in charge of the ID sequence. Once my spark job persists the dataset I would need it to emit a kafka event with the assigned id. – TechCrap Feb 01 '18 at 11:43
  • 1
    @SimeonKredatus So you wan to have cake an eat it too. You generate unique ID on Spark itself (see for example implementation of [MongoDB ObjectID](https://docs.mongodb.com/manual/reference/method/ObjectId/)). With PostgreSQL specific solution, you could listen to events there with `LISTEN` / `NOTIFY`, and propagate to Kafka directly. – zero323 Feb 01 '18 at 18:32