1

I am trying to run my cloud dataflow code from cloud shell I am suing following code

 from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import psycopg2
from io import StringIO
import argparse
import logging
import sys
import re
import csv
conn = psycopg2.connect("dbname='db_bio' user='postgres' host='*****' port='5432' password='poui19956'") 

class scrip_val(beam.DoFn):
    def process(self, element):
        f = StringIO(element)
        print(type(f))
        reader = csv.reader(f, delimiter=',')
        for row in reader:
            cur.execute("insert into bio values(%s,%s,%s,%s,%s)",row)
            conn.commit()
        return [len(element)]
def run():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://pydataflow',
                        help='Input file to process.')
   
    known_args, pipeline_args = parser.parse_known_args()
    
    with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:
        lines = p | 'read' >> ReadFromText(known_args.input)
        (lines
        | 'words'>> beam.ParDo(scrip_val())
        )
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()`

But when I am trying to run this code from cloud shell using this command

python -m  bulksumlog --input [Bucket_location]  --runner DataflowRunner --project [Project_id] --temp_locat
ion [temo_Bucket_location] --save_main_session True

I am getting following error

 return dill.load_session(file_path)
  File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 402, in load_session
    module = unpickler.load()
  File "/usr/lib/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1096, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 465, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1130, in find_class
    __import__(module)
ImportError: No module named psycopg2.extensions

I installed all python library related to psycopg2

Dharman
  • 30,962
  • 25
  • 85
  • 135

1 Answers1

0

We should run with psycopg2-binary. In my case I had psycopg2-binary installed (using pip) and still this error. I found help in Fernando Munoz answer here. Steps for me were: a) pip uninstall psycopg2, b) pip uninstall psycopg2-binary, c) sudo apt install build-dep python-psycopg2 (prerequisities on Debian), d) pip install psycopg2-binary

mirek
  • 1,140
  • 11
  • 10