I am developing a simple distributed ledger. I want to be able to start nodes on different ports, that could communicate with eachother. Each programme then would have a file, where it would write newly discovered nodes.
At first, only the most reliable nodes are hardcoded into that file. Here is procedurally what happens:
1) I start a new node, which starts a HTTP server (I use com.sun.HttpServer). The server has a GetAddress handler, which listens to requests that go to the specified URI. It then gets the IP and PORT (which is specified in URI query params), acquires a semaphore for a known_nodes.txt file, and writes the newly received peer address to that file, if it's not already there, and send the contents of newly updated file as a json list back to the requester.
2) Within my Node class (which, as mentioned earlier, starts a HTTPServer on a separate thread), I create a ScheduledExecutorService and give it a runnable to be run every few seconds, whose job will be to connect to the URLs present in the known_nodes.txt file, and ask them for their known_nodes. If we received nodes that were not previously present in our known_nodes file, we overwrite our file.
NOW! If I start a node, and try to request it from the browser, everything goes as planned - we receive a request, write it to our file, then our runnable will try to connect to the address specified in request. If we caught a SocketTimeoutException, we remove the address from our known_nodes.txt file.
The problem arises, when I start two nodes, running let's say on port 8001 and 8002. Please note, that each node has its own known_nodes file. What happens, is that one of the nodes will stop running the DiscoverAddresses task, the other one won't. So effectively, one nodes stopped receiving requests.
NB! The node that will stop its scheduledtask will STILL send at least ONE discovering request, and then will die/block(?).
Here is the code for the runnable task:
@Override
public void run() {
log.info("still running ");
PeerAddressesHolder inactiveNodes = new PeerAddressesHolder();
ApplicationConfiguration appConf = ApplicationConfiguration.getInstance();
for (PeerAddress peerAddress : knownNodes.getAddresses()) {
if (isSameNode(peerAddress)) {
continue;
}
String urlString = String.format("http://%s:%s%s?myport=%d", peerAddress.getIP(), peerAddress.getPort(), Constants.GET_ADDRESS, myPort);
try {
StringBuilder result = new StringBuilder();
URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setConnectTimeout(5000);
conn.setRequestMethod("GET");
try (InputStream connInputStream = conn.getInputStream();
InputStreamReader ir = new InputStreamReader(connInputStream);
BufferedReader br = new BufferedReader(ir)){
String line;
while ((line = br.readLine()) != null) {
result.append(line).append('\n');
}
} catch (Exception e) {
log.warn("Couldn't read from connection input stream",e);
}
PeerAddressesHolder peerAddressesHolder = gson.fromJson(result.toString(), PeerAddressesHolder.class);
boolean fetchedNew = false;
for (PeerAddress fetchedAddress : peerAddressesHolder.getAddresses()) {
if (!isValidAddress(peerAddress)) {
log.warn("Peer has sent us a null-address. It will be ignored.");
return;
}
if (!knownNodes.contains(fetchedAddress)) {
knownNodes.addAddress(fetchedAddress);
fetchedNew = true;
}
}
if (fetchedNew) {
FileUtils.writeToFile(appConf.getKnownNodesFilePath(), gson.toJson(knownNodes), false);
}
} catch (SocketTimeoutException e) {
if (appConf.getMostReliableNodes().contains(peerAddress)) {
log.warn("Most reliable node not available: " + peerAddress);
} else {
inactiveNodes.addAddress(peerAddress);
log.warn("Connection timeout from " + peerAddress + ". It will be removed.");
}
} catch (Exception e) {
log.warn("Couldn't discover new addresses." + e);
}
}
try {
knownNodes.removeAll(inactiveNodes.getAddresses());
FileUtils.writeToFile(appConf.getKnownNodesFilePath(), gson.toJson(knownNodes), false);
} catch (IOException ioe) {
log.warn("Couldn't write to file after deleting dead node", ioe);
}
}
And here is how I start it upon Node creation.
public NetworkNode(int port) {
this.appConf = ApplicationConfiguration.getInstance();
this.port = port;
log.info("Starting a new node on port " + port);
try {
this.knownNodes = FileUtils.createPeerAddressesList(appConf.getKnownNodesFilePath());
} catch (Exception e) {
log.error("Error while trying to construct a list of peer addresses from file content on path: " + appConf.getKnownNodesFilePath());
}
scheduledExecutorService = Executors.newScheduledThreadPool(4);
scheduledExecutorService.scheduleAtFixedRate(new DiscoverAddressesTask(knownNodes, this.port), 3, 4, TimeUnit.SECONDS);
Methods dealing with file reading/writing are all done using try-with-resources construct, so my initial idea that the runnable stops because of some unclosed streams is probably not valid.