0
users_grpd = pairs.groupByKey()

users_grpd_flattened = users_grpd.map(
    lambda (k, vals): "{0} {1}".format(k, ' '.join(str(x) for x in vals)))

the first column in userid and the rest of the columns are product ids. I would now like to sort the product ids per user. The number of products per user are not fixed, but will vary. Here is how users_grpd_flattened looks like: Is there a way to efficiently sort the product ids/user?

userid   product ids.............

30095212 208518 10519 208520 120821
3072220 20506 205037
209212 208518 10519 208520 120821
100222 20506 205037 10519 208520 120821 20116  124574 102575
zero323
  • 322,348
  • 103
  • 959
  • 935
user3803714
  • 5,269
  • 10
  • 42
  • 61

1 Answers1

0

You can use mapValues with sorted.

users_grpd.mapValues(sorted)

When you use mapValues input partitioning is preserved so there is no shuffling involved and the most expensive and potentially dangerous operation is preceding groupByKey.

Check if everything works as expected (is_sorted taken from @WaiYipTung answer):

def is_sorted(l):
    return all(l[i] <= l[i+1] for i in xrange(len(l)-1))

pairs = sc.parallelize([
    (30095212, 208518), (30095212, 10519), (30095212, 208520), 
    (30095212, 120821), (3072220, 20506), (3072220, 205037),
    (209212, 208518), (209212, 10519), (209212, 208520), (209212, 120821),
    (100222, 20506), (100222, 205037), (100222, 10519), (100222, 208520),
    (100222, 120821), (100222, 20116), (100222, 124574), (100222, 102575),
    (87620, 12012851), (87620, 12022661), (87620, 12033827), (87620, 1205376)
])

users_grpd_with_sorted_vals = pairs.groupByKey().mapValues(sorted)

Some checks

>>> all(users_grpd_with_sorted_vals.values().map(is_sorted).collect())
True
>>> users_grpd_with_sorted_vals.lookup(87620)
[[1205376, 12012851, 12022661, 12033827]]
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks. Is there any recommended way for avoiding groupByKey? Basically I need to group users_projected = sqlContext.sql("SELECT user, prod_ids FROM infotable") pairs = distinct_users_projected.map(lambda x: (x.user, x.prod_ids)) users_grpd = pairs.groupByKey() – user3803714 Jul 19 '15 at 13:43
  • If `groupByKey` is followed by some kind of reduce operations which reduces size of the data you should consider using `reduceByKey`, `combineByKey` or `aggregateByKey` (you can read more [here](http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html)), but here there is nothing to gain by that. If output specification is not fixed an alternative is to simple partition by key, sort, and output (`userid`, `product_id`) pairs. – zero323 Jul 19 '15 at 13:53
  • Unfortunately, users_grpd.mapValues(sorted) is not working. For example, here is a row after that operation. 100876220 12012850 12022660 12033837 1205366 – user3803714 Jul 19 '15 at 14:17
  • What does it mean "not working"? If there is some issue please provide an example input and expected output. – zero323 Jul 19 '15 at 14:19
  • If you look at the o/p that I just sent: User id Product ids 87620 12012851 12022661 12033827 1205376 The product ids (should be sorted, but are not.) – user3803714 Jul 19 '15 at 14:19
  • Looks of for me. Please take a look a the example, I've added to the answer. – zero323 Jul 19 '15 at 16:19
  • I am glad to hear that. If it solved your problem please accept the answer :) – zero323 Jul 19 '15 at 17:17