6

I have been trying to get my code to work for many days, I am desperate. I've scoured the internet, but I still can't find it.

I have a text file encoded in "latin-1" of 9GB -> 737 022 387 lines, each line contains a string.

I would like to read each line and send them in an http PUT request that waits for a response, and returns TRUE or FALSE if the response is 200 or 400 The PUT request takes about 1 to 3 seconds, so to speed up the processing time I would like to use either a Thread or a multiprocessing.

To start, I simulate my PUT request with a sleep of 3 seconds. and even that I can't get it to work

This code split my string into char, i don't know why...

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,line )
        print(res)

This give error : TypeError: process_line() takes 1 positional argument but 17 were given

import multiprocessing
from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    with open(r"d:\txtFile",encoding="latin-1") as file:
        res = pool.apply(process_line,file.readline() )
        print(res)

that : Crash the computer

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,peon )
        print(res)
Eva exe
  • 87
  • 6
  • 1
    `map` works on an iterable of arguments. What you want is to remove the for loop and do: `res = pool.map(process_line, peon)` – flakes Apr 11 '22 at 06:06
  • That crash my computer, my file is 9GB. map load the file in the RAM ? – Eva exe Apr 11 '22 at 06:10
  • 2
    Ah you probably want `imap()` then. `for res in pool.imap(process_line, peon): ...` – flakes Apr 11 '22 at 06:17

2 Answers2

3

Although the problem seems unrealistic though. shooting 737,022,387 requests! calculate how many months it'll take from single computer!!

Still, Better way to do this task is to read line by line from file in a separate thread and insert into a queue. And then multi-process the queue.

Solution 1:

from multiprocessing import Queue, Process
from threading import Thread
from time import sleep

urls_queue = Queue()
max_process = 4

def read_urls():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            urls_queue.put(url.strip())
            print('put url: {}'.format(url.strip()))

    # put DONE to tell send_request_processor to exit
    for i in range(max_process):
        urls_queue.put("DONE")


def send_request(url):
    print('send request: {}'.format(url))
    sleep(1)
    print('recv response: {}'.format(url))


def send_request_processor():
    print('start send request processor')
    while True:
        url = urls_queue.get()
        if url == "DONE":
            break
        else:
            send_request(url)


def main():
    file_reader_thread = Thread(target=read_urls)
    file_reader_thread.start()

    procs = []
    for i in range(max_process):
        p = Process(target=send_request_processor)
        procs.append(p)
        p.start()

    for p in procs:
        p.join()

    print('all done')
    # wait for all tasks in the queue
    file_reader_thread.join()


if __name__ == '__main__':
    main()

Demo: https://onlinegdb.com/Elfo5bGFz

Solution 2:

You can use tornado asynchronous networking library

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await gen.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            await q.put(url)
            print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')
    # producer and consumer can run in parallel

IOLoop.current().run_sync(main)
Nitesh Tosniwal
  • 1,796
  • 2
  • 12
  • 17
1

Using method multiprocessing.pool.imap is a step in the right direction but the problem is that with so much input you will be feeding the input task queue faster than the processing pool can take the tasks off the queue and return results. Consequently, the task queue will continue to grow and you will exhaust memory. What is needed is a way to "throttle" method imap so that it blocks once the task queue size has N tasks on it. I think a reasonable value for N as a default is twice the pool size to ensure that when a pool process completes work on a task there will be no delay for it to find another task to work on. Hence we create classes BoundedQueueProcessPool (multiprocessing) and BoundedQueueThreadPool (multithreading):

import multiprocessing.pool
import multiprocessing
import threading


class ImapResult():
    def __init__(self, semaphore, result):
        self._semaphore = semaphore
        self.it = result.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        try:
            elem = self.it.__next__()
            self._semaphore.release()
            return elem
        except StopIteration:
            raise
        except:
            self._semaphore.release()
            raise

class BoundedQueuePool:
    def __init__(self, limit, semaphore):
        self._limit = limit
        self._semaphore = semaphore

    def release(self, result, callback=None):
        self._semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        self._semaphore.acquire()
        callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
        error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)

    def imap(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self._semaphore.acquire()
                yield elem
        if chunksize > self._limit:
            raise ValueError(f'chunksize argument exceeds {self._limit}')
        result = super().imap(func, new_iterable(iterable), chunksize)
        return ImapResult(self._semaphore, result)

    def imap_unordered(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self._semaphore.acquire()
                yield elem
        if chunksize > self._limit:
            raise ValueError(f'chunksize argument exceeds {self._limit}')
        result = super().imap_unordered(func, new_iterable(iterable), chunksize)
        return ImapResult(self._semaphore, result)

class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        limit = self._processes + max_waiting_tasks
        BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore(limit))

class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        limit = self._processes + max_waiting_tasks
        BoundedQueuePool.__init__(self, limit, threading.BoundedSemaphore(limit))


#######################################################################

from time import sleep


def process_line(line):
    sleep(3)
    # the lines already have line end characters:
    print(line, end='')
    return True

if __name__ == "__main__":
    pool = BoundedQueueProcessPool(2)
    with open("test.txt") as file:
        for res in pool.imap(process_line, file):
            #print(res)
            pass
    pool.close()
    pool.join()
Booboo
  • 38,656
  • 3
  • 37
  • 60