One possible solution would be to modify the custom source from the previous question. Otherwise, you can do an initial pass through the data to save the headers as a side input for the main processing step:
input = p | 'Read CSV file' >> ReadFromText("input.csv")
headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn())
rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers))
where ParseHeadersFn
checks if the row starts with --
to qualify as header and discards that first field if true as it's not needed:
class ParseHeadersFn(beam.DoFn):
"""ParDo to output only the headers"""
def process(self, element):
if '--' in element.split(',')[0]:
yield [','.join(element.split(',')[1:])]
Then, within ParseRowsFn
, we can access the headers
side input:
class ParseRowsFn(beam.DoFn):
"""ParDo to process data rows according to header metadata"""
def process(self, element, headers):
if 'time1' in element.split(',')[0]:
for id in headers[0]:
print 'ids: ' + id
for label in headers[1]:
print 'labels: ' + label
Note that I assume that the id row will come before the label one but that might not be true as Dataflow is a distributed system. It would be better to do some stronger checking.
If our input.csv
is:
--,id1,id1,id1,id2,id2,id2
--,label1,label2,label3,label1,label2,label3
time1,data1,data2,data3,data4,data5,data6
time2,data7,data8,data9,data10,data11,data12
Example output:
ids: id1 , id1 , id1 , id2 , id2 , id2
labels: label1,label2,label3,label1,label2,label3
Code used: script.py in this gist
ParseRowsFn
can be modified with dict(zip(...))
to get the desired output but I am not sure I understood the output structure. Do you need something like this?
id1,time1,data1,data2,data3
id1,time2,data7,data8,data9
id2,time1,data4,data5,data6
id2,time2,data10,data11,data12
If that's the case we can use the trick in this answer to determine where an ID changes and act accordingly:
class ParseRowsFn(beam.DoFn):
"""ParDo to process data rows according to header metadata"""
def process(self, element, headers):
# changing ids as per https://stackoverflow.com/a/28242076/6121516
fields = element.split(',')
if '--' not in fields[0]:
ids = headers[0][0].split(',')
labels = headers[1][0].split(',')
id_changes = [i for i in range(1,len(ids)) if ids[i]!=ids[i-1]]
id_changes.append(len(ids))
for idx, change in enumerate(id_changes):
row = {'timestamp': fields[0], 'id': ids[change - 1]}
low = max(idx - 1, 0)
row.update(dict(zip(labels[low:change], fields[low+1:change+1])))
print row
yield [row]
Example output:
{'timestamp': u'time1', u'label2': u'data2', u'label3': u'data3', 'id': u'id1', u'label1': u'data1'}
{'timestamp': u'time1', u'label2': u'data5', u'label3': u'data6', 'id': u'id2', u'label1': u'data4'}
{'timestamp': u'time2', u'label2': u'data8', u'label3': u'data9', 'id': u'id1', u'label1': u'data7'}
{'timestamp': u'time2', u'label2': u'data11', u'label3': u'data12', 'id': u'id2', u'label1': u'data10'}
Code used: output.py in the same gist