1

I want to persist Spark Dataframe to Ignite. When I explored, I came across ignite-spark, which helps to do this. But currently ignite-spark works only with Spark 2.3, not Spark 2.4.

So I fallback to the traditional approach of

df.write.format("jdbc")

Now, my code looks like below.

df.write
     .format("jdbc")
     .option("url", "jdbc:ignite:thin://127.0.0.1:10800")
     .option("dbtable", "sample_table")
     .option("user", "ignite")
     .option("password", "ignite")
     .mode(SaveMode.Overwrite)
     .save()

The problem I'm facing now is due to missing a primary key in my DataFrame which is mandatory for Ignite, Kindly suggest how to overcome this issue.

Error Stack Trace below:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" java.sql.SQLException: No PRIMARY KEY defined for CREATE TABLE
    at org.apache.ignite.internal.jdbc.thin.JdbcThinConnection.sendRequest(JdbcThinConnection.java:750)
    at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.execute0(JdbcThinStatement.java:212)
    at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.executeUpdate(JdbcThinStatement.java:340)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:859)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.ev.spark.job.Ignite$.delayedEndpoint$com$ev$spark$job$Ignite$1(Ignite.scala:52)
    at com.ev.spark.job.Ignite$delayedInit$body.apply(Ignite.scala:9)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.ev.spark.job.Ignite$.main(Ignite.scala:9)
    at com.ev.spark.job.Ignite.main(Ignite.scala)

Edit:

I'm looking for a solution to create the table on the fly before persisting the DF. In my case, I already have one or more fields in my DF which somehow I have to communicate with Spark to use as a primary key for table creation.

Manoj - GT
  • 83
  • 12

3 Answers3

1

Try to create an underlying Ignite table beforehand with Ignite DDL. Define some primary key, such as id. Then use Spark API to connect to Ignite and use this dynamically created Ignite table. Manually increment id and pass into DataFrames API. For instance, this Ignite API can be used for unique IDs generation.

As for the unsupported Spark 2.4 version, I've opened a ticket for Ignite community. Hopefully, the ticket will be taken into 2.7.6 release scheduled for August.

dmagda
  • 1,735
  • 8
  • 8
  • Yes, the only way I currently see is to upfront create the table with the required primary key. – Manoj - GT Aug 09 '19 at 06:55
  • Also, Thanks Denis for making the ticket as a blocker and helping to get ignite-spark to work with the latest spark version. Really appreciate it. – Manoj - GT Aug 10 '19 at 04:31
  • The chances are high that the ticket will be included in the August release. An Ignite committer is looking into you. Please feel free to follow up with us on Ignite dev list. – dmagda Aug 10 '19 at 13:48
  • Sure will do that. Thanks. – Manoj - GT Aug 10 '19 at 16:33
0

If all it needs a column having unique values (as a primary key), you can create it by yourself, save the dataframe and then drop that column from Ignite.

Please refer to this link (you can directly go to Directly with dataframe API) : Primary keys with Apache Spark

Hope it helps !

Goldie
  • 164
  • 12
  • Sadly, this won't work for me :( This works only if I have the table created upfront with the Primary key. I'm looking for a solution to create the table on the fly before persisting the DF. In my case, I already have one or more fields in my DF which somehow I have to communicate with Spark to use as a primary key for table creation. – Manoj - GT Aug 07 '19 at 14:46
  • Got the issue. you might have already read this (as you said that you already explored ignite-spark) but thought of sharing it anyways - https://apacheignite-fs.readme.io/docs/ignite-data-frame#section-ignite-data-frame-options – Goldie Aug 07 '19 at 15:12
  • Yes, It works but not supporting the latest Spark version :( That's why taken the JDBC route. – Manoj - GT Aug 07 '19 at 15:38
0

Spark contains several SaveModes that will be applied if the table that you are going to use exists:

* Overwrite - with this option you will try to re-create existed table or create new and load data there using IgniteDataStreamer implementation
* Append - with this option you will not try to re-create existed table or create new table and just load the data to existed table
* ErrorIfExists - with this option you will get the exception if the table that you are going to use exists
* Ignore - with this option nothing will be done in case if the table that you are going to use exists. If the table already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.

In your example, you try to store the data with re-creating of the cache BUT you don't provide the Ignite table details. Please try to add the next options when you use "Overwrite" save mode:

.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated")

https://apacheignite-fs.readme.io/docs/ignite-data-frame#section-saving-dataframes

Also, think about using the append mode to not re-create the table every time.

BR, Andrei

  • The option you have provided is using ignite-spark not using traditional spark JDBC persistence. As I have mentioned ignite-spark currently support Spark 2.3, not Spark 2.4. – Manoj - GT Aug 10 '19 at 04:23