0

I've been on this site a while and I've found so many helpful solutions to the problems I've encountered as I build my first python program. I'm hopeful you guys can help me once again.

I am trying to launch a variable number of multiprocesses, with each one taking a small piece of a list to scan. I have been tinkering with queues, but when I implement them, they always add a sizable amount of time to my loop. I am looking to maximize my speed while protecting my Titles.txt from erroneous contents. Let me show you my code.

l= ['url1', 'url2', etc]

def output(t):  
    f = open('Titles.txt','a')
    f.write(t)
    f.close()

def job(y,processload):
    calender = ['Jan', 'Feb', 'Mar', 'Dec']   #the things i want to find
    for i in range(processload):              #looping processload times
        source = urllib.request.urlopen(l[y]).read()      #read url #y
        soup = bs.BeautifulSoup(source,'lxml')
            for t in soup.html.head.find_all('title'):
                if any(word in t for word in calender):  
                    output(t)                 #this what i need to queue
    y+=1                                      #advance url by 1

if __name__ == '__main__':
    processload=5                 #the number of urls to be scanned by job
    y=0                           #the specific count of url in list
    runcount = 0
    while runcount == 0:          #engage loop 
        for i in range(380/processload):      #the list size / 5
            p= multiprocessing.Process(target=job, args=(y,processload)
            p.start()
            y+=processload        #jump y ahead

The code above allows for maximum speed in my loop. I would like to preserve the speed while also protecting my output. I have been searching through examples, but I haven't found code yet that features a lock or queue started in a child process. How would you recommend I proceed?

Thank you very much.

suscat
  • 61
  • 8
  • I assume you need to create a single `Queue` and pass it in as another argument in the tuple. Then each process can push `t` onto the queue. Then, when the processes have quit (you will need a collection of them and join all of them) you can process the queue into 'Titles.txt' – quamrana Oct 25 '17 at 19:05
  • Since the processes will be endlessly recycling, would I need to run the write in the loop after `for i in range`? – suscat Oct 26 '17 at 17:42
  • I don't know what you mean about endlessly recycling, but either you wait for all the processes to finish (call `join()` on each `Process`) and then service the queue, or you also start up another `Process` to sink the `Queue` out to the file. – quamrana Oct 26 '17 at 19:18
  • So, in order to wait for the processes to finish, would i need to `o=output(t)`,`o.join()`? – suscat Oct 26 '17 at 19:46
  • No, see my answer. – quamrana Oct 26 '17 at 20:19
  • Well that's what I mean, would i call output.join() on each process or do you mean i would call p.join in my loop? – suscat Oct 26 '17 at 20:34

1 Answers1

0

This example code does what I think you want a program to do:

import multiprocessing as mp
import time
import random

# Slicing a list into sublists from SilentGhost
# https://stackoverflow.com/a/2231685/4834
def get_chunks(input_list, chunk_size):
    return [input_list[i:i+chunk_size] for i in range(0, len(input_list), chunk_size)]

def find_all(item):
    ''' Dummy generator to simulate fetching a page and returning interesting stuff '''
    secs = random.randint(1,5)
    time.sleep(secs)
    # Just one yield here, but could yield each item found
    yield item


def output(q):
    ''' Dummy sink which prints instead of writing to a file '''
    while True:
        item = q.get()
        if item is None:
            return
        print(item)

def job(chunk, q):
    for item in chunk:
        for t in find_all(item):
            q.put(t)
    print('Job done:', chunk)



if __name__ == '__main__':
    all_urls = ['url1', 'url2', 'url3', 'url4', 'url5', 'url6']

    chunks = get_chunks(all_urls, 2)
    q = mp.Queue()
    # Create processes, each taking a chunk and the queue
    processes = [mp.Process(target=job, args=(chunk,q)) for chunk in chunks]

    # Start them all
    for p in processes:
        p.start()

    # Create and start the sink
    sink = mp.Process(target=output, args=(q,))
    sink.start()

    # Wait for all the jobs to finish
    for p in processes:
        p.join()

    # Signal the end with None
    q.put(None)

    sink.join()

Example output:

url3
Job done: ['url3', 'url4']
url4
url5
url1
Job done: ['url5', 'url6']
url6
Job done: ['url1', 'url2']
url2
quamrana
  • 37,849
  • 12
  • 53
  • 71
  • Firstly, thank you so much. I visited the link you provided. What he suggested is exactly what I was trying to do, in a much more efficient way. Moreover, your method is extremely effective in what I am trying to do. I was able to adapt my code very easily, and my initial question is well answered. I wonder, however, if it would be possible to pass a second argument to the queue? I now `yield info` where info is the url and short description of something interesting, but I would like to add a second variable so I can determine where to write the file in `output()` with `if myvar ==` – suscat Oct 26 '17 at 21:21
  • Tuples are brilliant in Python, you just make them like this: `(a,b,c)` and they can then be passed as a single argument between functions, until you need to unpack them like this: `a,b,c = arg`. Also you can also ask a separate question about this on Stackoverflow. – quamrana Oct 27 '17 at 08:19