0

process array column using udf and return another array

Below is my input:

docID Shingles D1 [23, 25, 39,59] D2 [34, 45, 65]

I want to generate a new column called hashes by processing shingles array column: For example, I want to extract min and max (this is just example toshow that I want a fixed length array column, I don’t actually want to find min or max)

docID Shingles Hashes D1 [23, 25, 39,59] [23,59] D2 [34, 45, 65] [34,65]

I created a udf as below:

def generate_minhash_signatures(shingles, coeffA, coeffB):
    signature = []
    minHashCode = nextPrime + 1
    maxHashCode = 0
    for shingleID in shingles:
        if shingleID < minHashCode:
            minHashCode = shingleID
        if shingleID > maxHashCode:
            maxHashCode = shingleID
    return [minHashCode, maxHashCode]

minhash_udf = udf(generate_minhash_signatures, ArrayType(IntegerType()))
df_with_minhash = df.withColumn('min_max_hash', minhash_udf("shingles", coeffA, coeffB))
df_with_minhash.show()

But it gives following error:

TypeError: Invalid argument, not a string or column: [2856022824, 2966132496, 947839218, 1658426276, 1862779421, 3729685802, 1710806966, 2696513050, 3630333076, 2555745391] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Actual udf:

def generate_minhash_signatures(shingles, coeffA, coeffB, numHashes):
    signature = []
    for i in range(0, numHashes):
        minHashCode = nextPrime + 1
        for shingleID in shingles:
            hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

            if hashCode < minHashCode:
                minHashCode = hashCode

        signature.append(minHashCode)
    return signature
pooja kosala
  • 29
  • 3
  • 8

2 Answers2

4

Your udf expects all three parameters to be columns. It's likely coeffA and coeffB are not just numeric values which you need to convert to column objects using lit:

import pyspark.sql.functions as f
df.withColumn('min_max_hash', minhash_udf(f.col("shingles"), f.lit(coeffA), f.lit(coeffB)))

If coeffA and coeffB are lists, use f.array to create the literals as follows:

df.withColumn('min_max_hash', 
  minhash_udf(f.col("shingles"), 
  f.array(*map(f.lit, coeffA)),
  f.array(*map(f.lit, coeffB))
)

Or separate column arguments and non column arguments as follows:

def generate_minhash_signatures(coeffA, coeffB, numHashes)
    def generate_minhash_signatures_inner(shingles):
        signature = []
        for i in range(0, numHashes):
            minHashCode = nextPrime + 1
            for shingleID in shingles:
                hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

                if hashCode < minHashCode:
                    minHashCode = hashCode

            signature.append(minHashCode)
        return signature
    return f.udf(generate_minhash_signatures_inner, ArrayType(IntegerType()))

And then you can call the function as:

df.withColumn('min_max_hash', generate_minhash_signatures(coeffA, coeffB, numHashes)("shingles"))
Psidom
  • 209,562
  • 33
  • 339
  • 356
0

My problem is not exactly the same. but a similar one - I had to sent three array type columns as input and get an array type (of string types ) as output

I was returning a list and tried many other approaches but it did not succeed.

def func_req(oldlist , newlist , pve):
    deleted_stores = list(set(oldlist) - set(newlist))
    new_stores = list(set(newlist) - set(oldlist))
    old_map = dict(zip(list(oldlist), list(pvector)))
    for key in deleted_stores:
        old_map.pop(key)
    for key in newlist:
        if key not in old_map.keys():
            old_map[key] = 'PTest'
    pvec=list(old_map.values())
    return pvec

I called it as in this statement:

df_diff = df3.withColumn(
    'updatedp',
    func_req(f.col('oldlist'), f.col('presentlist'), f.col('pvec'))
)

It gave me an error:

AssertionError: col should be Column

Solution

Then, I came across this post and introduced a wrapper function -

func_req_wrapper = f.udf(func_req, ArrayType(StringType()))

and called it in:

df_diff = df3.withColumn(
    'updatedp', 
    func_req_wrapper('oldlist',  'presentlist', 'pvec')
)
Vincent Doba
  • 4,343
  • 3
  • 22
  • 42