I'm new with this, please don't judge much. I'm struggling with a problem when I try to load a table from source to target database. I'll try to describe what I'm doing and put additional info to each step.
The task is to extract a table from one database(Postgres), count md5 hash of concatenated row, add this hash to new column and load the result table to another database(Postgres) using Python, pandas, psycopg2.
My approach may not be the best and efficient (so I would accept any suggestions with great pleasure). The steps I've taken are as follows:
- To fetch data I use simple pandas function read_sql_query and load it to dataframe. Since the table I work with may count up to 10-20 mil. rows, I fetch data in chunks:
copy_to_target_stmt = f"COPY {kwargs['target_schema']}.{kwargs['target_table']} \
({', '.join(kwargs['target_columns'])}) \
FROM STDIN WITH (FORMAT csv, DELIMITER E'\x1f', NULL 'None')"
for src_chunk in pd.read_sql_query(con=kwargs['src_conn'], chunksize=chunk_size):
data_iterator = get_data_iterator(chunk_with_md5(src_chunk))
kwargs['target_cursor'].copy_expert(copy_to_target_stmt, data_iterator)
where get_data_iterator is
def get_data_iterator(data):
data = [tuple(row) for row in data.itertuples(index=False)]
return StringIteratorIO((
"\x1f".join(map(str, row)) + '\r\n'
for row in data
))
and for StringIteratorIO I inspired from StackOverflow here:
import io
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter):
self._iter = iter
self._left = ''
def readable(self):
return True
def _read1(self, n=None):
while not self._left:
try:
self._left = next(self._iter)
except StopIteration:
break
ret = self._left[:n]
self._left = self._left[len(ret):]
return ret
def read(self, n=None):
l = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
l.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
l.append(m)
return ''.join(l)
def readline(self):
l = []
while True:
i = self._left.find('\n')
if i == -1:
l.append(self._left)
try:
self._left = next(self._iter)
except StopIteration:
self._left = ''
break
else:
l.append(self._left[:i+1])
self._left = self._left[i+1:]
break
return ''.join(l)
chunk_with_md5 is a function that counts md5 hash (as I mentioned earlier) using numpy.
- Loading to target can be seen from the code above (I use copy_expert from psycopg)
The problem is, that a column with text values may have single unterminated double quote and I get this error:
kwargs['target_cursor'].copy_expert(copy_to_target_stmt, data_iterator)
psycopg2.errors.BadCopyFileFormat: unterminated CSV quoted field
CONTEXT: COPY student_classification, line 100001: "348734You could have used "Silhouette method here ..."
(edit: It also doesn't concatenate the row properly in copy statement, the columns should be delimited by \x1f)
So, the double quote before Silhouette is not closed and this triggers the error. I think I've tried everything including function text from sqlalchemy, replacing " with \", but nothing seems to work. Changing data by removing those double quotes is not the best solution in my case.
Are there ways to ignore those characters that breaks the string?
Thank you very much in advance!