I have a function process_line
that maps from the input format to the output format
Some rows are corrupted, and need to be ignored.
I am successfully running this code as a python streaming job:
for input_line in sys.stdin:
try:
output_line=process_line(input_line.strip())
print (output_line)
except:
sys.stderr.write('Error with line: {l}\n'.format(l=input_line))
continue
How can I run the equivalent code in pyspark ? This is what I tried:
input = sc.textFile(input_dir, 1)
output=lines.map(process_line)
output.saveAsTextFile(output_dir)
How can I keep track of corrupted lines and have statistics on their count ?