Hello I am working on crawling a web page. We are crawling by utilizing the requests part that can make requests to the server and the webdriver operation using selenium.In the function site_distinction(), only the requests part was extracted and parallel processing was performed using threads, but as the number of dynamic web pages increased, the processing speed slowed down considerably. I wonder if there is a way to parallelize processing by controlling selenium with a thread.
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup as bs4
from threading import Thread
import os
import requests
import json
import time
class itemCrawling:
def __init__(self, comparison_url : str, select_option : str = None, price_standard : str = "할인가격"):
"""
- 파라미터 구성
comparison_url ==> [string] 가격비교 URL (** 카탈로그 **)
select_option ==> [str] 옵션이 있다면 str으로 전달 (** 디폴트는 None **)
price_standard ==> [str] 비교할 가격 기준 (정상가격 / 할인가격 택 1) (디폴트 값으로 "할인가격"이 정해져 있으나, 상품의 "할인가격"이 없다면, 정상가격을 기준한다.)
"""
self.comparison_url = comparison_url # 가격비교 URL
self.select_option = select_option # 옵션값
self.price_standard = price_standard # 비교할 가격 기준 (정상가격 / 할인가격 택 1)
self.headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'}
self.start = time.time()
self.thread_list = []
self.item_list=[]
def set_driver(self):
service = Service(ChromeDriverManager().install())
options = Options()
options.add_experimental_option('detach',True)
options.add_experimental_option('excludeSwitches',['enable-logging'])
options.add_argument("disable-gpu") # 가속 사용 x
options.add_argument("lang=ko_KR") # 가짜 플러그인 탑재
options.add_argument('user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36') # user-agent 이름 설정
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
# options.add_argument('--headless')
self.driver = webdriver.Chrome(service=service,options=options)
self.driver.get(self.comparison_url)
def option(self):
print(f"[Get_option] ----- 본 페이지의 선택 옵션은 [{self.select_option}] 입니다.")
lis = self.driver.find_elements(By.CSS_SELECTOR,'div.filter_condition_group__h8Gss')[-1].find_elements(By.CSS_SELECTOR,'ul > li')
for li in lis:
option = li.find_element(By.CSS_SELECTOR,'span').text
if self.select_option == option:
label = li.find_element(By.CSS_SELECTOR,'label.filter_label__3GLbR')
self.driver.execute_script('arguments[0].click();',label)
time.sleep(3)
break
# ---------------------------------------------------------------------------------------------------------------------------------------------------------
## [내장] 각 채널 크롤링 ##
### requests thread(1)
def smartstore(self,url,item : dict):
while True:
try:
response = requests.get(url)
if response.status_code == 200:
soup = bs4(response.text, 'html.parser')
scripts = soup.find('body').find_all('script')
data = scripts[0].text.strip()[27:]
script_data = json.loads(data)
break
except Exception as e:
print(str(e))
continue
product = script_data["product"]["A"]
item_code = product["id"]
channel = "스마트스토어"
item_url = product["productUrl"] # type str
name = product["name"] # type str
## 가격종류 변수할당
original_price = str(product["salePrice"]) # type str - [1] 정상가격
pc_discount_value = str(product["benefitsView"]["discountedSalePrice"]) # type str - [2] PC할인가격
mb_discount_value = str(product["benefitsView"]["mobileDiscountedSalePrice"]) # type str - [3] MOBILE할인가격
### [할인없음] [1], [2], [3] 모든 가격이 같음
if original_price == pc_discount_value and original_price == mb_discount_value:
pc_discount_price = '0' # type str
mb_discount_price = '0' # type str
### [MOBILE만 할인] [3] 가격만 다름
elif original_price == pc_discount_value and original_price != mb_discount_value:
pc_discount_price = '0' # type str
mb_discount_price = mb_discount_value
### [PC만 할인] [2] 가격만 다름
elif original_price != pc_discount_value and original_price == mb_discount_value:
pc_discount_price = pc_discount_value
mb_discount_price = '0' # type str
### [전체할인] [2], [3] 가격 같음, [1] 가격만 다름
elif original_price != pc_discount_value and original_price != mb_discount_value:
pc_discount_price = pc_discount_value
mb_discount_price = mb_discount_value
### [PC 할인 외 MOBILE은 추가 할인] [1], [2], [3] 모든 가격이 다름
elif original_price != pc_discount_value and original_price != mb_discount_value and pc_discount_value != mb_discount_value:
pc_discount_price = pc_discount_value
mb_discount_price = mb_discount_value
new_item = {"판매채널":channel,"판매처":item['판매처'],"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":item_url}
self.item_list.append(new_item)
def gmarket(self,url,item : dict):
# (1)번 url의 파라미터만 추출
start_idx = url.find('?')
end_idx = url.find('&NaPm')
url_pram = url[start_idx:end_idx]
# (2) g마켓 도메인과 (1)번에서 추출한 파라미터 연결
short_url = f'http://item.gmarket.co.kr/Item{url_pram}'
gmarket_res = requests.get(short_url)
gmarket_soup = bs4(gmarket_res.text,'html.parser')
item_code = gmarket_soup.find('span',class_='text__item-number').text.replace("상품번호 : ","")
mall = gmarket_soup.find('span',class_='text__seller').find('a',class_='link__seller').text
name = gmarket_soup.find('h1',class_='itemtit').text # 상품명
try:
# original = soup2.find('span',class_='price_innerwrap').find('span',class_='price_original')
original_price = gmarket_soup.find('span',class_='text__price').text.replace(",","")
pc_discount_price = gmarket_soup.find('strong',class_='price_real').text.replace(",","").replace("원","")
except:
original_price = gmarket_soup.find('strong',class_='price_real').text.replace(",","").replace("원","")
pc_discount_price = '0'
mb_discount_price = '0'
new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
self.item_list.append(new_item)
def auction(self,url,item : dict):
# (1)번 url "&NaPm" 전까지 파라미터 추출
index = url.find('&NaPm')
# (2) 단축 url 정의
short_url = url[:index]
res = requests.get(short_url)
soup = bs4(res.text,'html.parser')
# (3) 리다이렉트 사이트 크롤링
item_code = url[:url.find('&pc')][-10:]
mall = soup.find('span',class_='text__seller').find('a',class_='link__seller').text
name = soup.find('h1',class_='itemtit').text
try:
original_price = soup.find('span',class_='price_original').text.replace(",","").replace("원","")
pc_discount_price = soup.find('strong',class_='price_real').text.replace(",","").replace("원","")
except:
original_price = soup.find('strong',class_='price_real').text.replace(",","").replace("원","")
pc_discount_price = '0'
mb_discount_price = '0'
new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
self.item_list.append(new_item)
def coupang(self,url,item : dict):
coup_res =requests.get(url, headers=self.headers)
coup_soup=bs4(coup_res.text,'html.parser')
script=coup_soup.find('head').find_all('script')[4].text
start_idx=script.find('"wishList"')-1
end_idx=script.find(';')
data=script[start_idx:end_idx]
json_data = json.loads(data)
item_code = url[:url.find('&vendorItemId')][-10:]
mall = json_data["vendor"]["name"]
name = json_data["title"]
short_url = f'https://www.coupang.com{json_data["vendor"]["link"]}'
if json_data["quantityBase"][0]["price"]["originPrice"] == None:
original_price = json_data["quantityBase"][0]["price"]["salePrice"].replace(",","")
pc_discount_price = '0'
else:
original_price = json_data["quantityBase"][0]["price"]["originPrice"].replace(",","")
pc_discount_price = json_data["quantityBase"][0]["price"]["salePrice"].replace(",","")
mb_discount_price = '0'
new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
self.item_list.append(new_item)
### selenium thread(2)
def st11(self,url,item : dict):
idx = url.find('&NaPm')
short_url = url[:idx]
self.driver.get(short_url)
item_code=short_url[:short_url.find('&tid')][-10:]
try:
mall=self.driver.find_element(By.CSS_SELECTOR,'h1.c_product_store_title').text
except:
mall = 'Erorr'
try:
name=self.driver.find_element(By.CSS_SELECTOR,'div.c_product_info_title > h1.title').text
except:
name = 'Erorr'
price_wrap = self.driver.find_element(By.CSS_SELECTOR,'ul.price_wrap')
try:
original_price = price_wrap.find_element(By.CSS_SELECTOR,'dl.price_regular').find_element(By.CSS_SELECTOR,'span.value').text.replace(",","")
pc_discount_price = price_wrap.find_element(By.CSS_SELECTOR,'dl.price').find_element(By.CSS_SELECTOR,'span.value').text.replace(",","")
except:
original_price = price_wrap.find_element(By.CSS_SELECTOR,'dl.price').find_element(By.CSS_SELECTOR,'span.value').text.replace(",","")
pc_discount_price = '0'
mb_discount_price = '0'
new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
self.item_list.append(new_item)
def interpark(self,url,item : dict):
short_url = url
self.driver.get(url)
item_code=short_url[:short_url.find('&biz_cd')][-10:]
try:
mall = self.driver.find_element(By.CSS_SELECTOR,'div.sellerName > a').text
except:
mall = "Erorr"
try:
name = self.driver.find_element(By.CSS_SELECTOR,'span.subject').text
except:
name = "Erorr"
try:
original_price = self.driver.find_element(By.CSS_SELECTOR,'span.originPrice > em').text.replace(",","")
pc_discount_price = self.driver.find_element(By.CSS_SELECTOR,'div.salePriceWrap > span').find_element(By.CSS_SELECTOR,'em').text.replace(",","")
except:
original_price = self.driver.find_element(By.CSS_SELECTOR,'div.salePriceWrap > span').find_element(By.CSS_SELECTOR,'em').text.replace(",","")
pc_discount_price = '0'
mb_discount_price = '0'
new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
self.item_list.append(new_item)
def lotteon(self,url,item : dict):
short_url = url
self.driver.get(url)
item_code=short_url[:short_url.find('?sitmNo')][-12:]
try:
mall = self.driver.find_element(By.CSS_SELECTOR,'div.priceShow').find_elements(By.CSS_SELECTOR,'p')[1].text.replace("판매자 : ","")
except:
mall = 'Erorr'
try:
name = self.driver.find_element(By.CSS_SELECTOR,'div.productName').text
except:
name = 'Erorr'
try:
original_price = self.driver.find_element(By.CSS_SELECTOR,'div.priceInfo').text.replace(",","").replace("할인가","").replace("원","").replace("\n","")
pc_discount_price = self.driver.find_element(By.CSS_SELECTOR,'div.discountInfo > div.price').find_element(By.CSS_SELECTOR,'span').text.replace(",","").replace("\n","")
except:
original_price = self.driver.find_element(By.CSS_SELECTOR,'div.priceInfo').text.replace(",","").replace("할인가","").replace("원","").replace("\n","")
pc_discount_price = '0'
mb_discount_price = '0'
new_item = {"판매채널":item["판매채널"],"판매처":mall,"상품코드":item_code,"상품명":name,"정상가격":original_price,"PC할인가격":pc_discount_price,"MOBILE할인가격":mb_discount_price,"url":short_url}
self.item_list.append(new_item)
# ---------------------------------------------------------------------------------------------------------------------------------------------------------
## [내장] 웹 사이트 구별 ##
def site_distinction(self,url,item):
if url[:24] == "http://www.gmarket.co.kr":
self.thread_list.append(Thread(target=self.gmarket,args=(url,item)))
elif url[:23] == "http://pd.auction.co.kr":
self.thread_list.append(Thread(target=self.auction,args=(url,item)))
elif url[:24] == "https://link.coupang.com":
self.thread_list.append(Thread(target=self.coupang,args=(url,item)))
elif url[:22] == "https://www.11st.co.kr":
self.st11(url,item)
elif url[:24] == "http://www.interpark.com":
self.interpark(url,item)
elif url[:23] == "https://www.lotteon.com":
self.lotteon(url,item)
else:
short_url = url
mb_discount_price = '0'
item.update(redirect_url=short_url)
try:
item["판매처"]
self.item_list.append({"판매채널":"자사몰","판매처":item["판매처"],"상품코드":item["상품코드"],"상품명":item["상품명"],"정상가격":item["정상가격"],"PC할인가격":item["PC할인가격"],"MOBILE할인가격":mb_discount_price,"url":item["redirect_url"]})
except:
self.item_list.append({"판매채널":item["판매채널"],"판매처":"오류로 인하여 판매처 미상","상품코드":item["상품코드"],"상품명":item["상품명"],"정상가격":item["정상가격"],"PC할인가격":item["PC할인가격"],"MOBILE할인가격":mb_discount_price,"url":item["redirect_url"]})
# ---------------------------------------------------------------------------------------------------------------------------------------------------------
## [실행] 파라미터 사이트 크롤링 ##
def basic_data(self):
"""외부몰 (자사몰)이 있을 수 있기 때문에 li 태그의 상품명 가격도 함께 크롤링 진행한다."""
self.set_driver()
if self.select_option != None:
self.option()
def crawling(items):
data_list = []
for item in items:
## 상품명
name = item.find_element(By.CSS_SELECTOR,"a.productList_title__R1qZP").text
## 상품코드 (** 외부몰을 위한 임의의 상품코드//실제코드가 있으면 각 함수에서 업데이트 예정 **)
item_code = item.find_element(By.CSS_SELECTOR,"a.productList_title__R1qZP").get_attribute('data-i')
## 판매가
price = item.find_element(By.CSS_SELECTOR,"a.productList_value__B_IxM").find_element(By.CSS_SELECTOR,"em").text.replace(",","")
## 판매처
try:
img = item.find_element(By.CSS_SELECTOR,"a.productList_mall_link__TrYxC").find_element(By.CSS_SELECTOR,"img")
mall_name = img.get_attribute('alt')
data_dic = {"판매채널":mall_name,"상품명":name,"정상가격":price,"PC할인가격":'0',"상품코드":item_code}
except:
mall_name = item.find_element(By.CSS_SELECTOR,"a.productList_mall_link__TrYxC").find_element(By.CSS_SELECTOR,"span").text
data_dic = {"판매처":mall_name,"상품명":name,"정상가격":price,"PC할인가격":'0',"상품코드":item_code}
## redirect_url
direct_url = item.find_element(By.CSS_SELECTOR,"a.productList_title__R1qZP").get_attribute('href')
data_dic.update(redirect_url=direct_url)
data_list.append(data_dic)
return data_list
basic_list = []
while True:
try:
time.sleep(1)
btns = self.driver.find_element(By.CSS_SELECTOR,"div.productList_seller_wrap__FZtUS").find_elements(By.CSS_SELECTOR,"div.pagination_pagination__JW7zT > a")
break
except Exception as e:
print(str(e))
continue
if btns:
for i,btn in enumerate(btns):
if i != 0:
self.driver.execute_script('arguments[0].click();',btn)
time.sleep(2)
items = self.driver.find_element(By.CSS_SELECTOR,"div.productList_seller_wrap__FZtUS").find_elements(By.CSS_SELECTOR,"ul.productList_list_seller__XGhCk > li")
data_list = crawling(items)
basic_list.extend(data_list)
else:
items = self.driver.find_element(By.CSS_SELECTOR,"div.productList_seller_wrap__FZtUS").find_elements(By.CSS_SELECTOR,"ul.productList_list_seller__XGhCk > li")
data_list = crawling(items)
basic_list.extend(data_list)
print(f"[2] Basic_Complete ----------------- {time.time()-self.start}")
return basic_list
## [실행] 각 채널 정보 크롤링 ##
def set_item_list(self,data_list : list):
for data in data_list:
redirect_url = data['redirect_url']
res = requests.get(redirect_url)
soup = bs4(res.text, 'html.parser')
script = soup.find_all("script")
if len(script) == 1:
script = soup.find("script").text
real_url = script.split('"')[3]
self.site_distinction(real_url,data)
else:
self.thread_list.append(Thread(target=self.smartstore,args=(redirect_url,data)))
for t in self.thread_list:
t.start()
for t in self.thread_list:
t.join()
print(f"[3] Thread_Complete ----------------- {time.time()-self.start}")
main = self.driver.window_handles
for i in main:
self.driver.switch_to.window(i)
self.driver.close()
## [실행] 기준가격 기준 정렬 ##
def sort_item_list(self):
before_data = []
result = []
## 매개변수 price_standard --> 기준가격 Key, Value 추가
for item in self.item_list:
if self.price_standard == "할인가격":
if int(item["PC할인가격"]) > 0:
new_price = item["PC할인가격"]
else:
new_price = item["정상가격"]
else:
new_price = item["정상가격"]
new_dict = {'기준가격':new_price}
print(item["판매채널"],item["판매처"],new_dict)
for key, value in item.items():
new_dict[key]=value
before_data.append(new_dict)
## 기준가격을 대상으로 오름차순 정렬
after_data = sorted(before_data,key=lambda x:int(x["기준가격"]))
## 순위 추가
for i,data in enumerate(after_data):
result_dic = {"순위":i+1}
for k,v in data.items():
result_dic[k]=v
result.append(result_dic)
self.item_list = result
print(f"[4] Sort_Done ----------------- {time.time()-self.start}")
return self.item_list
## [메인] 실행 ##
def main(self):
basic_list = self.basic_data()
self.set_item_list(basic_list)
item_list = self.sort_item_list()
return item_list
if __name__ == "__main__":
file_path = f"{os.path.dirname(os.path.realpath(__file__))}/item_list.json"
with open(file_path,"r",encoding="utf-8-sig") as f:
items = json.load(f)["item_list"]
for item in items:
c = itemCrawling(
item["comparison_url"],
item["select_option"],
item["price_standard"]
)
item_list = c.main()
for i in item_list:
print(i)
print(len(item_list))