-1
package port_channel;
import java.util.*;
import java.util.concurrent.*; 
import java.io.*;
import java.net.*; 

public class ChannelPort implements Runnable {
    int portNum; 
    int nSize; 
    PrintWriter[] outs ; 
    Listner[] listners; 
    ConcurrentLinkedQueue<MessageType> que; 

    public ChannelPort(int portNum, int networkSize) {
        this.portNum = portNum; 
        this.nSize = networkSize; 
        outs = new PrintWriter[nSize]; 
        listners = new Listner[nSize]; 
        que = new ConcurrentLinkedQueue<MessageType>(); 

    } 

    public void initialize() {
        ServerSocket serverSocket = null; 
        try {
            serverSocket = new ServerSocket(portNum); 
        } catch (IOException ioe) {  } 
        for (int j = 0; j < nSize; j++) { 
            try {
                Socket clientSocket = serverSocket.accept();       
                // not part of communication 
                outs[j] = new PrintWriter(new OutputStreamWriter(clientSocket.getOutputStream())); 
                ObjectInputStream in = new ObjectInputStream(clientSocket.getInputStream()); 
                listners[j] = new Listner(j, in, this);  
            } catch (IOException ioe) { 
                System.err.println("Failed in connection for j=" + j);
                ioe.printStackTrace(); 
                System.exit(-1);
            } 
        } 
        System.out.println("Connections are all established."); 
    } 
        //thread
    public void run() { 
        initialize(); 
        for (int j = 0; j < nSize; j++) {
            listners[j].start(); 
        }
    }

    synchronized void gotMessage(MessageType message) {
        que.offer(message); 
        notifyAll(); 
    } 

    public synchronized MessageType receive() { 
        while (que.isEmpty()) { 
            try {
                wait(); 
            } catch (InterruptedException ire) {
                ire.printStackTrace();
            }
        } 
        MessageType msg = que.poll(); 
        System.out.println("receive: " + msg);
        return msg; 
    } 

    public synchronized void broadcast(String msgStr) { 
        for (int j = 0; j < outs.length; j++) { 
            outs[j].println(msgStr); 
            outs[j].flush();
        }
    }

    public int getPortNum() {
        return portNum;
    }

    public void setPortNum(int portNum) {
        this.portNum = portNum;
    }

    public int getnSize() {
        return nSize;
    }

    public ConcurrentLinkedQueue<MessageType> getQue() {
        return que;
    }

    public static void main(String[] args) throws IOException, InterruptedException   { 
        if (args.length != 2) 
            System.out.println("usage: java ChannelPort port-number number-of-nodes");
        int portNum = Integer.parseInt(args[0]); 
        int numNode = Integer.parseInt(args[1]); 
        ChannelPort cp = new ChannelPort(portNum, numNode); 
        new Thread(cp).start(); 
        Thread.sleep(60000); 
        System.out.println("Shutdown"); 
        Iterator<MessageType> ite = cp.getQue().iterator(); 
        while (ite.hasNext()) {
            System.out.println(ite.next()); 
        }
    }
}
//thread
class Listner extends Thread { 
    int pId; 
    ObjectInputStream in; 
    ChannelPort cPort; 
    boolean done = false; 
    final int ERR_THRESHOLD = 100; 

    public Listner(int id, ObjectInputStream in, ChannelPort cPort) { 
        this.pId = id; 
        this.in = in;  
        this.cPort = cPort; 
    } 
    public void run() { 
        MessageType msg; 
        int errCnt = 0; 
        while(in != null) { 
            try { 
                msg = (MessageType)in.readObject(); 
                System.out.println("process " + pId + ": " + msg); 
                cPort.gotMessage(msg); 
            } catch (ClassNotFoundException cnfe) { 
                cnfe.printStackTrace(); 
            } catch (SocketException se) { 
                System.err.println(se); 
                errCnt++; 
                if (errCnt > ERR_THRESHOLD)  System.exit(0); 
            } catch (IOException ioe) { 
                ioe.printStackTrace(); 
            }
        }
    }
}
pnuts
  • 58,317
  • 11
  • 87
  • 139

1 Answers1

2

Did you check the serverSocket created properly ?

You've suppressed the exception while creating the ServerSocket.

    catch (IOException ioe) {
            //print stack trace and see why exception occurs.
    }

serverSocket might be null at 2nd try block, so serverSocket.accept() throws NPE, fix this error

Saravana
  • 12,647
  • 2
  • 39
  • 57
  • java.net.BindException: Address already in use: JVM_Bind at java.net.DualStackPlainSocketImpl.bind0(Native Method) at java.net.DualStackPlainSocketImpl.socketBind(DualStackPlainSocketImpl.java:106) at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376) – Bipin Maurya Nov 01 '15 at 17:04
  • 1
    @BipinMaurya Have you considered *looking it up?* – user207421 Nov 01 '15 at 23:33
  • That means the port you're trying to use is already in use by some other service, try with an unique port number that is not used by other services, refer my answer in this question to kill the process binding the port http://stackoverflow.com/questions/33448995/tomcat-java-net-connectexception-connection-refused/33449055#33449055 – Saravana Nov 02 '15 at 01:50