I working on spark streaming job in which incoming stream join with existing hive table. I have created a singleton hiveContext. When hiveContext fetch the table data from hive, spark give warning and after few day warning converts into error.
18/03/10 15:55:28 INFO parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs://nameservice1/user/hive/warehouse/iot.db/iotdevice/part-r-00000-931d1d81-af03-41a4-b659-81a883131289.gz.parquet start: 0 end: 5695 length: 5695 hosts: []}
18/03/10 15:55:28 WARN security.UserGroupInformation: PriviledgedActionException as:svc-ra-iotloaddev (auth:SIMPLE) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt) 18/03/10 15:55:28 WARN kms.LoadBalancingKMSClientProvider: KMS provider at [https://iotserver9009.kd.iotserver.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!
It will stop the job after some day.
Here is code for creating hivecontext
@transient private var instance: HiveContext = _
def getHiveContext(sparkContext: SparkContext, propertiesBroadcast: Broadcast[Properties]): HiveContext = {
synchronized {
val configuration = new Configuration
configuration.addResource("/etc/hadoop/conf/hdfs-site.xml")
UserGroupInformation.setConfiguration(configuration)
UserGroupInformation.getCurrentUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS)
val secure = propertiesBroadcast.value.getProperty("kerberosSecurity").toBoolean
if (instance == null) {
UserGroupInformation.loginUserFromKeytabAndReturnUGI(
propertiesBroadcast.value.getProperty("hadoop.kerberos.principal"), sparkContext.getConf.get("spark.yarn.keytab"))
.doAs(new PrivilegedExceptionAction[HiveContext]() {
@Override
def run(): HiveContext = {
System.setProperty("hive.metastore.uris", propertiesBroadcast.value.getProperty("hive.metastore.uris"));
if (secure) {
System.setProperty("hive.metastore.sasl.enabled", "true")
System.setProperty("hive.metastore.kerberos.keytab.file", sparkContext.getConf.get("spark.yarn.keytab"))
System.setProperty("hive.security.authorization.enabled", "false")
System.setProperty("hive.metastore.kerberos.principal", propertiesBroadcast.value.getProperty("hive.metastore.kerberos.principal"))
System.setProperty("hive.metastore.execute.setugi", "true")
}
instance = new HiveContext(sparkContext)
instance.setConf("spark.sql.parquet.writeLegacyFormat", "true")
instance.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
instance.setConf("hive.exec.dynamic.partition", "true")
instance.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
instance
}
})
}
UserGroupInformation.loginUserFromKeytabAndReturnUGI(
propertiesBroadcast.value.getProperty("hadoop.kerberos.principal"), sparkContext.getConf.get("spark.yarn.keytab"))
.doAs(new PrivilegedExceptionAction[HiveContext]() {
@Override
def run(): HiveContext = {
instance
}
})
}
}