3

I need to write to external HDFS cluster whose authentication details are available for both simple as well as kerberos authentication. For the sake of simplicity, lets assume we are dealing with simple authentication.

This is what I have:

  • External HDFS cluster connection details (host, port)
  • Authentication details (user for simple auth)
  • HDFS location where files need to be written (hdfs://host:port/loc)
  • Also, other details like format, etc.

Please note SPARK user is not same as user specified for HDFS auth.

Now, using the spark programming API, this is what I am trying to do:

val hadoopConf =  new Configuration()
hadoopConf.set("fs.defaultFS", fileSystemPath)
hadoopConf.set("hadoop.job.ugi", userName)
val jConf = new JobConf(hadoopConf)
jConf.setUser(user)
jConf.set("user.name", user)
jConf.setOutputKeyClass(classOf[NullWritable])
jConf.setOutputValueClass(classOf[Text])
jConf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])

outputDStream.foreachRDD(r => {
                val rdd = r.mapPartitions { iter =>
                    val text = new Text()
                    iter.map { x =>
                        text.set(x.toString)
                        println(x.toString)
                        (NullWritable.get(), text)
                    }
                }

                val rddCount = rdd.count()
                if(rddCount > 0) {
                    rdd.saveAsHadoopFile(config.outputPath, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], jConf)
                }
            })

Here, I was assuming that if we pass JobConf with correct details, it should be used for authentication and write should be done using the user specified in JobConf.

However, write still happens as the spark user ("root") irrespective of the auth details present in JobConf ("hdfs" as user). Below is the exception that I get:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=root, access=WRITE, inode="/spark-deploy/out/_temporary/0":hdfs:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1698)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1665)
at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3900)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:978)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:622)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy40.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:558)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy41.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3000)
... 45 more

Please let me know, if there are any suggestions.

A.G.
  • 734
  • 2
  • 8
  • 27
  • Is root the user that ran spark-submit? You're not editing the jobConf of the SparkSession, you've made a new one – OneCricketeer Apr 29 '18 at 06:49
  • yes.. spark-submit is ran by root user – A.G. Apr 29 '18 at 06:50
  • 1
    Can you `export HADOOP_USER_NAME=hdfs`, then run Spark submit? – OneCricketeer Apr 29 '18 at 06:51
  • I am refraining to do that as it impacts the sparkContext globally, I suppose. And I just want to write something to hdfs as "hdfs" user... Isn't there a way we can pass on authentication details in hadoop conf to spark api and it should take care of writing to hdfs as that user? Another reason of not to do that as I might want to write to multiple hdfs clusters simultaneously with different host/port/auth. – A.G. Apr 29 '18 at 06:56
  • Hadoop isn't secure by default without Kerberos. Just exporting the variable will pass along the username it writes as. And I've never used Spark against more than a single NameNode at once, so can't help you there – OneCricketeer Apr 29 '18 at 07:03
  • No problem. Exporting the variable means spark will use that user only for all the calls which might not be correct in this case. There should be a way as it does not seem to be such an unusual case. Only thing is I am struggling to figure it out. – A.G. Apr 29 '18 at 07:07
  • IMO, you shouldn't be running code as root. You would have Unix service accounts mapped to HDFS users/groups, then you don't need to modify permissions at runtime – OneCricketeer Apr 29 '18 at 07:11
  • Similar solution here https://stackoverflow.com/a/45794709/2308683 – OneCricketeer Apr 29 '18 at 07:14
  • By default, the Hadoop `UserGroupInformation` is **static** and assumes a single "client" identity for your session, even when accessing multiple clusters. And that totally makes sense with Kerberos authentication. BTW look at the comment from Steve Loughran in https://stackoverflow.com/questions/42650562/access-a-secured-hive-when-running-spark-in-an-unsecured-yarn-cluster – Samson Scharfrichter Apr 29 '18 at 20:26
  • @SamsonScharfrichter Does that mean we cannot write to hdfs even when we have all the authentication details for the external cluster using Spark Programming API? SparkContext can deal only with single name-node? What is the use of JobConf that we pass along in the API methods? – A.G. Apr 30 '18 at 05:09
  • 1
    You would have to check the Spark source code -- and know a lot about Hadoop auth internals -- to be sure. But honestly, how many people want to do that? >>> My 2 cents: write down all your results in **local** filesystem, then use pure Java code to create multiple `UserGroupInformation` & HDFS `FileSystem` instances then upload each local file to its target cluster. Good luck; that stuff is poorly documented. – Samson Scharfrichter Apr 30 '18 at 07:48
  • Thanks @SamsonScharfrichter for the inputs. Lets see how it goes! – A.G. Apr 30 '18 at 07:54
  • If you have root access in the machine you make your Spark Submit you can do: sudo -u hdfs /usr/bin/spark2-submit ... to change the user you run the submit if you don't want to use: export HADOOP_USER_NAME – Michail N May 17 '18 at 09:58
  • Unfortunaltely, we do not have root access to machine and hdfs cluster is being accessed programatically – A.G. May 17 '18 at 11:17

1 Answers1

3

This is probably more a comment than an answer but as it is too long I put it here. I haven't tried this because I have no environment to test it. Please try and let me know if this works (and if it doesn't I'll remove this answer).

Looking a bit into the code it looks like DFSClient creates a proxy using createProxyWithClientProtocol that uses UserGroupInformation.getCurrentUser() (I haven't traced the createHAProxy branch down but I suspect the same logic there). Then this info is sent to the server for authentication.

It means that you need to change what UserGroupInformation.getCurrentUser() returns in the context of your particular call. This is what UserGroupInformation.doAs is supposed to do so you just need to get a proper UserGroupInformation instance. And in the case of simple authentication UserGroupInformation.createRemoteUser might actually work.

So I suggest trying something like this:

...
val rddCount = rdd.count()
if(rddCount > 0) {
    val remoteUgi = UserGroupInformation.createRemoteUser("hdfsUserName")
    remoteUgi.doAs(() => { rdd.saveAsHadoopFile(config.outputPath, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], jConf) })
}
SergGr
  • 23,570
  • 2
  • 30
  • 51