2

I am trying to make a dict from csv data in python, I do not want to use the traditional split(',') and then using renaming the rows to the heading I would like, as I will be recieving different csv files with different amounts of information, and I will not be able to consistently target the rows I want with that method.

THE HEADER NAMES WILL BE CONSISTENT, just their maybe more headers in one file compared to another

Instead, I have been trying to formulate a list from the CSV file, then zipping the first row into the rest of the rows to create a dictionary, then I can extract the exact contents I want.

I can create a list of lists, by either using the csv.reader or :

class Split(beam.DoFn):
    def process(self, element):
        rows = element.splitlines()
        data = []
        for row in rows:
            data.append([row])
        return data

This returns:

[u'FIRST_NAME,last_name,birthdate,voter_id,phone_number']
[u'hector,ABAD,6/15/1970,11*******,7*********']
[u'm,ABAL,6/16/1949,12********,']
[u'jorge,ABDALA,6/15/1962,21********,3********']
[u'karen,ABELLA,6/18/1988,33********,']

Although when I try to access the first row via:

rows = element.splitlines()
data = []
for row in rows:
    # f = pattern.findall(row)
    data.append([row])
return data[0]

It returns:

FIRST_NAME,last_name,birthdate,voter_id,phone_number
hector,ABAD,6/15/1970,11*******,7*********
m,ABAL,6/16/1949,109055849,
jorge,ABDALA,6/15/1962,21********,3********
karen,ABELLA,6/18/1988,33********,

I have also tried the beam_utils csv reader although this says that there is no module named 'sources' after I fix the fileio bug.

If someone knows a better way or can point me towards what I'm doing wrong that would be great, also this is my pipeline:

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'Read' >> ReadFromText(known_args.input)
     | 'Split Values' >> beam.ParDo(Split())
     | 'WriteToText' >> beam.io.WriteToText(known_args.output)) 

I am only reading from my google-cloud storage bucket for now, but in the future it will be from pubsub.

I would like the content to look like:

{"FIRST_NAME": "hector", "last_name": "ABAD", "birthdate": "6/15/1970", "voter_id": 11*******, "phone_number": 7*********}
etc.
etc.
etc.
Mark Tolonen
  • 166,664
  • 26
  • 169
  • 251
jmoore255
  • 321
  • 4
  • 15

2 Answers2

2

Processing the header element of csv files doesn't seem to be well supported by the python beam SDK (other than discarding it). Fortunately someone has created this repo for dealing with this use case: https://github.com/pabloem/beam_utils

It contains a CSVFileSource class extending FileBasedSource (Beam's abstract class for creating custom file sources) to create your dict from the file with variable headers.

Install:

pip install beam_utils
from beam_utils.sources import CsvFileSource

It can be used like:

 p | 'ReadCsvFile' >> beam.io.Read(CsvFileSource(known_args.input))

Should produce the output you're looking for.

Edit: To make the package available to Dataflow workers create a tar and provide to the job with --extra_package flag as in https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#local-or-nonpypi

Ryan M
  • 344
  • 1
  • 6
  • Thanks Ryan, I'm running this right now, although I have tried to use this previously, and the first error was fileio was not found, so I switched fireio to filesystem, athough then it produces the error `ImportError: No module named beam_utils.sources`, thus after I initialize the application with a requirements.txt file and it returns the first error. – jmoore255 Jun 27 '18 at 02:24
  • Try this: "pip install git+https://github.com/pabloem/beam_utils --upgrade" The master branch contains a fix for that issue which isn't in the pip package. Working for me now. – Ryan M Jun 27 '18 at 03:09
  • I'm still getting the `No module named beam_utils.sources`, although I had the same problem with the phonenumbers module, and that was fixed through a requirements.txt file, so I'm targetting the github module in my requirements.txt file and hopefully it works. Also when doing the pip install I used `pip install git+https://github.com/pabloem/beam_utils --upgrade`, but that shouldn't make a difference right? As the way you put it gave me `parse error at "'+github.'"` – jmoore255 Jun 27 '18 at 03:47
  • Have a look at this https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#local-or-nonpypi Although beam-utils is in the PyPI, its the old version. So will need to follow steps under "Local or non-PyPI Dependencies" to make the correct version available to the dataflow workers. – Ryan M Jun 27 '18 at 04:25
-2

Check out python library module csv.DictReader: https://docs.python.org/2/library/csv.html#csv.DictReader

Copying the example from documentation for quick reference

>>> import csv
>>> with open('names.csv') as csvfile:
...     reader = csv.DictReader(csvfile)
...     for row in reader:
...         print(row['first_name'], row['last_name'])
Priyeshj
  • 1,295
  • 2
  • 17
  • 32
  • Thanks @Priyeshj, I've already checked that out, the thing is I'm reading from an element within the function, thus I tried: ```with element as csvfile: reader = csv.DictReader(csvfile) etc``` But it just returned `AttributeError: __exit__ [while running 'Split Values']` – jmoore255 Jun 27 '18 at 02:11