ConcurrentHashMap<Long, CopyOnWriteArrayList<Observation>
mostRecentObservationDB = new ConcurrentHashMap<Long,
CopyOnWriteArrayList<Observation>(524288, 0.75f, 32);
This is my map. I am trying to read and write with multiple threads at the same time, but somehow it always creates multiple keys.
long timeInMilliseconds = System.currentTimeMillis();
if (/* the last key older than 10 seconds comparing to the new key*/) {
CopyOnWriteArrayList<Observation> initializingObservation = new CopyOnWriteArrayList<>();
initializingObservation.add(obs);
mostRecentObservationDB.putIfAbsent(timeInMilliseconds, initializingObservation);
} else {
// Update
}
Seperate Thread, which filters this hash map by deleting the keys which are older than 10 seconds.
while (true) {
try {
Thread.sleep(4000);
if(/*Time (key) older than 10 seconds*/) {
mostRecentObservationDB.remove(key);
}
} catch (Exception e) {
}
}
The problem is that after removing the key, it creates multiple keys while initializing. This are my logs.
key -> 1501779153776, value
key -> 1501779153826, value
key -> 1501779153876, value
key -> 1501779153896, value
I want them to be stored as one key while removing operation. This is how it should be stored.
key -> 1501779153776, value
However, when I read from it and then remove all entries by remove()
method, I want that no other thread writes to the map while I’m in the process of reading the map contents and then removing them.
This is the code which is behaving odd:
public static void main(String[] args) {
ConcurrentHashMap<Long, String> tenSecondBucket =
new ConcurrentHashMap<Long, String>();
Thread writingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1);
if(tenSecondBucket.size() > 0) {
// getting last key
long lastKey = 0;
for (long keyValue : tenSecondBucket.keySet()) {
lastKey = keyValue;
}
if(System.currentTimeMillis() - lastKey > 10000) {
tenSecondBucket.put(System.currentTimeMillis(), "secondEntry");
} else {
tenSecondBucket.put(lastKey, "updatedEntry");
}
} else {
tenSecondBucket.put(System.currentTimeMillis(), "newEntry");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
writingThread.start();
Thread removingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
if(tenSecondBucket.size() > 0) {
tenSecondBucket.keySet().stream().forEach(key -> {
if(System.currentTimeMillis() - key > 10000) {
tenSecondBucket.remove(key);
}
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
removingThread.start();
Thread readingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
if(tenSecondBucket.size() > 0) {
tenSecondBucket.keySet().stream().forEach(key -> {
System.out.println("testing key which is timestamp " + key);
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
readingThread.start();
}