2

I have dataframe that matches exactly like table except for primary key which is auto generated .

Below is my table

    ID         |FirstName |LastName    |CreOn      |CreBy
auto-generated |Varchar(20)|Varchar(20)| timestamp| Varchar(20)

Below is my dataframe

FirstName|LastName|CreOn    |CreBy
String   |String  |timestamp| String

When I use spark.jdbc.write with Mode 'append', I get below error

Permission denied for schema abc Position 14

How do we handle Autogenerated Fields.

Should I prefer using Python to do the JDBC operation as compared with Pyspark as I will have greater control over batch size and rollback

Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35
Vinay b
  • 139
  • 1
  • 2
  • 15
  • All the values you are going to insert are new and need a new id? Or some are new and other need to uppsert? Also, you have any kind of validation like unique in the 4 fields that you insert? – Alfilercio May 28 '20 at 11:45
  • I think this is not the spark related problem, please check the user that you use to write via spark jdbc has Authorization to write. Please ask your database administrator to provide access to the table using `GRANT USAGE ON SCHEMA abc TO username;` ref-https://tableplus.com/blog/2018/04/postgresql-how-to-grant-access-to-users.html – Som May 31 '20 at 01:12
  • I have observered that when database has 10 columns and dataframe has 9 i.e without primary key , spark tried to change schema by trying to create table – Vinay b May 31 '20 at 11:54

2 Answers2

0

It depends upon your data size....if the number of rows is huge python will most probably fail and you won't be able to load data.

One possible way to do this is use staging table. Load data from spark to your staging table. Then run a query to move data from staging to your main table using python libraries.

As far as batchsize is concerned, you can configure it in spark jdbc write also.

For really huge data I would suggest use SQOOP to move data as spark cannot form concurrent connections with db while sqoop can have establish multiple mapper to write data to your db.

Shubham Jain
  • 5,327
  • 2
  • 15
  • 38
  • Thanks but spring.jdbc.write tends to change table structure if columns of dataframe dont match with table order do not mismatch , i am struck with issue of autogenerated key i cannot include the column in dataframe hence write is failing now – Vinay b May 21 '20 at 07:33
  • for that let spark create the staging table and then move you data from staging to main table. – Shubham Jain May 21 '20 at 09:07
  • spark jobs will not be given permission to create any table , there is no otherway to handle autogenerated fields – Vinay b May 21 '20 at 10:30
0

When a dataframe is saved into a SQL database, Spark will only writes into those columns in the database that are present in the dataframe. So if the ID column is not part of the dataframe, Spark will ignore it during the insert operation.

The insert statement is created in JdbcUtils.getInsertStatement(), and here only columns that are part of the underlying rdd are included in the create statement:

val columns = 
      [...]
      rddSchema.fields.map { col =>
        val normalizedName = tableColumnNames.find(f => columnNameEquality(f, col.name)).getOrElse {
          throw new AnalysisException(s"""Column "${col.name}" not found in schema $tableSchema""")
        }
        dialect.quoteIdentifier(normalizedName)
      }.mkString(",")
      [...]
s"INSERT INTO $table ($columns) VALUES ($placeholders)"

For example given the table definition

create table address (
    id serial, 
    FirstName varchar(20), 
    LastName varchar(20),
    CreOn timestamp,
    CreBy varchar(20),
    constraint pk primary key (id))

and the Python code

df = spark.createDataFrame(
    [("John", "Doe", "1970-01-02 03:46:40", "py2")],
    ['FirstName','LastName','CreOn','CreBy']
)

df.write.mode("append").jdbc(<jdbc url>, "address",  \
                             properties={"driver":...,"user": ..., "password": ...})

Spark creates the insert statement

INSERT INTO address ("firstname","lastname","creon","creby") VALUES (?,?,?,?)

and the insert operation is successful.

So autogenerated fields should simply not be part of the dataframe and the error Permission denied is probably not related to the autogenerated field.

werner
  • 13,518
  • 6
  • 30
  • 45
  • Thanks , how can i do update operation ? if i use override mode it truncate whole table , i just want to udpate few rows – Vinay b May 28 '20 at 15:00
  • `mode("append")` will insert all rows into the table that are contained in the dataframe. So you decide what gets inserted into the database by crafting the content of the dataframe. But this is an aspect that is not related to an autogenerated field. – werner May 28 '20 at 17:03
  • my question was more towards update operation , is there a way to do update in pyspark – Vinay b May 28 '20 at 17:12
  • No, there is no update method in Spark. You would have to implement the logic yourself if you want a more detailled control what gets written into the database. You could check the answers to [this question](https://stackoverflow.com/q/34643200/2129801) for more details on that. – werner May 28 '20 at 17:22