0

I'm new with this, please don't judge much. I'm struggling with a problem when I try to load a table from source to target database. I'll try to describe what I'm doing and put additional info to each step.

The task is to extract a table from one database(Postgres), count md5 hash of concatenated row, add this hash to new column and load the result table to another database(Postgres) using Python, pandas, psycopg2.

My approach may not be the best and efficient (so I would accept any suggestions with great pleasure). The steps I've taken are as follows:

  1. To fetch data I use simple pandas function read_sql_query and load it to dataframe. Since the table I work with may count up to 10-20 mil. rows, I fetch data in chunks:
copy_to_target_stmt = f"COPY {kwargs['target_schema']}.{kwargs['target_table']} \
 ({', '.join(kwargs['target_columns'])}) \
 FROM STDIN WITH (FORMAT csv, DELIMITER E'\x1f', NULL 'None')"

for src_chunk in pd.read_sql_query(con=kwargs['src_conn'], chunksize=chunk_size):
     data_iterator = get_data_iterator(chunk_with_md5(src_chunk))
     kwargs['target_cursor'].copy_expert(copy_to_target_stmt, data_iterator)

where get_data_iterator is

def get_data_iterator(data):
    data = [tuple(row) for row in data.itertuples(index=False)]
    return StringIteratorIO((
            "\x1f".join(map(str, row)) + '\r\n'
            for row in data
        ))

and for StringIteratorIO I inspired from StackOverflow here:

import io

class StringIteratorIO(io.TextIOBase):

    def __init__(self, iter):
        self._iter = iter
        self._left = ''

    def readable(self):
        return True

    def _read1(self, n=None):
        while not self._left:
            try:
                self._left = next(self._iter)
            except StopIteration:
                break
        ret = self._left[:n]
        self._left = self._left[len(ret):]
        return ret

    def read(self, n=None):
        l = []
        if n is None or n < 0:
            while True:
                m = self._read1()
                if not m:
                    break
                l.append(m)
        else:
            while n > 0:
                m = self._read1(n)
                if not m:
                    break
                n -= len(m)
                l.append(m)
        return ''.join(l)

    def readline(self):
        l = []
        while True:
            i = self._left.find('\n')
            if i == -1:
                l.append(self._left)
                try:
                    self._left = next(self._iter)
                except StopIteration:
                    self._left = ''
                    break
            else:
                l.append(self._left[:i+1])
                self._left = self._left[i+1:]
                break
        return ''.join(l)

chunk_with_md5 is a function that counts md5 hash (as I mentioned earlier) using numpy.

  1. Loading to target can be seen from the code above (I use copy_expert from psycopg)

The problem is, that a column with text values may have single unterminated double quote and I get this error:

kwargs['target_cursor'].copy_expert(copy_to_target_stmt, data_iterator)
psycopg2.errors.BadCopyFileFormat: unterminated CSV quoted field
CONTEXT:  COPY student_classification, line 100001: "348734You could have used "Silhouette method here ..."

(edit: It also doesn't concatenate the row properly in copy statement, the columns should be delimited by \x1f)

So, the double quote before Silhouette is not closed and this triggers the error. I think I've tried everything including function text from sqlalchemy, replacing " with \", but nothing seems to work. Changing data by removing those double quotes is not the best solution in my case.

Are there ways to ignore those characters that breaks the string?

Thank you very much in advance!

Kiki
  • 11
  • 2
  • 1) Why use Pandas? Just use `psycopg2` directly, it will save you a lot of overhead. 2) If you use `psycopg2` use its its built in [COPY](https://www.psycopg.org/docs/usage.html#using-copy-to-and-copy-from) support. 3) **DO NOT** use `f` strings use the the `psycopg2` [sql](https://www.psycopg.org/docs/sql.html) module to build dynamic SQL. – Adrian Klaver Mar 09 '23 at 21:57
  • 1) Define further what, *count md5 hash of concatenated row,* means? In other words provide example data. 2) In meantime would: `select md5((tbl.*::text)) from tbl` work? 3) If so just `COPY` the table to the other table as is, add the column and then `update the col = md5((tbl.*::text)) ` – Adrian Klaver Mar 09 '23 at 23:58
  • @AdrianKlaver by concatenated row i mean, that I merge every field of a row to one string and then calculate md5 hash of that string. Like name|age|city: John|39|London -> concatenated string is "John39London" – Kiki Mar 10 '23 at 09:32
  • @AdrianKlaver I'm working on my semestral project, where I need to build ETL. So I use pandas to transfrom data – Kiki Mar 10 '23 at 09:35
  • If anyone encounters the same problem with escaping just one double quote in similar case, I've come up with a crutch: string.replace('"', '""""').replace('"', '\"'). – Kiki Mar 10 '23 at 15:28

0 Answers0