0

I have a RDD from logged events I wanted to take few samples of each category.

Data is like below

|xxx|xxxx|xxxx|type1|xxxx|xxxx
|xxx|xxxx|xxxx|type2|xxxx|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type3|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type3|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type4|xxxx|xxxx|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type1|xxxx|xxxx
|xxx|xxxx|xxxx|type6|xxxx

My try

eventlist = ['type1', 'type2'....]
orginalRDD = sc.textfile("/path/to/file/*.gz").map(lambda x: x.split("|"))

samplelist = []
for event in event list:
    eventsample = orginalRDD.filter(lambda x: x[3] == event).take(5).collect()
    samplelist.extend(eventsample)

print samplelist

I have two questions on this,
1. Some better way/efficient way to collect sample based on specific condition?
2. Is it possible to collect the unsplit lines instead of splitted lines?

Python or scala suggestion are welcome!

WoodChopper
  • 4,265
  • 6
  • 31
  • 55

1 Answers1

1

If sample doesn't have to be random something like this should work just fine:

n = ...  # Number of elements you want to sample
pairs =  orginalRDD.map(lambda x: (x[3], x))

pairs.aggregateByKey(
    [],  # zero values
    lambda acc, x: (acc + [x])[:n],  # Add new value a trim to n elements
    lambda acc1, acc2: (acc1 + acc2)[:n])  # Combine two accumulators and trim

Getting a random sample is a little bit harder. One possible approach is to add a random value and sort before aggregation:

import os
import random

def add_random(iter):
   seed = int(os.urandom(4).encode('hex'), 16)
   rs = random.Random(seed)
   for x in iter:
       yield (rs.random(), x)

(pairs
    .mapPartitions(add_random)
    .sortByKey()
    .values()
    .aggregateByKey(
        [],
        lambda acc, x: (acc + [x])[:n],
        lambda acc1, acc2: (acc1 + acc2)[:n]))

For a DataFrame specific solution see Choosing random items from a Spark GroupedData Object

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Great, let me try this thanks and does it check for aggregation of x[3] in complete RDD? coz RDD is created from 1TB of S3 gz compressed files. – WoodChopper Dec 09 '15 at 17:19