1

New to spark here and I'm trying to read a pipe delimited file in spark. My file looks like this:

user1|acct01|A|Fairfax|VA
user1|acct02|B|Gettysburg|PA
user1|acct03|C|York|PA
user2|acct21|A|Reston|VA
user2|acct42|C|Fairfax|VA
user3|acct66|A|Reston|VA

and I do the following in scala:

scala> case class Accounts (usr: String, acct: String, prodCd: String, city: String, state: String)
defined class Accounts

scala> val accts = sc.textFile("accts.csv").map(_.split("|")).map(
     | a => (a(0), Accounts(a(0), a(1), a(2), a(3), a(4)))
     | )

I then try to group the key value pair by the key, and this is not sure if I'm doing this right...is this how I do it?

scala> accts.groupByKey(2)
res0: org.apache.spark.rdd.RDD[(String, Iterable[Accounts])] = ShuffledRDD[4] at groupByKey at <console>:26

I thought the (2) is to give me the first two results back but I don't seem to get anything back at the console...

If I run a distinct...I get this too..

scala> accts.distinct(1).collect(1)
<console>:26: error: type mismatch;
 found   : Int(1)
 required: PartialFunction[(String, Accounts),?]
              accts.distinct(1).collect(1)

EDIT: Essentially I'm trying to get to a key value pair nested mapping. For example, user1 would looke like this:

user1 | {'acct01': {prdCd: 'A', city: 'Fairfax', state: 'VA'}, 'acct02': {prdCd: 'B', city: 'Gettysburg', state: 'PA'}, 'acct03': {prdCd: 'C', city: 'York', state: 'PA'}}

trying to learn this step by step so thought I'd break it down into chunks to understand...

lightweight
  • 3,227
  • 14
  • 79
  • 142

2 Answers2

0

I think you might have better luck if you put your data into a DataFrame if you've already gone through the process of defining a schema. First off, you need to modify the split comment to use single quotes. (See this question). Also, you can get rid of the a(0) in the beginning. Then, converting to a DataFrame is trivial. (Note that DataFrames are available on spark 1.3+.)

val accts = sc.textFile("/tmp/accts.csv").map(_.split('|')).map(a => Accounts(a(0), a(1), a(2), a(3), a(4)))
val df = accts.toDF()

Now df.show produces:

+-----+------+------+----------+-----+
|  usr|  acct|prodCd|      city|state|
+-----+------+------+----------+-----+
|user1|acct01|     A|   Fairfax|   VA|
|user1|acct02|     B|Gettysburg|   PA|
|user1|acct03|     C|      York|   PA|
|user2|acct21|     A|    Reston|   VA|
|user2|acct42|     C|   Fairfax|   VA|
|user3|acct66|     A|    Reston|   VA|
+-----+------+------+----------+-----+

It should be easier for you to work with the data. For example, to get a list of the unique users:

df.select("usr").distinct.collect()

produces

res42: Array[org.apache.spark.sql.Row] = Array([user1], [user2], [user3])

For more details, check out the docs.

Community
  • 1
  • 1
santon
  • 4,395
  • 1
  • 24
  • 43
  • so I went through those docs but how do I get groupBy to work? I've tried df.groupBy("usr").collect(), df.select(),groupby().collect, and various other ways but can't get it to work... – lightweight Aug 13 '15 at 12:14
  • What is it that you're trying to do? I think there's maybe some confusion over what the `groupBy` operator does. You need to apply some kind of aggregation so that Spark knows how to aggregate all of the records corresponding to a given key. For instance, `df.groupBy("usr").count().collect()` will give you the number of records corresponding to each distinct user. You can look at the [API docs](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) to see what kinds of functions you can use to aggregate records. – santon Aug 13 '15 at 16:44
  • just made an edit to show the end goal...essentially trying to take the csv file and convert it to a key value pair mapping... – lightweight Aug 14 '15 at 14:39
0

3 observations that may help you understand the problem:

1) groupByKey(2) does not return first 2 results, the parameter 2 is used as number of partitions for the resulting RDD. See docs.

2) collect does not take Int parameter. See docs.

3) split takes 2 types of parameters, Char or String. String version uses Regex so "|" needs escaping if intended as literal.

Shyamendra Solanki
  • 8,751
  • 2
  • 31
  • 25
  • I'm think I'm still confused by this...I see groupByKey(K, V) and groupByKey(Partition). How do I just simply pull the results back into a list. I think I have to use groupByKey, right? But do I use partition or K, V? – lightweight Aug 13 '15 at 12:01