2

I am iterating through files to gather information about the values in their columns and rows in a dictionary. I have the following code which works locally:

def search_nulls(file_name):
    separator = ','
    nulls_dict = {}
    fp = open(file_name,'r')
    null_cols = {}
    lines = fp.readlines()

    for n,line in enumerate(lines):
        line = line.split(separator)
        for m,data in enumerate(line):
            data = data.strip('\n').strip('\r')
            if str(m) not in null_cols:
                null_cols[str(m)] = defaultdict(lambda: 0)
            if len(data) <= 4:
                null_cols[str(m)][str(data)] = null_cols[str(m)][str(data)] + 1

    return null_cols


files_to_process = ['tempfile.csv']
results = map(lambda file: search_nulls(file), files_to_process)

The above code works fine without spark. I comment the last two lines above, and I try with spark, since this is a prototype of something that will need to run distributed:

os.environ['SPARK_HOME'] = <path_to_spark_folder>
conf = SparkConf().setAppName("search_files").setMaster('local')

sc = SparkContext(conf=conf)

objects = sc.parallelize(files_to_process)
resulting_object = \
    objects.map(lambda file_object: find_nulls(file_object))

result = resulting_object.collect()

When using spark, though, this results in the following error:

File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
    return pickle.dumps(obj, protocol)
TypeError: expected string or Unicode object, NoneType found​

I've been unable to find any obvious reason why this would fail, since it runs perfectly locally, and I am not sharing any files across worker nodes. In fact, I'm only running this on my local machine anyway.

Does anyone know of a good reason why this might be failing?

makansij
  • 9,303
  • 37
  • 105
  • 183

1 Answers1

7

The source of your problem is a following line:

null_cols[str(m)] = defaultdict(lambda: 0)

As you can read in the What can be pickled and unpickled? section of the pickle module documentation:

The following types can be pickled:

  • ...
  • functions defined at the top level of a module (using def, not lambda)
  • built-in functions defined at the top level of a module
  • ...

It should be clear that lambda: 0 doesn't meet above criteria. To make it work you can for example replace lambda expression with int:

null_cols[str(m)] = defaultdict(int)

How is it possible that we can pass lambda expression to the higher order functions in PySpark? The devil is in the detail. PySpark is using different serializers depending on a context. To serialize closures, including lambda expressions it is using custom cloudpickle which supports lambda expressions and nested functions. To handle data it is using default Python tools.


A few side notes:

  • I wouldn't use Python file objects to read data. It is not portable and won't work beyond local file system. You can use SparkContex.wholeTextFiles instead.
  • if you do make sure you close the connections. Using with statement is usually the best approach
  • you can safely strip newline characters before you split the line
zero323
  • 322,348
  • 103
  • 959
  • 935
  • So, just to clarify, generally speaking, a `lambda` function that can be serialized locally should be able to be serialized by `pyspark`? It would be useful to know this for the purpose of testing things locally. Thanks for your persistence on this question. – makansij Nov 08 '15 at 07:53
  • Most of the time yes. You have to think when and where things happen and generally speaking I wouldn't overuse lambdas. Pretty much all common operations can be performed using built-in functions, without static typing there are error prone, inherently not testable, and surprisingly verbose. – zero323 Nov 08 '15 at 08:09