1

I am new in scala.

I have a csv file stored in hdfs. I am reading that file in scala using

 val salesdata = sc.textFile("hdfs://localhost:9000/home/jayshree/sales.csv")

Here is a small sample of data "sales".

C_ID    T_ID    ITEM_ID ITEM_Price
5       199        1       500
33      235        1       500
20      249        3       749
35      36         4       757
19      201        4       757
17      94         5       763
39      146        5       763
42      162        5       763
49      41         6       824
3       70         6       824
24      161        6       824
48      216        6       824

I have to perform the following operation on it.

1.Apply some discount on each item, on the column d(itemprice) suppose 30% of discount. The formula will be d=d-(30%(d)).

2.Find customer wise minimum and maximum item value after applying 30% discount to each item.

I tried to multiply 30 with the observation of column ITEM_Price. The problem is that the value of d as taken as string. When I am multiplying with a number in result it is showing the value that many time. like (500*3 = 500500500)

I can convert it into a dataframe and do it. But I just want to know that without converting it into a dataframe can we do these operation for an RDD.

Jaishree Rout
  • 382
  • 5
  • 17
  • I have tried to extract the column as an array[int]. But then If I am doing any arithmetic operation for each element of the array it is throwing some error like * is not a part of array[int]. I don't need a whole code for this. I just need a guide to get the result. What are the steps I need to follow. – Jaishree Rout Mar 22 '17 at 21:32

2 Answers2

1

Discount

case class Transaction(cId: Int, tId: Int, itemId: Int, itemPrice: Int)
  1. val salesdata : RDD[String]=> Map the RDD, within the map split the line by your separator and then convert the Array to a case class called Transaction calling Array(i).toInt to cast the fields. In this step your target is to get a RDD[Transaction].
  2. Map the RDD again and copy your transaction applying the discount ( t => t.copy(itemPrice=0.7*t.itemPrice))
  3. You will have a new RDD[Transaction]

Customer wise

  1. Take the last object, apply a keyBy(_.cId) to get RDD[Int, Transaction] where your key is the client.
  2. Reduce By Key adding the prices for each item. Goal => RDD[Int, Int] where you get the total for each client.
  3. Find your target clients!
gasparms
  • 3,336
  • 22
  • 26
  • I have used this code to convert that rdd to array " val discount = salesdata.map( str => str.split(",")).map( c => (c(0).toInt, c(1).toInt, c(2).toInt, c(3).toDouble)) " . When I am using case class on the array I am getting an error. – Jaishree Rout Mar 23 '17 at 21:38
  • salesdata.map( str => str.split(",")).map( c => Transaction(c(0).toInt, c(1).toInt, c(2).toInt, c(3).toDouble)) What is your error? – gasparms Mar 24 '17 at 08:22
  • Caused by: java.lang.NumberFormatException: For input string: C_ID" – Jaishree Rout Mar 26 '17 at 17:03
  • Can you help me when I am running the below codes I am not getting any error but when I am trying to save the rdd as text file I am getting error. I am not understanding where the code is wrong. 1. val salesdata = sc.textFile("hdfs://localhost:9000/home/jayshree/sales.csv") 2. val salesdata_arr = salesdata.map( str => str.split(",")).map( c => (c(0).toInt, c(1).toInt, c(2).toInt, c(3).toDouble)) 3. salesdata_arr.saveAsTextFile("hdfs://localhost:9000/home/jayshree/discount_output") – Jaishree Rout Mar 26 '17 at 17:18
  • The problem is that you csv has header line so you cannot convert the string "C_ID" to an integer. You can use spark-csv library for spark-sql or remove the header http://stackoverflow.com/questions/27854919/how-to-skip-header-from-csv-files-in-spark – gasparms Mar 27 '17 at 11:45
  • I am sorry for the late reply. Thank you grasparms for you time to make me understand this. You are right. I removed the names of column and repeated the steps from beginning it worked. – Jaishree Rout Apr 07 '17 at 14:07
  • You're welcome. Vote the answer if it helped you :). – gasparms Apr 09 '17 at 08:27
0

Since you want more of a guide, let's look at this outside of Spark for a second and think about things as typical Scala collections.

Your data would look like this:

val data = Array(
        (5, 199, 5, 100),
        (33, 235, 5, 100),
...
)

I think you will have no trouble mapping your salesdata RDD of strings to an RDD of Array or Tuple4 using a split or regular expression or something.

Let's go with a tuple. Then you can do this:

data.map {
  case (cId, tId, item, price) => (cId, tId, item, price * .7)
}

That maps the original RDD of tuples to another RDD of tuples where the last values, the prices, are reduced by 30%. So the result is a Tuple4[Int, Int, Int, Double].

To be honest, I don't know what you mean by customer-wise min and max, but maybe it is something like this:

data.map {
  case (cId, tId, item, price) => (cId, tId, item, price * .7)
}.groupBy(_._1)
.mapValues { tuples => 
  val discountedPrices = tuples.map(_._4)
  (discountedPrices.min, discountedPrices.max)
}

First, I do a groupBy, which produces a Map from cId (the first value in the tuple, which explains the ._1) to a collection of full tuples--so a Map of cId to a collection of rows pertaining to that cId. In Spark, this would produce a PairRDD.

Map and PairRDD both have a mapValues function, which allows me to preserve the keys (the cIds) while transforming each collection of tuples. In this case, I simply map the collection to a collection of discounted prices by getting the 4th item in each tuple, the discounted prices. Then I call min and max on that collection and return a tuple of those values.

So the result is a Map of customer ID to a tuple of the min and max of the discounted prices. The beauty of the RDD API is that it follows the conventional Scala collection API so closely, so it is basically the same thing.

Vidya
  • 29,932
  • 7
  • 42
  • 70