2

I have a dataset and I wanted to test different classifiers in parallel using Spark with Python. For example, if I want to test a Decision Tree and a Random Forest, how could I run them in parallel?

I have tried a few approaches but I keep getting:

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I was trying to do this (which had worked well using scikit-learn's classifiers instead of Spark's:

def apply_classifier(clf, train_dataset, test_dataset):
    model = clf.fit(train_dataset)

    predictions = model.transform(test_dataset)

    evaluator = BinaryClassificationEvaluator()
    evaluator.evaluate(predictions)

    return [(model, predictions)]

...

dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

classifiers = [dt, rf]

sc.parallelize(classifiers).flatMap(lambda x: apply_classifier(x, train_dataset, test_dataset)).collect() 

Any suggestions on how I can manage to do this?

Thanks!

Larissa Leite
  • 1,358
  • 3
  • 21
  • 36
  • 1
    Multi-model evaluator is under development right now – T. Gawęda Apr 04 '17 at 12:43
  • 1
    thank you for your answer @T.Gawęda so there's no way around it at the moment? – Larissa Leite Apr 04 '17 at 12:48
  • 1
    I don't think so. Even if you will paralellize submitting the jobs, it still will be queued in cluster manager. But let's wait, maybe someone will have some tested workaround - I just pointed that it's not currently supported out of the box, but will be supported in near future :) – T. Gawęda Apr 04 '17 at 12:51

1 Answers1

0

@larissa-leite

To overcome this, I'm using [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) like explained in that thread.

This is the code of the thread:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

Just explain why I'm using this: I trained several text classifier models (more than 200) using OneVsRestClassifier and I need to span out every model the text that I receive.

The latency here it's less than 200ms to get all predictions to me (the baseline time reaction for the human being can be something between 100ms to 420ms) so this 'latency' it's not a big deal for me.

Flavio
  • 759
  • 1
  • 11
  • 24