2

I have an input file which will contain a long list of URLs. Lets assume this in mylines.txt:

https://yahoo.com
https://google.com
https://facebook.com
https://twitter.com

What I need to do is:

  1. Read a line from the input file mylines.txt

  2. Execute myFun function. Which will perform some tasks. And produce an output that consists of a line. It is more complex in my real code. But something like this in concept.

  3. Write the output to the results.txt file

Since I have large input. I need to leverage python multithreading. I looked at this good post here. But unfortunately, it assumes input in a simple list, and does not assume I want to write the output of the function in a file.

I need to ensure that each input's output is written in a single line (i.e. the danger if multithreads are writing to the same line so I get incorrect data).

I tried to mess around. But no success. I did not use python's multithreading before but it is time to learn as it is inevitable in my case. I have a very long list which can not finish in a reasonable time without multithreading. My function will not do this simple task, but more operations that are not necessary for the concept.

Here is my attempt. Please correct me (in the code itself):

import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import Queue

def myFunc(url):
        response = requests.get(url, verify=False ,timeout=(2, 5))
        results = open("myresults","a") # "a" to append results
        results.write("url is:",url, ", response is:", response.url)
        results.close()

worker_data = open("mylines.txt","r") # open my input file.

#load up a queue with your data, this will handle locking
q = Queue.Queue()

for url in worker_data:
    q.put(url)

# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)

# close the pool and wait for the work to finish
pool.close()
pool.join()

Q: How to fix the above code (please be concise and help me in the code itself) to read a line from the input file, execute the function, write the result associated with the input in a line using python multithreading to execute the requests concurrently so I can finish my list in a reasonable time.

UPDATE:

Based on the answer, the code becomes:

import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import queue
from multiprocessing import Queue

def myFunc(url):
    response = requests.get(url, verify=False ,timeout=(2, 5))
    return "url is:" + url + ", response is:" + response.url

worker_data = open("mylines.txt","r") # open my input file.

#load up a queue with your data, this will handle locking
q = queue.Queue(4)
with open("mylines.txt","r") as f: # open my input file.
    for url in f:
        q.put(url)

# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)

with open("myresults","w") as f:
    for line in results:
        f.write(line + '\n')

The mylines.txt contains:

https://yahoo.com
https://www.google.com
https://facebook.com
https://twitter.com

Note that I first was using:

import Queue

And: q = Queue.Queue(4)

But got an error saying:

Traceback (most recent call last):
  File "test3.py", line 4, in <module>
    import Queue
ModuleNotFoundError: No module named 'Queue'

Based on some search I changed to:

import queue

And the concerned line to: q = queue.Queue(4)

I also added:

from multiprocessing import Queue

But nothing works. Can any expert in python multithreading help?

bad_coder
  • 11,289
  • 20
  • 44
  • 72
user9371654
  • 2,160
  • 16
  • 45
  • 78

2 Answers2

2

You should change your function to return a string:

def myFunc(url):
    response = requests.get(url, verify=False ,timeout=(2, 5))
    return "url is:" + url + ", response is:" + response.url

and write these strings to the file later:

results = pool.map(myFunc, q)

with open("myresults","w") as f:
    for line in results:
        f.write(line + '\n')

This keeps the multithreading working for the requests.get, but serialises the writing the results to the output file.

Update:

And you should also make use of with for reading the input file:

#load up a queue with your data, this will handle locking
q = Queue.Queue()

with open("mylines.txt","r") as f: # open my input file.
    for url in f:
        q.put(url)
quamrana
  • 37,849
  • 12
  • 53
  • 71
  • Thanks. Will this work in millions of lines in the file? – user9371654 Mar 01 '19 at 10:41
  • That's a different question. Perhaps you could gather all this code up, make sure it works for small input files and then try it for your much larger file. If there is a problem, then you could post your symptoms as a new question. – quamrana Mar 01 '19 at 10:43
  • I made the suggested changes. I get: `ModuleNotFoundError: No module named 'Queue'` which is not clear why? – user9371654 Mar 01 '19 at 10:52
  • My original code was not working. I updated my code based on your suggestions and tried some solutions to solve the Queue issue. I end up with no errors bu hangs and no output at all. I use python 3.6 and some posts says it is small letter (queue) in 3.6. – user9371654 Mar 01 '19 at 11:07
2

Rather than have the worker pool threads print the result out, which is not guaranteed to buffer the output correctly, instead create one more thread, which reads results from a second Queue and prints them.

I've modified your solution so it builds its own threadpool of workers. There's little point giving the queue an inifinite length, since the main thread will block when the queue reaches maximum size: you only need it to be long enough to make sure there's always work to be processed by the worker threads - the main thread will block and unblock as the queue size increases and decreases.

It also identifies the thread responsible for each item on the output queue, which should give you some confidence that the multithreading approach is working, and prints the response code from the server. I found I had to strip the newlines from the URLs.

Since now only one thread is writing to the file, writes are always perfectly in sync and there is no chance of them interfering with each other.

import threading
import requests
import queue
POOL_SIZE = 4

def myFunc(inq, outq):  # worker thread deals only with queues
    while True:
        url = inq.get()  # Blocks until something available
        if url is None:
            break
        response = requests.get(url.strip(), timeout=(2, 5))
        outq.put((url, response, threading.currentThread().name))


class Writer(threading.Thread):
    def __init__(self, q):
        super().__init__()
        self.results = open("myresults","a") # "a" to append results
        self.queue = q
    def run(self):
        while True:
            url, response, threadname = self.queue.get()
            if response is None:
                self.results.close()
                break
            print("****url is:",url, ", response is:", response.status_code, response.url, "thread", threadname, file=self.results)

#load up a queue with your data, this will handle locking
inq = queue.Queue()  # could usefully limit queue size here
outq = queue.Queue()

# start the Writer
writer = Writer(outq)
writer.start()

# make the Pool of workers
threads = []
for i in range(POOL_SIZE):
    thread = threading.Thread(target=myFunc, name=f"worker{i}", args=(inq, outq))
    thread.start()
    threads.append(thread)

# push the work onto the queues
with open("mylines.txt","r") as worker_data: # open my input file.
    for url in worker_data:
        inq.put(url.strip())
for thread in threads:
    inq.put(None)

# close the pool and wait for the workers to finish
for thread in threads:
    thread.join()

# Terminate the writer
outq.put((None, None, None))
writer.join()

Using the data given in mylines.txt I see the following output:

****url is: https://www.google.com , response is: 200 https://www.google.com/ thread worker1
****url is: https://twitter.com , response is: 200 https://twitter.com/ thread worker2
****url is: https://facebook.com , response is: 200 https://www.facebook.com/ thread worker0
****url is: https://www.censys.io , response is: 200 https://censys.io/ thread worker1
****url is: https://yahoo.com , response is: 200 https://uk.yahoo.com/?p=us thread worker3
holdenweb
  • 33,305
  • 7
  • 57
  • 77
  • Have you run it in your side? It hangs for ever with me. The cursor just blinking and it hangs. – user9371654 Mar 01 '19 at 12:01
  • After clicking CTRL+C to quit, I got this: `^CException ignored in: Traceback (most recent call last): File "/usr/lib/python3.6/threading.py", line 1294, in _shutdown t.join() File "/usr/lib/python3.6/threading.py", line 1056, in join self._wait_for_tstate_lock() File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock elif lock.acquire(block, timeout): KeyboardInterrupt` – user9371654 Mar 01 '19 at 12:04
  • I am testing it in 5 lines input as in the question. I use `python3` command and my system in Ubunut 18.04. I see the output file created but nothing written on it. The python program never ends. Even without indicator, the cursor should stop but now just blinking. – user9371654 Mar 01 '19 at 12:13
  • It seems to work. Will make sure. But my problem in copying incorrectly. – user9371654 Mar 01 '19 at 12:32
  • It runs but incorrectly. The output should not be repeated. It should only performs the requests.get once per URL read in the file. Why is it repeating? – user9371654 Mar 01 '19 at 12:36
  • Some final mods to stop using the global `url` and pass it through in the output queue instead. Glad you got there! – holdenweb Mar 01 '19 at 12:36
  • The ****url is: should be the URL read from the file. But in your code it repeats the same first line. – user9371654 Mar 01 '19 at 12:37
  • In that version of the code, `url` was a global, and so it used the value from the main thread. Since the main thread had by then processed all the urls, it showed the last one. The final modification overcomes that issue by passing the URL value through the output queue. – holdenweb Mar 01 '19 at 12:39
  • thanks. What is the maximum queue size? Can it take 1M line? Why you add `f` before worker? `name=f"worker{i}"` – user9371654 Mar 01 '19 at 12:43
  • [This link](https://docs.python.org/3/library/queue.html#queue.Queue) explains the maximum queue size is "infinite," but you can set a maximum size when you create it. The `f"worker{i}"` notation was new in Python 3.6 - the expressions inside the braces are evaluated and formatted. – holdenweb Mar 01 '19 at 12:50
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/189264/discussion-between-holdenweb-and-user9371654). – holdenweb Mar 01 '19 at 16:07