0

I got an exception while executing the below code snippet. The dataset i was working with is "stocks.csv" and which has the columns - date, symbol, volume, open, close, high, low and adjclose

    val stock = 
    sc.textFile("C:/Users/kondr/Desktop/stocks/stocks.csv")
    val splits = stock.map(record => record.split(","))
    val symvol = splits.map(arr => (arr(1),arr(2).toInt))
    val maxvol = symvol.reduceByKey((vol1,vol2) => 
    Math.max(vol1,vol2),1)
    maxvol.collect().foreach(println)

Error Message

21/05/05 14:09:31 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.NumberFormatException: For input string: "volume" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

  • Can you share your sample data? It looks like you have header in the csv file. why not read it directly as dataframe as ` val df = spark.read.csv("parh")` – koiralo May 05 '21 at 10:05
  • Hi @koiralo, Thanks a ton for your response. Here is the sample data. – Data Real World May 05 '21 at 10:17
  • date symbol volume open close high low adjclose 18-04-2019 A 2874100 75.73000336 76.16999817 76.54000092 75.30999756 76.16999817 17-04-2019 A 4472000 78.15000153 75.43000031 78.31999969 74.45999908 75.43000031 16-04-2019 A 3441500 80.81999969 77.55000305 80.95999908 77.19000244 77.55000305 15-04-2019 A 1627300 81 80.40000153 81.12999725 79.91000366 80.40000153 12-04-2019 A 1249300 81.43000031 80.98000336 82.05999756 80.90000153 80.98000336 – Data Real World May 05 '21 at 10:18
  • Hi @koiralo, I had tried with the .txt file format as well. here is the sample code where i have used "\t" for splitting. – Data Real World May 05 '21 at 10:27
  • In the first row you have string as `date symbol volume open close high low adjclose ` which cannot be cast to integer – koiralo May 05 '21 at 10:28
  • val stock = sc.textFile("C:/Users/kondr/Desktop/stocks.txt") val splits = stock.map(record => record.split("\t")) val symvol = splits.map(arr => (arr(1),arr(2).toInt)) val maxvol = symvol.reduceByKey((vol1,vol2) => Math.max(vol1,vol2),1) maxvol.collect().foreach(println) – Data Real World May 05 '21 at 10:28
  • okay, how do i fix this @koiralo – Data Real World May 05 '21 at 10:29
  • Does this answer your question? [What is a NumberFormatException and how can I fix it?](https://stackoverflow.com/questions/39849984/what-is-a-numberformatexception-and-how-can-i-fix-it) – xenteros Jul 14 '21 at 07:23

1 Answers1

0

Here is how you can skip the first row

stock.zipWithIndex().filter(_._2 != 0)
  .map(_._1)
  .map(record => record.split(" "))
  .map(arr => (arr(1),arr(2).toInt))
  .reduceByKey((vol1,vol2) => Math.max(vol1,vol2),1)

Or you can directly read it to dataframe as below

val csvDF = spark.read
  .option("header", true)
  .option("delimiter", " ")
  .csv("stock.txt")

csvDF.show(false)

Output:

+----------+------+-------+-----------+-----------+-----------+-----------+-----------+
|date      |symbol|volume |open       |close      |high       |low        |adjclose   |
+----------+------+-------+-----------+-----------+-----------+-----------+-----------+
|18-04-2019|A     |2874100|75.73000336|76.16999817|76.54000092|75.30999756|76.16999817|
|17-04-2019|A     |4472000|78.15000153|75.43000031|78.31999969|74.45999908|75.43000031|
|16-04-2019|A     |3441500|80.81999969|77.55000305|80.95999908|77.19000244|77.55000305|
|15-04-2019|A     |1627300|81         |80.40000153|81.12999725|79.91000366|80.40000153|
|12-04-2019|A     |1249300|81.43000031|80.98000336|82.05999756|80.90000153|80.98000336|
+----------+------+-------+-----------+-----------+-----------+-----------+-----------+
koiralo
  • 22,594
  • 6
  • 51
  • 72
  • Hi @koiralo, the above solution works fine. But, I'm new to this ndlearning Spark currently and i really want to see the DAG, on how does spark works internally when we apply transformations like map... etc. Hence i started using sc.textFile() and then transform the RDD into map transformations twice before invoking collect action. Is there a way to work on it using Spark-Core instead of DF. – Data Real World May 05 '21 at 12:14
  • Hi @koiralo, I've tried the following code by modifying the code which you suggested. It works fine. But it is returning only one symbol along with it's maximum volume. Could you please be so kind to help. val stock = sc.textFile("C:/Users/kondr/Desktop/stocks.csv") .zipWithIndex().filter(_._2 == 1).map(_._1) val splits = stock.map(record => record.split(",")) val symvol = splits.map(arr => (arr(1),arr(2).toInt)) val maxvol = symvol.reduceByKey((vol1,vol2) => Math.max(vol1,vol2)) maxvol.collect().foreach(println) – Data Real World May 05 '21 at 12:43
  • It should be like this `val stock = spark.sparkContext.textFile("C:/Users/kondr/Desktop/stocks.csv") .zipWithIndex().filter(_._2 == 1).map(_._1) val splits = stock.map(record => record.split(",")) val symvol = splits.map(arr => (arr(1),arr(2).toInt)) val maxvol = symvol.reduceByKey((vol1,vol2) => Math.max(vol1,vol2)) maxvol.collect().foreach(println)` – koiralo May 05 '21 at 12:47
  • If the answer helped, can you mark it as an answer? And if you have any other question please another question – koiralo May 05 '21 at 12:48
  • Hi @koiralo created a new question for the different issue. Hope that can be helpful to get the issue fixed. Thanks – Data Real World May 05 '21 at 13:17
  • @DataRealWorld take a look I have updated answer, to skip headers it should be `stock.zipWithIndex().filter(_._2 != 0)` – koiralo May 05 '21 at 13:55
  • Awesome @koiralo, it's working perfectly alright. Kudosscala> maxvol.collect().foreach(println) (EPP,5369900) (FDEV,6200) (MGU,67000) (ICVT,271900) (VNRX,394600) (MPVD,2590200) (GIM,717000) (BFS,576300) (IRT,1078800) (SCHP,1576200) (KONA,3411900) (BBF,89300) – Data Real World May 05 '21 at 14:01