0

I have a csv file with the first couple lines as ids and labels with the rest of the lines being the actual data. What would be the best way to share the first couple lines with the map function for the transformation of the subsequent lines with the actual data? Overall I am doing something similar to this question, but I don't just have labels up top, I also have an additional row of IDs.

The data look something like this:

-- ,id1 , id1 , id1 , id2 , id2 , id2
-- ,label,label,label,label,label,label
time1,data, data, data, data, data, data
time2,data, data, data, data, data, data

Then for each unique id i want to write an id/time/dataobject record to bigquery.

Basically I am assuming I need to have an intermediate pipeline step which turns the file into multiple pcollections where i could have the next step actually transform all of the file rows based on the values of the top rows. If this is the case, what is the best way to accomplish that? If not, what other was can I provide the values from the first couple rows to the map function for the other row items?

jumpdart
  • 1,702
  • 16
  • 35

1 Answers1

3

One possible solution would be to modify the custom source from the previous question. Otherwise, you can do an initial pass through the data to save the headers as a side input for the main processing step:

input = p | 'Read CSV file' >> ReadFromText("input.csv")
headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn())
rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers))

where ParseHeadersFn checks if the row starts with -- to qualify as header and discards that first field if true as it's not needed:

class ParseHeadersFn(beam.DoFn):
    """ParDo to output only the headers"""
    def process(self, element):
        if '--' in element.split(',')[0]:
          yield [','.join(element.split(',')[1:])]

Then, within ParseRowsFn, we can access the headers side input:

class ParseRowsFn(beam.DoFn):
    """ParDo to process data rows according to header metadata"""
    def process(self, element, headers):
      if 'time1' in element.split(',')[0]:
        for id in headers[0]:
          print 'ids: ' + id
        for label in headers[1]:
          print 'labels: ' + label

Note that I assume that the id row will come before the label one but that might not be true as Dataflow is a distributed system. It would be better to do some stronger checking.

If our input.csv is:

--,id1,id1,id1,id2,id2,id2
--,label1,label2,label3,label1,label2,label3
time1,data1,data2,data3,data4,data5,data6
time2,data7,data8,data9,data10,data11,data12

Example output:

ids: id1 , id1 , id1 , id2 , id2 , id2
labels: label1,label2,label3,label1,label2,label3

Code used: script.py in this gist

ParseRowsFn can be modified with dict(zip(...)) to get the desired output but I am not sure I understood the output structure. Do you need something like this?

id1,time1,data1,data2,data3
id1,time2,data7,data8,data9
id2,time1,data4,data5,data6
id2,time2,data10,data11,data12

If that's the case we can use the trick in this answer to determine where an ID changes and act accordingly:

class ParseRowsFn(beam.DoFn):
    """ParDo to process data rows according to header metadata"""
    def process(self, element, headers):
      # changing ids as per https://stackoverflow.com/a/28242076/6121516
      fields = element.split(',')

      if '--' not in fields[0]:
        ids = headers[0][0].split(',')
        labels = headers[1][0].split(',')
        id_changes = [i for i in range(1,len(ids)) if ids[i]!=ids[i-1]]
        id_changes.append(len(ids))

        for idx, change in enumerate(id_changes):
          row = {'timestamp': fields[0], 'id': ids[change - 1]}
          low = max(idx - 1, 0)
          row.update(dict(zip(labels[low:change], fields[low+1:change+1])))
          print row
          yield [row]

Example output:

{'timestamp': u'time1', u'label2': u'data2', u'label3': u'data3', 'id': u'id1', u'label1': u'data1'}
{'timestamp': u'time1', u'label2': u'data5', u'label3': u'data6', 'id': u'id2', u'label1': u'data4'}
{'timestamp': u'time2', u'label2': u'data8', u'label3': u'data9', 'id': u'id1', u'label1': u'data7'}
{'timestamp': u'time2', u'label2': u'data11', u'label3': u'data12', 'id': u'id2', u'label1': u'data10'}

Code used: output.py in the same gist

Guillem Xercavins
  • 6,938
  • 1
  • 16
  • 35
  • 1
    i might have sent you down the wrong path with the ids stuff... mainly just want the different header rows available to the map function of the data rows. This seems to do the trick. My only question would be if there would be any significant performance issues in doing the ParDo on each item just to get the first two? Just about to test it – jumpdart Dec 04 '19 at 19:56
  • No problem, that's why I shared a minimal sample that makes the metadata available and the extended one which shows a possible use case. Indeed, it feels a little bit redundant to read the data twice but other solutions that come to my mind would have other problems (such as ensuring that headers are parsed before reading the data). The most effective one would be to modify the custom source to use a csvreader that accepts multi-line headers – Guillem Xercavins Dec 05 '19 at 16:56