2

I wrote some code in python using selenium and multiprocessing to parallelize data collection. I am collecting some data from YouTube. I have a method which initiates a chrome webdriver. I used multiprocessing to collect data faster. The issue is that when the timeout for the multiprocessing is reached, the function with the chromedriver exits the function before driver.quit() command can register. This leads to the accumulation of idle chromedrivers which I cannot close within python since (to my knowledge) there is no way to reference them. Is there any way to close all chromedrivers without explicitly using the driver objects?

I wrote the code in python3. The chromedriver is Chrome version 72.

# Web related modules
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.common.exceptions import WebDriverException
from bs4 import BeautifulSoup
from urllib.request import urlopen
import html2text

# YouTube download module
from pytube import YouTube

# Multiprocessing tools
from multiprocessing import Lock, Manager, Queue, Pool
import multiprocessing as mp

# Misc modules
import time, re, pickle, os, shutil, argparse, glob, unicodedata, datetime
from argparse import RawTextHelpFormatter

# Irrelevant to the problem
def save_vids(vid_ids,save_loc):
    print('Irrelevant Function')

# Function that generates the initial list of urls to visit
def explore_home(chromedriver_path,chrome_options,caps):
driver=webdriver.Chrome(executable_path=chromedriver_path,options=chrome_options,desired_capabilities=caps)
    driver.get('https://www.youtube.com')
    time.sleep(1)
    html_source = driver.page_source


    driver.close()
    parts=html_source.split('{"webCommandMetadata":{"url":"/watch_videos?')[1:]
    vids=[]
    for part in parts:
        part=part[part.find('video_ids=')+10:]

        if part.find('\\u')!=-1:
            if part.find('"')!=-1:
                end=min(part.find('\\u'),part.find('"'))
            else:
                end=part.find('\\u')
        elif part.find('"')!=-1:
            end=part.find('"')
        else:
            print('fuck')
        concat_list=part[:end]
        vids.extend(concat_list.split('%2C'))
    vids=[vid for vid in vids if len(re.findall(r'[0-9]|[a-z]|[A-Z]|_|-',vid))==11 and len(vid)==11]

    return vids

# The function that generates chromedrivers and fails to quit if a multiprocessing timeout occurs.
def explore_vid(chromedriver_path,chrome_options,caps,vid,ads,save_loc,l):
    driver=webdriver.Chrome(executable_path=chromedriver_path,options=chrome_options,desired_capabilities=caps)
    driver.get('https://www.youtube.com/watch?v='+vid)
    time.sleep(2)
    sec_html = driver.page_source
    soup=BeautifulSoup(sec_html,'lxml')
    mydivs = str(soup.findAll("div", {"class": "style-scope ytd-watch-next-secondary-results-renderer"}))
    inds=[m.start() for m in re.finditer('ytimg.com/vi/', mydivs)]
    rec_vids=['https://www.youtube.com/watch?v='+mydivs[ind+13:ind+24] for ind in inds]
    browser_log = driver.get_log('performance') 

    adInfo=find_ad(browser_log,vid)

    if adInfo:
        #Check if it is the first time this ad has been seen
        adID=adInfo[0]

        l.acquire()
        try:
            if adID in ads:
                ads[adID][0].append(adInfo[1])
            else:
                try:
                    element = WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, ".ytp-ad-button.ytp-ad-visit-advertiser-button.ytp-ad-button-link")))
                    element.click()

                    driver.switch_to.window(driver.window_handles[-1])

                    ad_website_URL=driver.current_url
                    ad_website_HTML=driver.page_source
                    clean_text=html2text.html2text(ad_website_HTML)

                    save_vids(adID,save_loc)

                    textName=os.path.join(save_loc,adID,'adwebsite.txt')


                    file = open(textName,"w") 

                    file.write(ad_website_URL)
                    file.write('\n')
                    file.write(clean_text)

                    file.close() 

                    ads[adID]=[[adInfo[1]],ad_website_URL]

                except WebDriverException:
                    print('Button click failed: %s:%s' %(vid,adInfo[0]))

        finally:
            l.release()

    # The quit command for the chrome driver
    driver.quit()
    return rec_vids


def find_ad(browser_log,vid):
    for k in range(len(browser_log)):
        if browser_log[k]['message'].find('adunit')!=-1 and browser_log[k]['message'].find(vid)!=-1:
            ind=browser_log[k]['message'].find('https://www.youtube.com/get_video_info?html5=1&video_id=')
            vid_id=browser_log[k]['message'][ind+56:ind+67]
            return (vid_id,time.localtime())
    return None

def positive_int(argument):
    num=int(argument)
    if num<1:
        msg="Maximum depth parameter must be a positive number. You entered: %s" %argument
        raise argparse.ArgumentTypeError(msg)
    return num

def valid_pickle(argument):
    file=str(argument)
    if not file.endswith('.pickle'):
        msg="ad_save_loc must end with .pickle You entered: %s" %file
        raise argparse.ArgumentTypeError(msg)
    return file

def valid_dir(argument):
    directory=str(argument)
    if not os.path.isdir(directory):
        msg="vid_save_loc must be a valid directory. You entered: %s" %directory
        raise argparse.ArgumentTypeError(msg)
    return directory

if __name__ == '__main__':
    # Argument Parsing
    parser = argparse.ArgumentParser(description='Scrapes Youtube ads and advertising company websites. \nUse --restart to restart the scraping from scratch by deleting previous data\nExample Usage: python finalReader.py E:\ads\ads.pickle E:\ads --ncpu 2', formatter_class=RawTextHelpFormatter)
    parser.add_argument('ad_save_loc',help='Save Location for Ad Main Dictionary', type=valid_pickle)
    parser.add_argument('vid_save_loc',help='Save Location for Ad Videos', type=valid_dir)
    parser.add_argument('chromedriver_path', help='Path of the chrome executable', type=str)
    parser.add_argument('--restart', help='Restart collection', action="store_true", default=False, dest='restartCollection')
    parser.add_argument('--ncpu', nargs='?', help='Number of cores for multiprocessing, 1 by default', default=1, type=int, dest='mpcpu')
    parser.add_argument('--timeout',nargs='?', help='For how long the data collection will take place (in seconds), infinite by default', default=float('inf'), type=float, dest='time_limit')
    parser.add_argument('--max_depth', nargs='?', help='Depth of Youtube exploration tree', default=1, type=positive_int, dest='search_depth')

    args = parser.parse_args()

    ad_save_loc=args.ad_save_loc
    vid_save_loc=args.vid_save_loc
    vid_save_loc=os.path.join(vid_save_loc,'ad_data')
    mpcpu=max(args.mpcpu,1)
    time_limit=args.time_limit
    chromedriver_path=args.chromedriver_path
    search_depth=args.search_depth

    if not os.path.isdir(vid_save_loc):
        os.mkdir(vid_save_loc)

    if args.restartCollection:
        for the_file in os.listdir(vid_save_loc):
            file_path = os.path.join(vid_save_loc, the_file)
            try:
                if os.path.isfile(file_path):
                    os.unlink(file_path)
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
            except Exception as e:
                print(e)

        if os.path.isfile(ad_save_loc):
            os.remove(ad_save_loc)
        ads={}
    else:
        if os.path.isfile(ad_save_loc):
            pickle_in = open(ad_save_loc,"rb")
            ads = pickle.load(pickle_in)
        else:
            ads={}

#    Chrome Driver Options
    chrome_options=Options()
    chrome_options.add_argument('--mute-audio')
    caps = DesiredCapabilities.CHROME
    caps['loggingPrefs'] = {'performance': 'ALL'}

    startTime=time.time()
    currentTime=time.time()

# Data Collection Loop - Multiprocessing
    while currentTime-startTime<time_limit:
        print('Time from start: %s' %str(datetime.timedelta(seconds=currentTime-startTime)))
        rec_vids=explore_home(chromedriver_path,chrome_options,caps)
        while not rec_vids:
            time.sleep(60)
            rec_vids=explore_home(chromedriver_path,chrome_options,caps)

        m = Manager()
        lock = m.Lock()

        pool = Pool(processes=mpcpu)

        for depth in range(search_depth):
            print('Depth %s' %depth)
            multiple_results=[pool.apply_async(explore_vid, (chromedriver_path,chrome_options,caps,vid,ads,vid_save_loc,lock)) for vid in rec_vids]
            branching_vids=[]

            for res in multiple_results:        
                try:
                    branching_vids.append(res.get(timeout=30))
                    if time.time()-startTime<time_limit:
                        break
                except mp.TimeoutError:
                    print('Timeout')
            res_vids=branching_vids.copy()

            pickle_out = open(ad_save_loc,"wb")
            pickle.dump(ads, pickle_out)
            pickle_out.close()

        currentTime=time.time()

0 Answers0