0

I knows to make sync when writing to a file easily, but the problem is when there are multiple files, we to make sync on per file. I have write some code like this to make people easily to understand what i have said

import java.io._
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main {

  val lock: ConcurrentMap[String, Boolean] = new ConcurrentHashMap[String, Boolean]()

  def writeToFile(path: String): Unit = {
    while (lock.get(path)) {}
    lock.put(path, true)
    println("write to file " + path)
    Thread.sleep(2000)
    lock.remove(path)
  }

  def main(args: Array[String]): Unit = {
    Future {
      writeToFile("1")
    }
    Future {
      writeToFile("1")
    }
    Future {
      writeToFile("1")
    }
    while (true) {}
  }
}

The result will be:

write to file 1 [sleep 2 seconds]
write to file 1
write to file 1

The last two lines print at the same time. There are any another solution, please tell me :) thanks!!!

1 Answers1

0

Your operations are not locked appropriately or atomic. While one thread may just have removed the lock from your map, two others may exit their respective while-loops simultaneously.

Also, empty infinite while loops put an unnecessary strain on your resources.

I replaced your lock map with a map that holds a Future for each path. The map is solely updated by its atomic compute method. If the path was never written before we get null and create our first write future. If on the other hand the path was written to before, we can just chain our new write operation. I have done this with .map which allows for error handling, if needed.

See SO: future chaining for more info on .map vs .andThen

import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object concurrent {

  val writeOperations: ConcurrentMap[String, Future[_]] = new ConcurrentHashMap[String, Future[_]]()

  def writeToFile(path: String): Unit = {
    val writeOp = writeOperations.compute(path, (path, future) => {
      if (future == null) {
        Future {
          println("write to file " + path)
          Thread.sleep(2000)
        }
      } else {
        future.map(x => { // you can use x for error handling of previous write operation
          println("write to file " + path)
          Thread.sleep(2000)
        })
      }
    })
  }

  def main(args: Array[String]): Unit = {
    val futures = Seq(
      Future {
        writeToFile("1")
      }, Future {
        writeToFile("1")
      }, Future {
        writeToFile("1")
      }
    )

    futures.foreach(Await.ready(_, Duration.Inf))
    writeOperations.values().forEach( x => Await.ready(x, Duration.Inf))
  }
}
rad i
  • 194
  • 2
  • 8