1

I am trying to create a sample dataset off my current dataset. I try two different ways and they produce two separate results. Separate in a way each sampled row should be integer and string ([5,unprivate], [1,hiprivate]). The first way is giving me string and string for each row ([private,private], [unprivate, hiprivate]). The second way is giving me the correct output.

Why are they producing two totally different datasets?

dataset

5,unprivate
1,private
2,hiprivate

ingest data

from pyspark import SparkContext

sc = SparkContext()
INPUT = "./dataset"

def parse_line(line):
    bits = line.split(",")
    return bits

df = sc.textFile(INPUT).map(parse_line)

1st way - outputs something like [[u'unprivate', u'unprivate'], [u'unprivate', u'unprivate']]

#1st way
columns = df.first()
new_df = None
for i in range(0, len(columns)):
    column = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[i]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
    if new_df is None:
        new_df = column
    else:
        new_df = new_df.join(column)
        new_df = new_df.map(lambda e: (e[0], e[1][0] + e[1][1]))
new_df = new_df.map(lambda e: e[1])
print new_df.collect()

2nd way - outputs something like [(0, [u'5', u'unprivate']), (1, [u'1', u'unprivate']), (2, [u'2', u'private'])]

#2nd way
new_df = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[0]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
new_df2 = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[1]).zipWithIndex().map(lambda e: (e[1], [e[0]]))

new_df = new_df.join(new_df2)
new_df = new_df.map(lambda e: (e[0], e[1][0] + e[1][1]))
print new_df.collect()

I am trying to figure out the unisample function in page 62 http://info.mapr.com/rs/mapr/images/Getting_Started_With_Apache_Spark.pdf

collarblind
  • 4,549
  • 13
  • 31
  • 49
  • What is the exact reason you believe the 2 results should be the same? – desertnaut Jan 09 '18 at 11:39
  • not exactly the same, but the first element should come from the first column, and the second element should come from the second. the logic is sampling from within each column. – collarblind Jan 09 '18 at 21:53

1 Answers1

1

This has to do with how Spark executes code. Try putting this print statement in your code in the first example:

for i in range(0, len(columns)):
    if new_df:
        print(new_df.take(1))

Since the code is executed lazily for loops like this won't work because Spark is effectively going to execute only the last loop. So, when you start the for loop for the 2nd column you've already got a value for new_df which equals the output of the 2nd for loop.

You have to use the approach you use in your second example.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
brandomr
  • 192
  • 5