I wrote some code in python using selenium and multiprocessing to parallelize data collection. I am collecting some data from YouTube. I have a method which initiates a chrome webdriver. I used multiprocessing to collect data faster. The issue is that when the timeout for the multiprocessing is reached, the function with the chromedriver exits the function before driver.quit() command can register. This leads to the accumulation of idle chromedrivers which I cannot close within python since (to my knowledge) there is no way to reference them. Is there any way to close all chromedrivers without explicitly using the driver objects?
I wrote the code in python3. The chromedriver is Chrome version 72.
# Web related modules
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.common.exceptions import WebDriverException
from bs4 import BeautifulSoup
from urllib.request import urlopen
import html2text
# YouTube download module
from pytube import YouTube
# Multiprocessing tools
from multiprocessing import Lock, Manager, Queue, Pool
import multiprocessing as mp
# Misc modules
import time, re, pickle, os, shutil, argparse, glob, unicodedata, datetime
from argparse import RawTextHelpFormatter
# Irrelevant to the problem
def save_vids(vid_ids,save_loc):
print('Irrelevant Function')
# Function that generates the initial list of urls to visit
def explore_home(chromedriver_path,chrome_options,caps):
driver=webdriver.Chrome(executable_path=chromedriver_path,options=chrome_options,desired_capabilities=caps)
driver.get('https://www.youtube.com')
time.sleep(1)
html_source = driver.page_source
driver.close()
parts=html_source.split('{"webCommandMetadata":{"url":"/watch_videos?')[1:]
vids=[]
for part in parts:
part=part[part.find('video_ids=')+10:]
if part.find('\\u')!=-1:
if part.find('"')!=-1:
end=min(part.find('\\u'),part.find('"'))
else:
end=part.find('\\u')
elif part.find('"')!=-1:
end=part.find('"')
else:
print('fuck')
concat_list=part[:end]
vids.extend(concat_list.split('%2C'))
vids=[vid for vid in vids if len(re.findall(r'[0-9]|[a-z]|[A-Z]|_|-',vid))==11 and len(vid)==11]
return vids
# The function that generates chromedrivers and fails to quit if a multiprocessing timeout occurs.
def explore_vid(chromedriver_path,chrome_options,caps,vid,ads,save_loc,l):
driver=webdriver.Chrome(executable_path=chromedriver_path,options=chrome_options,desired_capabilities=caps)
driver.get('https://www.youtube.com/watch?v='+vid)
time.sleep(2)
sec_html = driver.page_source
soup=BeautifulSoup(sec_html,'lxml')
mydivs = str(soup.findAll("div", {"class": "style-scope ytd-watch-next-secondary-results-renderer"}))
inds=[m.start() for m in re.finditer('ytimg.com/vi/', mydivs)]
rec_vids=['https://www.youtube.com/watch?v='+mydivs[ind+13:ind+24] for ind in inds]
browser_log = driver.get_log('performance')
adInfo=find_ad(browser_log,vid)
if adInfo:
#Check if it is the first time this ad has been seen
adID=adInfo[0]
l.acquire()
try:
if adID in ads:
ads[adID][0].append(adInfo[1])
else:
try:
element = WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, ".ytp-ad-button.ytp-ad-visit-advertiser-button.ytp-ad-button-link")))
element.click()
driver.switch_to.window(driver.window_handles[-1])
ad_website_URL=driver.current_url
ad_website_HTML=driver.page_source
clean_text=html2text.html2text(ad_website_HTML)
save_vids(adID,save_loc)
textName=os.path.join(save_loc,adID,'adwebsite.txt')
file = open(textName,"w")
file.write(ad_website_URL)
file.write('\n')
file.write(clean_text)
file.close()
ads[adID]=[[adInfo[1]],ad_website_URL]
except WebDriverException:
print('Button click failed: %s:%s' %(vid,adInfo[0]))
finally:
l.release()
# The quit command for the chrome driver
driver.quit()
return rec_vids
def find_ad(browser_log,vid):
for k in range(len(browser_log)):
if browser_log[k]['message'].find('adunit')!=-1 and browser_log[k]['message'].find(vid)!=-1:
ind=browser_log[k]['message'].find('https://www.youtube.com/get_video_info?html5=1&video_id=')
vid_id=browser_log[k]['message'][ind+56:ind+67]
return (vid_id,time.localtime())
return None
def positive_int(argument):
num=int(argument)
if num<1:
msg="Maximum depth parameter must be a positive number. You entered: %s" %argument
raise argparse.ArgumentTypeError(msg)
return num
def valid_pickle(argument):
file=str(argument)
if not file.endswith('.pickle'):
msg="ad_save_loc must end with .pickle You entered: %s" %file
raise argparse.ArgumentTypeError(msg)
return file
def valid_dir(argument):
directory=str(argument)
if not os.path.isdir(directory):
msg="vid_save_loc must be a valid directory. You entered: %s" %directory
raise argparse.ArgumentTypeError(msg)
return directory
if __name__ == '__main__':
# Argument Parsing
parser = argparse.ArgumentParser(description='Scrapes Youtube ads and advertising company websites. \nUse --restart to restart the scraping from scratch by deleting previous data\nExample Usage: python finalReader.py E:\ads\ads.pickle E:\ads --ncpu 2', formatter_class=RawTextHelpFormatter)
parser.add_argument('ad_save_loc',help='Save Location for Ad Main Dictionary', type=valid_pickle)
parser.add_argument('vid_save_loc',help='Save Location for Ad Videos', type=valid_dir)
parser.add_argument('chromedriver_path', help='Path of the chrome executable', type=str)
parser.add_argument('--restart', help='Restart collection', action="store_true", default=False, dest='restartCollection')
parser.add_argument('--ncpu', nargs='?', help='Number of cores for multiprocessing, 1 by default', default=1, type=int, dest='mpcpu')
parser.add_argument('--timeout',nargs='?', help='For how long the data collection will take place (in seconds), infinite by default', default=float('inf'), type=float, dest='time_limit')
parser.add_argument('--max_depth', nargs='?', help='Depth of Youtube exploration tree', default=1, type=positive_int, dest='search_depth')
args = parser.parse_args()
ad_save_loc=args.ad_save_loc
vid_save_loc=args.vid_save_loc
vid_save_loc=os.path.join(vid_save_loc,'ad_data')
mpcpu=max(args.mpcpu,1)
time_limit=args.time_limit
chromedriver_path=args.chromedriver_path
search_depth=args.search_depth
if not os.path.isdir(vid_save_loc):
os.mkdir(vid_save_loc)
if args.restartCollection:
for the_file in os.listdir(vid_save_loc):
file_path = os.path.join(vid_save_loc, the_file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(e)
if os.path.isfile(ad_save_loc):
os.remove(ad_save_loc)
ads={}
else:
if os.path.isfile(ad_save_loc):
pickle_in = open(ad_save_loc,"rb")
ads = pickle.load(pickle_in)
else:
ads={}
# Chrome Driver Options
chrome_options=Options()
chrome_options.add_argument('--mute-audio')
caps = DesiredCapabilities.CHROME
caps['loggingPrefs'] = {'performance': 'ALL'}
startTime=time.time()
currentTime=time.time()
# Data Collection Loop - Multiprocessing
while currentTime-startTime<time_limit:
print('Time from start: %s' %str(datetime.timedelta(seconds=currentTime-startTime)))
rec_vids=explore_home(chromedriver_path,chrome_options,caps)
while not rec_vids:
time.sleep(60)
rec_vids=explore_home(chromedriver_path,chrome_options,caps)
m = Manager()
lock = m.Lock()
pool = Pool(processes=mpcpu)
for depth in range(search_depth):
print('Depth %s' %depth)
multiple_results=[pool.apply_async(explore_vid, (chromedriver_path,chrome_options,caps,vid,ads,vid_save_loc,lock)) for vid in rec_vids]
branching_vids=[]
for res in multiple_results:
try:
branching_vids.append(res.get(timeout=30))
if time.time()-startTime<time_limit:
break
except mp.TimeoutError:
print('Timeout')
res_vids=branching_vids.copy()
pickle_out = open(ad_save_loc,"wb")
pickle.dump(ads, pickle_out)
pickle_out.close()
currentTime=time.time()