16

I have a file in hdfs which is distributed across the nodes in the cluster.

I'm trying to get a random sample of 10 lines from this file.

in the pyspark shell, I read the file into an RDD using:

>>> textFile = sc.textFile("/user/data/myfiles/*")

and then I want to simply take a sample... the cool thing about Spark is that there are commands like takeSample, unfortunately I think I'm doing something wrong because the following takes a really long time:

>>> textFile.takeSample(False, 10, 12345)

so I tried creating a partition on each node, and then instructing each node to sample that partition using the following command:

>>> textFile.partitionBy(4).mapPartitions(lambda blockOfLines: blockOfLines.takeSample(False, 10, 1234)).first()

but this gives an error ValueError: too many values to unpack :

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/serializers.py", line 117, in dump_stream
    for obj in iterator:
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/rdd.py", line 821, in add_shuffle_key
    for (k, v) in iterator:
ValueError: too many values to unpack

How can I sample 10 lines from a large distributed data set using spark or pyspark?

maasg
  • 37,100
  • 11
  • 88
  • 115
mgoldwasser
  • 14,558
  • 15
  • 79
  • 103
  • I don't think this is an issue with spark, see http://stackoverflow.com/questions/7053551/python-valueerror-too-many-values-to-unpack – aaronman Jul 17 '14 at 14:26
  • @aaronman you're correct in the sense that the "too many values" error is definitely a python error. I'll add more details about the error message. My hunch is that there's something wrong with my pyspark code though - are you able to run this code successfully on your spark setup? – mgoldwasser Jul 17 '14 at 14:37
  • 1
    I only really use the scala spark API, I think the functional style of scala fits really well with Mapreduce in general – aaronman Jul 17 '14 at 14:58
  • @aaronman I'm open to a scala solution! – mgoldwasser Jul 17 '14 at 14:59
  • Sorry it took so long, I left another answer explaining why takeSample is so slow – aaronman Jul 17 '14 at 19:22
  • @aaronman makes a good point about using the Scala API :) Also his answer is spot on, you need to use `sample` and *specify a fraction*. – samthebest Jul 18 '14 at 10:41
  • 1
    @samthebest - I don't know if I'm missing something here, both python and scala are functional languages and spark has both a Python and Scala API. Is this anything more than a matter of preference? – mgoldwasser Jul 18 '14 at 14:14
  • @mgoldwasser I've added an answer to a question about the two APIs, hope it helps: http://stackoverflow.com/questions/17236936/api-compatibility-between-scala-and-python/24838915#24838915 – samthebest Jul 19 '14 at 09:41
  • Well if you just want a look at data(not random sample) and the file is stored in hdfs then you could perform hdfs dfs -cat | head -n i know this is not what you asked for but it is a nice little trick – iec2011007 Jan 27 '16 at 17:19

3 Answers3

31

Try using textFile.sample(false,fraction,seed) instead. takeSample will generally be very slow because it calls count() on the RDD. It needs to do this because otherwise it wouldn't take evenly from each partition, basically it uses the count along with the sample size you asked for to compute the fraction and calls sample internally. sample is fast because it just uses a random boolean generator that returns true fraction percent of the time and thus doesn't need to call count.

In addition, I don't think this is happening to you but if the sample size returned is not big enough it calls sample again which can obviously slow it down. Since you should have some idea of the size of your data I would recommend calling sample and then cutting the sample down to size yourself, since you know more about your data than spark does.

aaronman
  • 18,343
  • 7
  • 63
  • 78
  • This is a bit strange. Counting is not a slow operation - It's ~2 orders of magnitude faster than takeSample, suggesting this is not the core issue. – nbubis Aug 02 '16 at 05:38
21

Using sample instead of takeSample appears to make things reasonably fast:

textFile.sample(False, .0001, 12345)

the problem with this is that it's hard to know the right fraction to choose unless you have a rough idea of the number of rows in your data set.

gsamaras
  • 71,951
  • 46
  • 188
  • 305
mgoldwasser
  • 14,558
  • 15
  • 79
  • 103
0

Different Types of Sample in PySpark

Randomly sample % of the data with and without replacement

import pyspark.sql.functions as F
#Randomly sample 50% of the data without replacement
sample1 = df.sample(False, 0.5, seed=0)

#Randomly sample 50% of the data with replacement
sample1 = df.sample(True, 0.5, seed=0)

#Take another sample exlcuding records from previous sample using Anti Join
sample2 = df.join(sample1, on='ID', how='left_anti').sample(False, 0.5, seed=0)

#Take another sample exlcuding records from previous sample using Where
sample1_ids = [row['ID'] for row in sample1.ID]
sample2 = df.where(~F.col('ID').isin(sample1_ids)).sample(False, 0.5, seed=0)

#Generate a startfied sample of the data across column(s)
#Sampling is probabilistic and thus cannot guarantee an exact number of rows
fractions = {
        'NJ': 0.5, #Take about 50% of records where state = NJ
    'NY': 0.25, #Take about 25% of records where state = NY
    'VA': 0.1, #Take about 10% of records where state = VA
}
stratified_sample = df.sampleBy(F.col('state'), fractions, seed=0)
Yash M
  • 336
  • 3
  • 7