1

I'm having issuing using most or all of the cores to process the files faster , it can be reading multiple files a time or using multiple cores to read a single file.

I would prefer using multiple cores to read a single file before moving it to the next.

I tried the code below but can't seem to get all the core used up.

The following code would basically retrieve *.txt file in the directory which contains htmls , in json format.

   #!/usr/bin/python
    # -*- coding: utf-8 -*-
    import requests
    import json
    import urlparse
    import os
    from bs4 import BeautifulSoup
    from multiprocessing.dummy import Pool  # This is a thread-based Pool
    from multiprocessing import cpu_count

    def crawlTheHtml(htmlsource):
        htmlArray = json.loads(htmlsource)
        for eachHtml in htmlArray:
            soup = BeautifulSoup(eachHtml['result'], 'html.parser')
            if all(['another text to search' not in str(soup),
                   'text to search' not in str(soup)]):
                try:
                    gd_no = ''
                    try:
                        gd_no = soup.find('input', {'id': 'GD_NO'})['value']
                    except:
                        pass

                    r = requests.post('domain api address', data={
                        'gd_no': gd_no,
                        })
                except:
                    pass


    if __name__ == '__main__':
        pool = Pool(cpu_count() * 2)
        print(cpu_count())
        fileArray = []
        for filename in os.listdir(os.getcwd()):
            if filename.endswith('.txt'):
                fileArray.append(filename)
        for file in fileArray:
            with open(file, 'r') as myfile:
                htmlsource = myfile.read()
                results = pool.map(crawlTheHtml(htmlsource), f)

On top of that , i'm not sure what the ,f represent.

Question 1 :

What did i not do properly to fully utilize all the cores/threads ?

Question 2 :

Is there a better way to use try : except : because sometimes the value is not in the page and that would cause the script to stop. When dealing with multiple variables, i will end up with a lot of try & except statement.

CodeGuru
  • 3,645
  • 14
  • 55
  • 99

3 Answers3

1

Answer to question 1, your problem is this line:

from multiprocessing.dummy import Pool  # This is a thread-based Pool

Answer taken from: multiprocessing.dummy in Python is not utilising 100% cpu

When you use multiprocessing.dummy, you're using threads, not processes:

multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.

That means you're restricted by the Global Interpreter Lock (GIL), and only one thread can actually execute CPU-bound operations at a time. That's going to keep you from fully utilizing your CPUs. If you want get full parallelism across all available cores, you're going to need to address the pickling issue you're hitting with multiprocessing.Pool.

Yassine Faris
  • 951
  • 6
  • 26
  • "address the pickling issue" Could you elaborate this ? :) – CodeGuru May 04 '18 at 16:12
  • 1
    Because process do not share memory they need a way to share information. They do this by a mechanism called serialization using the `Pickle` module. It just convert a python object by serialize it into bytestream in process before sending it into an other process when the bytestream is deserialize it into an object. – Yassine Faris May 04 '18 at 16:17
0

i had this probleme you need to do

from multiprocessing import Pool
from multiprocessing import freeze_support

and you need to do in the end

if __name__ = '__main__':
  freeze_support()

and you can continue your script

Skiller Dz
  • 897
  • 10
  • 17
0
from  multiprocessing import Pool, Queue
from os import getpid
from time import sleep
from random import random

MAX_WORKERS=10

class Testing_mp(object):
    def __init__(self):
        """
        Initiates a queue, a pool and a temporary buffer, used only
        when the queue is full.
        """
        self.q = Queue()
        self.pool = Pool(processes=MAX_WORKERS, initializer=self.worker_main,)
        self.temp_buffer = []

    def add_to_queue(self, msg):
        """
        If queue is full, put the message in a temporary buffer.
        If the queue is not full, adding the message to the queue.
        If the buffer is not empty and that the message queue is not full,
        putting back messages from the buffer to the queue.
        """
        if self.q.full():
            self.temp_buffer.append(msg)
        else:
            self.q.put(msg)
            if len(self.temp_buffer) > 0:
                add_to_queue(self.temp_buffer.pop())

    def write_to_queue(self):
        """
        This function writes some messages to the queue.
        """
        for i in range(50):
            self.add_to_queue("First item for loop %d" % i)
            # Not really needed, just to show that some elements can be added
            # to the queue whenever you want!
            sleep(random()*2)
            self.add_to_queue("Second item for loop %d" % i)
            # Not really needed, just to show that some elements can be added
            # to the queue whenever you want!
            sleep(random()*2)

    def worker_main(self):
        """
        Waits indefinitely for an item to be written in the queue.
        Finishes when the parent process terminates.
        """
        print "Process {0} started".format(getpid())
        while True:
            # If queue is not empty, pop the next element and do the work.
            # If queue is empty, wait indefinitly until an element get in the queue.
            item = self.q.get(block=True, timeout=None)
            print "{0} retrieved: {1}".format(getpid(), item)
            # simulate some random length operations
            sleep(random())

# Warning from Python documentation:
# Functionality within this package requires that the __main__ module be
# importable by the children. This means that some examples, such as the
# multiprocessing.Pool examples will not work in the interactive interpreter.
if __name__ == '__main__':
    mp_class = Testing_mp()
    mp_class.write_to_queue()
    # Waits a bit for the child processes to do some work
    # because when the parent exits, childs are terminated.
    sleep(5)
CodeGuru
  • 3,645
  • 14
  • 55
  • 99