0

Hello I am working on crawling a web page. We are crawling by utilizing the requests part that can make requests to the server and the webdriver operation using selenium.In the function site_distinction(), only the requests part was extracted and parallel processing was performed using threads, but as the number of dynamic web pages increased, the processing speed slowed down considerably. I wonder if there is a way to parallelize processing by controlling selenium with a thread.

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup as bs4
from threading import Thread
import os
import requests
import json
import time

class itemCrawling:
    def __init__(self, comparison_url : str, select_option : str = None, price_standard : str = "할인가격"):
        """
        - 파라미터 구성
        comparison_url ==> [string] 가격비교 URL (** 카탈로그 **)
        select_option ==> [str] 옵션이 있다면 str으로 전달 (** 디폴트는 None **)
        price_standard ==> [str] 비교할 가격 기준 (정상가격 / 할인가격 택 1) (디폴트 값으로 "할인가격"이 정해져 있으나, 상품의 "할인가격"이 없다면, 정상가격을 기준한다.)
        """
        self.comparison_url = comparison_url # 가격비교 URL
        self.select_option = select_option # 옵션값
        self.price_standard = price_standard # 비교할 가격 기준 (정상가격 / 할인가격 택 1)
        self.headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'}
        self.start = time.time()
        self.thread_list = []
        self.item_list=[]
def set_driver(self):
    service = Service(ChromeDriverManager().install())
    options = Options()
    options.add_experimental_option('detach',True)
    options.add_experimental_option('excludeSwitches',['enable-logging'])
    options.add_argument("disable-gpu")   # 가속 사용 x
    options.add_argument("lang=ko_KR")    # 가짜 플러그인 탑재
    options.add_argument('user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36')  # user-agent 이름 설정
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    # options.add_argument('--headless')

    self.driver = webdriver.Chrome(service=service,options=options)
    self.driver.get(self.comparison_url)
def option(self):
    print(f"[Get_option] ----- 본 페이지의 선택 옵션은 [{self.select_option}] 입니다.")
    lis = self.driver.find_elements(By.CSS_SELECTOR,'div.filter_condition_group__h8Gss')[-1].find_elements(By.CSS_SELECTOR,'ul > li')
    for li in lis:
        option = li.find_element(By.CSS_SELECTOR,'span').text
        if self.select_option == option:
            label = li.find_element(By.CSS_SELECTOR,'label.filter_label__3GLbR')
            self.driver.execute_script('arguments[0].click();',label)
            time.sleep(3)
            break
# ---------------------------------------------------------------------------------------------------------------------------------------------------------
## [내장] 각 채널 크롤링 ##
### requests thread(1)
def smartstore(self,url,item : dict):
    while True:
        try:
            response = requests.get(url)
            if response.status_code == 200:
                soup = bs4(response.text, 'html.parser')
                scripts = soup.find('body').find_all('script')
            data = scripts[0].text.strip()[27:]
            script_data = json.loads(data)
            break
        except Exception as e:
            print(str(e))
            continue
    product = script_data["product"]["A"]
    item_code = product["id"]
    channel = "스마트스토어"
    item_url = product["productUrl"] # type str
    name = product["name"] # type str
    ## 가격종류 변수할당
    original_price = str(product["salePrice"]) # type str - [1] 정상가격 
    pc_discount_value = str(product["benefitsView"]["discountedSalePrice"]) # type str - [2] PC할인가격
    mb_discount_value = str(product["benefitsView"]["mobileDiscountedSalePrice"]) # type str - [3] MOBILE할인가격
    ### [할인없음] [1], [2], [3] 모든 가격이 같음
    if original_price == pc_discount_value and original_price == mb_discount_value:
        pc_discount_price = '0' # type str
        mb_discount_price = '0' # type str
    ### [MOBILE만 할인] [3] 가격만 다름
    elif original_price == pc_discount_value and original_price != mb_discount_value:
        pc_discount_price = '0' # type str
        mb_discount_price = mb_discount_value
    ### [PC만 할인] [2] 가격만 다름
    elif original_price != pc_discount_value and original_price == mb_discount_value:
        pc_discount_price = pc_discount_value
        mb_discount_price = '0' # type str
    ### [전체할인] [2], [3] 가격 같음, [1] 가격만 다름
    elif original_price != pc_discount_value and original_price != mb_discount_value:
        pc_discount_price = pc_discount_value
        mb_discount_price = mb_discount_value
    ### [PC 할인 외 MOBILE은 추가 할인] [1], [2], [3] 모든 가격이 다름
    elif original_price != pc_discount_value and original_price != mb_discount_value and pc_discount_value != mb_discount_value:
        pc_discount_price = pc_discount_value
        mb_discount_price = mb_discount_value
    new_item = {"판매채널":channel,"판매처":item['판매처'],"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":item_url}
    self.item_list.append(new_item)
def gmarket(self,url,item : dict):
    # (1)번 url의 파라미터만 추출
    start_idx = url.find('?')
    end_idx = url.find('&NaPm')
    url_pram = url[start_idx:end_idx]
    # (2) g마켓 도메인과 (1)번에서 추출한 파라미터 연결
    short_url = f'http://item.gmarket.co.kr/Item{url_pram}'
    gmarket_res = requests.get(short_url)
    gmarket_soup = bs4(gmarket_res.text,'html.parser')
    item_code = gmarket_soup.find('span',class_='text__item-number').text.replace("상품번호 : ","")
    mall = gmarket_soup.find('span',class_='text__seller').find('a',class_='link__seller').text
    name = gmarket_soup.find('h1',class_='itemtit').text # 상품명
    try:
        # original = soup2.find('span',class_='price_innerwrap').find('span',class_='price_original')
        original_price = gmarket_soup.find('span',class_='text__price').text.replace(",","")
        pc_discount_price = gmarket_soup.find('strong',class_='price_real').text.replace(",","").replace("원","")
    except:
        original_price = gmarket_soup.find('strong',class_='price_real').text.replace(",","").replace("원","")
        pc_discount_price = '0'
    mb_discount_price = '0'
    new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
    self.item_list.append(new_item)
def auction(self,url,item : dict):
    # (1)번 url "&NaPm" 전까지 파라미터 추출
    index = url.find('&NaPm')
    # (2) 단축 url 정의
    short_url = url[:index]
    res = requests.get(short_url)
    soup = bs4(res.text,'html.parser')
    # (3) 리다이렉트 사이트 크롤링
    item_code = url[:url.find('&pc')][-10:]
    mall = soup.find('span',class_='text__seller').find('a',class_='link__seller').text
    name = soup.find('h1',class_='itemtit').text
    try:
        original_price = soup.find('span',class_='price_original').text.replace(",","").replace("원","")
        pc_discount_price = soup.find('strong',class_='price_real').text.replace(",","").replace("원","")
    except:
        original_price = soup.find('strong',class_='price_real').text.replace(",","").replace("원","")
        pc_discount_price = '0'
    mb_discount_price = '0'
    new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
    self.item_list.append(new_item)
def coupang(self,url,item : dict):
    coup_res =requests.get(url, headers=self.headers)
    coup_soup=bs4(coup_res.text,'html.parser')
    script=coup_soup.find('head').find_all('script')[4].text
    start_idx=script.find('"wishList"')-1
    end_idx=script.find(';')
    data=script[start_idx:end_idx]
    json_data = json.loads(data)
    item_code = url[:url.find('&vendorItemId')][-10:]
    mall = json_data["vendor"]["name"]
    name = json_data["title"]
    short_url = f'https://www.coupang.com{json_data["vendor"]["link"]}'
    if json_data["quantityBase"][0]["price"]["originPrice"] == None:
        original_price = json_data["quantityBase"][0]["price"]["salePrice"].replace(",","")
        pc_discount_price = '0'
    else:
        original_price = json_data["quantityBase"][0]["price"]["originPrice"].replace(",","")
        pc_discount_price = json_data["quantityBase"][0]["price"]["salePrice"].replace(",","")
    mb_discount_price = '0'
    new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
    self.item_list.append(new_item)
### selenium thread(2)
def st11(self,url,item : dict):
    idx = url.find('&NaPm')
    short_url = url[:idx]
    self.driver.get(short_url)
    item_code=short_url[:short_url.find('&tid')][-10:]
    
    try:
        mall=self.driver.find_element(By.CSS_SELECTOR,'h1.c_product_store_title').text
    except:
        mall = 'Erorr'
    try:
        name=self.driver.find_element(By.CSS_SELECTOR,'div.c_product_info_title > h1.title').text
    except:
        name = 'Erorr'
    price_wrap = self.driver.find_element(By.CSS_SELECTOR,'ul.price_wrap')
    try:
        original_price = price_wrap.find_element(By.CSS_SELECTOR,'dl.price_regular').find_element(By.CSS_SELECTOR,'span.value').text.replace(",","")
        pc_discount_price = price_wrap.find_element(By.CSS_SELECTOR,'dl.price').find_element(By.CSS_SELECTOR,'span.value').text.replace(",","")
    except:
        original_price = price_wrap.find_element(By.CSS_SELECTOR,'dl.price').find_element(By.CSS_SELECTOR,'span.value').text.replace(",","")
        pc_discount_price = '0'
    mb_discount_price = '0'
    new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
    self.item_list.append(new_item)
def interpark(self,url,item : dict):
    short_url = url
    self.driver.get(url)
    item_code=short_url[:short_url.find('&biz_cd')][-10:]
    try:
        mall = self.driver.find_element(By.CSS_SELECTOR,'div.sellerName > a').text
    except:
        mall = "Erorr"
    try:
        name = self.driver.find_element(By.CSS_SELECTOR,'span.subject').text
    except:
        name = "Erorr"
    try:
        original_price = self.driver.find_element(By.CSS_SELECTOR,'span.originPrice > em').text.replace(",","")
        pc_discount_price = self.driver.find_element(By.CSS_SELECTOR,'div.salePriceWrap > span').find_element(By.CSS_SELECTOR,'em').text.replace(",","")
    except:
        original_price = self.driver.find_element(By.CSS_SELECTOR,'div.salePriceWrap > span').find_element(By.CSS_SELECTOR,'em').text.replace(",","")
        pc_discount_price = '0'
    mb_discount_price = '0'
    new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
    self.item_list.append(new_item)
def lotteon(self,url,item : dict):
    short_url = url
    self.driver.get(url)
    item_code=short_url[:short_url.find('?sitmNo')][-12:]
    try:
        mall = self.driver.find_element(By.CSS_SELECTOR,'div.priceShow').find_elements(By.CSS_SELECTOR,'p')[1].text.replace("판매자 : ","")
    except:
        mall = 'Erorr'
    try:
        name = self.driver.find_element(By.CSS_SELECTOR,'div.productName').text
    except:
        name = 'Erorr'
    try:
        original_price = self.driver.find_element(By.CSS_SELECTOR,'div.priceInfo').text.replace(",","").replace("할인가","").replace("원","").replace("\n","")
        pc_discount_price = self.driver.find_element(By.CSS_SELECTOR,'div.discountInfo > div.price').find_element(By.CSS_SELECTOR,'span').text.replace(",","").replace("\n","")
    except:
        original_price = self.driver.find_element(By.CSS_SELECTOR,'div.priceInfo').text.replace(",","").replace("할인가","").replace("원","").replace("\n","")
        pc_discount_price = '0'
    mb_discount_price = '0'
    new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
    self.item_list.append(new_item)
# ---------------------------------------------------------------------------------------------------------------------------------------------------------
## [내장] 웹 사이트 구별 ##
def site_distinction(self,url,item):
    if url[:24] == "http://www.gmarket.co.kr":
        self.thread_list.append(Thread(target=self.gmarket,args=(url,item)))
    elif url[:23] == "http://pd.auction.co.kr":
        self.thread_list.append(Thread(target=self.auction,args=(url,item)))
    elif url[:24] == "https://link.coupang.com":
        self.thread_list.append(Thread(target=self.coupang,args=(url,item)))
    elif url[:22] == "https://www.11st.co.kr":
        self.st11(url,item)
    elif url[:24] == "http://www.interpark.com":
        self.interpark(url,item)
    elif url[:23] == "https://www.lotteon.com":
        self.lotteon(url,item)
    else:
        short_url = url
        mb_discount_price = '0'
        item.update(redirect_url=short_url)
        try:
            item["판매처"]
            self.item_list.append({"판매채널":"자사몰","판매처":item["판매처"],"상품코드":item["상품코드"],"상품명":item["상품명"],"정상가격":item["정상가격"],"PC할인가격":item["PC할인가격"],"MOBILE할인가격":mb_discount_price,"url":item["redirect_url"]})
        except:
            self.item_list.append({"판매채널":item["판매채널"],"판매처":"오류로 인하여 판매처 미상","상품코드":item["상품코드"],"상품명":item["상품명"],"정상가격":item["정상가격"],"PC할인가격":item["PC할인가격"],"MOBILE할인가격":mb_discount_price,"url":item["redirect_url"]})
# ---------------------------------------------------------------------------------------------------------------------------------------------------------
## [실행] 파라미터 사이트 크롤링 ##
def basic_data(self):
    """외부몰 (자사몰)이 있을 수 있기 때문에 li 태그의 상품명 가격도 함께 크롤링 진행한다."""
    self.set_driver()

    if self.select_option != None:
        self.option()

    def crawling(items):
        data_list = []
        for item in items:
            ## 상품명
            name = item.find_element(By.CSS_SELECTOR,"a.productList_title__R1qZP").text
            ## 상품코드 (** 외부몰을 위한 임의의 상품코드//실제코드가 있으면 각 함수에서 업데이트 예정 **)
            item_code = item.find_element(By.CSS_SELECTOR,"a.productList_title__R1qZP").get_attribute('data-i')
            ## 판매가
            price = item.find_element(By.CSS_SELECTOR,"a.productList_value__B_IxM").find_element(By.CSS_SELECTOR,"em").text.replace(",","")
            ## 판매처
            try:
                img = item.find_element(By.CSS_SELECTOR,"a.productList_mall_link__TrYxC").find_element(By.CSS_SELECTOR,"img")
                mall_name = img.get_attribute('alt')
                data_dic = {"판매채널":mall_name,"상품명":name,"정상가격":price,"PC할인가격":'0',"상품코드":item_code}
            except:
                mall_name = item.find_element(By.CSS_SELECTOR,"a.productList_mall_link__TrYxC").find_element(By.CSS_SELECTOR,"span").text
                data_dic = {"판매처":mall_name,"상품명":name,"정상가격":price,"PC할인가격":'0',"상품코드":item_code}
            ## redirect_url
            direct_url = item.find_element(By.CSS_SELECTOR,"a.productList_title__R1qZP").get_attribute('href')
            data_dic.update(redirect_url=direct_url)
            data_list.append(data_dic)
        return data_list

    basic_list = []
    while True:
        try:
            time.sleep(1)
            btns = self.driver.find_element(By.CSS_SELECTOR,"div.productList_seller_wrap__FZtUS").find_elements(By.CSS_SELECTOR,"div.pagination_pagination__JW7zT > a")
            break
        except Exception as e:
            print(str(e))
            continue
    if btns:
        for i,btn in enumerate(btns):
            if i != 0:
                self.driver.execute_script('arguments[0].click();',btn)
                time.sleep(2)
            items = self.driver.find_element(By.CSS_SELECTOR,"div.productList_seller_wrap__FZtUS").find_elements(By.CSS_SELECTOR,"ul.productList_list_seller__XGhCk > li")
            data_list = crawling(items)
            basic_list.extend(data_list)
    else:
        items = self.driver.find_element(By.CSS_SELECTOR,"div.productList_seller_wrap__FZtUS").find_elements(By.CSS_SELECTOR,"ul.productList_list_seller__XGhCk > li")
        data_list = crawling(items)
        basic_list.extend(data_list)
    print(f"[2] Basic_Complete ----------------- {time.time()-self.start}")
    return basic_list
## [실행] 각 채널 정보 크롤링 ##
def set_item_list(self,data_list : list):
    for data in data_list:
        redirect_url = data['redirect_url']
        res = requests.get(redirect_url)
        soup = bs4(res.text, 'html.parser')
        script = soup.find_all("script")
        if len(script) == 1:
            script = soup.find("script").text
            real_url = script.split('"')[3]
            self.site_distinction(real_url,data)
        else:
            self.thread_list.append(Thread(target=self.smartstore,args=(redirect_url,data)))
    for t in self.thread_list:
        t.start()
    for t in self.thread_list:
        t.join()
    print(f"[3] Thread_Complete ----------------- {time.time()-self.start}")
    main = self.driver.window_handles
    for i in main:
        self.driver.switch_to.window(i)
        self.driver.close()
    
## [실행] 기준가격 기준 정렬 ##
def sort_item_list(self):
    before_data = []
    result = []
    ## 매개변수 price_standard --> 기준가격 Key, Value 추가 
    for item in self.item_list:
        if self.price_standard == "할인가격":
            if int(item["PC할인가격"]) > 0:
                new_price = item["PC할인가격"]
            else:
                new_price = item["정상가격"]
        else:
            new_price = item["정상가격"]
        
        new_dict = {'기준가격':new_price}
        print(item["판매채널"],item["판매처"],new_dict)
        for key, value in item.items():
            new_dict[key]=value
        before_data.append(new_dict)
    ## 기준가격을 대상으로 오름차순 정렬
    after_data = sorted(before_data,key=lambda x:int(x["기준가격"]))
    ## 순위 추가
    for i,data in enumerate(after_data):
        result_dic = {"순위":i+1}
        for k,v in data.items():
            result_dic[k]=v
        result.append(result_dic)
        
    self.item_list = result
    print(f"[4] Sort_Done ----------------- {time.time()-self.start}")
    return self.item_list
## [메인] 실행 ##
def main(self):
    basic_list = self.basic_data()
    self.set_item_list(basic_list)
    item_list = self.sort_item_list()
    return item_list
if __name__ == "__main__":


file_path = f"{os.path.dirname(os.path.realpath(__file__))}/item_list.json"
with open(file_path,"r",encoding="utf-8-sig") as f:
    items = json.load(f)["item_list"]

for item in items:
    c = itemCrawling(
                    item["comparison_url"],
                    item["select_option"],
                    item["price_standard"]
                    )
    item_list = c.main()
    for i in item_list:
        print(i)
    print(len(item_list))
    
Michael Butscher
  • 10,028
  • 4
  • 24
  • 25
hanbit
  • 37
  • 4
  • 2
    Your code is quite long and unclear to me. Please provide a simplified version, including comments in ___english___ – kaliiiiiiiii Jan 20 '23 at 07:13
  • With threading you can open multiple independent windows (instances) of the browser and run simultaneously code in each window. If this is what you need look here https://stackoverflow.com/a/72574108/8157304 – sound wave Jan 20 '23 at 11:08

0 Answers0