14

I am getting the below error when I am downloading files using multiprocessing. I am downloading Wikipedia page views and they have it by hour so it might include a lot of downloading.

Any recommendation to why this error is caused and HOW TO SOLVE IT? Thanks

MaybeEncodingError: Error sending result: ''. Reason: 'TypeError("cannot serialize '_io.BufferedReader' object",)'

import fnmatch
import requests
import urllib.request
from bs4 import BeautifulSoup
import multiprocessing as mp

def download_it(download_file):
    global path_to_save_document
    filename = download_file[download_file.rfind("/")+1:]
    save_file_w_submission_path = path_to_save_document + filename
    request = urllib.request.Request(download_file)
    response = urllib.request.urlopen(request)
    data_content = response.read()
    with open(save_file_w_submission_path, 'wb') as wf:    
        wf.write(data_content)
    print(save_file_w_submission_path)  

pattern = r'*200801*'
url_to_download = r'https://dumps.wikimedia.org/other/pagecounts-raw/'
path_to_save_document = r'D:\Users\Jonathan\Desktop\Wikipedia\\'    

def main():
    global pattern
    global url_to_download
    r  = requests.get(url_to_download)
    data = r.text
    soup = BeautifulSoup(data,features="lxml")

    list_of_href_year = []
    for i in range(2):
        if i == 0:
            for link in soup.find_all('a'):
                lien = link.get('href')
                if len(lien) == 4:
                    list_of_href_year.append(url_to_download + lien + '/')
        elif i == 1:
            list_of_href_months = [] 
            list_of_href_pageviews = []        
            for loh in list_of_href_year: 
                r  = requests.get(loh)
                data = r.text
                soup = BeautifulSoup(data,features="lxml")   
                for link in soup.find_all('a'):
                    lien = link.get('href')
                    if len(lien) == 7:
                        list_of_href_months.append(loh + lien + '/')
                if not list_of_href_months:
                   continue
                for lohp in list_of_href_months: 
                    r  = requests.get(lohp)
                    data = r.text
                    soup = BeautifulSoup(data,features="lxml")              
                    for link in soup.find_all('a'):
                        lien = link.get('href')
                        if "pagecounts" in lien:
                            list_of_href_pageviews.append(lohp + lien)       

    matching_list_of_href = fnmatch.filter(list_of_href_pageviews, pattern)   
    matching_list_of_href.sort()
    with mp.Pool(mp.cpu_count()) as p:
        print(p.map(download_it, matching_list_of_href))

if __name__ == '__main__':
    main()
Jonathan Lam
  • 1,237
  • 3
  • 20
  • 49
  • Possible duplicate of [multiprocessing.Pool: urllib TypeError if not using dummy module](https://stackoverflow.com/questions/54736710/multiprocessing-pool-urllib-typeerror-if-not-using-dummy-module) – Darkonaut Mar 12 '19 at 23:07
  • pickle didn't work. I get same error about `TypeError: cannot serialize '_io.BufferedReader' object` – Jonathan Lam Mar 12 '19 at 23:25
  • Pickle is not the solution but the reason you get that error. Serializing means pickling in Python. My code in the linked answer just demonstrates this. You need to scroll the error message in the linked question to the right to see that it also is about `Reason: 'TypeError("cannot serialize '_io.BufferedReader' object")` – Darkonaut Mar 12 '19 at 23:29
  • I don't see any linked question other than mine – Jonathan Lam Mar 13 '19 at 00:36
  • The "possible duplicate" I linked in my first comment? – Darkonaut Mar 13 '19 at 00:39
  • I dont get it. How does your answer can resolve the issue i am getting? – Jonathan Lam Mar 13 '19 at 01:41
  • 2
    You are trying to pass response-objects within `matching_list_of_href` to child processes here: `p.map(download_it, matching_list_of_href)`. Pool needs to pickle everything what it sends to its child processes. Your response-objects contain `_io.BufferedReader` objects and these cannot be pickled, hence you get that error. – Darkonaut Mar 13 '19 at 01:58
  • Ok, so I use thread instead. Thx – Jonathan Lam Mar 13 '19 at 04:21

2 Answers2

0

As Darkonaut proposed. I used multithreading instead.

Example:

from multiprocessing.dummy import Pool as ThreadPool 

'''This function is used for the download the files using multi threading'''    
def multithread_download_files_func(self,download_file):
    try:
        filename = download_file[download_file.rfind("/")+1:]
        save_file_w_submission_path = self.ptsf + filename
        '''Check if the download doesn't already exists. If not, proceed otherwise skip'''
        if not os.path.exists(save_file_w_submission_path):
            data_content = None
            try:
                '''Lets download the file'''
                request = urllib.request.Request(download_file)
                response = urllib.request.urlopen(request)
                data_content = response.read()     
            except urllib.error.HTTPError:
                '''We will do a retry on the download if the server is temporarily unavailable'''
                retries = 1
                success = False
                while not success:
                    try:
                        '''Make another request if the previous one failed'''
                        response = urllib.request.urlopen(download_file)
                        data_content = response.read()                        
                        success = True
                    except Exception:
                        '''We will make the program wait a bit before sending another request to download the file'''
                        wait = retries * 5;
                        time.sleep(wait)
                        retries += 1 
            except Exception as e:
                print(str(e))   
            '''If the response data is not empty, we will write as a new file and stored in the data lake folder'''                     
            if data_content:
                with open(save_file_w_submission_path, 'wb') as wf:    
                    wf.write(data_content)
                print(self.present_extract_RC_from_RS + filename)                   
    except Exception as e:
        print('funct multithread_download_files_func' + str(e))

'''This function is used as a wrapper before using multi threading in order to download the files to be stored in the Data Lake'''            
def download_files(self,filter_files,url_to_download,path_to_save_file):
    try:
        self.ptsf = path_to_save_file = path_to_save_file + 'Step 1 - Data Lake\Wikipedia Pagecounts\\'
        filter_files_df = filter_files 
        self.filter_pattern = filter_files       
        self.present_extract_RC_from_RS = 'WK Downloaded->           ' 
        
        if filter_files_df == '*':
            '''We will create a string of all the years concatenated together for later use in this program'''
            reddit_years = [2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018]
            filter_files_df = ''
            '''Go through the years from 2005 to 2018'''
            for idx, ry in enumerate(reddit_years):
                filter_files_df += '*' + str(ry) + '*'
                if (idx != len(reddit_years)-1):
                    filter_files_df += '&'   
                    
        download_filter = list([x.strip() for x in filter_files_df.split('&')])
        download_filter.sort()
        
        '''If folder doesn't exist, create one'''
        if not os.path.exists(os.path.dirname(self.ptsf)):
            os.makedirs(os.path.dirname(self.ptsf))       
        
        '''We will get the website HTML elements using beautifulsoup library'''
        r  = requests.get(url_to_download)
        data = r.text
        soup = BeautifulSoup(data,features="lxml")
        
        list_of_href_year = []
        for i in range(2):
            if i == 0:
                '''Lets get all href available on this particular page. The first page is the year page'''
                for link0 in soup.find_all('a'):
                    lien0 = link0.get('href')
                    '''We will check if the length is 4 which corresponds to a year'''
                    if len(lien0) == 4:
                        list_of_href_year.append(url_to_download + lien0 + '/')
                        
            elif i == 1:
                list_of_href_months = [] 
                list_of_href_pageviews = []        
                for loh in list_of_href_year: 
                    r1  = requests.get(loh)
                    data1 = r1.text
                    '''Get the webpage HTML Tags'''
                    soup1 = BeautifulSoup(data1,features="lxml")   
                    for link1 in soup1.find_all('a'):
                        lien1 = link1.get('href')
                        '''We will check if the length is 7 which corresponds to the year and month'''
                        if len(lien1) == 7:
                            list_of_href_months.append(loh + lien1 + '/')                                            
                for lohm in list_of_href_months: 
                    r2  = requests.get(lohm)
                    data2 = r2.text
                    '''Get the webpage HTML Tags'''
                    soup2 = BeautifulSoup(data2,features="lxml")              
                    for link2 in soup2.find_all('a'): 
                        lien2 = link2.get('href')
                        '''We will now get all href that contains pagecounts in their name. We will have the files based on Time per hour. So 24 hrs is 24 files
                        and per year is 24*365=8760 files in minimum'''                            
                        if "pagecounts" in lien2:
                            list_of_href_pageviews.append(lohm + lien2)      
     
        existing_file_list = []
        for file in os.listdir(self.ptsf):
             filename = os.fsdecode(file)     
             existing_file_list.append(filename)  
         
        '''Filter the links'''
        matching_fnmatch_list = []
        if filter_files != '':
            for dfilter in download_filter:
                fnmatch_list = fnmatch.filter(list_of_href_pageviews, dfilter) 
                i = 0
                for fnl in fnmatch_list:
                    '''Break for demo purpose only'''
                    if self.limit_record != 0:
                        if (i == self.limit_record) and (i != 0):
                            break
                    i += 1
                    matching_fnmatch_list.append(fnl) 
        
        '''If the user stated a filter, we will try to remove the files which are outside that filter in the list'''
        to_remove = []
        for efl in existing_file_list:
            for mloh in matching_fnmatch_list:
                if efl in mloh:         
                    to_remove.append(mloh)
        
        '''Lets remove the files which has been found outside the filter'''
        for tr in to_remove:
            matching_fnmatch_list.remove(tr)   
            
        matching_fnmatch_list.sort()    
          
        '''Multi Threading of 200'''
        p = ThreadPool(200)
        p.map(self.multithread_download_files_func, matching_fnmatch_list)
    except Exception as e:
        print('funct download_files' + str(e))
Jonathan Lam
  • 1,237
  • 3
  • 20
  • 49
  • 13
    Can you please elaborate on what you mean by "I used multithreading instead" for those who encounter similar errors? – theV0ID May 02 '20 at 17:47
  • 1
    Means used multithreading pool instead of multiprocessing pool. For more details, see: https://stackoverflow.com/questions/18114285/what-are-the-differences-between-the-threading-and-multiprocessing-modules – Jongwook Choi Nov 18 '22 at 04:34
  • 1
    However, this is **NOT** a valid answer. When using multithreading, data serialization (encoding python objects into some pickle-like format) is not necessary because different threads in the same process share the memory (heap). When multi-processing is used, the processes cannot share the memory so serialization and copy is needed (unless using some IPC or shared memory). Python's multithread performance is very bad due to GIL. So this answer is not a OK solution to the problem. – Jongwook Choi Nov 18 '22 at 04:36
-1

From the accepted answer, I understood that it is simply replacing from multiprocessing import Pool by from multiprocessing.dummy import Pool.

This worked for me.

DomDev
  • 540
  • 2
  • 12