17

The work here is to scrape an API a site that starts from https://xxx.xxx.xxx/xxx/1.json to https://xxx.xxx.xxx/xxx/1417749.json and write it exactly to mongodb. For that I have the following code:

client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
write_log = open("logging.log", "a")
min = 1
max = 1417749
for n in range(min, max):
    response = requests.get("https:/xx.xxx.xxx/{}.json".format(str(n)))
    if response.status_code == 200:
        parsed = json.loads(response.text)
        inserted = com.insert_one(parsed)
        write_log.write(str(n) + "\t" + str(inserted) + "\n")
        print(str(n) + "\t" + str(inserted) + "\n")
write_log.close()

But it is taking lot of time to do the task. Question here is how can I speed up this process.

Tek Nath Acharya
  • 1,676
  • 2
  • 20
  • 35
  • Did you first try to benchmark how long it takes to process single json? Assuming it takes 300ms per record, you can process all of these records sequentially in about 5 days. – tuxdna Dec 21 '19 at 08:16

8 Answers8

10

There are several things that you could do:

  1. Reuse connection. According to the benchmark below it is about 3 times faster
  2. You can scrape in multiple processes in parallel

Parallel code from here

from threading import Thread
from Queue import Queue
q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

Timings from this question for reusable connection

>>> timeit.timeit('_ = requests.get("https://www.wikipedia.org")', 'import requests', number=100)
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
...
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
Starting new HTTPS connection (1): www.wikipedia.org
52.74904417991638
>>> timeit.timeit('_ = session.get("https://www.wikipedia.org")', 'import requests; session = requests.Session()', number=100)
Starting new HTTPS connection (1): www.wikipedia.org
15.770191192626953
keiv.fly
  • 3,343
  • 4
  • 26
  • 45
6

You can improve your code on two aspects:

  • Using a Session, so that a connection is not re-arranged at every request and is kept open;

  • Using parallelism in your code with asyncio;

Give a look here https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html

albestro
  • 121
  • 1
  • 4
5

asyncio is also a solution if you don't want to use multi threading

import time
import pymongo
import json
import asyncio
from aiohttp import ClientSession


async def get_url(url, session):
    async with session.get(url) as response:
        if response.status == 200:
            return await response.text()


async def create_task(sem, url, session):
    async with sem:
        response = await get_url(url, session)
        if response:
            parsed = json.loads(response)
            n = url.rsplit('/', 1)[1]
            inserted = com.insert_one(parsed)
            write_log.write(str(n) + "\t" + str(inserted) + "\n")
            print(str(n) + "\t" + str(inserted) + "\n")


async def run(minimum, maximum):
    url = 'https:/xx.xxx.xxx/{}.json'
    tasks = []
    sem = asyncio.Semaphore(1000)   # Maximize the concurrent sessions to 1000, stay below the max open sockets allowed
    async with ClientSession() as session:
        for n in range(minimum, maximum):
            task = asyncio.ensure_future(create_task(sem, url.format(n), session))
            tasks.append(task)
        responses = asyncio.gather(*tasks)
        await responses


client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
write_log = open("logging.log", "a")
min_item = 1
max_item = 100

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(min_item, max_item))
loop.run_until_complete(future)
write_log.close()
Frans
  • 799
  • 6
  • 7
4

What you are probably looking for is asynchronous scraping. I would recommend you to create some batches of urls, ie 5 urls (try not to chrash the website), and scrape them asynchronous. If you dont know much about async, google for the libary asyncio. I hope I could help you :)

Theodor Peifer
  • 3,097
  • 4
  • 17
  • 30
4

Assuming that you won't get blocked by the API and that there are no rate limits, this code should make the process 50 times faster (maybe more because all requests are now sent using the same session).

import pymongo
import threading

client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = client["thread1"]
com = db["threadcol"]
start_time = time.time()
logs=[]

number_of_json_objects=1417750
number_of_threads=50

session=requests.session()

def scrap_write_log(session,start,end):
    for n in range(start, end):
        response = session.get("https:/xx.xxx.xxx/{}.json".format(n))
        if response.status_code == 200:
            try:
                logs.append(str(n) + "\t" + str(com.insert_one(json.loads(response.text))) + "\n")
                print(str(n) + "\t" + str(inserted) + "\n")
            except:
                logs.append(str(n) + "\t" + "Failed to insert" + "\n")
                print(str(n) + "\t" + "Failed to insert" + "\n")

thread_ranges=[[x,x+number_of_json_objects//number_of_threads] for x in range(0,number_of_json_objects,number_of_json_objects//number_of_threads)]

threads=[threading.Thread(target=scrap_write_log, args=(session,start_and_end[0],start_and_end[1])) for start_and_end in thread_ranges]

for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

with open("logging.log", "a") as f:
    for line in logs:
        f.write(line)
idar
  • 614
  • 6
  • 13
3

Try to chunk the requests and use the MongoDB bulk write operation.

  • group the requests(100 requests per group)
  • Iterate through the groups
  • Use asynchronous request model to fetch the data(URL in a group)
  • Update the DB after completing a group (Bulk write operation)

This might save lots of time in the following ways * MongoDB write latency * synchronous network call latency

But do not increase the parallel request count(Chunk size), It will increase the network load of the server and server might think this as a DDoS attack.

  1. https://api.mongodb.com/python/current/examples/bulk.html
thuva4
  • 1,185
  • 8
  • 13
2

I happened to have the same question many years ago. I'm never satisfied with python-based answers, which are pretty slow or too complicated. After I switch to other mature tools, the speed is fast and I never come back.

Recently I use such steps to speed up process as follows.

  1. generate a bunch of urls in txt
  2. use aria2c -x16 -d ~/Downloads -i /path/to/urls.txt to download these files
  3. parse locally

This is the fastest process I came up so far.

In terms of scraping web pages, I even download necessary *.html, instead of visiting the page once at a time, which actually make no difference. When you hit visit the page, with python tools like requests or scrapy or urllib, it still cache and download the whole web content for you.

anonymous
  • 1,372
  • 1
  • 17
  • 22
1

First create list of all links because all are same just change iterate it.

list_of_links=[]
for i in range(1,1417749):
    list_of_links.append("https:/xx.xxx.xxx/{}.json".format(str(i)))

t_no=2
for i in range(0, len(list_of_links), t_no):
    all_t = []
    twenty_links = list_of_links[i:i + t_no]
    for link in twenty_links:
        obj_new = Demo(link,)
        t = threading.Thread(target=obj_new.get_json)
        t.start()
        all_t.append(t)
    for t in all_t:
        t.join()

class Demo:
    def __init__(self, url):
        self.json_url = url

def get_json(self):
    try:
       your logic
    except Exception as e:
       print(e)

By simply increasing or decreasing t_no you can change no of threads..

Mobin Al Hassan
  • 954
  • 11
  • 22