1

This is a kind of an extension of my question here.

I have 3 classes.

My main:

import java.io.*;

public class ConnectionManager {
    public static void main(String argv[]) {
    
        try {
            PipedOutputStream pout = new PipedOutputStream();
            PipedInputStream pin = new PipedInputStream(pout);

            Sender s = new Sender(pout,true);
            Receiver r = new Receiver(pin,true);
            System.out.println("Starting threads");
            s.start();
            r.start();
        } catch (Exception e) {System.out.println(e);}
    }
}

My Sender/Producer class:

import java.io.*;

public class Sender extends Thread {
    ObjectOutputStream oos;
    boolean primitive;

    public Sender(OutputStream os, boolean primitive) {
        try {
            oos = new ObjectOutputStream(os);
            this.primitive = primitive;
        } catch (Exception e) {System.out.println(e);}
    }

    public void run() {
        try {
            System.out.println("Sending a message");
            Thread.sleep(1000);
            oos.writeInt(99);
            oos.flush();
            System.out.println("Message sent, terminating");
            oos.close();
        } catch (Exception e) {System.out.println("Sender: " + e);}
    }
}

My Receiver/Consumer class:

import java.io.*;

public class Receiver extends Thread {
    ObjectInputStream ois;
    boolean primitive;

    public Receiver(InputStream is, boolean primitive) {
        try {
            ois = new ObjectInputStream(is);
            this.primitive = primitive;
        } catch (Exception e) {System.out.println(e);}
    }

    public void run() {
        try {
            System.out.println("waiting for a message");
            int x = ois.readInt();
            System.out.println("message received: " + x);
            ois.close();
        } catch (Exception e) {System.out.println("Receiver: " + e);}

    }
}

Produces this output:

Starting threads
Sending a message
waiting for a message
Receiver: java.io.IOException: Write end dead
Sender: java.io.IOException: Read end dead

I read in this page that I'm getting these exceptions because I wasn't closing the pipes. But even if I do, I still get them. How can I fix this?

edit: the reason I'm casting the type of the stream objects from, say, PipedInputStream to InputStream and then constructing a new ObjectInputStream using InputStream is because I want to be able to send and receive data of various types, not just int or bytes.

Community
  • 1
  • 1
Manuel
  • 2,143
  • 5
  • 20
  • 22

2 Answers2

2

These errors are not from the object streams. See the stack trace. They are from the piped streams, and they occur because the thread concerned has exited or not been started yet. And the reason for that in turn is because you're constructing the object streams in the thread constructors instead of in the run() methods, and both object stream constructors perform I/O, and you haven't started the threads yet.

You don't need the sleeps.

Don't use pipes for this. Use a queue.

NB Re your comment, you don't need to cast PipedInputStream to InputStream. It already is. And indeed you aren't.

user207421
  • 305,947
  • 44
  • 307
  • 483
  • @GhostCat According to what material? During the creation process both pipes are being asked to do I/O without another running thread being attached to the other pipe. It can't possibly work. Try it with the object streams being created in the `run()` method and see for yourself. I did. – user207421 Apr 27 '17 at 06:32
  • Hi, I was reading your recommendation using Queue instead of PipedStream, Please, tell me Why? –  Jul 16 '19 at 02:45
1

I stay corrected; and following the advise by EJP; here is a working solution.

import java.io.*;
public class ConnectionManager {
    public static void main(String argv[]) throws Exception {
      PipedOutputStream pout = new PipedOutputStream();
      PipedInputStream pin = new PipedInputStream(pout);
      Sender s = new Sender(pout);
      Receiver r = new Receiver(pin);
      System.out.println("Starting threads");
      s.start();
      r.start();
    }
}

class Sender extends Thread {
  private final OutputStream os;
  Sender(OutputStream os) { this.os = os; }
  public void run() {
    try(ObjectOutputStream oos = new ObjectOutputStream(os)) {
      oos.writeInt(99);
      System.out.println("Message sent, terminating");
    } catch (Exception e) {
      System.out.println("Sender: " + e);
      e.printStackTrace();
    }
  }
}

class Receiver extends Thread {
  private final InputStream is;
  Receiver(InputStream is) {this.is = is; }
  public void run() {
    try(ObjectInputStream ois = new ObjectInputStream(is)) {
     System.out.println("waiting for a message");
     int x = ois.readInt();
     System.out.println("message received: " + x);
    } catch (Exception e) {
      System.out.println("Receiver: " + e);
      e.printStackTrace();
    }
  }
}

Should print:

Starting threads
Message sent, terminating
waiting for a message
message received: 99

Notes: the core point is to create the ObjectInputStreams within the run methods. Beyond that: removed the things not necessary (that boolean primitive; but added try-with-resources and printing stack traces).

GhostCat
  • 137,827
  • 25
  • 176
  • 248
  • He doesn't need to use `connect()`. He is creating a [connected `PipedInputStream`](https://docs.oracle.com/javase/8/docs/api/java/io/PipedInputStream.html#PipedInputStream-java.io.PipedOutputStream-). – user207421 Apr 27 '17 at 04:05
  • You're wrong again. It *does* work, and I've proven it. You just have to *do it right.* – user207421 Apr 27 '17 at 08:07
  • The core point is to create the object streams within the `run()` methods, which is what you have actually done. Creating them within overridden `start()` methods wouldn't work any better than the OP's original code. – user207421 Apr 27 '17 at 09:11
  • Arr, that was a typo. – GhostCat Apr 27 '17 at 09:12
  • Thank you so much for this. Also many thanks for all the feedback you had been providing up until this. – Manuel Apr 27 '17 at 21:36