1

input.csv:

200,300,889,767,9908,7768,9090

300,400,223,4456,3214,6675,333

234,567,890

123,445,667,887

What I want: Read input file and compare with set "123,200,300" if match found, gives matching data 200,300 (from 1 input line)

300 (from 2 input line)

123 (from 4 input line)

What I wrote:

  import org.apache.spark.{SparkConf, SparkContext}
  import org.apache.spark.rdd.RDD

  object sparkApp {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("CountingSheep")
    val sc = new SparkContext(conf)

    def parseLine(invCol: String) : RDD[String]  = {
      println(s"INPUT, $invCol")
      val inv_rdd = sc.parallelize(Seq(invCol.toString))
      val bs_meta_rdd = sc.parallelize(Seq("123,200,300"))
      return inv_rdd.intersection(bs_meta_rdd)
    }

    def main(args: Array[String]) {
      val filePathName = "hdfs://xxx/tmp/input.csv"
      val rawData = sc.textFile(filePathName)
      val datad = rawData.map{r => parseLine(r)}
    }
  }

I get the following exception:

java.lang.NullPointerException

Please suggest where I went wrong

Community
  • 1
  • 1
  • 1
    How did you run your program? Have you Your compiled and packaged the jar already? – giaosudau Jun 16 '16 at 13:54
  • Yes I compiled and package the jar :: spark-submit --class "sparkApp" --master local --num-executors 2 --driver-memory 1g --executor-memory 1g --executor-cores 1 /home/spark_app/sparkApp/target/scala-2.10/sparkapp_2.10-1.0.jar – Manish Saraf Bhardwaj Jun 16 '16 at 14:08
  • When I ran the code, I got the exception – Manish Saraf Bhardwaj Jun 16 '16 at 14:10
  • can you put your maven or sbt file here? – giaosudau Jun 16 '16 at 14:13
  • simple.sbt name := "SparkApp" version := "1.0" scalaVersion := "2.10.3" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.5.2", "org.apache.spark" %% "spark-sql" % "1.5.2", "org.apache.spark" %% "spark-hive" % "1.5.2", "mysql" % "mysql-connector-java" % "5.1.28" ) OR anything else is requried? – Manish Saraf Bhardwaj Jun 16 '16 at 14:17
  • Seem your sbt file is not good. Because I already compiled and ran code in my machine already (ran well). I put you a reference here https://gist.github.com/giaosudau/e62ab2261de9f2b740f7d61bcba76314 – giaosudau Jun 16 '16 at 14:28
  • Error while compiling code [hdfs@app193 sparkApp]$ sbt compile /home/spark_app/sparkApp/simple.sbt:50: error: not found: value assemblyJarName assemblyJarName in assembly := "impression.jar" ^ [error] Type error in expression Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore? – Manish Saraf Bhardwaj Jun 16 '16 at 14:45
  • you should use sbt assembly plugin https://github.com/sbt/sbt-assembly – giaosudau Jun 16 '16 at 14:46
  • So do you get an exception, or a type error? You've said both now. If exception, please provide the stack trace – The Archetypal Paul Jun 16 '16 at 14:49
  • @TheArchetypalPaul the OP is trying to run actions on RDDs (SPARK-5063) I was watching the conversation to see where they were heading. – eliasah Jun 16 '16 at 14:52
  • 1
    Ah yes, looked at the code now! And yet @giaosudau says the code compiled and ran! – The Archetypal Paul Jun 16 '16 at 14:56
  • @TheArchetypalPaul that's why I didn't comment before. I'm sorry, I'm wicked sometimes :) – eliasah Jun 16 '16 at 14:58
  • SO How we can achieve ? – Manish Saraf Bhardwaj Jun 16 '16 at 17:20
  • 1
    How do we achieve what ? Your code doesn't make much sense. Would you care to explain what you are trying to do with you code ? – eliasah Jun 16 '16 at 18:08

3 Answers3

2

Problem is solved. This is very simple.

val pfile = sc.textFile("/FileStore/tables/6mjxi2uz1492576337920/input.csv")
case class pSchema(id: Int, pName: String)
val pDF = pfile.map(_.split("\t")).map(p => pSchema(p(0).toInt,p(1).trim())).toDF()
pDF.select("id","pName").show()

enter image description here

Define UDF

val findP = udf((id: Int,
                    pName: String
                    ) => {
  val ids = Array("123","200","300")
  var idsFound : String = ""
  for (id  <- ids){
    if (pName.contains(id)){
      idsFound = idsFound + id + ","
    }
  }
  if (idsFound.length() > 0) {
    idsFound = idsFound.substring(0,idsFound.length -1)
  }    
 idsFound
})

Use UDF in withCoulmn()

pDF.select("id","pName").withColumn("Found",findP($"id",$"pName")).show()

enter image description here

1

What you are trying to do can't be done the way you are doing it.

Spark does not support nested RDDs (see SPARK-5063).

Spark does not support nested RDDs or performing Spark actions inside of transformations; this usually leads to NullPointerExceptions (see SPARK-718 as one example). The confusing NPE is one of the most common sources of Spark questions on StackOverflow:

I think we can detect these errors by adding logic to RDD to check whether sc is null (e.g. turn sc into a getter function); we can use this to add a better error message.

eliasah
  • 39,588
  • 11
  • 124
  • 154
1

For simple answer, why we are making it so complex? In this case we don't require UDF.

This is your input data:

200,300,889,767,9908,7768,9090|AAA
300,400,223,4456,3214,6675,333|BBB
234,567,890|CCC
123,445,667,887|DDD

and you have to match it with 123,200,300

val matchSet = "123,200,300".split(",").toSet
val rawrdd = sc.textFile("D:\\input.txt")
rawrdd.map(_.split("|"))
      .map(arr => arr(0).split(",").toSet.intersect(matchSet).mkString(",") + "|" + arr(1))
      .foreach(println)

Your output:

300,200|AAA
300|BBB
|CCC
123|DDD
Souvik
  • 377
  • 4
  • 16
  • You split input line with "," 200,300,889,767,9908,7768,9090. As per requriement this is a single column. Dont split it. If you want to split, use \t – Manish Saraf Bhardwaj Apr 19 '17 at 09:25
  • EI know that @Manish, this is a single column. Please try to understand my code. I just concise your code in terms of performance as you know user defined function is expensive in spark. Try to run this code. Hope, it will resolved your issue. – Souvik Apr 19 '17 at 09:45
  • Hi @ManishSaraf , I have just updated this code. Please check and let me know if you have any issue. – Souvik Apr 19 '17 at 10:02