4

I'm learning Spark and I came across problem that I'm unable to overcome.. What I would like to achieve is to get number of elements with the same value for 2 arrays on the same positions. I'm able to get what I want via Python UDF but I'm wondering if there is a way using only Spark functions.

df_bits = sqlContext.createDataFrame([[[0, 1, 1, 0, 0, ],
                                       [1, 1, 1, 0, 1, ],
                                     ]],['bits1', 'bits2'])
df_bits_with_result = df_bits.select('bits1', 'bits2', some_magic('bits1', 'bits2').show()


+--------------------+--------------------+---------------------------------+
|bits1                  |bits2                  |some_magic(bits1, bits2)|
+--------------------+--------------------+---------------------------------+
|[0, 1, 1, 0, 1, ]    |[1, 1, 1, 0, 0, ]   |3                                      |
+--------------------+--------------------+---------------------------------+

Why 3? bits1[1] == bits2[1] AND bits1[2] == bits2[2] AND bits1[3] == bits2[3]
I tried to play with rdd.reduce but with no luck.

Andy
  • 63
  • 7
  • 2
    What version of spark? For a generalized solution, you can use [`arrays_zip`](https://stackoverflow.com/questions/54282706/how-to-zip-two-array-columns-in-spark-sql) in 2.4+. Also please [format your code properly](https://meta.stackexchange.com/questions/22186/how-do-i-format-my-code-blocks). – pault Jul 29 '20 at 15:29
  • 2
    I will add that if your problem is simplified as you've shown in this case where a) the array contents are all non-null binary values (i.e. 1's and 0's) and b) the lengths of the arrays are the same, fixed, known length you can also do some hacky regex or a loop – pault Jul 29 '20 at 15:31
  • I forgot to mention which version of Spark I'm using but lucky for me I'm using 2.4+, so both current answers work for me. – Andy Jul 29 '20 at 22:43

2 Answers2

3

Perhaps this is helpful-

spark>=2.4

Use aggregate and zip_with

 val df = spark.sql("select array(0, 1, 1, 0, 0, null) as bits1, array(1, 1, 1, 0, 1, null) as bits2")
    df.show(false)
    df.printSchema()

    /**
      * +----------------+----------------+
      * |bits1           |bits2           |
      * +----------------+----------------+
      * |[0, 1, 1, 0, 0,]|[1, 1, 1, 0, 1,]|
      * +----------------+----------------+
      *
      * root
      * |-- bits1: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      * |-- bits2: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      */

    df.withColumn("x", expr("aggregate(zip_with(bits1, bits2, (x, y) -> if(x=y, 1, 0)), 0, (acc, x) -> acc + x)"))
      .show(false)

    /**
      * +----------------+----------------+---+
      * |bits1           |bits2           |x  |
      * +----------------+----------------+---+
      * |[0, 1, 1, 0, 0,]|[1, 1, 1, 0, 1,]|3  |
      * +----------------+----------------+---+
      */
Som
  • 6,193
  • 1
  • 11
  • 22
3

Pyspark use arrays_zip as mentioned in comments

from pyspark.sql import functions as F

df_bits.withColumn("sum", \
              F.expr("""aggregate(arrays_zip(bits1,bits2),0,(acc,x)-> IF(x.bits1==x.bits2,1,0)+acc)""")).show()

#+---------------+---------------+---+
#|          bits1|          bits2|sum|
#+---------------+---------------+---+
#|[0, 1, 1, 0, 0]|[1, 1, 1, 0, 1]|  3|
#+---------------+---------------+---+
murtihash
  • 8,030
  • 1
  • 14
  • 26