9

First my question:

  • is it possible to prevent Julia from copying variables each time in a parallel for loop ?
  • if not, how to implement a parallel reduce operations in Julia ?

Now the details:

I have this program:

data = DataFrames.readtable("...") # a big baby (~100MB)
filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
filtered_data = @parallel vcat for fct in filter_functions
  fct(data)::DataFrame
end

It works nice functionality wise, but each parallel call to fct(data) on another worker copies the whole data frame, making everything painfully slow.

Ideally, I would like to load the data once, and always use each on each worker the pre-loaded data. I came up with this code to do so:

@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
@everywhere for i in 1:length(filter_functions)
  if (myid()-1) % nworkers()
    fct = filter_functions[i]
    filtered_data_temp = fct(data)
  end
  # How to vcat all the filtered_data_temp ?
end

But now I have another problem: I cannot figure out how to vcat() all the filtered_data_temp onto a variable in the worker with myid()==1.

I would very much appreciate any insight.

Note: I am aware of Operating in parallel on a large constant datastructure in Julia. Yet, I don't believe it applies to my problem because all my filter_functions do operate on the array as a whole.

Community
  • 1
  • 1
Antoine Trouve
  • 1,198
  • 10
  • 21

2 Answers2

10

You might want to look into/load your data into Distributed Arrays

EDIT: Probably something like this:

data = DataFrames.readtable("...")
dfiltered_data = distribute(data) #distributes data among processes automagically
filter_functions = [ fct1, fct2, fct3 ... ] 
for fct in filter_functions
  dfiltered_data = fct(dfiltered_data)::DataFrame
end

You can also check the unit tests for more examples

Felipe Lema
  • 2,700
  • 12
  • 19
  • It looks nice. Let me check. – Antoine Trouve Jul 29 '15 at 06:02
  • 2
    You may also want to consider `SharedArray`s, if all your data start on one process and you don't want to pay a price for moving them to another process. – tholy Jul 29 '15 at 20:47
  • Nice, but (1) I think it uses shared memory, which might not work on distributed clusters (2) "distribute" does not seem to support dataframe (only Arrays). Since, your example as well as the link to the unit tests were very enlightening. – Antoine Trouve Aug 10 '15 at 07:30
4

After all, I found over there the solution to my question: Julia: How to copy data to another processor in Julia.

Especially, it introduces the following primitive in order to retrieve a variable from another process:

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))

Below is how I am using it:

@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
# Executes the filter functions
@everywhere for i in 1:length(filter_functions)
  local_results = ... # some type
  if (myid()-1) % nworkers()
    fct = filter_functions[i]
    filtered_data_temp = fct(data)
    local_results = vcat(local_results, filtered_data_temp)
  end
  # How to vcat all the filtered_data_temp ?
end
# Concatenate all the local results
all_results = ... # some type
for wid in 1:workers()
  worker_local_results = getfrom(wid, :local_results)
  all_results = vcat(all_results,worker_local_results)
end
Community
  • 1
  • 1
Antoine Trouve
  • 1,198
  • 10
  • 21