6

I have a ThreadLocal variable. I would like to use it like this:

ThreadLocal<AutoCloseable> threadLocal = new ThreadLocal<AutoCloseable>(); // pseudocode
ForkJoinPool fj = new ForkJoinPool(nThreads);
fj.submit(
    () -> myStream.parallel().forEach(e -> {
        /*I want to use the thread local autocloseable here, 
          but how do I close it when this parallel processing is done?*/
    })
);
pavel_orekhov
  • 1,657
  • 2
  • 15
  • 37
  • I'd more likely use something like `CompletableFuture` for those multi-threaded things where you want to do something after something else has been finished – UninformedUser Nov 30 '18 at 13:48
  • Since every thread will have its own instance, you just use it like in serialized code. There is no sharing here. Do you have problems with that? – Antoniossss Nov 30 '18 at 13:48
  • @flakes I use a Producer class to simulate multiple producers, this is a simulation, not a real world use case. I want to create such a producer per each thread. – pavel_orekhov Nov 30 '18 at 13:49
  • @Antoniossss serialized code? Explain what you mean, I can't close this autocloseable inside forEach. – pavel_orekhov Nov 30 '18 at 13:49
  • And why is that ? – Antoniossss Nov 30 '18 at 13:50
  • @AKSW I did use completable future, but my code reviewer told me to rewrite everything using parallel streams... – pavel_orekhov Nov 30 '18 at 13:50
  • @Antoniossss because forEach goes over each item, so, what you suggest is: go over the first item, close the closeable, and next time go over the second item and throw an exception when using the closeable, because it has been closed in the previous step. – pavel_orekhov Nov 30 '18 at 13:52
  • What I am saying, is that if you use ThreadLocal, then EVERY THREAD in that stream will have SEPARATE INTANCE (should have) that is independent from eachother at this layer of code. – Antoniossss Nov 30 '18 at 13:53
  • @Antoniossss I know, and your point is? I still don't understand how this is relevant, sorry. – pavel_orekhov Nov 30 '18 at 13:55
  • @hey_you im in the same place - I have no idea what are you doing in for loop so thats my conclusion. Idk id if you open resource on loop, or you share it somehow. – Antoniossss Nov 30 '18 at 13:59
  • @Antoniossss, because opening and closing it the the same forEach block is too easy, I would have figured it out myself, it's not that hard, I think what I meant can be inferred from that. :D – pavel_orekhov Nov 30 '18 at 14:07
  • But you used `ThreadLocal` that would literally make what you want to do impossible - so no, it cannot be inferred if you provide misleading information. – Antoniossss Nov 30 '18 at 14:09
  • Yes it can, it just depends on how savvy you are. And, "But you used ThreadLocal that would literally make what you want to do impossible" -- prove it. – pavel_orekhov Nov 30 '18 at 14:14

2 Answers2

8

ThreadLocal are closed after the thread using them dies. If you want control over this you need to use a map instead.

// do our own thread local resources which close when we want.
Map<Thread, Resource> threadLocalMap = new ConcurrentHashMap<>();

fj.submit(
() -> myStream.parallel().forEach(e -> {
     Resource r = threadLocalMap.computeIfAbsent(Thread.currentThread(), t -> new Resource();
    // use the thread local autocloseable here, 
})

// later once all the tasks have finished.
// close all the thread local resources when the parallel processing is done
threadLocalMap.values().forEach(Utils::closeQuietly);

It's common to have a method which closes resources without throwing an exception. Chronicle has one but so do many other libraries.

public static void closeQuietly(Closeable c) {
    if (c != null) {
       try {
           c.close();
       } catch (IOException ioe) {
           // ignore or trace log it
       }
    }
}

Most likely you have a method do this in your project already https://www.google.co.uk/search?q=public+static+void+closequietly+Closeable

Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • should wrap this in a try-finally – flakes Nov 30 '18 at 13:56
  • Sorry, I did not notice it. Then I have another question. What is Utils::closeQuietly? And I want proof of the fact that ThreadLocals are closed after the thread using them dies. – pavel_orekhov Nov 30 '18 at 13:59
  • It (by convention) closes without throwing exceptions (swallowing it) – Antoniossss Nov 30 '18 at 14:00
  • 1
    @hey_you I imagine its just a try-catch-ignore wrapper around `AutoCloseable::close` – flakes Nov 30 '18 at 14:03
  • @flakes it is obvious what that is from the name, but which library is this from? And where does it say that ThreadLocal autocloses everything? – pavel_orekhov Nov 30 '18 at 14:05
  • @hey_you but cant you write it? Its like 3 lines of code ;P Also who said that ThreadLocal closes anything? – Antoniossss Nov 30 '18 at 14:05
  • Can't write what? And Peter Lawrey in his first sentence of this answer said that ThreadLocals are closed when the thread dies. – pavel_orekhov Nov 30 '18 at 14:11
  • @flakes you could write a class which wraps the Map as a Closeable resource but I don't know how the OP determines the tasks are complete. – Peter Lawrey Nov 30 '18 at 14:23
  • 3
    You mean `threadLocal.values().forEach(Utils::closeQuietly);` By the way, you can combine this approach with a `ThreadLocal` to reduce the number of `ConcurrentHashMap` lookups. Initialize with `Map threadLocal = new ConcurrentHashMap<>(); ThreadLocal resources = ThreadLocal.withInitial(() -> threadLocal.computeIfAbsent( Thread.currentThread(), t -> new Resource()));`. Then, within the stream operation, you can use a simple and efficient `Resource r = resources.get();`. The cleanup stays the same. – Holger Nov 30 '18 at 14:29
  • @Holger thanks. Am I right in saying that ThreadLocals are not closed automatically when the thread dies? – pavel_orekhov Nov 30 '18 at 14:32
  • 2
    The values of `ThreadLocals` may get garbage collected at some time. But that doesn’t imply that the resource gets closed, as that would require the particular resource class to implemented a finalizer or such alike doing the cleanup (and we all know, relying on finalizers is strongly discouraged). There might be an unpredictably long time after the thread’s dead until the garbage collector actually collects it. And well, even worse, parallel streams use a thread pool, so there’s not even a guaranty that the worker threads ever die after the operation. – Holger Nov 30 '18 at 14:36
  • 1
    @hey_you `ThreadLocal` instances have no dependency on the `Closable` interface. You still have to manually close the stale entries. Further, it can take an arbitrarily long amount of time to dereference (and eventually GC) instances stored in `ThreadLocal` containers due to its internal map implementation. This is true even if the `ThreadLocal` itself is dereferenced. If you want to have close called on your resources you may opt to create a custom thread factory for your threadpool instances. [See here](https://stackoverflow.com/a/25107499/3280538) – flakes Nov 30 '18 at 14:48
  • @flakes I knew that, but I don't understand why Peter Lawrey wrote this in his answer: "ThreadLocal are closed after the thread using them dies". – pavel_orekhov Dec 04 '18 at 12:46
  • 1
    @hey_you `Closeables` which implement `finalize()` are cleaned up like any other object when they are no longer referenced. For ThreadLocal this is after the thread finishes and it is no longer referenced. – Peter Lawrey Dec 04 '18 at 21:06
  • @PeterLawrey, ok, but no one should rely on that or use finalize at all for that matter, it's a deprecated method. – pavel_orekhov Dec 05 '18 at 09:28
  • @hey_you In many cases, the finalize() already exists and is unlikely to be removed, or you need to have something and finalize is better than nothing for resources which have to be cleaned up this way. You can use the Cleaner (public from Java 9) or WeakRefenerces instead. – Peter Lawrey Dec 05 '18 at 11:10
-1

It seems that you want to use some sort of shared resource. So either don't use ThreadLocal (as every thread will have its own instance (or null)) and wait untill every task is done

ForkJoinTask task=fj.submit(
    () -> myStream.parallel().forEach(e -> {
//whatever
    })
);
task.get()///wait - timeout would be good here
resource.close();// close that shared resource - wraping all of that with try-with-resources woudl work as well

.

Or just use that resource like serialized code - use it and close it in forEach

Antoniossss
  • 31,590
  • 6
  • 57
  • 99
  • 1
    No, sorry, the point is that I want to use multiple shared resources, not just one. I might have 200 items in my stream and I want 4 resources processing them in the parallel stream. – pavel_orekhov Nov 30 '18 at 14:01