2

I am trying to parallelize the element-wise computation with a user-defined function on a Dask bag on my local machine utilizing my 8 cores. Specifically, the list is a node list of a graph, G, created by NetworkX (I have left this code off for brevity, but I can always include it). I would like to use this bag to run a NetworkX graph calculation on each node in the bag, which is originally obtained as a list.

The list was created using:

from dask.distributed import Client
import dask.bag as db
import networkx as nx

client = Client()

def my_func(x, G):
    ego_graph = nx.ego_graph(G, x, radius=2)
    ego_graph_ls = nx.to_edgelist(ego_graph)
    return ego_graph_ls

starting_list = ['node1', 'node2', 'node3', 'node4']
my_bag = db.from_sequence(starting_list)
result = my_bag.map(my_func, G).compute()  # G is a nx.Graph()

In other words, for ever node in my_bag, I would like to calculate in parallel a list of the edges for that node within the overall graph.

However, when I run this code, I receive the following error (truncated for space but I can provide the entire thing if desired):

2019-05-09 11:00:58,566 - base_events.py - default_exception_handler - ERROR - Exception in callback BaseAsyncIOLoop._handle_events(64, 1)
handle: <Handle BaseAsyncIOLoop._handle_events(64, 1)>
Traceback (most recent call last):
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 122, in _handle_events
    handler_func(fileobj, events)
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
    connection, address = sock.accept()
  File "/Users/cj2001/.pyenv/versions/3.6.5/lib/python3.6/socket.py", line 205, in accept
    fd, addr = self._accept()
OSError: [Errno 24] Too many open files

In reading the API docs here it suggests that this is possible. I have also read posts like this one that perhaps Dask Delayed is the preferential way to parallelize a for loop (which this could easily become by using starting_list instead).

So what is the cause of this OSError and is it fixable in my code or do I need to try a different approach?

CJ Sullivan
  • 246
  • 3
  • 13

1 Answers1

0

I re-ran that above replacing the last line with:

result = my_bag.map(my_func, G).compute(scheduler='processes')

and it ran without error. It was very slow though and the task stream on the status page does not show any parallelization.

CJ Sullivan
  • 246
  • 3
  • 13