When reading from a socket using a BufferedReader it states that the readLine()
method returns
A String containing the contents of the line, not including any line-termination characters, or null if the end of the stream has been reached
How does it know that it's reached the end of the stream? What sequence of characters does it use to determine this.
I want to simulate sending the same sequence of characters to properly close another connection that uses PipedStreams.
Edit: Here is the code in question. From the responses it looks like there is no such sequence and calling close() on the PipedOutput stream should unblock the readLine() on the output stream. It doesn't appear to be doing this at the moment which is why I was confused so I'm thinking it might be a bug somewhere else.
What's happening is the incomingEventIn.close()
line appears to be blocking when inputLine = incomingEventIn.readLine()
is blocking. If inputLine = incomingEventIn.readLine()
isn't being executed on the other thread then incomingEventIn.close()
executes fine. Why is this happening?
public class SocketManager {
private Socket socket = null;
private PrintWriter out = null;
private BufferedReader in = null;
private PipedOutputStream incomingEventOutStream = null;
private PrintWriter incomingEventOut = null;
private BufferedReader incomingEventIn = null;
private PipedOutputStream incomingResponsOutStream = null;
private PrintWriter incomingResponseOut = null;
private BufferedReader incomingResponseIn = null;
private ArrayList<AsteriskLiveComsEventListener> listeners = new ArrayList<AsteriskLiveComsEventListener>();
private final ExecutorService eventsDispatcherExecutor;
private String ip;
private int port;
private Object socketLock = new Object();
public SocketManager(String ip, int port) {
this.ip = ip;
this.port = port;
eventsDispatcherExecutor = Executors.newSingleThreadExecutor();
}
public void connect() throws UnableToConnectException, AlreadyConnectedException {
synchronized(socketLock) {
if (socket != null && !socket.isClosed()) {
throw (new AlreadyConnectedException());
}
try {
socket = new Socket(ip, port);
out = new PrintWriter(socket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
incomingEventOutStream = new PipedOutputStream();
incomingEventIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingEventOutStream)));
incomingEventOut = new PrintWriter(incomingEventOutStream);
incomingResponsOutStream = new PipedOutputStream();
incomingResponseIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingResponsOutStream)));
incomingResponseOut = new PrintWriter(incomingResponsOutStream);
} catch (IOException e) {
throw (new UnableToConnectException());
}
new Thread(new IncomingEventThread()).start();
new Thread(new SocketThread()).start();
}
}
public void disconnect() throws NotConnectedException {
disconnect(false);
}
private void disconnect(boolean notRequested) throws NotConnectedException {
synchronized(socketLock) {
if (!isConnected()) {
throw (new NotConnectedException());
}
try {
incomingEventIn.close();
} catch (IOException e2) {}
// IT NEVER GETS TO HERE!
incomingEventOut.close();
try {
incomingResponseIn.close();
} catch (IOException e1) {}
System.out.println("disconnecting");
incomingResponseOut.close();
try {
socket.shutdownInput();
} catch (IOException e) {}
try {
socket.shutdownOutput();
} catch (IOException e) {}
try {
socket.close();
} catch (IOException e) {}
if (notRequested) {
System.out.println("disconnecting event");
dispatchEvent(new ConnectionLostEvent());
}
}
}
public boolean isConnected() {
synchronized(socketLock) {
return (socket != null && !socket.isClosed());
}
}
public void addEventListener(AsteriskLiveComsEventListener a) {
synchronized(listeners) {
listeners.add(a);
}
}
public void removeEventListener(AsteriskLiveComsEventListener a) {
synchronized(listeners) {
listeners.remove(a);
}
}
private void dispatchEvent(final AsteriskLiveComsEvent e) {
synchronized (listeners) {
synchronized (eventsDispatcherExecutor) {
eventsDispatcherExecutor.execute(new Runnable()
{
public void run()
{
for(int i=0; i<listeners.size(); i++) {
listeners.get(i).onAsteriskLiveComsEvent(e);
}
}
});
}
}
}
public JSONObject sendRequest(JSONObject request) throws JSONException, NotConnectedException {
synchronized(socketLock) {
System.out.println("sending request "+request.toString());
out.println(request.toString());
try {
return new JSONObject(incomingResponseIn.readLine());
} catch (IOException e) {
// lets close the connection
try {
disconnect(true);
} catch (NotConnectedException e1) {}
throw(new NotConnectedException());
}
}
}
private class SocketThread implements Runnable {
@Override
public void run() {
String inputLine = null;
try {
while((inputLine = in.readLine()) != null) {
// determine if this is a response or event and send to necessary location
JSONObject lineJSON = new JSONObject(inputLine);
if (lineJSON.getString("type").equals("response")) {
incomingResponseOut.println(inputLine);
incomingResponseOut.flush();
}
else if (lineJSON.getString("type").equals("event")) {
incomingEventOut.println(inputLine);
incomingEventOut.flush();
}
}
if (isConnected()) {
try {
disconnect(true);
} catch (NotConnectedException e) {}
}
} catch (IOException e) {
// try and disconnect (if not already disconnected) and end thread
if (isConnected()) {
try {
disconnect(true);
} catch (NotConnectedException e1) {}
}
}
}
}
private class IncomingEventThread implements Runnable {
@Override
public void run() {
String inputLine = null;
try {
while((inputLine = incomingEventIn.readLine()) != null) {
JSONObject lineJSON = new JSONObject(inputLine);
String eventType = lineJSON.getString("eventType");
// determine what type of event it is and then fire one that represents it
if (eventType.equals("channelAdded")) {
JSONObject a = lineJSON.getJSONObject("payload");
Hashtable<String,Object> data = new Hashtable<String,Object>();
Object[] keys = a.keySet().toArray();
for(int i=0; i<keys.length; i++) {
data.put((String) keys[i], a.get((String) keys[i]));
}
dispatchEvent(new ChannelAddedEvent(data));
}
else if (eventType.equals("channelRemoved")) {
dispatchEvent(new ChannelRemovedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
}
else if (eventType.equals("channelsToRoom")) {
ArrayList<Integer> data = new ArrayList<Integer>();
JSONObject a = lineJSON.getJSONObject("payload");
JSONArray ids = a.getJSONArray("channelIds");
for(int i=0; i<ids.length(); i++) {
data.add(ids.getInt(i));
}
dispatchEvent(new ChannelsToRoomEvent(data));
}
else if (eventType.equals("channelToHolding")) {
dispatchEvent(new ChannelToHoldingEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
}
else if (eventType.equals("channelVerified")) {
dispatchEvent(new ChannelVerifiedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
}
else if (eventType.equals("serverResetting")) {
dispatchEvent(new ServerResettingEvent());
}
}
} catch (IOException e) {}
System.out.println("here");
}
}
Edit 2:
I think it's a deadlock issue somewhere because if I put some breakpoints in before it in the debugger it runs fine and inputLine = incomingEventIn.readLine()
returns null. If I try and run it normally it locks up.
Edit 3: Solved thanks to Gray's answer. The input stream is being closed before the output which was causing the lock up. It needs to be the other way around. Closing the output stream first then informs the input stream that the stream is closed and unblocks the readLine()
method.