21

I have a spark data frame which I want to divide into train, validation and test in the ratio 0.60, 0.20,0.20.

I used the following code for the same:

def data_split(x):
    global data_map_var
    d_map = data_map_var.value
    data_row = x.asDict()
    import random
    rand = random.uniform(0.0,1.0)
    ret_list = ()
    if rand <= 0.6:
        ret_list = (data_row['TRANS'] , d_map[data_row['ITEM']] , data_row['Ratings'] , 'train')
    elif rand <=0.8:
        ret_list = (data_row['TRANS'] , d_map[data_row['ITEM']] , data_row['Ratings'] , 'test')
    else:
        ret_list = (data_row['TRANS'] , d_map[data_row['ITEM']] , data_row['Ratings'] , 'validation')
    return ret_list
​
​
split_sdf = ratings_sdf.map(data_split)
train_sdf = split_sdf.filter(lambda x : x[-1] == 'train').map(lambda x :(x[0],x[1],x[2]))
test_sdf = split_sdf.filter(lambda x : x[-1] == 'test').map(lambda x :(x[0],x[1],x[2]))
validation_sdf = split_sdf.filter(lambda x : x[-1] == 'validation').map(lambda x :(x[0],x[1],x[2]))
​
print "Total Records in Original Ratings RDD is {}".format(split_sdf.count())
​
print "Total Records in training data RDD is {}".format(train_sdf.count())
​
print "Total Records in validation data RDD is {}".format(validation_sdf.count())
​
print "Total Records in test data RDD is {}".format(test_sdf.count())
​
​
#help(ratings_sdf)
Total Records in Original Ratings RDD is 300001
Total Records in training data RDD is 180321
Total Records in validation data RDD is 59763
Total Records in test data RDD is 59837

My original data frame is ratings_sdf which I use to pass a mapper function which does the splitting.

If you check the total sum of train, validation and test does not sum to split (original ratings) count. And these numbers change at every run of the code.

Where is the remaining records going and why the sum is not equal?

Baktaawar
  • 7,086
  • 24
  • 81
  • 149

1 Answers1

37

TL;DR If you want to split DataFrame use randomSplit method:

ratings_sdf.randomSplit([0.6, 0.2, 0.2])

Your code is just wrong on multiple levels but there are two fundamental problems that make it broken beyond repair:

  • Spark transformations can be evaluated arbitrary number of times and functions you use should be referentially transparent and side effect free. Your code evaluates split_sdf multiple times and you use stateful RNG data_split so each time results are different.

    This results in a behavior you describe where each child sees different state of the parent RDD.

  • You don't properly initialize RNG and in consequence random values you get are not independent.

Krishna Chaurasia
  • 8,924
  • 6
  • 22
  • 35
zero323
  • 322,348
  • 103
  • 959
  • 935
  • I have only 11 values and I did split on [0.6, 0.3, 0.1] , but its dividing it in [6,5,0] or [8,3,0] I don't need zero as 11 can still be divided as [6,3,2] Is there any way to check to not get zero after split in train,test and valid – vipin Jun 28 '18 at 13:50
  • 2
    @vipin Other than calling `count` on each item? Not really. But if you have 11 records, using Spark doesn't make much sense anyway, and if you want specific distribution you can always split local data and parallelize it later. On "real" size data it is normally not an issue - [given how `randomSplit` works](https://stackoverflow.com/q/32933143) it is very unlikely you'll get empty splits with relatively large and balanced fractions like these.. – zero323 Jun 29 '18 at 19:05