2

I'm reading a multiline-record file using SparkContext.newAPIHadoopFile with a customized delimiter. Anyway, I already prepared, reduced my data. But now I want to add the key to every line (entry) again and then write it to a Apache Parquet file, which is then stored into HDFS.

This figure should explain my problem. What I'm looking for is the red arrow e.g. the last transformation before writing the file. Any idea? I tried flatMap but then the timestamp and float-value resulted in different records.

PySpark chain

The Python-Script can be downloaded here and the sample text file here. I'm using the Python-Code within a Jupyter Notebook.

Matthias
  • 5,574
  • 8
  • 61
  • 121
  • This diagram doesn't reflect your code. You flatten to pairs `(key, tuple(...))` and then concatenate tuples which is unlikely to be desired outcome. In other words `reduceByKey` part is wrong. – zero323 Jul 01 '16 at 11:34
  • Might be. Just need to make sure, that all values are shuffled to the same machine. If a variable occurs in many files that are read-in on different machines in the cluster. – Matthias Jul 01 '16 at 11:46
  • True, you are right. My ReduceByKey is wrong. But how can I concatenate the lists in the shuffle-reduce step to one list? Initially I followed [this post](http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark). – Matthias Jul 01 '16 at 11:56
  • I don't really see why you need this in the first place (see edit). And the linked answer is not good. – zero323 Jul 01 '16 at 12:05
  • Yeah, thanks a lot. I will try out your version though it has better performance. I found this solution that works as well: sheet.flatMap(process_and_extract).groupByKey().mapValues(lambda x: list(x)).flatMap(flatten).take(50) – Matthias Jul 01 '16 at 12:06

1 Answers1

2

Simple list comprehension should be more than enough:

from datetime import datetime


def flatten(kvs):
    """
    >>> kvs = ("852-YF-008", [
    ... (datetime(2016, 5, 10, 0, 0), 0.0),
    ... (datetime(2016, 5, 9, 23, 59), 0.0)])
    >>> flat = flatten(kvs)
    >>> len(flat)
    2
    >>> flat[0]
    ('852-YF-008', datetime.datetime(2016, 5, 10, 0, 0), 0.0)
    """
    k, vs = kvs
    return [(k, v1, v2) for v1, v2 in vs]

In Python 2.7 you could also use lambda expression with tuple argument unpacking but this is not portable and generally discouraged:

lambda (k, vs): [(k, v1, v2) for v1, v2 in vs]

Version independent:

lambda kvs: [(kvs[0], v1, v2) for v1, v2 in kvs[1]]

Edit:

If all you need is writing partitioned data then convert to Parquet directly without reduceByKey:

(sheet
    .flatMap(process)
    .map(lambda x: (x[0], ) + x[1])
    .toDF(["key", "datettime", "value"])
    .write
    .partitionBy("key")
    .parquet(output_path))
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Check `DataFrameWriter.mode`. It will fail by default but there are other options. Also http://stackoverflow.com/a/28844328/1560062 – zero323 Jul 01 '16 at 12:36