I want to persist Spark Dataframe to Ignite. When I explored, I came across ignite-spark, which helps to do this. But currently ignite-spark works only with Spark 2.3, not Spark 2.4.
So I fallback to the traditional approach of
df.write.format("jdbc")
Now, my code looks like below.
df.write
.format("jdbc")
.option("url", "jdbc:ignite:thin://127.0.0.1:10800")
.option("dbtable", "sample_table")
.option("user", "ignite")
.option("password", "ignite")
.mode(SaveMode.Overwrite)
.save()
The problem I'm facing now is due to missing a primary key in my DataFrame which is mandatory for Ignite, Kindly suggest how to overcome this issue.
Error Stack Trace below:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" java.sql.SQLException: No PRIMARY KEY defined for CREATE TABLE
at org.apache.ignite.internal.jdbc.thin.JdbcThinConnection.sendRequest(JdbcThinConnection.java:750)
at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.execute0(JdbcThinStatement.java:212)
at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.executeUpdate(JdbcThinStatement.java:340)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:859)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at com.ev.spark.job.Ignite$.delayedEndpoint$com$ev$spark$job$Ignite$1(Ignite.scala:52)
at com.ev.spark.job.Ignite$delayedInit$body.apply(Ignite.scala:9)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.ev.spark.job.Ignite$.main(Ignite.scala:9)
at com.ev.spark.job.Ignite.main(Ignite.scala)
Edit:
I'm looking for a solution to create the table on the fly before persisting the DF. In my case, I already have one or more fields in my DF which somehow I have to communicate with Spark to use as a primary key for table creation.