-2

I have a program which would benefit greatly (large data set with a somewhat involved mapping and filtering scheme, but not dependent on exterior variables / synchronization) from using parallelStream(), but must be collected as a set.

I'm somewhat new to parallel-streams (this being my first experience with them), and tried using the below code only to discover that this led to concurrent modification of a non-concurrent backend and a deadlock condition behind the scenes.

This mapping attempts to get the filesize of an unmounted disk using the Linux native command sudo blockdev --getsize64 unmounted_device_here (I have no idea if Java can get the full size of an unmounted disk on Linux, so I'm just using the native method since this will only ever be published on Linux systems anyway)

Mapping Method (which deadlocks):

var mountPath = Paths.get("/dev");
            //Do NVME Drives First
            var list = new ArrayList<Path>(10);
            //Looks like nvme1n1
            //For reasons beyond my understanding replacing [0-9] with \\d does not work here
            try (var directoryStream = Files.newDirectoryStream(mountPath, "nvme[0-9]n[0-9]")) {
                for (Path path : directoryStream) {
                    list.add(path);
                }
            }
//Map to DrivePacket (path, long), note that blockdev return bytes -> GB
var nvmePackets = list.parallelStream().map((drive) -> new DrivePacket(drive,
                    (Long.parseLong(runCommand("sudo", "blockdev", "--getsize64", drive.toAbsolutePath().toString())) / (1024 * 1024 * 1024))))
                    .collect(Collectors.toSet());

IOUtils is from the Apache utility classes:

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>

runCommand (executes the native calls):

public static String runCommand(String... command) {
        try {
            if (DEBUG_MODE) {
                systemMessage("Running Command: " + Arrays.asList(command).stream().collect(Collectors.joining(" ")));
            }
            var builder = new ProcessBuilder(command);
            var result = IOUtils.toString(builder.start().getInputStream(), StandardCharsets.UTF_8).replaceAll("\n", "");
            if (DEBUG_MODE) {
                System.out.println("Result: " + result);
            }
            return result;
        } catch (IOException ex) {
            throw new IllegalStateException(ex);

        }
    }

The DrivePacket Class:

   /**
     * A record of the relevant information for a drive
     *
     * Path is the fully qualified /dev/DRIVE path
     */
    public record DrivePacket(Path drivePath, long driveSize) {}

Since the operation benefits from concurrency, is there any way to do this using parallelStream? Or do I have to use another technique?

It always hangs at the stop where this line of code is executed, and waits forever at ForkJoinTask.java at externalAwaitDone(); when I use the debugger.

Unfortunately, there isn't a Set analog of toConcurrentMap() that I could find.

How can I avoid this deadlock while still getting both the parallelism granted for the computation and having the end result be a set?

System: JDK 16

EDIT 0: Updated the Code for Reproducability

Given that the mapping code calls subroutines that don't share data, I'm not sure why this would cause a deadlock condition.

Sarah Szabo
  • 10,345
  • 9
  • 37
  • 60
  • 1
    Does the deadlock happen somewhere behind `complicated_mapping_here`? – ernest_k Jun 03 '21 at 06:03
  • 4
    Please provide a [MRE]. [With a quick example (Ideone)](https://ideone.com/GXFEwh), I was not able to reproduce the problem. – Turing85 Jun 03 '21 at 06:11
  • 3
    To say it explicitly, all collectors are supposed to work with parallel streams and a “concurrent modification of a non-concurrent backend” should never happen. See [this answer](https://stackoverflow.com/a/41045442/2711488) for the difference between non-concurrent and concurrent collectors. Bugs in JDK 16 are not impossible, but still, I’d analyze “complicated_mapping_here” first, before searching for the cause in the JDK code. – Holger Jun 03 '21 at 08:38
  • @Turing85 I decided against doing that because I thought that the mapping not sharing data with anyone else (aside for the boolean which never changes it's value in `runCommand`) wouldn't be useful :) It's been updated now. – Sarah Szabo Jun 03 '21 at 17:53
  • @Holger Hmm, so from what I understand of your prior post (from the link), you say that the set collector isn't an issue since each set is merged in a serial fashion after work is done? I don't know what the issue would be then. Great post BTW, I didn't know that and had to re-read it a few times. – Sarah Szabo Jun 03 '21 at 17:59
  • 1
    It’s striking that you use `IOUtils.toString(builder.start().getInputStream(), …)`, requiring the process to terminate, to read the full output, but this can deadlock if the process uses one of the other channels. You should use something like `Process p = builder.redirectError(Redirect.INHERIT) .start(); p.getOutputStream().close(); var result = IOUtils.toString( p.getInputStream(), …` to ensure that errors are printed and read attempts of the sub process end immediately. – Holger Jun 04 '21 at 10:00
  • 1
    Side note: the reason, `\\d` doesn’t work is that the pattern is not a regex but the glob pattern described in [`getPathMatcher`](https://docs.oracle.com/en/java/javase/16/docs/api/java.base/java/nio/file/FileSystem.html#getPathMatcher(java.lang.String)). When you use `Files.newDirectoryStream(…)`, the `glob:` prefix is added automatically, so you can’t switch to regex. – Holger Jun 04 '21 at 15:47
  • Yeah, I realized that later and made a few filters. I'm somewhat surprised that I couldn't add the "regex:" syntax to make it work. – Sarah Szabo Jun 04 '21 at 15:58

1 Answers1

1

May be you could collect it to a ConcurrentMap and then get the keySet. (Assuming equals, hashcode method is defined for the mapped object):

list.parallelStream()
    .map(x -> complicated_mapping_here)
    .collect(Collectors.toConcurrentMap(Function.identity(),
                                        x -> Boolean.TRUE /*dummy value*/ ))
    .keySet();
Gautham M
  • 4,816
  • 3
  • 15
  • 37
  • Perhaps, but according to @Holger 's comment on the original post, this shouldn't matter since the conversion to a set takes place in a serial fashion (albeit a bit slower than a true concurrent collection) Something more subtle it at play :) – Sarah Szabo Jun 03 '21 at 18:58
  • @SarahSzabo the `java.lang.ProcessImpl` (which would be invoked from builder.start()) uses some shared resources. But for your code it doesn't seem that those objects are accessed. Could you please try with a different simple command instead of `sudo blockdev`. Even though I am not sure about it, please check if there are any restrictions on concurrently fetching the disk size using `blockdev --getSize` command. Because you are using `parallelStream` on a "list", could it have duplicate entries (though from the code it doesn't seem like duplicate drive names would be there). You may try "Set". – Gautham M Jun 04 '21 at 04:58