I wrote a python script that imports market data into a MariaDB database. To speed up the import i decided to use the module threading. So at first a function populates a queue with urls from which data is downloaded and imported into my database. Unfortunately the import function seems to be processed by only one thread instead of many.
import queue
from threading import Thread
num_threads = 4
threads = []
urls = queue.Queue()
def create_url():
...
getlist of items
...
for row in item_list:
url = 'https://someurl=' + str(row[0])
urls.put(url)
return urls
def import_mo(urls):
station_id = 60003760
print(worker.getName())
try:
mariadb_connection = mariadb.connect(allthedbstuff)
cursor = mariadb_connection.cursor()
while (True):
url = urls.get()
print("%s processes %s queue# %s" % (worker.getName(), url, urls.qsize()))
if url == None:
break
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))
jsonraw = s.get(url)
jsondata = ujson.loads(jsonraw.text)
for row in jsondata:
if (row['location_id'] == station_id):
cursor.execute(
'INSERT INTO tbl_mo_tmp (order_id) VALUES (%s)', (row['order_id'], ))
cursor.execute('SELECT order_id from tbl_mo WHERE order_id = %s',
(row['order_id'], ))
exists_mo = cursor.fetchall()
if len(exists_mo) != 0:
# print("updating order#", row['order_id'])
cursor.execute('UPDATE tbl_mo SET volume = %s, price = %s WHERE order_id = %s',
(row['volume_remain'], row['price'], row['order_id'], ))
mariadb_connection.commit()
else:
if (row['location_id'] == station_id):
# print("newly inserting order#", row['order_id'])
cursor.execute('INSERT INTO tbl_mo (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)',
(row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
mariadb_connection.commit()
urls.task_done()
except mariadb.Error as error:
mariadb_connection.rollback() # rollback if any exception occured
finally:
# closing database connection.
if mariadb_connection.is_connected():
cursor.close()
mariadb_connection.close()
def cleanup_mo():
...
do cleanup stuff
...
create_url()
for i in range(num_threads):
worker = Thread(target=import_mo, args=(urls,))
worker.setDaemon(True)
threads.append(worker)
worker.start()
for i in range(num_threads):
urls.put(None)
for worker in threads:
worker.join()
cleanup_mo()
The outputs states in the beginning:
Thread-1
Thread-2
Thread-3
Thread-4
which shows me that 4 individual workers are created, but entering the while loop make it seem only one worker actually processes the urls fetched.
Thread-1 processes https://someurl=2 queue# 32
Thread-1 processes https://someurl=3 queue# 31
Thread-1 processes https://someurl=4 queue# 30
Thread-1 processes https://someurl=5 queue# 29
Thread-1 processes https://someurl=6 queue# 28
Thread-1 processes https://someurl=7 queue# 27
Thread-1 processes https://someurl=8 queue# 26
Thread-1 processes https://someurl=9 queue# 25
Thread-1 processes https://someurl=10 queue# 24
Thread-1 processes https://someurl=11 queue# 23
Thread-1 processes https://someurl=12 queue# 22
Thread-1 processes https://someurl=13 queue# 21
Thread-1 processes https://someurl=14 queue# 20
Thread-1 processes https://someurl=15 queue# 19
Thread-1 processes https://someurl=16 queue# 18
Thread-1 processes https://someurl=17 queue# 17
Thread-1 processes https://someurl=18 queue# 16
I would expect the output look like (ideally):
Thread-1 processes https://someurl=2 queue# 32
Thread-2 processes https://someurl=3 queue# 31
Thread-3 processes https://someurl=4 queue# 30
Thread-4 processes https://someurl=5 queue# 29
What am i missing here?