0

I have dataframe 'regexDf' like below

id,regex
1,(.*)text1(.*)text2(.*)text3(.*)text4(.*)|(.*)text2(.*)text5(.*)text6(.*)
2,(.*)text1(.*)text5(.*)text6(.*)|(.*)text2(.*)

If the length of the regex exceeds some max length for example 50, then i want to remove the last text token in splitted regex string separated by '|' for the exceeded id. In the above data frame, id 1 length is more than 50 so that last tokens 'text4(.)' and 'text6(.)' from each splitted regex string should be removed. Even after removing that also length of the regex string in id 1 still more than 50, so that again last tokens 'text3(.)' and 'text5(.)' should be removed.so the final dataframe will be

id,regex
1,(.*)text1(.*)text2(.*)|(.*)text2(.*)
2,(.*)text1(.*)text5(.*)text6(.*)|(.*)text2(.*)

I am able to trim the last tokens using the following code

  val reducedStr = regex.split("|").foldLeft(List[String]()) {
    (regexStr,eachRegex) => {
      regexStr :+ eachRegex.replaceAll("\\(\\.\\*\\)\\w+\\(\\.\\*\\)$", "\\(\\.\\*\\)")
    }
  }.mkString("|")

I tried using while loop to check the length and trim the text tokens in iteration which is not working. Also i want to avoid using var and while loop. Is it possible to achieve without while loop.

         val optimizeRegexString = udf((regex: String) => {
              if(regex.length >= 50) {
                var len = regex.length;
                var resultStr: String = ""
                while(len >= maxLength) {
                  val reducedStr = regex.split("|").foldLeft(List[String]()) {
                    (regexStr,eachRegex) => {
                      regexStr :+ eachRegex
    .replaceAll("\\(\\.\\*\\)\\w+\\(\\.\\*\\)$", "\\(\\.\\*\\)")
                    }
                  }.mkString("|")
                  len = reducedStr.length
                  resultStr = reducedStr
                }
                resultStr
              } else {
                regex
              }
            })
            regexDf.withColumn("optimizedRegex", optimizeRegexString(col("regex"))) 

As per SathiyanS and Pasha suggestion, I changed the recursive method as function.

      def optimizeRegex(regexDf: DataFrame): DataFrame = {
        val shrinkString= (s: String) =>   {
          if (s.length > 50) {
            val extractedString: String = shrinkString(s.split("\\|")
.map(s => s.substring(0, s.lastIndexOf("text"))).mkString("|"))
            extractedString
          }
          else s
        }
        def shrinkUdf = udf((regex: String) => shrinkString(regex))
        regexDf.withColumn("regexString", shrinkUdf(col("regex")))
      }

Now i am getting exception as "recursive value shrinkString needs type"

    Error:(145, 39) recursive value shrinkString needs type
            val extractedString: String = shrinkString(s.split("\\|")
.map(s => s.substring(0, s.lastIndexOf("text"))).mkString("|"));
Mohan
  • 463
  • 3
  • 11
  • 24

3 Answers3

1

This is how I would do it.

First, a function for removing the last token from a regex:

def deleteLastToken(s: String): String =
  s.replaceFirst("""[^)]+\(\.\*\)$""", "")

Then, a function that shortens the entire regex string by deleting the last token from all the |-separated fields:

def shorten(r: String) = {
  val items = r.split("[|]").toSeq
  val shortenedItems = items.map(deleteLastToken)
  shortenedItems.mkString("|")
}

Then, for a given input regex string, create the stream of all the shortened strings you get by applying the shorten function repeatedly. This is an infinite stream, but it's lazily evaluated, so only as few elements as required will be actually computed:

val regex = "(.*)text1(.*)text2(.*)text3(.*)text4(.*)|(.*)text2(.*)text5(.*)text6(.*)"

val allShortened = Stream.iterate(regex)(shorten)

Finally, you can treat allShortened as any other sequence. For solving our problem, you can drop all elements while they don't satisfy the length requirement, and then keep only the first one of the remaining ones:

val result = allShortened.dropWhile(_.length > 50).head

You can see all the intermediate values by printing some elements of allShortened:

allShortened.take(10).foreach(println)

// Prints:
// (.*)text1(.*)text2(.*)text3(.*)text4(.*)|(.*)text2(.*)text5(.*)text6(.*)
// (.*)text1(.*)text2(.*)text3(.*)|(.*)text2(.*)text5(.*)
// (.*)text1(.*)text2(.*)|(.*)text2(.*)
// (.*)text1(.*)|(.*)
// (.*)|(.*)
// (.*)|(.*)
// (.*)|(.*)
// (.*)|(.*)
// (.*)|(.*)
// (.*)|(.*)
Roberto Bonvallet
  • 31,943
  • 5
  • 40
  • 57
1

Recursion:

def shrink(s: String): String = {
if (s.length > 50)
  shrink(s.split("\\|").map(s => s.substring(0, s.lastIndexOf("text"))).mkString("|"))
else s
}

Looks like issues with function calling, some additional info. Can be called as static function:

object ShrinkContainer  {
  def shrink(s: String): String = {
    if (s.length > 50)
      shrink(s.split("\\|").map(s => s.substring(0, s.lastIndexOf("text"))).mkString("|"))
    else s
  }
}

Link with dataframe:

def shrinkUdf = udf((regex: String) => ShrinkContainer.shrink(regex))
df.withColumn("regex", shrinkUdf(col("regex"))).show(truncate = false)

Drawbacks: Just basic example (approach) provided. Some edge cases (if regexp does not contains "text", if too many parts separated by "|", for ex. 100; etc.) have to be resolved by author of question, for avoid infinite recursion loop.

pasha701
  • 6,831
  • 1
  • 15
  • 22
  • This worked. but i am not sure why making the function as object is giving me "recursive value needs type" error – Mohan Sep 03 '18 at 09:11
0

Just to add to @pasha701 answer. Here is the solution that works in spark.

val df = sc.parallelize(Seq((1,"(.*)text1(.*)text2(.*)text3(.*)text4(.*)|(.*)text2(.*)text5(.*)text6(.*)"),(2,"(.*)text1(.*)text5(.*)text6(.*)|(.*)text2(.*)"))).toDF("ID", "regex")

df.show()
//prints
+---+------------------------------------------------------------------------+
|ID |regex                                                                   |
+---+------------------------------------------------------------------------+
|1  |(.*)text1(.*)text2(.*)text3(.*)text4(.*)|(.*)text2(.*)text5(.*)text6(.*)|
|2  |(.*)text1(.*)text5(.*)text6(.*)|(.*)text2(.*)                           |
+---+------------------------------------------------------------------------+

Now you can use the @pasha701 shrink function using udf

val shrink: String => String = (s: String) => if (s.length > 50) shrink(s.split("\\|").map(s => s.substring(0,s.lastIndexOf("text"))).mkString("|")) else s

def shrinkUdf = udf((regex: String) => shrink(regex))

df.withColumn("regex", shrinkUdf(col("regex"))).show(truncate = false)

//prints
+---+---------------------------------------------+
|ID |regex                                        |
+---+---------------------------------------------+
|1  |(.*)text1(.*)text2(.*)|(.*)text2(.*)         |
|2  |(.*)text1(.*)text5(.*)text6(.*)|(.*)text2(.*)|
+---+---------------------------------------------+
Sharath
  • 41
  • 1
  • 8
  • I am getting "Task not serializable" error with the above logic. – Mohan Aug 31 '18 at 08:09
  • Its working for me though. Are you running spark in local mode or cluster mode? – Sharath Aug 31 '18 at 09:30
  • Spark is running in cluster mode – Mohan Aug 31 '18 at 10:26
  • 1
    @Mohan methods are not serializable.. try this https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou – Sathiyan S Aug 31 '18 at 12:01
  • After reading @SathiyanS post, I understood that in this case once should use functions and not methods. So I have edited the answer from using method to function. Let me know if you still face the issue. – Sharath Aug 31 '18 at 20:36