1

I have a Spark Scala Job on EMR that runs smoothly up until the last job, then in that last job I see no progress for 2hrs. All executors show that tasks are evenly spread out so I dont think it's a data skew issue. The job was completing but I found a bug that was overwriting the map rather than updating, since fixing the bug I've seen this slowness and am not sure how to fix it. This part of the job reads/writes to Redis and utlizes two scala mutable maps, I'm not sure what could be causing this slowness so any help would be appreciated. This data set is repartitioned by col("id1")

val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
  case class countDateMap(count: Long, dateTime: ZonedDateTime)
  val id1CountDateMap = scala.collection.mutable.Map[String, scala.collection.mutable.Map[Long, Seq[countDateMap]]]()

  var rowResult: ListBuffer[Row] = ListBuffer()
  RedisClient.writeHost = writeHost.value
  RedisClient.readHost = readHost.value
  RedisClient.port = port.value
  RedisClient.keyTTL = keyTTL.value
  RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
  RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)

  iterator.foreach(row => {
    val id1 = row.getAs("id1").asInstanceOf[String]
    val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
    var rowsMap: scala.collection.mutable.Map[Long, Seq[countDateMap]] = scala.collection.mutable.Map[Long, Seq[countDateMap]]()
    if (!id1CountDateMap.isDefinedAt(id1)) {
      id1CountDateMap.put(id1, rowsMap)
    }
    rowsMap = id1CountDateMap(id1)
    
    availableRowsForId1.foreach(o => {
        val id2 = o.getAs("id2").asInstanceOf[Long]
        val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
        var restriction1 = false
        var restriction2 = false
        var localCount: Long = 1
        var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
        if (currentRowCountDateTime.isDefined) {
          //business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
        } 

        if (!restriction1 && !restriction2) {
          val set = RedisClient.setCount(id2.toString, (o.getAs("redisCount").asInstanceOf[String] + 1).toString)
          val newCountDate= countDateMap(localCount, dateTime)
          var newCountDateSeq = Seq(newCountDate)
          if (currentRowCountDateTime.nonEmpty) {
            newCountDateSeq = currentRowCountDateTime.get.union(Seq(newCountDate))
          }
          rowsMap.update(id2, newCountDateSeq)
          rowResult += RowFactory.create(id1, id2.toString)
        }
      })
    })
  })
  rowResult.toList.iterator
})(encoder)
//OLD THAT RAN QUICK BUT HAD A BUG B/C rowsMap GOT OVERWRITTEN NOT UPDATED
val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
  case class countDateMap(count: Long, dateTime: ZonedDateTime)
  val id1CountDateMap = new ConcurrentMap[String, Map[Long, Seq[countDateMap]]]()

  var rowResult: ListBuffer[Row] = ListBuffer()
  RedisClient.writeHost = writeHost.value
  RedisClient.readHost = readHost.value
  RedisClient.port = port.value
  RedisClient.keyTTL = keyTTL.value
  RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
  RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)

  iterator.foreach(row => {
    val id1 = row.getAs("id1").asInstanceOf[String]
    val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
    var rowsMap= scala.collection.mutable.Map[Long, Seq[countDateMap]]()
    id1CountDateMap.putifAbsent(id1, rowsMap)
    
    availableRowsForId1.foreach(o => {
        val id2 = o.getAs("id2").asInstanceOf[Long]
        val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
        var restriction1 = false
        var restriction2 = false
        var localCount: Long = 1
        var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
        if (currentRowCountDateTime.isDefined) {
          //business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
        } 

        if (!restriction1 && !restriction2) {
          val set = RedisClient.setCount(id2.toString, (o.getAs("redisCount").asInstanceOf[String] + 1).toString)
          val newCountDate = countDateMap(localCount, dateTime)
          var newCountDateSeq = Seq(newCountDate)
          if (currentRowCountDateTime.nonEmpty) {
            newCountDate Seq = currentRowCountDateTime.get.union(Seq(newCountDate ))
          }
          rowsMap.update(id2, newCountDate Seq)
          id1CountDateMap.replace(id1, rowsMap)
          rowResult += RowFactory.create(id1, id2.toString)
        }
      })
    })
  })
  rowResult.toList.iterator
})(encoder)

Summary of Task Metrics

enter image description here

sgallagher
  • 137
  • 10

0 Answers0