0

I'm working with a fairly big dataframe (around 100 thousand rows, with the intent to reach 10 Mil) and it has the following structure:

+------+--------------------+--------+--------------------+-------------------+
|LineId|             Content| EventId|       EventTemplate|          Timestamp|
+------+--------------------+--------+--------------------+-------------------+
|     1|Receiving block b...|ef6f4915|Receiving block <...|2009-11-08 20:35:18|
|     2|BLOCK* NameSystem...|9bc09482|BLOCK* NameSystem...|2009-11-08 20:35:18|
|     3|Receiving block b...|9ca53bce|Receiving block <...|2009-11-08 20:35:19|
+------+--------------------+--------+--------------------+-------------------+

I'd like to add a label and I'm using the following function to do so:

from functools import reduce
label_condition = reduce(lambda a, b: a|b, (df['Content'].like('%'+pat+"%") for pat in blocks))

where blocks is a list containing the block (let's call it a token) defining wether or not a a row is anomalous. This functions checks if the Content field contains any value of the blocks list.
The size of this list is around 17k, which is what I think is causing the problem.

When I try to add this to the dataframe, or simply or evaluate this operation I get the following error:

Py4JJavaError: An error occurred while calling o50581.toString.
: java.lang.StackOverflowError
    at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
    at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
...

Looking online I saw that it might be a problem with the execution of an overly complex plan from Spark and/or to use checkpoint to avoid this sort of thing, but I'm not sure how to go about it. I tried adding a checkpoint before evaluating this, and I also tried using a select to reduce the df to just the 'Content' column, but to no avail.

I found this solution in Scala to optimize the reduce function, but I don't know how to translate it for python.

Is there a way to optimize this or make it at least go step by step or iteratively to avoid the stack overflow?

Thanks in advance.

  • Can you check what is going on if you just test on smaller scale, so let's try with blocks having only two elements for example? – partlov Dec 30 '22 at 13:41
  • 1
    In a small scale it works perfectly. If you want an example you can check out the answer from [my question](https://stackoverflow.com/a/74959993/13762577) of a couple hours ago in which the function is explained. – Voxeldoodle Dec 30 '22 at 13:43
  • how many elements are there in `blocks`? the error occurs when there's a large number of recursion which in your case are the OR conditions. – samkart Dec 30 '22 at 13:46
  • There are exactly 16838 elements in the `blocks` list – Voxeldoodle Dec 30 '22 at 13:47
  • try using the `rlike` method – samkart Dec 30 '22 at 13:53
  • Just tried. Same error unfortunately. @samkart – Voxeldoodle Dec 30 '22 at 13:55
  • 1
    `withColumn('label', func.col('log').rlike('|'.join(anomalous_blocks)))` -- tried this? `rlike` accepts regex. – samkart Dec 30 '22 at 13:56
  • 1
    this `withColumn('label', func.col('log').rlike('|'.join(anomalous_blocks)))` worked. Thanks!!!! If you write it as an answer I'll consider it solved @samkart – Voxeldoodle Dec 30 '22 at 13:59

1 Answers1

2

you could try using rlike method which accepts regex - pass the regex pattern as 'element1|element2|...'.

data_sdf. \
    withColumn('label', func.col('log').rlike('|'.join(anomalous_blocks))). \
    show()

# +---+---------------+-----+
# | id|            log|label|
# +---+---------------+-----+
# |  1|Test logX blk_A| true|
# |  2|Test logV blk_B|false|
# |  3|Test logF blk_D| true|
# |  4|Test logD blk_F|false|
# |  5|Test logB blk_K|false|
# |  6|Test logY blk_A| true|
# |  7|Test logE blk_C| true|
# +---+---------------+-----+
samkart
  • 6,007
  • 2
  • 14
  • 29