2

I'm currently trying to use GPars to concurrently go through a directory system in a DFS manner. This is what I have so far:

ArrayList<String> visited = Collections.synchronizedList(new ArrayList<String>())

void scheduleDirectory(String rootStartPath) {
    GParsPool.withPool(4) {
        dfs(rootStartPath)
    }
}

void dfs(String path) {
    visited << path

    // This will retrieve what's in the path directory
    // Each object inside listResult shows the name, path, type (directory or file), etc.
    def listResult = dao.listDirectory(tenant, namespace, path)
    if(listResult) {
        listResult.eachParallel {
            def childPath = getPath(it)   // Get the path of the listResult
            if(it.type == "directory") {
                if(!visited.contains(childPath)) {
                    dfs(childPath)
                }
            }
            else if(it.type == "object") {
                // Do some other processing stuff
            }
        }
    }
}

But when I try to call eachParallel() with a closure, I get this on my stacktrace:

groovy.lang.MissingMethodException: No signature of method: java.util.ArrayList.eachParallel() is applicable for argument types: (com.abc.service.DFSService$_dfs_closure2) values: [com.abc.service.DFSService$_dfs_closure2@7d016471]
        at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.unwrap(ScriptBytecodeAdapter.java:72) ~[groovy-2.5.4.jar:2.5.4]
        at org.codehaus.groovy.runtime.callsite.PojoMetaClassSite.call(PojoMetaClassSite.java:48) ~[groovy-2.5.4.jar:2.5.4]
        at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:127) ~[groovy-2.5.4.jar:2.5.4]
        at com.abc.service.DFSService.dfs(DFSService.groovy:86) ~[main/:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
        at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:104) ~[groovy-2.5.4.jar:2.5.4]
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:326) ~[groovy-2.5.4.jar:2.5.4]
        at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:352) ~[groovy-2.5.4.jar:2.5.4]
        at org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.callCurrent(PogoMetaClassSite.java:68) ~[groovy-2.5.4.jar:2.5.4]
        at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCallCurrent(CallSiteArray.java:51) ~[groovy-2.5.4.jar:2.5.4]
        at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:156) ~[groovy-2.5.4.jar:2.5.4]
        at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:168) ~[groovy-2.5.4.jar:2.5.4]
        at com.abc.service.DFSService$_dfs_closure2.doCall(DFSService.groovy:97) ~[main/:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
        at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:104) ~[groovy-2.5.4.jar:2.5.4]
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:326) ~[groovy-2.5.4.jar:2.5.4]
        at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:264) ~[groovy-2.5.4.jar:2.5.4]
        at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1041) ~[groovy-2.5.4.jar:2.5.4]
        at groovy.lang.Closure.call(Closure.java:411) ~[groovy-2.5.4.jar:2.5.4]
        at groovy.lang.Closure.call(Closure.java:427) ~[groovy-2.5.4.jar:2.5.4]
        at groovyx.gpars.pa.CallClosure.call(CallClosure.java:47) ~[gpars-1.2.1.jar:1.2.1]
        at groovyx.gpars.pa.ClosureMapper.op(ClosureMapper.java:36) ~[gpars-1.2.1.jar:1.2.1]
        at groovyx.gpars.extra166y.AbstractParallelAnyArray$OOMPap.leafTransfer(AbstractParallelAnyArray.java:2255) ~[gpars-1.2.1.jar:1.2.1]
        at groovyx.gpars.extra166y.PAS$FJOMap.atLeaf(PAS.java:258) ~[gpars-1.2.1.jar:1.2.1]
        at groovyx.gpars.extra166y.PAS$FJBase.internalCompute(PAS.java:118) ~[gpars-1.2.1.jar:1.2.1]
        at groovyx.gpars.extra166y.PAS$FJBase.compute(PAS.java:106) ~[gpars-1.2.1.jar:1.2.1]
        at jsr166y.RecursiveAction.exec(RecursiveAction.java:148) ~[jsr166y-1.7.0.jar:1.7.0]
        at jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:305) ~[jsr166y-1.7.0.jar:1.7.0]
        at jsr166y.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:575) ~[jsr166y-1.7.0.jar:1.7.0]
        at jsr166y.ForkJoinPool.scan(ForkJoinPool.java:755) ~[jsr166y-1.7.0.jar:1.7.0]
        at jsr166y.ForkJoinPool.work(ForkJoinPool.java:617) ~[jsr166y-1.7.0.jar:1.7.0]
        at jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:369) ~[jsr166y-1.7.0.jar:1.7.0]

I've already seen this question so I thought maybe it was my build.gradle file but that doesn't seem to be the case either. My gradle.build file is:

...
dependencies {
    compile 'org.codehaus.groovy:groovy-all:2.5.4',
            'org.codehaus.gpars:gpars:1.2.1'
            ...
    }
...
halfer
  • 19,824
  • 17
  • 99
  • 186
sparkhee93
  • 1,381
  • 3
  • 21
  • 30

1 Answers1

0

It appears that by placing the GParsPool.withPool() outside of the dfs method will cause the recursive calls to dfs to fall outside of the scope of GPars and the result will be that the listResult in the nested calls will not have the eachParallel() method available.

Your issue is fixed if you move the GParsPool.withPool(4) closure within the dfs method and the results will run in parallel, limited to 4 threads as expected:

ArrayList<String> visited = Collections.synchronizedList(new ArrayList<String>())

void scheduleDirectory(String rootStartPath) {
    dfs(rootStartPath)
}

void dfs(String path) {
    GParsPool.withPool(4) {
        visited << path

        // This will retrieve what's in the path directory
        // Each object inside listResult shows the name, path, type (directory or file), etc.
        def listResult = dao.listDirectory(tenant, namespace, path)
        if(listResult) {
            listResult.eachParallel {
                def childPath = getPath(it)   // Get the path of the listResult
                if(it.type == "directory") {
                    if(!visited.contains(childPath)) {
                        dfs(childPath)
                    }
                }
                else if(it.type == "object") {
                    // Do some other processing stuff
                }
            }
        }
    }
}
pczeus
  • 7,709
  • 4
  • 36
  • 51