I'm trying to interact with Iceberg tables stored on S3 via a deployed hive metadata store service. The purpose is to be able to push-pull large amounts of data stored as an Iceberg datalake (on S3). Couple of days further, documentation, google, stack overflow... just not coming right.
From Iceberg's documentation the only dependencies seemed to be iceberg-spark-runtime
, without guidelines from a pyspark
perspective, but this is basically how far I got:
- iceberg-spark-runtime with set metadata-store uri allowed me to make meta data calls like listing database etc. (metadata DB functionality - postgres)
- Trial-error jar additions to get through the majority of java ClassNotFound exceptions.
- After iceberg-hive-runtime-1.2.0.jar
Next exc > java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
- After iceberg-hive-runtime-1.2.0.jar,hadoop-aws-3.3.5.jar
Next exc > java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
- After adding iceberg-hive-runtime-1.2.0.jar,hadoop-aws-3.3.5.jar,aws-java-sdk-bundle-1.12.316.jar
Next exc > java.lang.NoClassDefFoundError: org/apache/hadoop/fs/impl/WeakRefMetricsSource
- After adding iceberg-hive-runtime-1.2.0.jar,hadoop-aws-3.3.5.jar,aws-java-sdk-bundle-1.12.316.jar,hadoop-common-3.3.5.jar
Next exc > org.apache.iceberg.exceptions.RuntimeIOException: Failed to open input stream for file Caused by: java.nio.file.AccessDeniedException
- From a Jupyter pod on k8s the s3 serviceaccount was added, and tested that interaction was working via boto3. From pyspark, table reads did however still raise exceptions with
s3.model.AmazonS3Exception: Forbidden
, until finding the correct spark config params that can be set (using s3 session tokens mounted into pod from service account) - Next exceptions was related to
java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding
, even though seeing that the functions are explicitly contained within the hadoop-common.jar by viewing the class source code
This is where I decided to throw in the towel, to ask if I'm going down completely the wrong rabbit hole, or what's going on. This is my current code with some of the example tests:
token : str = "-- jwt s3 session token --"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
conf = SparkConf()
conf.set("spark.jars", "iceberg-hive-runtime-1.2.0.jar,hadoop-aws-3.3.5.jar,aws-java-sdk-bundle-1.12.316.jar,hadoop-common-3.3.5.jar")
conf.set("hive.metastore.uris", "thrift://hivemetastore-hive-metastore:9083")
conf.set("fs.s3a.assumed.role.arn", "-- aws iam role --")
conf.set("spark.hadoop.fs.s3a.session.token", token)
conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.WebIdentityTokenCredentialsProvider")
sc = SparkContext( conf=conf)
spark = SparkSession.builder.appName("py sql").enableHiveSupport() \
.getOrCreate()
- checking hive version on the hive metastore pod (version being 3.1.3):
$ hive --version
# response
WARNING: log4j.properties is not found. HADOOP_CONF_DIR may be incomplete.
WARNING: log4j.properties is not found. HADOOP_CONF_DIR may be incomplete.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hive/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive 3.1.3
Git git://MacBook-Pro.fios-router.home/Users/ngangam/commit/hive -r 4df4d75bf1e16fe0af75aad0b4179c34c07fc975
Compiled by ngangam on Sun Apr 3 16:58:16 EDT 2022
From source with checksum 5da234766db5dfbe3e92926c9bbab2af
From this I'm able to:
# list databases
spark.catalog.listDatabases()
# print table schema
spark_df = spark.sql("select * from db_name.table_name")
spark_df.printSchema()
# show tables from database via sql, but not with pyspark function
# -> works
spark.sql("show tables from db_name").show()
# -> not work
spark.catalog.listTables('db_name')
# not able to interact - read data from the actual external s3 table
spark.read.format("iceberg")
spark.catalog.setCurrentDatabase('db_name')
spark.read.table("table_name")
Iceberg table S3 interfacing with the exception log being (from point 4):
Py4JJavaError: An error occurred while calling o44.table.
: java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(org.apache.hadoop.fs.statistics.DurationTracker, org.apache.hadoop.util.functional.CallableRaisingIOE)'
at org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435)
at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)
at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:282)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:326)
at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:427)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:545)
at java.base/java.io.DataInputStream.read(DataInputStream.java:151)
at org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:123)
at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
at org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1685)
at org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1084)
at org.apache.iceberg.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3714)
at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:273)
at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:266)
at org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$0(BaseMetastoreTableOperations.java:189)
at org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$1(BaseMetastoreTableOperations.java:208)
at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
at org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:208)
at org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:185)
at org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:180)
at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:176)
at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:47)
at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:124)
at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:111)
at org.apache.iceberg.mr.hive.HiveIcebergSerDe.initialize(HiveIcebergSerDe.java:84)
at org.apache.hadoop.hive.serde2.AbstractSerDe.initialize(AbstractSerDe.java:54)
at org.apache.hadoop.hive.serde2.SerDeUtils.initializeSerDe(SerDeUtils.java:533)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:453)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:440)
at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:281)
at org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:263)
at org.apache.hadoop.hive.ql.metadata.Table.getColsInternal(Table.java:641)
at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:624)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree2$1(HiveClientImpl.scala:448)
at org.apache.spark.sql.hive.client.HiveClientImpl.org$apache$spark$sql$hive$client$HiveClientImpl$$convertHiveTableToCatalogTable(HiveClientImpl.scala:447)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getTableOption$3(HiveClientImpl.scala:434)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getTableOption$1(HiveClientImpl.scala:434)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:298)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:229)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:228)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:278)
at org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:432)
at org.apache.spark.sql.hive.client.HiveClient.getTable(HiveClient.scala:95)
at org.apache.spark.sql.hive.client.HiveClient.getTable$(HiveClient.scala:94)
at org.apache.spark.sql.hive.client.HiveClientImpl.getTable(HiveClientImpl.scala:92)
at org.apache.spark.sql.hive.HiveExternalCatalog.getRawTable(HiveExternalCatalog.scala:122)
at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$getTable$1(HiveExternalCatalog.scala:729)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
at org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:729)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:515)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:500)
at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.loadTable(V2SessionCatalog.scala:66)
at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:311)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$3(Analyzer.scala:1202)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$1(Analyzer.scala:1201)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1193)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1064)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1028)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1028)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:987)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:231)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:227)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:227)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:212)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:211)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.DataFrameReader.table(DataFrameReader.scala:607)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)