I'm currently trying to create a table and insert values in an Oracle SQL database.
I managed to make it work using df.to_sql(name=table_name, con=conn, if_exists='append', index=False)
but it took 1h30m to upload a DataFrame of only 10000 rows * 5 columns.
This made me look into Multiprocessing, so I tried following the answer Siddhi Kiran Bajracharya gave in this thread
Which turned out like this:
import pandas as pd
from sqlalchemy import create_engine
import config
LOCATION = r"C:\Oracle\instantclient_19_6"
os.environ["PATH"] = LOCATION + ";" + os.environ["PATH"]
conn = create_engine('oracle+cx_oracle://' + config.user + ':' + config.pw +
'@' + config.host + ':' + config.port + '/?service_name=' + config.db +'?charset=latin-1')
import math
from multiprocessing.dummy import Pool as ThreadPool
def insert_df(df, *args, **kwargs):
nworkers = 4 # number of workers that executes insert in parallel fashion
chunk = math.floor(df.shape[0] / nworkers) # number of chunks
chunks = [(chunk * i, (chunk * i) + chunk) for i in range(nworkers)]
chunks.append((chunk * nworkers, df.shape[0]))
pool = ThreadPool(nworkers)
def worker(chunk):
i, j = chunk
df.iloc[i:j, :].to_sql(*args, **kwargs)
pool.map(worker, chunks)
pool.close()
pool.join()
insert_df(df, f'{table_name}', conn, if_exists='append', index=False)
The problem is that this last code runs for 20mins, only inserts 9 rows into the Table, and then raises the following error DatabaseError: (cx_Oracle.DatabaseError) ORA-00955: name is already used by an existing object
Full Traceback:
---------------------------------------------------------------------------
DatabaseError Traceback (most recent call last)
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1248 self.dialect.do_execute(
-> 1249 cursor, statement, parameters, context
1250 )
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
579 def do_execute(self, cursor, statement, parameters, context=None):
--> 580 cursor.execute(statement, parameters)
581
DatabaseError: ORA-00955: name is already used by an existing object
The above exception was the direct cause of the following exception:
DatabaseError Traceback (most recent call last)
<ipython-input-73-b50275447767> in <module>
20
21
---> 22 insert_df(df, f'{table_name}', conn, if_exists='append', index=False)
<ipython-input-73-b50275447767> in insert_df(df, *args, **kwargs)
14 df.iloc[i:j, :].to_sql(*args, **kwargs)
15
---> 16 pool.map(worker, chunks)
17 pool.close()
18 pool.join()
C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in map(self, func, iterable, chunksize)
266 in a list that is returned.
267 '''
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
269
270 def starmap(self, func, iterable, chunksize=None):
C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in get(self, timeout)
655 return self._value
656 else:
--> 657 raise self._value
658
659 def _set(self, i, obj):
C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
119 job, i, func, args, kwds = task
120 try:
--> 121 result = (True, func(*args, **kwds))
122 except Exception as e:
123 if wrap_exception and func is not _helper_reraises_exception:
C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in mapstar(args)
42
43 def mapstar(args):
---> 44 return list(map(*args))
45
46 def starmapstar(args):
<ipython-input-73-b50275447767> in worker(chunk)
12 def worker(chunk):
13 i, j = chunk
---> 14 df.iloc[i:j, :].to_sql(*args, **kwargs)
15
16 pool.map(worker, chunks)
C:\ProgramData\Anaconda3\lib\site-packages\pandas\core\generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
2710 chunksize=chunksize,
2711 dtype=dtype,
-> 2712 method=method,
2713 )
2714
C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
516 chunksize=chunksize,
517 dtype=dtype,
--> 518 method=method,
519 )
520
C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
1317 dtype=dtype,
1318 )
-> 1319 table.create()
1320 table.insert(chunksize, method=method)
1321 if not name.isdigit() and not name.islower():
C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in create(self)
654 )
655 else:
--> 656 self._execute_create()
657
658 def _execute_insert(self, conn, keys, data_iter):
C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in _execute_create(self)
636 # Inserting table into database, add to MetaData object
637 self.table = self.table.tometadata(self.pd_sql.meta)
--> 638 self.table.create()
639
640 def create(self):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\schema.py in create(self, bind, checkfirst)
868 if bind is None:
869 bind = _bind_or_error(self)
--> 870 bind._run_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
871
872 def drop(self, bind=None, checkfirst=False):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _run_visitor(self, visitorcallable, element, connection, **kwargs)
2044 ):
2045 with self._optional_conn_ctx_manager(connection) as conn:
-> 2046 conn._run_visitor(visitorcallable, element, **kwargs)
2047
2048 class _trans_ctx(object):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _run_visitor(self, visitorcallable, element, **kwargs)
1613
1614 def _run_visitor(self, visitorcallable, element, **kwargs):
-> 1615 visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
1616
1617
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\visitors.py in traverse_single(self, obj, **kw)
136 meth = getattr(v, "visit_%s" % obj.__visit_name__, None)
137 if meth:
--> 138 return meth(obj, **kw)
139
140 def iterate(self, obj):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\ddl.py in visit_table(self, table, create_ok, include_foreign_key_constraints, _is_metadata_operation)
824 table,
825 include_foreign_key_constraints= # noqa
--> 826 include_foreign_key_constraints,
827 )
828 # fmt: on
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object_, *multiparams, **params)
986 raise exc.ObjectNotExecutableError(object_)
987 else:
--> 988 return meth(self, multiparams, params)
989
990 def _execute_function(self, func, multiparams, params):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\ddl.py in _execute_on_connection(self, connection, multiparams, params)
70
71 def _execute_on_connection(self, connection, multiparams, params):
---> 72 return connection._execute_ddl(self, multiparams, params)
73
74 def execute(self, bind=None, target=None):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_ddl(self, ddl, multiparams, params)
1048 compiled,
1049 None,
-> 1050 compiled,
1051 )
1052 if self._has_events or self.engine._has_events:
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1251 except BaseException as e:
1252 self._handle_dbapi_exception(
-> 1253 e, statement, parameters, cursor, context
1254 )
1255
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
1471 util.raise_from_cause(newraise, exc_info)
1472 elif should_wrap:
-> 1473 util.raise_from_cause(sqlalchemy_exception, exc_info)
1474 else:
1475 util.reraise(*exc_info)
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\util\compat.py in raise_from_cause(exception, exc_info)
396 exc_type, exc_value, exc_tb = exc_info
397 cause = exc_value if exc_value is not exception else None
--> 398 reraise(type(exception), exception, tb=exc_tb, cause=cause)
399
400
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
150 value.__cause__ = cause
151 if value.__traceback__ is not tb:
--> 152 raise value.with_traceback(tb)
153 raise value
154
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1247 if not evt_handled:
1248 self.dialect.do_execute(
-> 1249 cursor, statement, parameters, context
1250 )
1251 except BaseException as e:
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
578
579 def do_execute(self, cursor, statement, parameters, context=None):
--> 580 cursor.execute(statement, parameters)
581
582 def do_execute_no_params(self, cursor, statement, context=None):
DatabaseError: (cx_Oracle.DatabaseError) ORA-00955: name is already used by an existing object
[SQL:
CREATE TABLE "TEST_TABLE_DELETE" (
"id" CLOB,
"name" CLOB,
"var1" CLOB,
"var2" CLOB,
"var3" CLOB,
"var4" CLOB,
"var5" CLOB,
"var6" CLOB,
"var7" CLOB,
"var8" CLOB,
"var9" CLOB,
"var10" CLOB,
"var11" CLOB,
"var12" FLOAT,
"var13" CLOB,
"var14" CLOB
)
]
(Background on this error at: http://sqlalche.me/e/4xp6)
Any pointers to help me solve this issue would be greatly appreciated.
Thanks!
Luti