1

I have this code in pyspark where in I pass the index value of columns as a list. Now I want to select the columns from csv file for these corresponding indexes:

def ml_test(input_col_index):

    sc = SparkContext(master='local', appName='test')

    inputData = sc.textFile('hdfs://localhost:/dir1').zipWithIndex().filter(lambda (line, rownum): rownum >= 0).map(lambda (line, rownum): line)

if __name__ == '__main__':

    input_col_index = sys.argv[1] # For example - ['1','2','3','4']

    ml_test(input_col_index)

Now if I have a static or hardcoded set of columns that I want to select from above csv file, I can do that but here the indexes of desired columns is being passed as a parameter. Also I have to calculate the distinct length of each of the selected columns which I know can be done by colmn_1 = input_data.map(lambda x: x[0]).distinct().collect() but how do I do this for set of columns which are not pre-known and are determined based on the index list passed at runtime?

NOTE: I have to calculate the distinct length of columns because I have to pass that length as a parameter to Pysparks RandomForest algorithm.

Jason Donnald
  • 2,256
  • 9
  • 36
  • 49

2 Answers2

1

You can use list comprehensions.

# given a list of indicies...
indicies = [int(i) for i in input_col_index]

# select only those columns from each row
rdd = rdd.map(lambda x: [x[idx] for idx in indicies])

# for all rows, choose longest columns
longest_per_column = rdd.reduce(
    lambda x, y: [max(a, b, key=len) for a, b in zip(x, y)])

# get lengths of longest columns
print([len(x) for x in longest_per_column])

The reducing function takes two lists, loops over each of their values simultaneously, and creates a new list by selecting (for each column) whichever one was longer.

UPDATE: To pass the lengths into the RandomForest constructor, you can do something like this:

column_lengths = [len(x) for x in longest_per_column]

model = RandomForest.trainRegressor(
    categoricalFeaturesInfo=dict(enumerate(column_lengths)),
    maxBins=max(column_lengths),
    # ...
)
Galen Long
  • 3,693
  • 1
  • 25
  • 37
  • How can I pass the distinct length as you have shown to `RandomForest`? The standard way to pass the lengths to `RandomForest` algorithm in `Pyspark` is - `model = RandomForest.trainRegressor(label_points,categoricalFeaturesInfo={0:len(column1),1:len(column2),2:len(column3), ...}, numTrees=20, featureSubsetStrategy="auto", maxDepth=10, maxBins=max([len(column1),len(column2),len(column3),...]))`. For such a structure how can I pass the distinct lengths of each column to the model in a way as shown above? – Jason Donnald Apr 25 '16 at 04:42
  • @JasonDonnald I updated my answer. Does that make sense? – Galen Long Apr 25 '16 at 04:57
0

I would recommend this simple solution.

Assume we have following structure of CSV file [1]:

"TRIP_ID","CALL_TYPE","ORIGIN_CALL","ORIGIN_STAND","TAXI_ID","TIMESTAMP","DAY_TYPE","MISSING_DATA","POLYLINE"
"1372636858620000589","C","","","20000589","1372636858","A","False","[[-8.618643,41.141412],[-8.618499,41.141376]]"

And you want to select only columns: CALL_TYPE, TIMESTAMP, POLYLINE First you need format your data, then just split and select columns you need. It's simple:

from pyspark import SparkFiles
raw_data = sc.textFile("data.csv")
callType_days = raw_data.map(lambda x: x.replace('""','"NA"').replace('","', '\n').replace('"','')) \
    .map(lambda x: x.split()) \
    .map(lambda x: (x[1],x[5],x[8]))

callType_days.take(2)

Results will be:

[(u'CALL_TYPE', u'TIMESTAMP', u'POLYLINE'),
 (u'C',
  u'1372636858',
  u'[[-8.618643,41.141412],[-8.618499,41.141376]]')]

Afterwards, it's really easy to work with structured data like this.

[1]: Taxi Service Trajectory - Prediction Challenge, ECML PKDD 2015 Data Set

Matus Cimerman
  • 417
  • 2
  • 10
  • This doesn't really answer the question. The asker wanted to know how to select columns by index when the indices *weren't* hardcoded, and how to get the maximum lengths for each column. Also, it's best practice to use [csv.reader instead of split](http://stackoverflow.com/a/36408724/6157047) to parse csv data. – Galen Long Apr 30 '16 at 17:36