0

I'm trying to multi thread an import job, but running into a problem where it's causing duplicate data. I need to keep my map outside of the loop so all my threads can update and read from it, but I can't do this without it being final and with it being final I can't update the map. Currently I need to put my Map object in the run method, but the problem comes when the values are not initially in the database and each thread creates a new one. This results in duplicate data in the database. Does anybody know how to do some sort of call back to update my map outside?

ExecutorService executorService = Executors.newFixedThreadPool(10);

final Map<Integer, Object> map = new HashMap<>();
map.putAll(populate from database);
for (int i = 0; i < 10; i++) {

    executorService.execute(new Runnable() {
        public void run() {

        while ((line = br.readLine()) != null) {
            if(map.containsKey(123)) {
                //read map object
                session.update(object);                
            } else {
                map.put(123,someObject);
                session.save(object);
            }            

            if(rowCount % 250 == 0)
            tx.commit;
        });

}

executorService.shutdown();
Code Junkie
  • 7,602
  • 26
  • 79
  • 141
  • 5
    What do you mean by `with it being final I can't update the map`? You can call any methods on the instance, you just can't assign the variable `map` to anything else if it's final, which should work fine for you. – vanza Oct 28 '13 at 17:44
  • 1
    If your import is as simple as what's shown, and you're writing everything to the same table periodically, without guaranteeing that you're writing to different sections of the same table, then this appears like it's not a very good case for multi-threading. – user1676075 Oct 28 '13 at 17:50
  • The import is much more complicating than what I'm showing, I log into hundreds of ftp accounts to pickup csv files to be parsed. I'm trying to multi thread this process to get it done quicker. Anyhow, all of the jobs need to share the map in my example above with both read / write access. In order to use it outside of the runnable interface, it needs to be declared final. The problem I'm facing is once declared final, I can no longer update the map from within the runnable interface. – Code Junkie Oct 28 '13 at 17:54
  • 1
    @George: your understanding is wrong, that is not what `final` means. You can update the map (i.e. add / remove elements from it). So either you haven't explained what do you mean by "update", or your problem lies somewhere else. – vanza Oct 28 '13 at 18:49
  • So the real issue here was my misunderstanding of final. I thought once something was marked final it could no longer be modified, therefor I would be unable to update the map. Sorry about the confusion. – Code Junkie Oct 28 '13 at 19:00

4 Answers4

1

You need to use some synchronization techniques.

Problematic part is when different threads are trying to put some data into map.

Example:

Thread 1 is checking if there is object with key 123 in map. Before thread 1 added new object to map, thread 2 is executed. Thread 2 also check if there is object with key 123. Then both threads added object 123 to map. This causes duplicates...

You can read more about synchronization here

http://docs.oracle.com/javase/tutorial/essential/concurrency/sync.html

Dario
  • 2,053
  • 21
  • 31
1

Based on your problem description it appears that you want to have a map where the data is consistent and you always have the latest up-t-date data without having missed any updates.

In this case make you map as a Collections.synchronizedMap(). This will ensure that all read and write updates to the map are synchronized and hence you are guaranteed to find a key using the latest data in the map and also guaranteed to write exclusively to the map.

Refer to this SO discussion for a difference between the concurrency techniques used with maps.

Also, one more thing - defining a Map as final does not mean yu cannot modify the map - you can definitely add and remove elements from the map. What you cannot do however is change the variable to point to another map. This is illustrated by a simple code snippet below:

    private final Map<Integer, String> testMap = Collections.synchronizedMap(new HashMap<Integer,String>());
    testMap.add(1,"Tom"); //OK
    testMap.remove(1);   //OK
    testMap = new HashMap<Integer,String>(); //ERROR!! Cannot modify a variable with the final modifier
Community
  • 1
  • 1
Prahalad Deshpande
  • 4,709
  • 1
  • 20
  • 22
  • As far as my final issue, I think the reason I was struggling with it and something I didn't put in my original question was the fact my actual importTask.parse job is being done in a service, so I'm actually passing my map into a service called parse(map). The method then has the arguments as followed parse(final Map map) and from there I'm unable to update the original map outside of the service. Also does the collections.synchronizedMap make things thread safe so I don't end up with 2 objects being persisted to the database? – Code Junkie Oct 28 '13 at 18:27
  • @George - yes that would make the things thread safe (although at a loss of performance) – Prahalad Deshpande Oct 28 '13 at 18:47
1

I would suggest the following solution

  • Use ConcurrentHashmap
  • Don't use update and commit inside your crawling threads
  • Trigger save and commit when your map reaches a critical size in a separate thread.

Pseudocode sample:

final Object lock = new Object();

...

executorService.execute(new Runnable() {
    public void run() {
        ...
        synchronized(lock){
            if(concurrentMap.size() > 250){
               saveInASeparateThread(concurrentMap.values().removeAll()));          
            }
        }
    }
}
Andrey Chaschev
  • 16,160
  • 5
  • 51
  • 68
  • I ended up creating another post about the concurrency issues since it really isn't related to the original question. If you could take a look at http://stackoverflow.com/questions/19642756/how-to-sync-multi-thread-map-update I provided a working example and output. Thanks. – Code Junkie Oct 28 '13 at 18:56
1

This following logic resolves my issue. The code below isn't tested.

ExecutorService executorService = Executors.newFixedThreadPool(10);

final Map<Integer, Object> map = new ConcurrentHashMap<>();
map.putAll(myObjectList);

List<Future> futures = new ArrayList<>();

for (int i = 0; i < 10; i++) {
    final thread = i;

    Future future = executorService.submit(new Callable() {
        public void call() {

        List<MyObject> list;

        CSVReader reader = new CSVReader(new InputStreamReader(csvFile.getStream()));

        list = bean.parse(strategy, reader);

        int listSize = list.size();
        int rowCount = 0;

        for(MyObject myObject : list) {

            rowCount++;

            Integer key = myObject.getId();

            if(map.putIfAbsent(key, myObject) == null) {
                session.save(object);                
            } else {
                myObject = map.get(key);
                //Do something
                session.update(myObject);
            }            

            if(rowCount % 250 == 0 || rowCount == listSize) {
                tx.flush();
                tx.clear();
            }
        };

        tx.commit();

        return "Thread " + thread + " completed."; 

    });  

    futures.add(future);  
}

for(Future future : futures) {
    System.out.println(future.get());
}

executorService.shutdown();
Code Junkie
  • 7,602
  • 26
  • 79
  • 141