0

I wrote a python script that imports market data into a MariaDB database. To speed up the import i decided to use the module threading. So at first a function populates a queue with urls from which data is downloaded and imported into my database. Unfortunately the import function seems to be processed by only one thread instead of many.

import queue
from threading import Thread

num_threads = 4
threads = []
urls = queue.Queue()

def create_url():    

   ...
   getlist of items
   ...

   for row in item_list:
      url = 'https://someurl=' + str(row[0])
      urls.put(url)

   return urls


def import_mo(urls):
    station_id = 60003760

    print(worker.getName())

    try:
        mariadb_connection = mariadb.connect(allthedbstuff)
        cursor = mariadb_connection.cursor()

        while (True):
            url = urls.get()
            print("%s processes %s queue# %s" % (worker.getName(), url, urls.qsize()))
            if url == None:
                break
            s = requests.Session()
            retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
            s.mount('https://', HTTPAdapter(max_retries=retries))
            jsonraw = s.get(url)
            jsondata = ujson.loads(jsonraw.text)

            for row in jsondata:
                if (row['location_id'] == station_id):
                    cursor.execute(
                        'INSERT INTO tbl_mo_tmp (order_id) VALUES (%s)', (row['order_id'], ))

                cursor.execute('SELECT order_id from tbl_mo WHERE order_id = %s',
                               (row['order_id'], ))
                exists_mo = cursor.fetchall()

                if len(exists_mo) != 0:
                    # print("updating order#", row['order_id'])
                    cursor.execute('UPDATE tbl_mo SET volume = %s, price = %s WHERE order_id = %s',
                                   (row['volume_remain'], row['price'], row['order_id'], ))
                    mariadb_connection.commit()
                else:
                    if (row['location_id'] == station_id):
                        # print("newly inserting order#", row['order_id'])
                        cursor.execute('INSERT INTO tbl_mo (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)',
                                       (row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
                    mariadb_connection.commit()
            urls.task_done()

    except mariadb.Error as error:
        mariadb_connection.rollback()  # rollback if any exception occured

    finally:
        # closing database connection.
        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()

def cleanup_mo():
   ...
   do cleanup stuff
   ...

create_url()

for i in range(num_threads):
    worker = Thread(target=import_mo, args=(urls,))
    worker.setDaemon(True)
    threads.append(worker)
    worker.start()


for i in range(num_threads):
    urls.put(None)

for worker in threads:
    worker.join()

cleanup_mo()

The outputs states in the beginning:

Thread-1
Thread-2
Thread-3
Thread-4

which shows me that 4 individual workers are created, but entering the while loop make it seem only one worker actually processes the urls fetched.

Thread-1 processes https://someurl=2 queue# 32
Thread-1 processes https://someurl=3 queue# 31
Thread-1 processes https://someurl=4 queue# 30
Thread-1 processes https://someurl=5 queue# 29
Thread-1 processes https://someurl=6 queue# 28
Thread-1 processes https://someurl=7 queue# 27
Thread-1 processes https://someurl=8 queue# 26
Thread-1 processes https://someurl=9 queue# 25
Thread-1 processes https://someurl=10 queue# 24
Thread-1 processes https://someurl=11 queue# 23
Thread-1 processes https://someurl=12 queue# 22
Thread-1 processes https://someurl=13 queue# 21
Thread-1 processes https://someurl=14 queue# 20
Thread-1 processes https://someurl=15 queue# 19
Thread-1 processes https://someurl=16 queue# 18
Thread-1 processes https://someurl=17 queue# 17
Thread-1 processes https://someurl=18 queue# 16

I would expect the output look like (ideally):

Thread-1 processes https://someurl=2 queue# 32
Thread-2 processes https://someurl=3 queue# 31
Thread-3 processes https://someurl=4 queue# 30
Thread-4 processes https://someurl=5 queue# 29

What am i missing here?

Greg
  • 27
  • 5
  • Have you tried breaking up how it calls the url list? Right now its feeding all the urls to each thread at once. Here is an example showing what I mean: https://stackoverflow.com/questions/16181121/a-very-simple-multithreading-parallel-url-fetching-without-queue – D.Sanders Apr 10 '19 at 12:11
  • @D.Sanders I don't quite understand what you mean!? From my understanding thats what a queue is for. Each thread should be able to to take its input from it for processing. – Greg Apr 10 '19 at 12:26
  • I think that your program is indeed multithreaded, its just that printing `worker.getName()` prints the same name. – quamrana Apr 10 '19 at 12:36

1 Answers1

0

To print different 'names' for each worker:

def import_mo(i, urls):
    station_id = 60003760

    print('Worker', i)
    # etc
    # later:
        print("Worker %s processes %s queue# %s" % (i, url, urls.qsize()))

and creating threads:

for i in range(num_threads):
    worker = Thread(target=import_mo, args=(i,urls,))
    worker.setDaemon(True)
    threads.append(worker)
    worker.start()
quamrana
  • 37,849
  • 12
  • 53
  • 71
  • That's correct. My program is indeed multithreaded. It was just the function worker.getName() that always returned 'Thread-1'. Implementing an actual thread counter did the trick. Now i am enlightened Thank you! – Greg Apr 10 '19 at 12:59