2

I have a dataframe df like the following

+--------+--------------------+--------+------+
|      id|                path|somestff| hash1|
+--------+--------------------+--------+------+
|       1|/file/dirA/fileA.txt|      58| 65161|
|       2|/file/dirB/fileB.txt|      52| 65913|
|       3|/file/dirC/fileC.txt|      99|131073|
|       4|/file/dirF/fileD.txt|      46|196233|
+--------+--------------------+--------+------+

One note: The /file/dir differ. Not all files are stored in the same directory. In fact there a hundreds of files in various directories.

What I want to accomplish here is to read the file in the column path and count the records within the files and write the result of the row count into a new column of a dataframe.

I tried the following function and udf:

def executeRowCount(fileCount: String): Long = {
  val rowCount = spark.read.format("csv").option("header", "false").load(fileCount).count
  rowCount
}

val execUdf = udf(executeRowCount _)

df.withColumn("row_count", execUdf (col("path"))).show()

This results in the following error

org.apache.spark.SparkException: Failed to execute user defined fu
nction($anonfun$1: (string) => bigint)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:28)
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)
        ... 19 more

I tried to iterate through the column when collected like

val te = df.select("path").as[String].collect()
te.foreach(executeRowCount)

and here it works just fine, but I want to store the result within the df...

I've tried several solutions, but I'm facing a dead end here.

halfer
  • 19,824
  • 17
  • 99
  • 186
datanin
  • 31
  • 1
  • 5

4 Answers4

2

That does not work as the data frames can only be created in the driver JVM but the UDF code is run in executor JVMs. What you can do is to load the CSVs into a separate data frame and enrich the data with a file name column:

val csvs = spark
 .read
 .format("csv")
 .load("/file/dir/")
 .withColumn("filename", input_file_name())

and then join the original df on filename column

ollik1
  • 4,460
  • 1
  • 9
  • 20
  • Hey ollik1, The /file/dir differ. Not all files are stored in the same directory. In fact there a hundreds of files in various directories. I cannot load everything from hdfs in one step. files differ in size from some hundret mbs to some gbs.. – datanin Apr 01 '19 at 19:07
  • @datanin It is possible to define multiple locations https://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd . The file loading is lazy so it seems at least worth trying if it works well – ollik1 Apr 02 '19 at 06:00
  • a dataframe is a distributed collection of data, so I don't understand why I cannot use the UDF on the df. Nevertheless the workaround with a filename column is a good idea and I'll try that one. If it works, this will be my solution. Thanks – datanin Apr 04 '19 at 14:05
  • Hi ollik1, now I understand the problem with the driver/worker. Its not the dataframe but the UDF which is not serializable. – datanin Apr 04 '19 at 15:44
1

I fixed this issue in the following way:

val queue = df.select("path").as[String].collect()
val countResult = for (item <- queue) yield {
    val rowCount = (item, spark.read.format("csv").option("header", "false").load(item).count)
    rowCount
}

val df2 = spark.createDataFrame(countResult)

Afterwards I joined the df with df2...

The problem here is as @ollik1 mentioned within the driver/worker architecture on udfs. The UDF is not serializable, what I would need with the spark.read function.

halfer
  • 19,824
  • 17
  • 99
  • 186
datanin
  • 31
  • 1
  • 5
0

What about ? :

def executeRowCount = udf((fileCount: String) => {
  spark.read.format("csv").option("header", "false").load(fileCount).count
})

df.withColumn("row_count", executeRowCount(col("path"))).show()
maximous
  • 1
  • 1
0

May be something like that ?

  sqlContext
    .read
    .format("csv")
    .load("/tmp/input/")
    .withColumn("filename", input_file_name())
    .groupBy("filename")
    .agg(count("filename").as("record_count"))
    .show()