1

I have a Dataframe that has a column which needs some cleaning. I am looking forward for a regex pattern that can be applied in a Spark UDF in Java/Scala which will extract valid content from a String.

Sample input row of column userId as shown in the DataFrame below:

[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]

Expected Transformation of column named "userId":

A string which looks like:

105286112|115090439|29818926

I need the logic/approach to modify the userId column so as to make a UDF of the same. Can it happen with regex or some other approach?

The input DataFrame looks like this:

+--------------------+--------------------+
|    dt_geo_cat_brand|        userId      |
+--------------------+--------------------+
|2017-10-30_17-18 ...|[[133207500,2017-...|
|2017-10-19_21-22 ...|[[194112773,2017-...|
|2017-10-29_17-18 ...|[[274188233,2017-...|
|2017-10-29_14-16 ...|[[86281353,2017-1...|
|2017-10-01_09-10 ...|[[92478766,2017-1...|
|2017-10-09_17-18 ...|[[156663365,2017-...|
|2017-10-06_17-18 ...|[[111869972,2017-...|
|2017-10-13_09-10 ...|[[64404465,2017-1...|
|2017-10-13_07-08 ...|[[146355663,2017-...|
|2017-10-22_21-22 ...|[[54096488,2017-1...|
+--------------------+--------------------+

Schema:

root
 |-- dt_geo_cat_brand: string (nullable = true)
 |-- userId: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

Desired Output:

+--------------------+--------------------+
|    dt_geo_cat_brand|         userId     |
+--------------------+--------------------+
|2017-10-30_17-18 ...|133207500,1993333444|
|2017-10-19_21-22 ...|122122212,3432323333|
|2017-10-29_17-18 ...|274188233,8869696966|
|2017-10-29_14-16 ...|862813534,444344444,43444343434|
|2017-10-01_09-10 ...|92478766,880342342,4243244432,5554335535|
+--------------------+--------------------+

and so on...

CodeReaper
  • 377
  • 4
  • 18
  • But why are you trying to extract data from your dataframe with a regular expression? – Elliott Frisch Sep 05 '18 at 01:20
  • I need to use the values extracted from that column (numerical values) so that I can generate bitmaps over that at a later point in the processing model. The reason why you are seeing the data like that is I used to Cassandra to group the data by key and club the values together based on key. – CodeReaper Sep 05 '18 at 01:30

2 Answers2

1

Write UDF using below regex. It will extract what is needed.

import ss.implicits._

val df = ss.read.csv(path).as("")
df.show()

val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*" // regex which can extract the required data

val input = "[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]"   // input string
val mat = reg.r.findAllIn(input)  // extracting the data

println(mat)
while (mat.hasNext) {
    mat.next()
    println(mat.group(1) + "|" + mat.group(2)+ "|" +  mat.group(3)) // each group will print the 3 extracted fields
}

Output :

105286112|115090439|29818926

With UDF:

import ss.implicits._

    val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*"

    def reg_func = { (s: String) =>
        {
            val mat = reg.r.findAllIn(s)

            println(mat)
            var out = ""
            while (mat.hasNext) {
                mat.next()
                out = mat.group(1) + "|" + mat.group(2) + "|" + mat.group(3)
            }
            out
        }
    }

    val reg_udf = udf(reg_func)

    val df = ss.read.text(path)
    .withColumn("Extracted_fields", reg_udf($"value"))
    df.show(false)

Input : created some sample 2nd record

[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]
[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]

Output:

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|value                                                                                                                                                                                       |Extracted_fields            |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286112|115090439|29818926|
|[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286113|115090440|29818927|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
Praveen L
  • 937
  • 6
  • 13
  • hi , there can be more than 3 fields to be extracted, currently you have used mat.group(1) + "|" + mat.group(2) + "|" + mat.group(3), can we make it dynamic and not hardcoded? – CodeReaper Sep 05 '18 at 06:14
1

You do not need a regex to solve this. The data is formatted as an Array of structs and looking at the schema what you want is the _1 string for each struct. This can be solved with an UDF that extracts the value and then converts everything into a string with mkString("|") to get the expected output:

val extract_id = udf((arr: Seq[Row]) => { 
  arr.map(_.getAs[String](0)).mkString("|")
})

df.withColumn("userId", extract_id($"userId"))

Addition as per comment #1:

If you want to save the result partitioned on the dt_geo_cat_brand in a csv file (all values on its own row), you can do it as follows. First, return a list from the udf instead of a string and use explode:

val extract_id = udf((arr: Seq[Row]) => { 
  arr.map(_.getAs[String](0))
})

val df2 = df.withColumn("userId", explode(extract_id($"userId")))

Then use partitionBy(dt_geo_cat_brand) when saving. This will create a folder structure depending on the value in the dt_geo_cat_brand column. Depending on the partitioning, the number of csv files in each folder can differ but they will all have values from a single value in dt_geo_cat_brand (use repartition(1) before saving if you want a single file and have enough memory).

df2.write.partitionBy("dt_geo_cat_brand").csv(baseOutputBucketPath)

Additional as per comment #2:

To not use partitionBy while saving as separate files, you could do as follows (the partitioBy appraoch is recommended). First, find all distinct values in dt_geo_cat_brand:

val vals = df.select("dt_geo_cat_brand").distinct().as[String].collect()

For each of the values, filter the dataframe and save it (use the exploded df2 dataframe here as addition #1):

vals.foreach { v =>
  df2.filter($"dt_geo_cat_brand" === v)
    .select("userId")
    .write
    .csv(s"$baseOutputBucketPath=$v/")})
}

Alternativly, do not use the exploded dataframe but split on "|" if that udf is used:

vals.foreach { v =>
  df.filter($"dt_geo_cat_brand" === v)
    .select(split($"userId", "\\|").as("userId"))
    .write
    .csv(s"$baseOutputBucketPath=$v/")})
}
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • Hi @Shaido, Help! Udf generates - ABCD, 2323|4343434|644646|5454545|4756456 EFGH, 456464564|432444|4244554|525454 I want to store the id's based on first col (ABCD/EFGH) - ABCD partition should have a csv file with ids separated by \n - 222323 323233 323232 . I tried - import sparkSession.implicits._ dataFrame.collect.foreach(t => { val dt_geo_cat_brand= t.dt_geo_cat_brand val mbid = t.mbid.split("\\|").toList.toDF("mbid") mbid.repartition(1).write.csv(s"$baseOutputBucketPath=$dt_geo_cat_brand/")}) It fails due to memory issues? How to do parallely? – CodeReaper Sep 06 '18 at 00:50
  • Hi @CodeReaper, since it was a bit too long to answer in a comment I made an addition to the answer above. Hope it helps :) – Shaido Sep 06 '18 at 01:21
  • If I used partitionBy then will it lead to a lot of shuffling? The reason why originally I had the ID's concatenated for every row is so that I could avoid partitionBy based on first column. Originally I loaded the entire data, partitioned on first column, it was taking very long to write the partitions. So I used Cassandra to get me the DF where the id's are coming in a row. I want to avoid data shuffling. – CodeReaper Sep 06 '18 at 01:23
  • @CodeReaper: `partitionBy` should not result in any data shuffle as long as you save the data on a distributed filesystem (e.g. HDFS) that the nodes are part of (however, `repartition` will and `collect` will put all data on the driver node). This answer explain a bit more: https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby – Shaido Sep 06 '18 at 01:35
  • If we use explode, so techinically each row of data will get broken down into as many numeric values. So ABCD, 4445,7778,8899 will we broken into 3 rows? And then when we use partitionBy it will have to get values corresponding to each key from all the executors? Also I have already processed the data as per your last UDF and I am seeking for a solution after that stage to write the data out parallely. – CodeReaper Sep 06 '18 at 01:44
  • Will explode guarantee that for the partition key all numeric values will be present on same node so that partitionBy works fine? – CodeReaper Sep 06 '18 at 01:46
  • @CodeReaper: No. However, `partitionBy` does not work that way. When using `partitionBy` each node will write it's own separate file to HDFS with the relevant data depending on the partition by column's value. So you will get multiple csv files for each value. – Shaido Sep 06 '18 at 01:49
  • I did as you suggested and this is what it prints after that explode clause in logs - 18/09/06 04:35:04 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to x.x.x.x:40598 18/09/06 04:35:04 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 407 bytes It has been stuck there since 30 mins as I believe its trying to find the right numeric values after explode per partition. Is there a way we can parallelize the code snipped i posted in comments? That way we don't have to use partition by clause. – CodeReaper Sep 06 '18 at 04:38
  • @CodeReaper: If you really want to you could get all distinct `dt_geo_cat_brand` values and loop over them. In each loop filter the exploded dataframe to only keep a single value of `dt_geo_cat_brand` and save it. The code you used builds on using `collect` which should be avoided if possible (since it's not distributed/parallelizable). – Shaido Sep 06 '18 at 04:58
  • That dataframe already has distinct values of dt_geo_cat_brand. Can you share the snippet of how can I can look over that dataframe and save row one by one after explosion of the numeric column? – CodeReaper Sep 06 '18 at 05:10
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/179515/discussion-between-codereaper-and-shaido). – CodeReaper Sep 06 '18 at 06:32