I want to know Advantages to use BlockingQueue
instead of (PipedOutputStream
and PipedInputStream
)
import java.io.*;
import java.util.concurrent.*;
public class PipedStreamVsBlocking {
public static void main(String... args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
ExecutorService executor = Executors.newFixedThreadPool(4);
Runnable producerTask = () -> {
try {
while (true) {
int value = ThreadLocalRandom.current().nextInt(0, 1000);
blockingQueue.put(value);
System.out.println("BlockingQueue.Produced " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumerTask = () -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("BlockingQueue.Consume " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
PipedOutputStream pipedSrc = new PipedOutputStream();
PipedInputStream pipedSnk = new PipedInputStream();
try {
pipedSnk.connect(pipedSrc);
} catch (IOException e) {
e.printStackTrace();
}
Runnable runnablePut2 = () -> {
try {
ObjectOutputStream oos = new ObjectOutputStream(pipedSrc);
while (true) {
int value = ThreadLocalRandom.current().nextInt(0, 1000);
oos.writeInt(value);
oos.flush();
System.out.println("PipedStream.Produced " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (Exception e) {
e.printStackTrace();
}
};
Runnable runnableGet2 = () -> {
try {
ObjectInputStream ois = new ObjectInputStream(pipedSnk);
while (true) {
int value = ois.readInt();
System.out.println("PipedStream.Consume " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (Exception e) {
e.printStackTrace();
}
};
executor.execute(producerTask);
executor.execute(consumerTask);
executor.execute(runnablePut2);
executor.execute(runnableGet2);
executor.shutdown();
}
}
The output for this code is:
BlockingQueue.Consume 298
BlockingQueue.Produced 298
PipedStream.Produced 510
PipedStream.Consume 510
BlockingQueue.Produced 536
BlockingQueue.Consume 536
PipedStream.Produced 751
PipedStream.Consume 751
PipedStream.Produced 619
BlockingQueue.Produced 584
BlockingQueue.Consume 584
PipedStream.Consume 619
BlockingQueue.Produced 327
PipedStream.Produced 72
BlockingQueue.Consume 327
PipedStream.Consume 72
BlockingQueue.Produced 823
BlockingQueue.Consume 823
PipedStream.Produced 544
PipedStream.Consume 544
BlockingQueue.Produced 352
BlockingQueue.Consume 352
PipedStream.Produced 134
PipedStream.Consume 134
I think that to use PipedStream (PipedOutputStream
and PipedInputStream
) have advantages, I know when the data is produced/Processed directly.
May be I wrong, And this recommendation to use BlockingQueue instead of Pipe.
But, your comments/recommendations are not found in the documentation. For this reason, I need to know what I missed.
Why should I use BlockingQueue instead of Piped?