0
ConcurrentHashMap<Long, CopyOnWriteArrayList<Observation>
mostRecentObservationDB = new ConcurrentHashMap<Long,
    CopyOnWriteArrayList<Observation>(524288, 0.75f, 32);

This is my map. I am trying to read and write with multiple threads at the same time, but somehow it always creates multiple keys.

long timeInMilliseconds = System.currentTimeMillis();

if (/* the last key older than 10 seconds comparing to the new key*/) {
    CopyOnWriteArrayList<Observation> initializingObservation = new CopyOnWriteArrayList<>();
    initializingObservation.add(obs);

    mostRecentObservationDB.putIfAbsent(timeInMilliseconds, initializingObservation);
} else {
  // Update
}

Seperate Thread, which filters this hash map by deleting the keys which are older than 10 seconds.

while (true) {
    try {
        Thread.sleep(4000);
        if(/*Time (key) older than 10 seconds*/) {
            mostRecentObservationDB.remove(key);
        }

    } catch (Exception e) {

    }
}

The problem is that after removing the key, it creates multiple keys while initializing. This are my logs.

key -> 1501779153776, value
key -> 1501779153826, value
key -> 1501779153876, value
key -> 1501779153896, value

I want them to be stored as one key while removing operation. This is how it should be stored.

key -> 1501779153776, value

However, when I read from it and then remove all entries by remove() method, I want that no other thread writes to the map while I’m in the process of reading the map contents and then removing them.


This is the code which is behaving odd:

public static void main(String[] args) {
    ConcurrentHashMap<Long, String> tenSecondBucket =
        new ConcurrentHashMap<Long, String>();

    Thread writingThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1);

                    if(tenSecondBucket.size() > 0) {
                        // getting last key
                        long lastKey = 0;
                        for (long keyValue : tenSecondBucket.keySet()) {
                            lastKey = keyValue;
                        }

                        if(System.currentTimeMillis() - lastKey > 10000) {
                            tenSecondBucket.put(System.currentTimeMillis(), "secondEntry");
                        } else {
                            tenSecondBucket.put(lastKey, "updatedEntry");
                        }
                    } else {
                        tenSecondBucket.put(System.currentTimeMillis(), "newEntry");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    });

    writingThread.start();

    Thread removingThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(4000);

                    if(tenSecondBucket.size() > 0) {
                        tenSecondBucket.keySet().stream().forEach(key -> {
                            if(System.currentTimeMillis() - key > 10000) {
                                tenSecondBucket.remove(key);
                            }
                        });
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    });

    removingThread.start();

    Thread readingThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(4000);

                    if(tenSecondBucket.size() > 0) {
                        tenSecondBucket.keySet().stream().forEach(key -> {
                            System.out.println("testing key which is timestamp " + key);
                        });
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    });

    readingThread.start();
}
Zabuzard
  • 25,064
  • 8
  • 58
  • 82
vidhya sagar
  • 87
  • 1
  • 10
  • You can use [the diamond operator](https://stackoverflow.com/questions/4166966/what-is-the-point-of-the-diamond-operator-in-java-7) to significantly shorten that declaration, just as you do for `initializingObservation`. – Michael Aug 03 '17 at 16:37
  • 4
    Can you provide a [mcve]? You have only shown the part of your code that will (supposedly) add a new entry every 10 seconds - based on that code only one would expect to see multiple keys in your map... – assylias Aug 03 '17 at 16:39
  • @assylias I hope you understand with this explanation. The problem is while removing the key, which is older than 10 seconds, the other threads come and checks and think there is no entry and try to create new Entry at the same time. So at the end, I have multiple keys and the values separated by unnecessary multiple keys instead of one key. My goal is to capture every 10 seconds some observations by comparing their time value. – vidhya sagar Aug 03 '17 at 17:02
  • @assylias However, when I read from it and then remove all entries by remove() method, I want no other thread writes to map while I’m in the process of reading the map contents and then removing them. – vidhya sagar Aug 03 '17 at 17:25
  • @vidhyasagar Your problem is probably in `if (/*time older than 10 seconds comparing to the previous key*/) {`. Your output shows that multiple threads add to the map within a few milliseconds so your condition to check that the latest key is older than 10 seconds doesn't seem to work. Once again, the problem is in the details of your code but you have not shown the important parts: read the link I posted in my previous comment. – assylias Aug 03 '17 at 17:48
  • @assylias There i compare whether the last key is older than 10 seconds comparing to the new key. I am trying to build a 10 Seconds bucket for UDP packets using packet arrival time. – vidhya sagar Aug 03 '17 at 18:00
  • @vidhyasagar Please post an example of the odd behaviour that can be compiled and run on its own. – biziclop Aug 03 '17 at 18:01
  • 1
    @biziclop I have added the code at the end to test. I am always getting multiple keys. I hope you understand my goal. My goal is to create 10 seconds bucket using packet arrival time. Writing, reading and removing thread safely. – vidhya sagar Aug 03 '17 at 18:24
  • You have [a check then act situation](https://stackoverflow.com/questions/13051266/race-conditions-check-then-act-and-read-modify-write) and you encounter a data race. – assylias Aug 03 '17 at 19:21
  • @assylias Yes I am already doing that, but if i want to create a key using current timeInMillisec, then "check then act situation" will be useless, because of key value changes quickly. – vidhya sagar Aug 03 '17 at 19:59

1 Answers1

1

The problem is with the way you are deriving the "lastKey". You seem to need the highest time value present in the map, and you're assuming that it would be last entry it the tenSecondBucket.keySet(). However, the keySet() method returns a Set, which by nature is NOT ordered (in any case, maps don't maintain an ordered list of keys)

So you need to replace this code-

long lastKey = 0;
for (long keyValue : tenSecondBucket.keySet()) {
     lastKey = keyValue;
}

With this code -

long lastKey = 0;
for (long keyValue : tenSecondBucket.keySet()) {
    lastKey = keyValue > lastKey? keyValue : lastKey;
}

After that change, the code works okay
Note though that the code does still have room for improvement/refactoring

Ashutosh A
  • 975
  • 7
  • 10