-1

I am trying to read a text file which contains product information and it is | separated. When I am trying to read data as RDD and then splitting the data using delimiter |, then data gets corrupted. I am unable to understand why this is happening

#### input data
productId|price|saleEvent|rivalName|fetchTS 
12345|78.73|Special|VistaCart.com|2017-05-11 15:39:30 
12345|45.52|Regular|ShopYourWay.com|2017-05-11 16:09:43 
12345|89.52|Sale|MarketPlace.com|2017-05-11 16:07:29 
678|1348.73|Regular|VistaCart.com|2017-05-11 15:58:06 
678|1348.73|Special|ShopYourWay.com|2017-05-11 15:44:22 
678|1232.29|Daily|MarketPlace.com|2017-05-11 15:53:03 
777|908.57|Daily|VistaCart.com|2017-05-11 15:39:01 
#### spark-shell code
import org.apache.spark.sql.Encoder; import spark.implicits._
import org.apache.spark.sql.Encoder

case class Product(productId:Int, price:Double, saleEvent:String, rivalName:String, fetchTS:String)
val rdd = spark.sparkContext.textFile("/home/prabhat/Documents/Spark/sampledata/competitor_data_10.txt")
##########removing headers
val x = rdd.mapPartitionsWithIndex{(idx,iter) => if(idx==0)iter.drop(1) else iter}
########## why RDD **x** here is comma separated    
x.map(x=>x.split("|")).take(10)
res74: Array[Array[String]] = Array(Array(1, 2, 3, 4, 5, |, 3, 9, 9, ., 7, 3, |, S, p, e, c, i, a, l, |, V, i, s, t, a, C, a, r, t, ., c, o, m, |, 2, 0, 1, 7, -, 0, 5, -, 1, 1, " ", 1, 5, :, 3, 9, :, 3, 0, " "), Array(1, 2, 3, 4, 5, |, 3, 8, 8, ., 5, 2, |, R, e, g, u, l, a, r, |, S, h, o, p, Y, o, u, r, W, a, y, ., c, o, m, |, 2, 0, 1, 7, -, 0, 5, -, 1, 1, " ", 1, 6, :, 0, 9, :, 4, 3, " "), Array(1, 2, 3, 4, 5, |, 3, 8, 8, ., 5, 2, |, S, a, l, e, |, M, a, r, k, e, t, P, l, a, c, e, ., c, o, m, |, 2, 0, 1, 7, -, 0, 5, -, 1, 1, " ", 1, 6, :, 0, 7, :, 2, 9, " "),  ...

x.map(x=>x.split("|")).map(y => Product(y(0).toInt,y(1).toDouble,y(2),y(3),y(4))).toDF.show
+---------+-----+---------+---------+-------+
|productId|price|saleEvent|rivalName|fetchTS|
+---------+-----+---------+---------+-------+
|        1|  2.0|        3|        4|      5|
|        1|  2.0|        3|        4|      5|
|        1|  2.0|        3|        4|      5|
|        4|  3.0|        1|        5|      7|
|        4|  3.0|        1|        5|      7|
|        4|  3.0|        1|        5|      7|
|        3|  6.0|        1|        3|      0|
|        3|  6.0|        1|        3|      0|
+---------+-----+---------+---------+-------+

Why the output is like above, it should have been similar to this

+---------+-----+---------+---------+-------+
|productId|price|saleEvent|rivalName|fetchTS|
+---------+-----+---------+---------+-------+
|    12345|78.73|  Special|   VistaCart.com|    2017-05-11 15:39:30 |
+---------+-----+---------+---------+-------+
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
tryingSpark
  • 143
  • 2
  • 15

1 Answers1

3

split takes regex so use "\\|" instead of "|"

x.map(x=>x.split("\\|"))
 .map(y => Product(y(0).toInt,y(1).toDouble,y(2),y(3),y(4))
).toDF
.show(false)

This should give you the correct result.

Also if you want to convert into dataframe eventually, why dont you read directly as

spark.read
  .option("header", true)
  .option("delimiter", "|")
  .schema(Encoders.product[Product].schema)
  .csv("testfile.txt")
  .as[Product]

Output:

+---------+-------+---------+---------------+--------------------+
|productId|price  |saleEvent|rivalName      |fetchTS             |
+---------+-------+---------+---------------+--------------------+
|12345    |78.73  |Special  |VistaCart.com  |2017-05-11 15:39:30 |
|12345    |45.52  |Regular  |ShopYourWay.com|2017-05-11 16:09:43 |
|12345    |89.52  |Sale     |MarketPlace.com|2017-05-11 16:07:29 |
|678      |1348.73|Regular  |VistaCart.com  |2017-05-11 15:58:06 |
|678      |1348.73|Special  |ShopYourWay.com|2017-05-11 15:44:22 |
|678      |1232.29|Daily    |MarketPlace.com|2017-05-11 15:53:03 |
|777      |908.57 |Daily    |VistaCart.com  |2017-05-11 15:39:01 |
+---------+-------+---------+---------------+--------------------+
koiralo
  • 22,594
  • 6
  • 51
  • 72
  • Thanks a lot for quick response and it worked. Awesome... – tryingSpark May 31 '18 at 07:39
  • Also while mapping RDD to Product object its an error. >>>>>>>> case class Product(productId:Int, price:Double, saleEvent:String, rivalName:String, fetchTS:Timestamp) >>>>>>>> scala> x.map(x=>x.split("\\|")).map(y => Product(y(0).toInt,y(1).toDouble,y(2),y(3),y(4))) :34: error: type mismatch; found : String required: java.sql.Timestamp x.map(x=>x.split("\\|")).map(y => Product(y(0).toInt,y(1).toDouble,y(2),y(3),y(4))) I am searching on Google to convert to Timestamp – tryingSpark May 31 '18 at 07:45
  • You can't directly convert to Timestamp, you need to parse it to data and convert – koiralo May 31 '18 at 07:48
  • Okay Thanks for your reply – tryingSpark May 31 '18 at 08:18