0

Have this code that reads a file and process it. The file is quite big, 12 million lines, so currently I split it manually into 1000 lines file and start each process sequentially for each 1000 lines (bash script).

Is there a way to use Twisted to load a file and process it by 1000 items from one file (progress bar would be nice) without the need for me to split it manually?

scanner.py

import argparse

from tqdm import tqdm
from sys import argv
from pprint import pformat

from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers

import lxml.html

from geoip import geolite2
import pycountry

from tld import get_tld
import json
import socket

poweredby = ""
server = ""
ip = ""


def cbRequest(response, url):
    global poweredby, server, ip
    # print 'Response version:', response.version
    # print 'Response code:', response.code
    # print 'Response phrase:', response.phrase
    # print 'Response headers:'
    # print pformat(list(response.headers.getAllRawHeaders()))
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
    server = response.headers.getRawHeaders("Server")[0]

    #print poweredby
    #print server

    d = readBody(response)
    d.addCallback(cbBody, url)
    return d


def cbBody(body, ourl):
    global poweredby, server,ip

    #print body
    html_element = lxml.html.fromstring(body)
    generator = html_element.xpath("//meta[@name='generator']/@content")

    ip = socket.gethostbyname(ourl)

    try:
        match = geolite2.lookup(ip)
        if match is not None:
            country = match.country
            try:

                c = pycountry.countries.lookup(country)
                country = c.name
            except:
                country = ""

    except:
        country = ""
    try:
        res = get_tld("http://www" + ourl, as_object=True)
        tld = res.suffix
    except:
        tld = ""

    try:
        match = re.search(r'[\w\.-]+@[\w\.-]+', body)
        email = match.group(0)
    except:
        email = ""

    permalink=ourl.rstrip().replace(".","-")

    try:
        item = generator[0]
        val = "{ \"Domain\":" + json.dumps(
            "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
            str(server)) + ",\"PoweredBy\":" + json.dumps(
                str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
                    email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
    except:
        val = "{ \"Domain\":" + json.dumps(
            "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
            str(server)) + ",\"PoweredBy\":" + json.dumps(
                str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
                    email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"


    print val

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Scanner v0.99')
    parser.add_argument(
        '-i', '--input', help='Input list of domains', required=True)
    args = parser.parse_args()
    input = args.input

with open(input) as f:
    urls = f.read().splitlines()


def mainjob(reactor, urls=urls):
    for url in tqdm(urls):
        agent = Agent(reactor)
        d = agent.request(
            'GET', "http://" + url,
            Headers({'User-Agent': ['bot']}),
            None)
        d.addCallback(cbRequest, url)
        d.addErrback(lambda x: None)  # ignore errors
    return d


react(mainjob, argv[3:])

Update 1:

Now I execute it like this:

file.txt - 12,000,000 lines

chunk01.txt - file with 1000 lines . . .

I execute for each chunk file a script.

python scanner.py chunk01.txt
python scanner.py chunk02.txt
.
.
.

Want to execute script once:

python scanner.py file.txt

The problem lies, that I need to pass urls as arguments to react(). If I read it to memory (via f.read()) as 12,000,000 file it is too big. Hence I splitted the file and execute script on each small file.

Hope it is now clearer...

Update 2:

Based on @Jean-Paul Calderone answer, I cooked this code.

It seems to work, however I am bumped since on:

180,000 iterations.... I would assume 180,000 domains (each line from input file), the script has only printed/outputted ca. 35707 lines (entries). I would expect it to be something close to 180,000 ... I know some domains will time out. When I run it the "old" way, it was more consistent, the number was closer i.e number of input domains was close to outputed lines in output file.

Can there something be "bad" with the code? Any ideas?

python scanner.py > out.txt

181668it [1:47:36,  4.82it/s]

and counting the lines:

wc -l out.txt
36840 out.txt

scanner.py

import argparse

from tqdm import tqdm
from sys import argv
from pprint import pformat

from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults

import lxml.html

from geoip import geolite2
import pycountry

from tld import get_tld
import json
import socket

poweredby = ""
server = ""
ip = ""


def cbRequest(response, url):
    global poweredby, server, ip
    # print 'Response version:', response.version
    # print 'Response code:', response.code
    # print 'Response phrase:', response.phrase
    # print 'Response headers:'
    # print pformat(list(response.headers.getAllRawHeaders()))
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
    server = response.headers.getRawHeaders("Server")[0]

    #print poweredby
    #print server

    d = readBody(response)
    d.addCallback(cbBody, url)
    return d


def cbBody(body, ourl):
    global poweredby, server,ip

    #print body
    html_element = lxml.html.fromstring(body)
    generator = html_element.xpath("//meta[@name='generator']/@content")

    ip = socket.gethostbyname(ourl)

    try:
        match = geolite2.lookup(ip)
        if match is not None:
            country = match.country
            try:

                c = pycountry.countries.lookup(country)
                country = c.name
            except:
                country = ""

    except:
        country = ""
    try:
        res = get_tld("http://www" + ourl, as_object=True)
        tld = res.suffix
    except:
        tld = ""

    try:
        match = re.search(r'[\w\.-]+@[\w\.-]+', body)
        email = match.group(0)
    except:
        email = ""

    permalink=ourl.rstrip().replace(".","-")

    try:
        item = generator[0]
        val = "{ \"Domain\":" + json.dumps(
            "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
            str(server)) + ",\"PoweredBy\":" + json.dumps(
                str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
                    email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
    except:
        val = "{ \"Domain\":" + json.dumps(
            "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
            str(server)) + ",\"PoweredBy\":" + json.dumps(
                str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
                    email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"


    print val


def main(reactor, url_path):
    urls = open(url_path)
    return mainjob(reactor, (url.strip() for url in urls))

def mainjob(reactor, urls=argv[2:]):
    #for url in urls:
    #  print url
    agent = Agent(reactor)
    work = (process(agent, url) for url in tqdm(urls))
    tasks = list(cooperate(work) for i in range(100))
    return gatherResults(list(task.whenDone() for task in tasks))



def process(agent, url):
    d = agent.request(
        'GET', "http://" + url,
        Headers({'User-Agent': ['bot']}),
        None)
    d.addCallback(cbRequest, url)
    d.addErrback(lambda x: None)  # ignore errors
    return d

react(main, ["./domains.txt"])

Update 3:

Updated the code to print errors to errors.txt

import argparse

from tqdm import tqdm
from sys import argv
from pprint import pformat

from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults

import lxml.html

from geoip import geolite2
import pycountry

from tld import get_tld
import json
import socket

poweredby = ""
server = ""
ip = ""

f = open("errors.txt", "w")


def error(response, url):
    f.write("Error: "+url+"\n") 


def cbRequest(response, url):
    global poweredby, server, ip
    # print 'Response version:', response.version
    # print 'Response code:', response.code
    # print 'Response phrase:', response.phrase
    # print 'Response headers:'
    # print pformat(list(response.headers.getAllRawHeaders()))
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
    server = response.headers.getRawHeaders("Server")[0]

    #print poweredby
    #print server

    d = readBody(response)
    d.addCallback(cbBody, url)
    return d


def cbBody(body, ourl):
    global poweredby, server,ip

    #print body
    html_element = lxml.html.fromstring(body)
    generator = html_element.xpath("//meta[@name='generator']/@content")

    ip = socket.gethostbyname(ourl)

    try:
        match = geolite2.lookup(ip)
        if match is not None:
            country = match.country
            try:

                c = pycountry.countries.lookup(country)
                country = c.name
            except:
                country = ""

    except:
        country = ""
    try:
        res = get_tld("http://www" + ourl, as_object=True)
        tld = res.suffix
    except:
        tld = ""

    try:
        match = re.search(r'[\w\.-]+@[\w\.-]+', body)
        email = match.group(0)
    except:
        email = ""

    permalink=ourl.rstrip().replace(".","-")

    try:
        item = generator[0]
        val = "{ \"Domain\":" + json.dumps(
            "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
            str(server)) + ",\"PoweredBy\":" + json.dumps(
                str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
                    email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
    except:
        val = "{ \"Domain\":" + json.dumps(
            "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
            str(server)) + ",\"PoweredBy\":" + json.dumps(
                str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
                    email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"


    print val


def main(reactor, url_path):
    urls = open(url_path)
    return mainjob(reactor, (url.strip() for url in urls))

def mainjob(reactor, urls=argv[2:]):
    #for url in urls:
    #  print url
    agent = Agent(reactor)
    work = (process(agent, url) for url in tqdm(urls))
    tasks = list(cooperate(work) for i in range(100))
    return gatherResults(list(task.whenDone() for task in tasks))



def process(agent, url):
    d = agent.request(
        'GET', "http://" + url,
        Headers({'User-Agent': ['crawler']}),
        None)
    d.addCallback(cbRequest, url)
    d.addErrback(error, url) 
    return d

react(main, ["./domains.txt"])

f.close()

Update 4:

I captured the traffic with Wireshark, just with 2 domains, those domains errored previously:

user@laptop:~/crawler$ python scanner.py 
2it [00:00, 840.71it/s]
user@laptop:~/crawler$ cat errors.txt 
Error: google.al
Error: fau.edu.al

As you can see they had errors, but with Wireshark I see the response:

enter image description here

dev
  • 1,119
  • 1
  • 11
  • 34
  • 1
    You should process your files like this: "for line in f". f.read() will read the whole file into memory. Look here: http://stackoverflow.com/a/8010133/1904113 – MKesper Feb 22 '17 at 13:34
  • Thanks! I am aware of this, but well I need to pass it as a parameter (url) to twisted (react()). Reading line by line will not advance the program to get to this point, it will stop at reading the file. New to react and Python. – dev Feb 22 '17 at 14:22
  • I don't quite understand what you want the new code to do. You lost me at "process it by 1000 items from one file". Can you try to restate this goal? – Jean-Paul Calderone Feb 22 '17 at 16:34
  • Thanks @Jean-PaulCalderone. I updated the question with Update 1: section. Hopefully it explains it more. – dev Feb 22 '17 at 18:02

1 Answers1

1

You need to add a limit to the amount of concurrency your program creates. Currently, you process all URLs given at the same time - or try to, at least:

def mainjob(reactor, urls=urls):
    for url in tqdm(urls):
        agent = Agent(reactor)
        d = agent.request(
            'GET', "http://" + url,
            Headers({'User-Agent': ['bot']}),
            None)
        d.addCallback(cbRequest, url)
        d.addErrback(lambda x: None)  # ignore errors
    return d

This issues a request for each URL without waiting for any of them to complete. Instead, use twisted.internet.task.cooperate to run a limited number at a time. This runs one request at a time:

def mainjob(reactor, urls):
    agent = Agent(reactor)
    work = (process(agent, url) for url in tqdm(urls))
    task = cooperate(work)
    return task.whenDone()

def process(agent, url):
    d = agent.request(
        'GET', "http://" + url,
        Headers({'User-Agent': ['bot']}),
        None)
    d.addCallback(cbRequest, url)
    d.addErrback(lambda x: None)  # ignore errors
    return d

You probably want more than that. So, call cooperate() a few more times:

def mainjob(reactor, urls=urls):
    agent = Agent(reactor)
    work = (process(agent, url) for url in tqdm(urls))
    tasks = list(cooperate(work) for i in range(100))
    return gatherResults(list(task.whenDone() for task in tasks))

This runs up to 100 requests at a time. Each task pulls the next element from work and waits on it. gatherResults waits for all 100 tasks to finish.

Now just avoid loading the complete input into memory at a time:

def main(reactor, url_path):
    urls = open(url_path)
    return mainjob(reactor, (url.strip() for url in urls))

react(main, ["path-to-urls.txt"])

This opens the url file but only reads lines from it as they're needed.

Jean-Paul Calderone
  • 47,755
  • 6
  • 94
  • 122
  • Hmmmm .. I ran code based on your answer for the while, but it behaves weird ... I would assume the amount of input would be close to amount of output (domains in, should produce out file which should be close with lines number to domain in file, some will time out etc). I described it in Update 2: @Jean-Paul Calderone do you have any idea why? – dev Feb 25 '17 at 13:07
  • A good step would be to make it so you can count failures, too. The code just throws them away right now. If you count failures you can see if the total of success + failure is correct and then proceed to looking at the failures to see if there's something to improve. If the total is still small, you can look at the driver to see where/how it's dropping jobs. – Jean-Paul Calderone Feb 25 '17 at 13:45
  • Thanks!!! I added addErrback to print to file. So after 1523 iterations, 1050 were errors and 427 were ok. The errored domains like google.al, while tested, were online. Any ideas? Should I open new question regarding the "driver troubleshooting"? Can I contact you out of bands? My email is in my profile description, so you can contact me if you wish via Email. – dev Feb 25 '17 at 16:46
  • I tried it on my laptop (DSL) and also on a server (different network) and it behaves similarly – dev Feb 25 '17 at 18:33
  • Did some captures with Wireshark. See Update 4: It seems the request and response is on the wire, but the twisted (driver) does not see it? – dev Feb 25 '17 at 19:10
  • 1
    What are the errors? And, yea, a long discussion in comments is not a great way to proceed. A new, focused question may be better (not "here's the latest iteration of my program, what's wrong?" but more like "here's a program, I expect this behavior, I observe this other behavior, I investigate this and found that, I don't see what the next step is"). I'm not really in to providing help one-on-one except on a compensated basis. – Jean-Paul Calderone Feb 25 '17 at 19:21
  • Thanks! I opened first question .... first to see the error message :) http://stackoverflow.com/questions/42460779/python-twisted-how-to-print-more-detailed-error-message – dev Feb 25 '17 at 19:45