3

I've heard that Python multi-threading is a bit tricky, and I am not sure what is the best way to go about implementing what I need. Let's say I have a function called IO_intensive_function that does some API call which may take a while to get a response.

Say the process of queuing jobs can look something like this:

import thread
for job_args in jobs:
    thread.start_new_thread(IO_intense_function, (job_args))

Would the IO_intense_function now just execute its task in the background and allow me to queue in more jobs?

I also looked at this question, which seems like the approach is to just do the following:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(2)
results = pool.map(IO_intensive_function, jobs)

As I don't need those tasks to communicate with each other, the only goal is to send my API requests as fast as possible. Is this the most efficient way? Thanks.

Edit: The way I am making the API request is through a Thrift service.

user3666197
  • 1
  • 6
  • 50
  • 92
Baiqing
  • 1,223
  • 2
  • 9
  • 21
  • 1
    If the functions aren't cpu intensive only but call network APIs, use [asyncio](https://docs.python.org/3/library/asyncio.html). E.g. pass the 10 functs to `asyncio.gather` to execute them concurrently. Don't be scared by the terse official docs. Search online for better info and tutorials – Pynchia Dec 31 '20 at 06:56
  • 1
    What *specific* issues do you have with the ThreadPool solution in your case? – jfs Dec 31 '20 at 07:22
  • I tried using the threadpool solution, unfortunately it still seems a bit slow so I was wondering if this is the best I can do or is there some other option that's better. – Baiqing Dec 31 '20 at 07:45
  • How do you know the *"IO_intensiveness"* can be latency-masked by a concurrency? Are you sure the nature of the Input-Output intensity does not resolve to a SPoF-bottleneck anyway down the graph of their actual dependencies ( finally waiting for a spinning harddisk to serve any concurrent amount of request but in a pure-[SERIAL] fashion one after another... )? Using threads in GIL-governed ecosystem is in most cases helpful only for low jitter, across-network, off-platform, non-singular remote-resources use-cases, otherwise just wasting local time ( for increased GIL-governed thread-swiching ) – user3666197 Dec 31 '20 at 10:40

2 Answers2

1

For network API request you can use asyncio. Have a look at this article https://realpython.com/python-concurrency/#asyncio-version for an example how to implement it.

assli100
  • 555
  • 3
  • 12
  • 2
    Please read about [answer]. While the contents of this link might actually solve the problem, link-only answers are discouraged on this site. Links tend to break and then the answer loses its value. It is better to include the essential parts of the link as text in the answer itself and provide the link as a reference. Your plain-text answer looks like: `For network API request you can use asyncio. Have a look at this article for an example how to implement it.` which doesn't provide much information. If you think the link alone can help, post it as a comment (if you have 50 rep) instead. – Tomerikoo Dec 31 '20 at 08:25
1

I had to create code to do something similar recently. I've tried to make it generic below. Note I'm a novice coder, so please forgive the inelegance. What you may find valuable, however, is some of the error processing I found it necessary to embed to capture disconnects, etc.

I also found it valuable to perform the json processing in a threaded manner. You have the threads working for you, so why go "serial" again for a processing step when you can extract the info in parallel.

It is possible I will have mis-coded in making it generic. Please don't hesitate to ask follow-ups and I will clarify.

import requests
from multiprocessing.dummy import Pool as ThreadPool
from src_code.config import Config

        with open(Config.API_PATH + '/api_security_key.pem') as f:
            my_key = f.read().rstrip("\n")
            f.close()
        base_url = "https://api.my_api_destination.com/v1"
        headers = {"Authorization": "Bearer %s" % my_key}
        itm = list()
        itm.append(base_url)
        itm.append(headers)


        def call_API(call_var):
            base_url = call_var[0]
            headers = call_var[1]
            call_specific_tag = call_var[2]

            endpoint = f'/api_path/{call_specific_tag}'

            connection_tries = 0
            for i in range(3):
                try:
                    dat = requests.get((base_url + endpoint), headers=headers).json()
                except:
                    connection_tries += 1
                    print(f'Call for {api_specific_tag} failed after {i} attempt(s).  Pausing for 240 seconds.')
                    time.sleep(240)
                else:
                    break

            tag = list()
            vars_to_capture_01 = list()
            vars_to_capture_02 = list()

            connection_tries = 0

            try:
                if 'record_id' in dat:
                    vars_to_capture_01.append(dat['record_id'])
                    vars_to_capture_02.append(dat['second_item_of_interest'])
                else:
                    vars_to_capture_01.append(call_specific_tag)
                    print(f'Call specific tag {call_specific_tag} is unavailable.  Successful pull.')
                    vars_to_capture_02.append(-1)

            except:
                    print(f'{call_specific_tag} is unavailable.  Unsuccessful pull.')
                    vars_to_capture_01.append(call_specific_tag)
                    vars_to_capture_02.append(-1)
                    time.sleep(240)

            pack = list()
            pack.append(vars_to_capture_01)
            pack.append(vars_to_capture_02)

            return pack

        vars_to_capture_01 = list()
        vars_to_capture_02 = list()

        i = 0
        max_i = len(all_tags)
        while i < max_i:
            ind_rng = range(i, min((i + 10), (max_i)), 1)
            itm_lst = (itm.copy())
            call_var = [itm_lst + [all_tags[q]] for q in ind_rng]
            #packed = call_API(call_var[0]) # for testing of function without pooling
            pool = ThreadPool(len(call_var))
            packed = pool.map(call_API, call_var)
            pool.close()
            pool.join()
            for pack in packed:
                try:
                    vars_to_capture_01.append(pack[0][0])
                except:
                    print(f'Unpacking error for {all_tags[i]}.')
                vars_to_capture_02.append(pack[1][0])
C. Cooney
  • 471
  • 5
  • 19
  • I should add..I originally went down the asyncio route as well. I found it to be far more confusing and complex than advertised despite the "helpful" videos. No doubt I will return to it as my knowledge expands (and what is difficult today becomes simpler tomorrow). – C. Cooney Dec 31 '20 at 11:59