0

I have a function process_line that maps from the input format to the output format

Some rows are corrupted, and need to be ignored.

I am successfully running this code as a python streaming job:

for input_line in sys.stdin:
    try:
        output_line=process_line(input_line.strip())
        print (output_line)
    except:
        sys.stderr.write('Error with line: {l}\n'.format(l=input_line))
        continue

How can I run the equivalent code in pyspark ? This is what I tried:

input = sc.textFile(input_dir, 1)
output=lines.map(process_line)
output.saveAsTextFile(output_dir)

How can I keep track of corrupted lines and have statistics on their count ?

Uri Goren
  • 13,386
  • 6
  • 58
  • 110
  • What is the problem with your pyspark code? Did you get any errors? Can you elaborate some more? – Avihoo Mamka Nov 22 '15 at 09:09
  • It seems to be taking longer than the hadoop streaming for some reason, and how do I account for the corrupted lines ? – Uri Goren Nov 22 '15 at 09:15
  • I see that you read the textfile to only 1 partition. This may cause the slowness. Try to remove it and let spark decide the number of partitions on its own and then check it. – Avihoo Mamka Nov 22 '15 at 09:16
  • See: [What is the equivalent to scala.util.Try in pyspark?](http://stackoverflow.com/q/33383275/1560062) and [Spark Programming Guide - Accumulators](http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka) – zero323 Nov 22 '15 at 12:25

1 Answers1

2

You're trying to read the text file to only one partition, which can cause your job to run slowly because you basically give up the parallelism.

Try to do this:

input = sc.textFile(input_dir)
output = lines.map(process_line)
output.saveAsTextFile(output_dir)

As for the corrupted lines, you can use a try-except mechanism in your process_line function and maybe write to some log file the problematic line, or try to do some other logic instead.

Avihoo Mamka
  • 4,656
  • 3
  • 31
  • 44