0

When I try to crawl thesis information in multiple threads, I cannot close the process after getting the information:

error And when I comment the code which function is get the information from network, these processes can end normally. normal This error is trouble me and I don't have any idea, my network connect is by requests and set the response.close() so can any handsome brother or beautiful lady help this confused person? Thanks

This is whole code: my python is python 3.7


from multiprocessing import Process, Queue, Pool,Manager,Value
import time, random
import requests
import re
from bs4 import BeautifulSoup

headers = {
    'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36,Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36",
    'Connection': 'close'
}

## Just get the html text
def GetUrlInfo(url):
    response = requests.get(url=url, headers=headers)
    response.encoding = 'utf-8'
    response.close()
    SoupData = BeautifulSoup(response.text, 'lxml')
    return SoupData

def GetVolumeUrlfromUrl(url:str)->str:
    """input is Journal's url and output is a link and a text description to each issue of the journal"""
    url = re.sub('http:', 'https:', url)
    SoupDataTemp = GetUrlInfo(url+'index.html')
    SoupData = SoupDataTemp.find_all('li')
    UrlALL = []
    for i in SoupData:
        if i.find('a') != None:
            volumeUrlRule = '<a href=\"(.*?)\">(.*?)</a>'
            volumeUrlTemp = re.findall(volumeUrlRule,str(i),re.I)
            # u = i.find('a')['href']
            # # print(u)
            for u in volumeUrlTemp:
                if re.findall(url, u[0]):
                    # print(u)
                    UrlALL.append((u[0], u[1]), )
    # print(UrlALL)
    return UrlALL

def GetPaperBaseInfoFromUrlAll(url:str)->str:
    """The input is the url and the output is all the paper information obtained from the web page,
    including, doi, title, author, and the date about this volume """
    soup = GetUrlInfo(url)
    temp1 = soup.find_all('li',class_='entry article')
    temp2= soup.find_all('h2')
    temp2=re.sub('\\n',' ',temp2[1].text)
    # print(temp2)
    volumeYear = re.split(' ',temp2)[-1]
    paper = []
    for i in temp1:
        if i.find('div',class_='head').find('a')== None:
            paperDoi = ''
        else:
            paperDoi = i.find('div',class_='head').find('a')['href']
        title = i.find('cite').find('span',class_='title').text[:-2]
        paper.append([paperDoi,title])
    return paper,volumeYear


# test start
url = 'http://dblp.uni-trier.de/db/journals/talg/'
UrlALL = GetVolumeUrlfromUrl(url)
UrlLen = len(UrlALL)

# put the url into the query
def Write(query,value,num):
    for count in range(num):
        query.put(value[count][0],True)
        # time.sleep(random.random())
    print('write end')

# from the query get the url and get the paper info with this url
def Read(query,num,PaperInfo1,COUNT,i,paperNumber):
    while True:
        count = COUNT.get(True)
        # print("before enter" + str(i) + ' - ' + str(count)+' - '+str(num))
        COUNT.put(count, True)
        if not query.empty():
            value = query.get(True)
            count = COUNT.get(True)
            count = count + 1
            COUNT.put(count,True)
            paper, thisYear = GetPaperBaseInfoFromUrlAll(value) # just commented
            print("connected " + str(i) + ' - ' + str(count) + ' - ' + str(num))
            numb = paperNumber.get(True)
            numb = numb + len(paper)
            paperNumber.put(numb) # just commented
            # print(paper,thisYear)
            PaperInfo1.put((paper,thisYear),) # just commented
            print("the process "+str(i)+' - '+ str(count)+ ' : '+value)
        if not COUNT.empty():
            count = COUNT.get(True)
            # print("after enter" + str(i) + ' - ' + str(count) + ' - ' + str(num))
            COUNT.put(count,True)
            if int(count) == int(num):
                print("the process "+str(i)+" end ")
                break
    print('read end')

# print the paper info
def GetPaperInfo(PaperInfo1,paperNumber):
    for i in range(paperNumber.get(True)):
            value = PaperInfo1.get(True)
            print(value)

if __name__=='__main__':

    r_num = 10 # th read process number
    w_num = 1 # th write process number
    w_cnt = UrlLen # the write counter
    q = Queue(UrlLen) # the volune url queue
    paperNumber = Queue(1) # the all paper number
    COUNT = Queue(1) # the end tag
    COUNT.put(int(0)) # first is zero
    paperNumber.put(int(0)) # first is zero
    PaperInfo1 = Queue()
    r_list = [Process( target=Read, args=(q,w_cnt,PaperInfo1,COUNT,i,paperNumber) ) for i in range(r_num)]
    w_list = [Process( target=Write, args=(q,UrlALL,w_cnt) )]

    time_start = time.time()
    [task.start() for task in w_list]
    [task.start() for task in r_list]

    [task.join() for task in w_list]
    [task.join() for task in r_list]

    time_used = time.time() - time_start
    GetPaperInfo(PaperInfo1, paperNumber)
    print('time_used:{}s'.format(time_used))


I have no idea, with debug the process finally enter the process.py -> row:297: try: self.run() and then enter the row:300: util._exit_function() and just a connected the debug but I dont know why the network can cause this error and how to solve this that's all Thank you!

  • [Please do not upload images of code/data/errors.](//meta.stackoverflow.com/q/285551) – cafce25 Dec 03 '22 at 15:41
  • @cafce25 , I think the first image, at least, is fine. OP included the code. – wwii Dec 03 '22 at 15:48
  • 1
    Have you ruled out a problem with Queue.empty? From the docs - `... Because of multithreading/multiprocessing semantics, this is not reliable.` – wwii Dec 03 '22 at 15:50
  • Can you explain the unusual strategy of getting a value from the queue then immediately putting it back? – DarkKnight Dec 03 '22 at 15:59
  • 1
    Would adding a timeout when you join help? I might actually try to write this using asyncio and I prefer using concurrent.Futures and its *infrastructure* instead of trying to use multiprocessing directly and trying to build all the interprocess communications. Any reason you used multiprocessing instead of threading? – wwii Dec 03 '22 at 16:11
  • @wwii's observation that multithreading is more appropriate is entirely correct – DarkKnight Dec 03 '22 at 16:17
  • @wwii Unfortunately I cannot see the image, it's blocked for me. Without it I don't understand the question, which would be fine if this is a UI related question but it doesn't seem to be one. – cafce25 Dec 03 '22 at 18:55
  • @cafce25 the first image is pointing out that the line `paper, thisYear = GetPaperBaseInfoFromUrlAll(value)` is preventing the process from ending, at least the OP believes that is the case. The second shows that line commented out with an annotation that the process will end when that line is commented. – wwii Dec 03 '22 at 19:23
  • @cafce25 I'm sorry to trouble you about this. Actually the images show the content as described by wwii, and the third image shows that after the process has jumped out of the loop into `connected`. – Jack August Dec 04 '22 at 05:42
  • @wwii I didnot think this, actually I do not think the Quene.empty would be a problem, I'll try to add a judegment on the qsize(), thank you! – Jack August Dec 04 '22 at 05:47
  • @Cobra I just want to reduce impact on other processes, by get out and put into immediately, other processes can smooth access to the value, maybe? – Jack August Dec 04 '22 at 05:50
  • @wwii I have considered adding a `timeout` to determine, but this code is to get the network pages' info, and can not sure how long it takes to access a page, because of network fluctuations and server response times, so I'd like it to stop when all the data has been fetched rather than time out, otherwise I'm not sure if any data will be missed. – Jack August Dec 04 '22 at 05:55
  • @wwii for the thread and multiprocess, for me just because I find a example on multiprocess and it really works. Next time I'll try the thread to solve this problem. – Jack August Dec 04 '22 at 05:59
  • @wwii The `qsize()` is not help, I think the end of the process should still solve the `network connection` problem, the judgement part is true and feasible, the problem is still that the process cannot end when it jumps out of the loop, i.e. `the process cannot end when all the queue contents are taken out` – Jack August Dec 04 '22 at 06:11

2 Answers2

0

Hi,this is me again,I tried a concurrent implementation of threads,and global variables for threads are much more comfortable than process queue data sharing. By thread it does implement but my main function can't be stopped, previously with processes it was not possible to proceed to the next step when fetching concurrently, the fetching of data was implemented through threads and continued in the main function but the main function can't be stopped anymore. How interesting!

I have designed three functions similar to the previous ones.

GetUrlintoQueue is to write the fetched url UrlALL to the queue UrlQueue, UrlLen is the number of the url.

import threading
import queue

count = 0 # Record the number of times a value is fetched from the queue
paperNumber = 0 # Record the number of papers

def GetUrlintoQueue(UrlQueue,UrlALL,UrlLen):
    for index in range(UrlLen):
        UrlQueue.put(UrlALL[index][0], True)
    print('Write End')
    UrlQueue.task_done()

The other is GetPaperInfofromUrl. Get the url from the UrlQueue and write the information of the corresponding page to PaperInfo, index is the thread number.

def GetPaperInfofromUrl(UrlQueue,PaperInfo,index,UrlLen):
    global count,paperNumber
    while True:
        if not UrlQueue.empty():
            url = UrlQueue.get(True)
            count = count + 1
            paper, thisYear = GetPaperBaseInfoFromUrlAll(url)  # just commented
            print("connected " + str(index) + '-nd - ' + str(count) + ' - ' + str(UrlLen))
            print(paper,thisYear)
            paperNumber = paperNumber + len(paper)
            PaperInfo.put((paper, thisYear), True)
        if count == UrlLen:
            print("the process " + str(index) + " end ")
            break
    UrlQueue.task_done()
    PaperInfo.task_done()
    print('the process ' + str(index) +' get paper info end')

GetPaperInfo is to show the results about PaperInfo, and it don't change.

def GetPaperInfo(PaperInfo,paperNumber):
    for i in range(paperNumber):
            value = PaperInfo.get(True)
            print(value)

The main function first sets the corresponding variables, then writes directly first, then 10 threads crawl paper information, and finally shows the results, but after displaying the results still can not exit, I can not understand why.

if __name__ == '__main__':
    url = 'http://dblp.uni-trier.de/db/journals/talg/'
    UrlALL = GetVolumeUrlfromUrl(url)
    UrlLen = len(UrlALL)
    UrlQueue = queue.Queue(UrlLen)
    PaperInfo = queue.Queue(1000)
    WriteThread = 1
    ReadThread = 10

    # url write
    GetUrlThread = [threading.Thread(target=GetUrlintoQueue, args=(UrlQueue,UrlALL,UrlLen,))]
    time_start = time.time()
    [geturl.start() for geturl in GetUrlThread]
    [geturl.join() for geturl in GetUrlThread]
    time_used = time.time() - time_start
    print('time_used:{}s'.format(time_used))
    # url write end

    # paperinfo get
    PaperinfoGetThread = [threading.Thread(target=GetPaperInfofromUrl, args=(UrlQueue,PaperInfo,index,UrlLen,)) for index in range(ReadThread)]
    time_start = time.time()
    [getpaper.start() for getpaper in PaperinfoGetThread]
    [getpaper.join() for getpaper in PaperinfoGetThread]
    time_used = time.time() - time_start
    print('time_used:{}s'.format(time_used))
    # paperinfo get end
    
    GetPaperInfo(PaperInfo,paperNumber) # show the results
    import sys # it does not work 
    sys.exit()

The debug shows: debug.gif (I dont have 10 reputation so the picture is the type of link. )

  • When it is time to exit are any of the threads alive? You are using `task_done` incorrectly, you never join your queues, read the docs again - you don't have to join a queue, but it can be used to know when all the tasks are done. Are all the diagnostic prints at the end of the functions printing? – wwii Dec 04 '22 at 15:48
  • `UrlQueue`'s size is equal to the number of urls you want to fetch. It should fill up immediately, I don't think you need to do that in a thread, just put that for loop in `__main__`. – wwii Dec 04 '22 at 15:53
  • [Python - What is queue.task_done() used for?](https://stackoverflow.com/questions/49637086/python-what-is-queue-task-done-used-for) – wwii Dec 04 '22 at 19:00
  • `url = UrlQueue.get(True)` - are you blocking here without a timeout and preventing one of your threads from finishing? from the docs: `Similarly, if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.` Maybe .empty() is false then another thread gets the last task and **this** `.get` *hangs*. – wwii Dec 04 '22 at 19:03
  • @wwii I get it, I re-examined the variables in the code and I found that whether `task_done` was used or not was irrelevant, and I found that the reason the process didn't stop at the end was that the last function ` GetPaperInfo` was getting the data from the queue `PaperInfo` in the wrong number `paperNumber` of loops, I was actually putting in a tuple instead of a single individual, which caused the loop to not stop after getting the data from the queue, the logic was changed and the functions were basically done, thanks a lot! – Jack August Dec 05 '22 at 11:43
0

Here is how your process might look using concurrent.futures to manage all the threads and data transport. (not tested) Adapting an example in the documentation.

from concurrent.futures import ThreadPoolExecutor 

def GetPaperInfofromUrl(index,url):
    paper, thisYear = GetPaperBaseInfoFromUrlAll(url)
    return (index,url,paper,thisYear)

if __name__ == "__main__":
    url = 'http://dblp.uni-trier.de/db/journals/talg/'
    urls,descr = zip(*GetVolumeUrlfromUrl(url))
    results = []

    with ThreadPoolExecutor(max_workers=10) as executor:
        futs = [executor.submit(GetPaperInfofromUrl, index,url) for index,url in enumerate(urls)]
        for future in concurrent.futures.as_completed(futs):
            results.append(future.result())

GetPaperInfofromUrl seems superfluous, you could probably refactor GetPaperBaseInfoFromUrlAll and avoid a function call.

wwii
  • 23,232
  • 7
  • 37
  • 77