2

I have a List and has to create Map from this for further use, I am using RDD, but with use of collect(), job is failing in cluster. Any help is appreciated.

Please help. Below is the sample code from List to rdd.collect. I have to use this Map data further but how to use without collect?

This code creates a Map from RDD (List) Data. List Format->(asdfg/1234/wert,asdf)

 //List Data to create Map
 val listData = methodToGetListData(ListData).toList
//Creating RDD from above List  

  val rdd = sparkContext.makeRDD(listData)

      implicit val formats = Serialization.formats(NoTypeHints)
      val res = rdd
        .map(map => (getRPath(map._1), getAttribute(map._1), map._2))
        .groupBy(_._1)
        .map(tuple => {
          Map(
            "P_Id" -> "1234",
            "R_Time" -> "27-04-2020",
            "S_Time" -> "27-04-2020",
            "r_path" -> tuple._1,
            "S_Tag" -> "12345,
            tuple._1 -> (tuple._2.map(a => (a._2, a._3)).toMap)
          )
        })

      res.collect()
    }
Anu S
  • 21
  • 6
  • Can you please add some sample input & output ? – Srinivas Apr 27 '20 at 11:18
  • 3
    How is it failing? Is that an out of memory error? And how are you using the collected data? – ernest_k Apr 27 '20 at 11:28
  • List : Input List : List((Start~>HInfo~>Monitor~>VSData,XYZVN), (Start~>HInfo~>Cables~>Cables-1~>Name,LC), (Start~>HInfo~>Disk~>Disk-1~>Partition~>Partition-1~>Name,Not Used)) – Anu S Apr 27 '20 at 11:48
  • Example Output Map : Map(Item_Id -> -0909, Parent_Id -> 1234, object_class_instance -> Cables-3, Received_Time -> 23-12-2020, Cables -> Map(Index -> 2, Status -> Installed, HInfoID -> ABCD1234, Name -> WLAN), object_class -> Cables, ServiceTag -> ASDF123, Scan_Time -> 12345, relation_tree -> Start~>HInfo~>Cables~>Cables-3) – Anu S Apr 27 '20 at 11:49
  • Map(Item_Id -> -0909, Parent_Id -> 1234, object_class_instance -> Cables-1, Received_Time -> 23-12-2020, Cables -> Map(Name -> LC, Status -> Installed, HInfoID -> ABCD1234, Index -> 0), object_class -> Cables, ServiceTag -> ASDF123, Scan_Time -> 12345, relation_tree -> Start~>HInfo~>Cables~>Cables-1) – Anu S Apr 27 '20 at 11:50
  • Error is task is bigger Size, recommended 100 MB and not returning output after hours also, I am using res.collect() at last to return the Map Data. – Anu S Apr 27 '20 at 11:51
  • Hi @ernest_k, after collecting the Map Data, I have to further Convert into Json and collect all the json file in a single Array. – Anu S Apr 27 '20 at 11:55
  • @Srinivas, Can you please have a look to your mail, shared sample code to get Input/Output. – Anu S Apr 27 '20 at 12:00
  • Are you sure you have to use an RDD? And if so, why can't you convert to json in the RDD? Also the error you mention sounds like an informational message, or a warning at best. – Jasper-M Apr 27 '20 at 12:02
  • ok, i will check that. – Srinivas Apr 27 '20 at 12:07
  • @Jasper, If you don't mind, can you please share your mail ID, I can share the actual Code, and see if you can help with some modifications.? – Anu S Apr 27 '20 at 12:13
  • check the answer. – Ram Ghadiyaram Apr 29 '20 at 04:42
  • if you are okay please care to accept [the answer as owner](https://meta.stackexchange.com/a/5235/369717) and [vote-up](https://meta.stackexchange.com/a/173400/369717) – Ram Ghadiyaram Apr 29 '20 at 04:45

1 Answers1

1

Q: how to use without collect?


Answer : collect will hit.. it will move the data to driver node. if data is huge. Never do that.


I dont exactly know what is the use case to prepare a map but it can be achievable using built in spark API i.e.collectionAccumulator ... in detail,

collectionAccumulator[scala.collection.mutable.Map[String, String]]


Lets suppose, this is your sample dataframe and you want to make a map.

+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree                  |
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|-0909  |1234     |Cables-1             |23-12-2020   |LC        |Installed   |ABCD1234     |0          |Cables      |ASDF123   |12345    |Start~>HInfo->Cables->Cables-1 |
|-09091 |1234111  |Cables-11            |23-12-2022   |LC1       |Installed1  |ABCD12341    |0          |Cables1     |ASDF1231  |123451   |Start~>HInfo->Cables->Cables-11|
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+

From this you want to make a map (nested map I prefixed with nestedmap key name in your example) then...

Below is the full example have a look and modify accordingly.

package examples

import org.apache.log4j.Level

object GrabMapbetweenClosure extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)


  import org.apache.spark.sql.SparkSession

  val spark = SparkSession
    .builder()
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()

  import spark.implicits._

  var mutableMapAcc = spark.sparkContext.collectionAccumulator[scala.collection.mutable.Map[String, String]]("mutableMap")

  val df = Seq(
    ("-0909", "1234", "Cables-1", "23-12-2020", "LC", "Installed", "ABCD1234"
      , "0", "Cables", "ASDF123", "12345", "Start~>HInfo->Cables->Cables-1")
    , ("-09091", "1234111", "Cables-11", "23-12-2022", "LC1", "Installed1", "ABCD12341"
      , "0", "Cables1", "ASDF1231", "123451", "Start~>HInfo->Cables->Cables-11")

  ).toDF("Item_Id", "Parent_Id", "object_class_instance", "Received_Time", "CablesName", "CablesStatus", "CablesHInfoID",
    "CablesIndex", "object_class", "ServiceTag", "Scan_Time", "relation_tree"
  )

  df.show(false)
  df.foreachPartition { partition => // for performance sake I used foreachPartition
    partition.foreach {
      record => {
        mutableMapAcc.add(scala.collection.mutable.Map(
          "Item_Id" -> record.getAs[String]("Item_Id")
          , "CablesStatus" -> record.getAs[String]("CablesStatus")
          , "CablesHInfoID" -> record.getAs[String]("CablesHInfoID")
          , "Parent_Id" -> record.getAs[String]("Parent_Id")
          , "CablesIndex" -> record.getAs[String]("CablesIndex")
          , "object_class_instance" -> record.getAs[String]("object_class_instance")
          , "Received_Time" -> record.getAs[String]("Received_Time")
          , "object_class" -> record.getAs[String]("object_class")
          , "CablesName" -> record.getAs[String]("CablesName")
          , "ServiceTag" -> record.getAs[String]("ServiceTag")
          , "Scan_Time" -> record.getAs[String]("Scan_Time")
          , "relation_tree" -> record.getAs[String]("relation_tree")

        )
        )
      }
    }
  }
  println("FinalMap : " + mutableMapAcc.value.toString)

}


Result :

+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree                  |
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|-0909  |1234     |Cables-1             |23-12-2020   |LC        |Installed   |ABCD1234     |0          |Cables      |ASDF123   |12345    |Start~>HInfo->Cables->Cables-1 |
|-09091 |1234111  |Cables-11            |23-12-2022   |LC1       |Installed1  |ABCD12341    |0          |Cables1     |ASDF1231  |123451   |Start~>HInfo->Cables->Cables-11|
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+

FinalMap : [Map(Scan_Time -> 123451, ServiceTag -> ASDF1231, Received_Time -> 23-12-2022, object_class_instance -> Cables-11, CablesHInfoID -> ABCD12341, Parent_Id -> 1234111, Item_Id -> -09091, CablesIndex -> 0, object_class -> Cables1, relation_tree -> Start~>HInfo->Cables->Cables-11, CablesName -> LC1, CablesStatus -> Installed1), Map(Scan_Time -> 12345, ServiceTag -> ASDF123, Received_Time -> 23-12-2020, object_class_instance -> Cables-1, CablesHInfoID -> ABCD1234, Parent_Id -> 1234, Item_Id -> -0909, CablesIndex -> 0, object_class -> Cables, relation_tree -> Start~>HInfo->Cables->Cables-1, CablesName -> LC, CablesStatus -> Installed)]

Similar problem was solved here.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Hi @Ram Ghadiyaram, This is working fine, but the real code is unning fine with rdd, while I'm tryng to convert List to DF(), I'm getting NULL Pointer Exception. val listData = (relationData).toList val rawDF = listData.toDF("rln_tr", "attr_val") rawDF.show(false) //RawDF has NULL Pointer Exception val rdd = sparkContext.makeRDD(listData)// This is working fine – Anu S Apr 29 '20 at 05:22
  • But you have to identify where you are doing operation on null i.m not sure with your data. – Ram Ghadiyaram Apr 29 '20 at 05:56
  • I say BIG NO to collect – Ram Ghadiyaram Apr 29 '20 at 05:57