1

I have a JdbcCatalog initialized with H2 database in my local java code. It is able to create iceberg tables with proper schema and partition spec. When I create a spark session in the same class, it is unable to use the JdbcCatalog already created using iceberg API, it either sets up its own catalog or errors out initializing JdbcCatalog saying ICEBERG_TABLES exists ( which is expected as iceberg jdbc catalog already would have the meta tables )

I have set the configuration properties for spark session to use

  .appName("Iceberg Catalog Example")
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  .config("spark.sql.catalog.myIcebergCatalog", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.myIcebergCatalog.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog")
  .config("spark.sql.catalog.myIcebergCatalog.url", "MY JDBC H2 URI that is used wuth iceberg JDBCATALOG")
  .config("spark.sql.catalog.myIcebergCatalog.user", "myuser")
  .config("spark.sql.catalog.myIcebergCatalog.password", "mypassword")
  .config("spark.sql.catalog.myIcebergCatalog.catalog-name", "mycatalog")
  .config("spark.sql.catalog.myIcebergCatalog.default-database", "mydatabase")
  .getOrCreate()

Current Error

enter image description here

Error in text:


com.example.spark.common.CommonRuntimeException: Issue while inserting csv data into the table 

    at com.example.spark.iceberg.extract.ETLCSVExtract.extractData(ETLCSVExtract.java:107)
    at com.example.spark.iceberg.service.OverwriteByKeyTest2.testTableInsert(OverwriteByKeyTest2.java:276)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.apache.iceberg.jdbc.UncheckedSQLException: Cannot initialize JDBC catalog
    at org.apache.iceberg.jdbc.JdbcCatalog.initialize(JdbcCatalog.java:112)
    at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:237)
    at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:282)
    at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:129)
    at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:519)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
    at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
    at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122)
    at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:93)
    at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:109)
    at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:106)
    at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:295)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:295)
    at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:277)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
    at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:112)
    at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:65)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:62)
    at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:210)
    at scala.Option.orElse(Option.scala:447)
    at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:411)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:537)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:443)
    at com.example.spark.iceberg.extract.ETLCSVExtract.extractData(ETLCSVExtract.java:76)
    ... 70 more
Caused by: org.h2.jdbc.JdbcSQLSyntaxErrorException: Table "ICEBERG_TABLES" already exists; SQL statement:
CREATE TABLE iceberg_tables(catalog_name VARCHAR(255) NOT NULL,table_namespace VARCHAR(255) NOT NULL,table_name VARCHAR(255) NOT NULL,metadata_location VARCHAR(5500),previous_metadata_location VARCHAR(5500),PRIMARY KEY (catalog_name, table_namespace, table_name)) [42101-210]
    at org.h2.message.DbException.getJdbcSQLException(DbException.java:521)
    at org.h2.message.DbException.getJdbcSQLException(DbException.java:496)
    at org.h2.message.DbException.get(DbException.java:227)
    at org.h2.message.DbException.get(DbException.java:203)
    at org.h2.command.ddl.CreateTable.update(CreateTable.java:88)
    at org.h2.command.CommandContainer.update(CommandContainer.java:174)
    at org.h2.command.Command.executeUpdate(Command.java:252)
    at org.h2.jdbc.JdbcPreparedStatement.execute(JdbcPreparedStatement.java:254)
    at org.apache.iceberg.jdbc.JdbcCatalog.lambda$initializeCatalogTables$0(JdbcCatalog.java:135)
    at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)
    at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
    at org.apache.iceberg.jdbc.JdbcCatalog.initializeCatalogTables(JdbcCatalog.java:121)
    at org.apache.iceberg.jdbc.JdbcCatalog.initialize(JdbcCatalog.java:106)
    ... 136 more

What is the property I'm missing for spark session to accurately point it to use the already created jdbc catalog.

Thanks in advance.

Ishan Das
  • 11
  • 3
  • What results are you getting currently? – Farkhod Abdukodirov Jun 04 '23 at 01:29
  • Error is it cannot initialize Jdbc Catalog. ICEBERG_TABLES already exists. Added a screenshot of the error to the post. – Ishan Das Jun 04 '23 at 09:44
  • Error is - it cannot initialize Jdbc Catalog. ICEBERG_TABLES already exists. Added a screenshot of the error to the post. Standalone jdbccatalog without the spark session configuration pointing to the H2 gets created fine. My idea is to create the jdbccatalog first , then a iceberg table with it and then spark session pointing to the same catalog to read and write df. – Ishan Das Jun 04 '23 at 09:49
  • Please update the error message, it should be plain text format not as an image – Farkhod Abdukodirov Jun 04 '23 at 12:18
  • Error in text is now updated – Ishan Das Jun 05 '23 at 13:49
  • In the following error message part: `Caused by: org.h2.jdbc.JdbcSQLSyntaxErrorException: Table "ICEBERG_TABLES" already exists; SQL statement: ` can you double check the table `ICEBERG_TABLES` , because it's saying that table already exists?! – Farkhod Abdukodirov Jun 06 '23 at 09:44
  • Yes, that is probably because, 1. JdbcCatalog from org.apache.iceberg already has it created upon initialization and iceberg table creation. 2. Spark session is trying to re-initialize its own jdbcCatalog within the same H2 database and finds the table already created. I wanted the spark session to point to JdbcCatalog created in point 1. – Ishan Das Jun 07 '23 at 10:49
  • How do I force spark session to use the JdbcCatalog already created – Ishan Das Jun 07 '23 at 10:56

0 Answers0