0

The exception I get. All I did that I increased pool count

Code

 def parse(url):
  r = request.get(url)
POOL_COUNT = 75
with Pool(POOL_COUNT) as p:
    result = p.map(parse, links)



File "/usr/lib64/python3.5/multiprocessing/pool.py", line 130, in worker
    put((job, i, (False, wrapped)))
  File "/usr/lib64/python3.5/multiprocessing/queues.py", line 355, in put
    self._writer.send_bytes(obj)
  File "/usr/lib64/python3.5/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib64/python3.5/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib64/python3.5/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Process ForkPoolWorker-26:
Traceback (most recent call last):
  File "/usr/lib64/python3.5/multiprocessing/pool.py", line 125, in worker
    put((job, i, result))
  File "/usr/lib64/python3.5/multiprocessing/queues.py", line 355, in put
    self._writer.send_bytes(obj)
  File "/usr/lib64/python3.5/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib64/python3.5/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib64/python3.5/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Volatil3
  • 14,253
  • 38
  • 134
  • 263
  • can you post enough code for me to try and replicate? – Xingzhou Liu Jul 21 '17 at 06:45
  • Changing the count of an instanciated pool? explain! With `map_async` you can use pool without counting the proc you use: [example](https://stackoverflow.com/questions/4413821/multiprocessing-pool-example#4415314) – mquantin Jul 21 '17 at 06:45
  • @XingzhouLiu The question is updated – Volatil3 Jul 21 '17 at 06:52
  • @mquantin the Qs is updated – Volatil3 Jul 21 '17 at 06:52
  • @mquantin `map` is internally using `map_async.get()`. By changing count I mean increasing Pool count from 50 to 75 – Volatil3 Jul 21 '17 at 06:57
  • Can you give us some more info on what system you are running this on, whether this is a repeatable error, and how many links you are trying to process? – Andrew Guy Jul 21 '17 at 07:12
  • 1
    Are you running your activity or looking at resources? i was able to get that error by dumping pressure on memory and io by reading a 700mb file over and over again into memory. – Xingzhou Liu Jul 21 '17 at 07:18
  • @XingzhouLiu am running Amazon's 2GB instances and spawning 120 instances in a go – Volatil3 Jul 21 '17 at 09:32
  • Having the same problem for some time, the problem is faced due to memory utilization, especially when large amount of data is being processed. – Jawad Ahmad Khan Jul 24 '20 at 19:43

3 Answers3

6

I was seeing Broken Pipe exception too. But mine is more complicated.

One reason that increasing the pool size alone will lead to exception would be you're getting too many things in request module so it could leads to not enough memory. Then it will seg-fault especially you have a small swap.

Edit1: I believe it's caused by memory usage. Too many pool connections used up too many memory and it finally get broken. It's very hard to debug and I myself limited my pool size to 4 since I have a small RAM and big packages.

user2189731
  • 558
  • 8
  • 15
1

I tried to reproduce on a AWS t2.small instance (2GB RAM as you described) with the following script (note that you missed a s in requests.get(), assuming you are using the requests library, and also the return was missing):

from multiprocessing import Pool
import requests
def parse(url):
  a = requests.get(url)
  if a.status_code != 200:
    print(a)
  return a.text
POOL_COUNT = 120
links = ['http://example.org/' for i in range(1000)]
with Pool(POOL_COUNT) as p:
  result = p.map(parse, links)
print(result)

Sadly, I didn't run into the same issue as you did.

From the stack trace you posted it seems that the problem is in launching the parse function, not in the requests module itself. It looks like the main process cannot send data to one of the launched processes.

Anyway: This operation is not CPU bound, the bottleneck is the network (most probably the remote servers max connections, or also probably), you are much better off using multithreading. This is most probably also faster, because multiprocessing.map needs to communicate between the processes, that means that the return of parse needs to be pickled and then sent to the main process.

To try with threads instead of processes, simply do from multiprocessing.pool import ThreadPool and replace Pool with ThreadPool in your code.

hansaplast
  • 11,007
  • 2
  • 61
  • 75
0

This simple version of you code works perfect here with any number of POOL_COUNT

from multiprocessing import Pool
def parse(url):
  r = url
  print(r)

POOL_COUNT = 90
with Pool(processes=POOL_COUNT) as p:
    links = [str(i) for i in range(POOL_COUNT)]
    result = p.map(parse, links)

Doesn't it? So the problem should be in request part, maybe needs a sleep?

mquantin
  • 1,085
  • 8
  • 23
  • Already 20+seconds sleep and yea, the code involves Db interactions as well.. do you think the pipe breaking has nothing to do with pool count? – Volatil3 Jul 21 '17 at 07:11