0

When reading from a socket using a BufferedReader it states that the readLine() method returns

A String containing the contents of the line, not including any line-termination characters, or null if the end of the stream has been reached

How does it know that it's reached the end of the stream? What sequence of characters does it use to determine this.

I want to simulate sending the same sequence of characters to properly close another connection that uses PipedStreams.


Edit: Here is the code in question. From the responses it looks like there is no such sequence and calling close() on the PipedOutput stream should unblock the readLine() on the output stream. It doesn't appear to be doing this at the moment which is why I was confused so I'm thinking it might be a bug somewhere else.

What's happening is the incomingEventIn.close() line appears to be blocking when inputLine = incomingEventIn.readLine() is blocking. If inputLine = incomingEventIn.readLine() isn't being executed on the other thread then incomingEventIn.close() executes fine. Why is this happening?

public class SocketManager {

    private Socket socket = null;
    private PrintWriter out = null;
    private BufferedReader in = null;

    private PipedOutputStream incomingEventOutStream = null;
    private PrintWriter incomingEventOut = null;
    private BufferedReader incomingEventIn = null;
    private PipedOutputStream incomingResponsOutStream = null;
    private PrintWriter incomingResponseOut = null;
    private BufferedReader incomingResponseIn = null;

    private ArrayList<AsteriskLiveComsEventListener> listeners = new ArrayList<AsteriskLiveComsEventListener>();
    private final ExecutorService eventsDispatcherExecutor;

    private String ip;
    private int port;

    private Object socketLock = new Object();

    public SocketManager(String ip, int port) {
        this.ip = ip;
        this.port = port;
        eventsDispatcherExecutor = Executors.newSingleThreadExecutor();
    }

    public void connect() throws UnableToConnectException, AlreadyConnectedException {
        synchronized(socketLock) {
            if (socket != null && !socket.isClosed()) {
                throw (new AlreadyConnectedException());
            }
            try {
                socket = new Socket(ip, port);
                out = new PrintWriter(socket.getOutputStream(), true);
                in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

                incomingEventOutStream = new PipedOutputStream();
                incomingEventIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingEventOutStream)));
                incomingEventOut = new PrintWriter(incomingEventOutStream);

                incomingResponsOutStream = new PipedOutputStream();
                incomingResponseIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingResponsOutStream)));
                incomingResponseOut = new PrintWriter(incomingResponsOutStream);

            } catch (IOException e) {
                throw (new UnableToConnectException());
            }
            new Thread(new IncomingEventThread()).start();
            new Thread(new SocketThread()).start();
        }
    }

    public void disconnect() throws NotConnectedException {
        disconnect(false);
    }

    private void disconnect(boolean notRequested) throws NotConnectedException {
        synchronized(socketLock) {
            if (!isConnected()) {
                throw (new NotConnectedException());
            }

            try {
                incomingEventIn.close();
            } catch (IOException e2) {}
            // IT NEVER GETS TO HERE!
            incomingEventOut.close();
            try {
                incomingResponseIn.close();
            } catch (IOException e1) {}
            System.out.println("disconnecting");
            incomingResponseOut.close();
            try {
                socket.shutdownInput();
            } catch (IOException e) {}
            try {
                socket.shutdownOutput();
            } catch (IOException e) {}
            try {
                socket.close();
            } catch (IOException e) {}

            if (notRequested) {

                System.out.println("disconnecting event");
                dispatchEvent(new ConnectionLostEvent());
            }
        }
    }

    public boolean isConnected() {
        synchronized(socketLock) {
            return (socket != null && !socket.isClosed());
        }
    }

    public void addEventListener(AsteriskLiveComsEventListener a) {
        synchronized(listeners) {
            listeners.add(a);
        }
    }

    public void removeEventListener(AsteriskLiveComsEventListener a) {
        synchronized(listeners) {
            listeners.remove(a);
        }
    }

    private void dispatchEvent(final AsteriskLiveComsEvent e) {
        synchronized (listeners) {
            synchronized (eventsDispatcherExecutor) {
                eventsDispatcherExecutor.execute(new Runnable()
                {
                    public void run()
                    {
                        for(int i=0; i<listeners.size(); i++) {
                            listeners.get(i).onAsteriskLiveComsEvent(e);
                        }
                    }
                });
            }
        }
    }

    public JSONObject sendRequest(JSONObject request) throws JSONException, NotConnectedException {
        synchronized(socketLock) {
            System.out.println("sending request "+request.toString());
            out.println(request.toString());
            try {
                return new JSONObject(incomingResponseIn.readLine());
            } catch (IOException e) {
                // lets close the connection
                try {
                    disconnect(true);
                } catch (NotConnectedException e1) {}
                throw(new NotConnectedException());
            }
        }
    }

private class SocketThread implements Runnable {

    @Override
    public void run() {
        String inputLine = null;
        try {
            while((inputLine = in.readLine()) != null) {
                // determine if this is a response or event and send to necessary location
                JSONObject lineJSON = new JSONObject(inputLine);
                if (lineJSON.getString("type").equals("response")) {
                    incomingResponseOut.println(inputLine);
                    incomingResponseOut.flush();
                }
                else if (lineJSON.getString("type").equals("event")) {
                    incomingEventOut.println(inputLine);
                    incomingEventOut.flush();
                }
            }

            if (isConnected()) {
                try {
                    disconnect(true);
                } catch (NotConnectedException e) {}
            }
        } catch (IOException e) {
            // try and disconnect (if not already disconnected) and end thread
            if (isConnected()) {
                try {
                    disconnect(true);
                } catch (NotConnectedException e1) {}
            }
        }
    }

}

private class IncomingEventThread implements Runnable {

    @Override
    public void run() {
        String inputLine = null;
        try {
            while((inputLine = incomingEventIn.readLine()) != null) {
                JSONObject lineJSON = new JSONObject(inputLine);
                String eventType = lineJSON.getString("eventType");
                // determine what type of event it is and then fire one that represents it
                if (eventType.equals("channelAdded")) {
                    JSONObject a = lineJSON.getJSONObject("payload");
                    Hashtable<String,Object> data = new Hashtable<String,Object>();
                    Object[] keys = a.keySet().toArray();
                    for(int i=0; i<keys.length; i++) {
                        data.put((String) keys[i], a.get((String) keys[i]));
                    }
                    dispatchEvent(new ChannelAddedEvent(data));
                }
                else if (eventType.equals("channelRemoved")) {
                    dispatchEvent(new ChannelRemovedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
                }
                else if (eventType.equals("channelsToRoom")) {
                    ArrayList<Integer> data = new ArrayList<Integer>();
                    JSONObject a = lineJSON.getJSONObject("payload");
                    JSONArray ids = a.getJSONArray("channelIds");
                    for(int i=0; i<ids.length(); i++) {
                        data.add(ids.getInt(i));
                    }
                    dispatchEvent(new ChannelsToRoomEvent(data));
                }
                else if (eventType.equals("channelToHolding")) {
                    dispatchEvent(new ChannelToHoldingEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
                }
                else if (eventType.equals("channelVerified")) {
                    dispatchEvent(new ChannelVerifiedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
                }
                else if (eventType.equals("serverResetting")) {
                    dispatchEvent(new ServerResettingEvent());
                }
            }
        } catch (IOException e) {}
        System.out.println("here");
    }

}

Edit 2: I think it's a deadlock issue somewhere because if I put some breakpoints in before it in the debugger it runs fine and inputLine = incomingEventIn.readLine() returns null. If I try and run it normally it locks up.

Edit 3: Solved thanks to Gray's answer. The input stream is being closed before the output which was causing the lock up. It needs to be the other way around. Closing the output stream first then informs the input stream that the stream is closed and unblocks the readLine() method.

Community
  • 1
  • 1
Tom Jenkinson
  • 1,390
  • 3
  • 19
  • 43
  • *Never* ignore exceptions, and never substitute your own messages for the message in the exception. Please fix your code to log all exceptions and re-run it. You may get a surprise. NB why are you starting two threads and four pipes per socket? This seems incredibly over-elaborate. I've used Pipes exactly once in 16 years of Java, and I hastily removed them in favour of a queue. – user207421 Jul 22 '13 at 22:24
  • There are only 2 pipes aren't there? It was the only way I could think of splitting the input read from the socket and sending it 2 different ways depending "type" declared in the json. If there's a better way please let me know? I know ignoring exceptions is a bad idea but I don't really know what to do if there is one in those places as the aim is to get the socket closed anyway. – Tom Jenkinson Jul 22 '13 at 22:35
  • Two pipes, four streams, plus the socket input and output streams, total six per socket. Surely you don't need all that? I don't care how you write your code but if you present code *here* that ignores exceptions there is a strong presumption that you've just missed an exception that has been thrown. It is incumbent on you to eliminate that possibility. – user207421 Jul 22 '13 at 22:38

4 Answers4

3

How does it know that it's reached the end of the stream? What sequence of characters does it use to determine this.

The answer to this is OS dependent but the OS' I'm familiar with, no EOF characters are read. The OS returns to the underlying caller the return values that indicate that the stream (file-descriptor) has reached EOF. The JVM sees the return value and returns the appropriate return (null, -1, ...) to the InputStream or Reader caller depending on the method.

I want to simulate sending the same sequence of characters to properly close another connection that uses PipedStreams.

If you are reading from a PipedReader then you close the associated PipedWriter. The Reader or InputStream will then return the appropriate EOF value to the caller.

Edit:

Since your IncomingEventThread is reading from incomingEventIn, the disconnect() method should close the incomingEventOut first. The thread should close the in side itself. Then you should close the response out.

I would not have the thread call disconnect(...). It should only close it's reader and writer, not all of the streams.

Gray
  • 115,027
  • 24
  • 293
  • 354
  • At the moment I am calling close() on the PipedReader and readLine() still doesn't appear to be returning null. I guess I've made a mistake somewhere. Thanks. – Tom Jenkinson Jul 22 '13 at 21:58
  • You call close on the _writer_ I believe @TomJenkinson. – Gray Jul 22 '13 at 22:00
  • 1
    The OS doesn't 'close the input stream'. It tells the reader that EOS has occurred, via a return value. The application must close the stream. If the OS close the stream you wouldn't ever see 'too many open files'. – user207421 Jul 22 '13 at 22:01
  • In my code though when I call `incomingEventIn.close()`, `incomingEventIn.readLine()` doesn't seem to be returning null in the other thread and the `incomingEventIn.close()` appears to be blocking. I have no idea why. – Tom Jenkinson Jul 22 '13 at 22:17
  • I've updated my answer. The main thread should close 2 streams, the thread should close the other side of each of the streams @TomJenkinson. – Gray Jul 22 '13 at 22:26
  • @Gray Well done and I've upvoted but there *are* no 'EOF characters [to be] read'. If there was such a thing as an 'EOF character' you couldn't read and write all 256 byte values, and you can. – user207421 Jul 22 '13 at 22:28
  • I never thought of that but it makes sense now! This is the first project I've done with streams. That fixed it and I understand how the EOS is detected now. Thanks! – Tom Jenkinson Jul 22 '13 at 22:31
  • Thanks @EJP. I'm looking at my answer and it doesn't say there are 'EOF' characters. I say "typically no EOF characters are read" in response to his query about them. Am I missing something? Ah, maybe you are taking issue with my "typically". I've changed my answer. I was being conservative because I have no idea how Windows works. – Gray Jul 22 '13 at 22:33
  • It can't say that meaningfully unless there is such a thing, which there isn't. You're also implying that there can be an atypical case where there is such a thing, and again there isn't. – user207421 Jul 22 '13 at 22:34
  • Well "EOF character" was the way we used to refer to it back in C/Unix days @EJP. It was always a `getChar() != EOF` type of construct. It also was used as you mention on the terminal side to close the output side of the pipe. But in terms of Java you are correct that it does not exist. – Gray Jul 22 '13 at 22:39
  • Java has nothing to do with it. It doesn't exist at all. EOF in Unix meant -1, which is out of band for a byte, same as the -1 return value for Java's read(). – user207421 Jul 22 '13 at 22:42
  • Sigh. DOS had ^Z in files. Unix tapes had tape marks which indicated EOF/EOT. cpio and tar streams on tapes had EOF sequences which indicated when the archive was done. Java serialized streams have EOF patterns so they can throw `EOFException`. EOF characters and sequences do exist @EJP. But as I mention, you are correct in saying that they are not read/written here. – Gray Jul 22 '13 at 22:47
2

Check out this question: what is character for end of file of filestream?

Community
  • 1
  • 1
lcta0717
  • 402
  • 5
  • 13
2

From your point of view, just call close on PipedOutputStream that you use to connect to your test.

The actual close of the socket is performed by the TCP stack on client and server.

This should do (note that you cannot read/write piped streams on the same thread, hence the 2 methods and a thread creation):

void runTest ( final PipedInputStream sink ) throws Exception
{
    try( final PipedOutputStream stream = new PipedOutputStream( sink ) )
    {
        try ( final OutputStreamWriter swriter = 
              new OutputStreamWriter( stream, "UTF-8" )
        )
        {
            try ( final PrintWriter writer = new PrintWriter( swriter ) )
            {
                writer.println( "Hello" );

                writer.println( "World!" );
            }
        }
    }
}

void test ( final PipedInputStream sink ) throws InterruptedException
{
    final Thread outputThread =
        new Thread(
            new Runnable ( )
            {
                @Override
                public void run ( )
                {
                    try
                    {
                        runTest( sink );
                    }
                    catch ( final Exception ex )
                    {
                        throw new RuntimeException( ex );
                    }
                }

            }
        );

    outputThread.start( );

    outputThread.join( );
}
Alexander Pogrebnyak
  • 44,836
  • 10
  • 105
  • 121
2

There isn't one. The OS knows when the stream reaches its end via the file size, the TCP FIN bit, or other out-of-band mechanisms depending on the source. The only exception I'm aware of is that the terminal driver recognizes Ctrl/d or Ctrl/z as EOF when types by a keyboard, but again that's the OS, not the Java stream or reader.

user207421
  • 305,947
  • 44
  • 307
  • 483