1

I have a pyspark dataframe like this:

+--------------------+--------------------+
|               label|           sentences|
+--------------------+--------------------+
|[things, we, eati...|<p>I am construct...|
|[elephants, nordi...|<p><strong>Edited...|
|[bee, cross-entro...|<p>I have a data ...|
|[milking, markers...|<p>There is an Ma...|
|[elephants, tease...|<p>I have Score d...|
|[references, gene...|<p>I'm looking fo...|
|[machines, exitin...|<p>I applied SVM ...|
+--------------------+--------------------+

And a top_ten list like this:

['bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation']

And I need to create a new_label column indicating 1.0 if at least one of the label values exists in the top_ten list (for each row, of course).

While the logic makes sense, my inexperience with the syntax is showing. Surely there's a short-ish answer to this problem?

I've tried:

temp = train_df.withColumn('label', F.when(lambda x: x.isin(top_ten), 1.0).otherwise(0.0))

and this:

def matching_top_ten(top_ten, labels):
    for label in labels:
        if label.isin(top_ten):
            return 1.0
        else:
            return 0.0

I found out after this last attempt that these functions can't be mapped to a dataframe. So I guess I could convert the column to an RDD, map it, and then .join() it back, but that sounds unnecessarily tedious.

**Update:**Tried the above function as a UDF with no luck as well...

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
matching_udf = udf(matching_top_ten, FloatType())
temp = train_df.select('label', matching_udf(top_ten, 'label').alias('new_labels'))
----
TypeError: Invalid argument, not a string or column: [...top_ten list values...] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

There are other similar questions I've found on SO, however, none of them involve the logic of verifying a list against another list (at best, a single value against a list).

alofgran
  • 427
  • 7
  • 18

3 Answers3

2

You don't need to use a udf and you can avoid the expense of explode + agg.

Spark Version 2.4+

You can use pyspark.sql.functions.arrays_overlap:

import pyspark.sql.functions as F

top_ten_array = F.array(*[F.lit(val) for val in top_ten])

temp = train_df.withColumn(
    'new_label', 
    F.when(F.arrays_overlap('label', top_ten_array), 1.0).otherwise(0.0)
)

Alternatively, you should be able to use pyspark.sql.functions.array_intersect().

temp = train_df.withColumn(
    'new_label', 
    F.when(
        F.size(F.array_intersect('label', top_ten_array)) > 0, 1.0
    ).otherwise(0.0)
)

Both of these check to see if the size of the intersection of label and the top_ten is non-zero.


For Spark 1.5 through 2.3, you can use array_contains in a loop over top_ten:

from operator import or_
from functools import reduce

temp = train_df.withColumn(
    'new_label',
    F.when(
        reduce(or_, [F.array_contains('label', val) for val in top_ten]),
        1.0
    ).otherwise(0.0)
)

You test to see if label contains any of the values in top_ten, and reduce the results with a bitwise-or. This will only return True if any of the values in top_ten are contained in label.

pault
  • 41,343
  • 15
  • 107
  • 149
  • I like these idea of the `array_intersect` and `arrays_overlap`...makes sense to me, however, I get this error: `AnalysisException: "cannot resolve '`bee`' given input columns: [label, sentences];` Maybe my problem is that the list containing 'bee' is a proper python list and not an array or dataframe column as the method expects... – alofgran Nov 19 '19 at 15:39
  • Oh sorry, you are going to have to use `lit` because `array` takes in a list of columns. I made an update. @alofgran **Edit**: I made a typo in the first edit but it should be fixed now. – pault Nov 19 '19 at 15:42
  • As far as I can tell, that did it @pault! You win the prize for avoiding UDFs and `.explode()`. – alofgran Nov 19 '19 at 15:47
0

You can create a new column for the top ten list as an array, split the sentence column into separate words in an array and then apply the udf in the following way:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

top_ten_list = ['bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation']
df.withColumn("top_ten_list", F.array([F.lit(x) for x in top_ten_list]))

def matching_top_ten(normal_string, top_ten_ls):
    if len(set(normal_string).intersection(set(top_ten_ls))) > 0:
        return 1
    return 0

matching_top_ten_udf = F.udf(matching_top_ten, IntegerType())

df = df.withColumn("label_flag", matching_top_ten_udf(F.col("label"), F.col("top_ten_list")))
df = df.withColumn("split_sentence", F.split("sentence", " ")).withColumn("label_flag", matching_top_ten_udf(F.col("split_sentence"), F.col("top_ten_list")))

You can skip the first step, since I see you already have the top_ten_list as label column

Sample output using the df I used (does not have the same schema as yours):

  customer  Month  year  spend        ls1                    sentence                      sentence_split  label
0        a     11  2018   -800  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
1        a     12  2018   -800  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
2        a      1  2019    300  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
3        a      2  2019    150  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
4        a      3  2019    300  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
5        a      4  2019   -500  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
6        a      5  2019   -800  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
7        a      6  2019    600  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
8        a      7  2019   -400  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
9        a      8  2019   -800  [new, me]  This is a new thing for me  [This, is, a, new, thing, for, me]      1
pissall
  • 7,109
  • 2
  • 25
  • 45
0

You can explode the label column and join the dataframe with a dataframe created from your list to avoid using UDFs that have low efficiency:

from pyspark.sql.functions import monotonicallyIncreasingId, explode, col

# creating id to group edxploded columns later
train_df = train_df.withColumn("id", monotonicallyIncreasingId())

# Exploding column
train_df = train_df.withColumn("label", explode("label"))

# Creation of dataframe with the top ten list
top_df = sqlContext.createDataFrame(
    [('bee', 'references', 'milking', 'expert', 'bombardier', 'borscht', 'distributions', 'wires', 'keyboard', 'correlation',)], ['top']
)

# Join to keep elements
train_df = train_df.join(top_df, col("label") == col("top"), "left")

# Replace nulls with 0s or 1s
train_df = train_df.withColumn("top", when(col("top").isNull(),0).otherwise(1))

# Group results
train_df = train_df.groupby("id").agg(collect_list("label").alias("label"), first("sentences").alias("sentences"), sum("top").alias("new_label"))

# drop id and transform label column to be 1 or 0
train_df = train_df.withColumn("new_label", when(col("new_label")>0,1).otherwise(0))
train_df = train_df.drop("id")
Shadowtrooper
  • 1,372
  • 15
  • 28
  • 1
    This solution using `explode` and `join` is computationally very expensive and blows up the in-memory size of the dataframe. Also, `label` column is not going to be the same as as the `top_ten_list`. You want to check if any of the words in `label` exist in the `top_ten_list`. – pissall Nov 19 '19 at 09:25
  • 1
    Maybe, but it's a different apporach to avoid using UDFs that are not recommended. – Shadowtrooper Nov 19 '19 at 09:26
  • 1
    @alofgran Please try a count of rows before an after the explode, also hope you don't run OOM – pissall Nov 19 '19 at 09:32
  • @pissall Understandable - thanks for the tip. And to clarify, I just want to check if **any** of the words in the label row exist in the top_ten list, not all. Your point still stands though. Thx – alofgran Nov 19 '19 at 15:36