1

I have two tables with same column names, same data, same number of rows but ordering of rows might differ. Now I select column A from table_1 and column A from table_2 and compare the values. How can i achieve this using PySpark SQL can I do sha2/md5 checksum and compare?

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row
import pyspark.sql.functions as f

app_name="test"
table1="DB1.department"
table2="DB2.department"
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

query1="select * from %s" %(table1)
df1 = sqlContext.sql(query1)
query2="select * from %s" %(table2)
df2 = sqlContext.sql(query2)
df3=sqlContext.sql(SELECT  DB1.departmentid FROM  DB1.department a FULL JOIN 
DB2.department b ON a.departmentid = b.departmentid WHERE   a.departmentid 
IS NULL OR b.departmentid IS NULL)
df5=sqlContext.sql("select md5(departmentid) from department1")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 
813, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve 'md5(departmentid)' 
due to data type mismatch: argument 1 requires binary type, however, 

'departmentid' is of bigint type.; line 1 pos 11"

when tried with md5 checksum it is saying it expects binarytype but department id is bigint

Table1:

departmentid  departmentname   departmentaddress
 1            A           Newyork
 2            B           Newjersey
 3            C           SanJose
 4            D           WashingtonDC
 5            E           Mexico
 6            F           Delhi
 7            G           Pune
 8            H           chennai 

Table2:

departmentid   departmentname   departmentaddress
 7            G         Pune
 8            H         chennai
 1            A         Newyork
 2            B         Newjersey
 3            C         SanJose
 4            D         WashingtonDC
 5            E         Mexico
 6            F         Delhi

Here in table two order of rows has just changed but still data remained so, now technically these two tables are identical. Until and unless new row gets added or values modified the two tables are identical (Tables are taken for example and explanation, in real we deal with Bigdata)

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
Nandu
  • 237
  • 1
  • 2
  • 12

1 Answers1

1

The simplest solution is:

def is_identical(x, y):
    return (x.count() == y.count()) and (x.subtract(y).count() == 0)

Example data:

df1 = spark.createDataFrame(
    [(1, "A", "Newyork"), (2, "B", "Newjersey"),
    (3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi"),
    (7, "G", "Pune"), (8, "H", "chennai")],
    ("departmentid", "departmentname", "departmentadd"))

df2 = spark.createDataFrame(
    [(7, "G", "Pune"), (8, "H", "chennai"), (1, "A", "Newyork"), (2, "B", "Newjersey"),
    (3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi")],
    ("departmentid", "departmentname", "departmentadd"))

df3 = spark.createDataFrame(
    [(1, "A", "New York"), (2, "B", "New Jersey"),
    (3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi"),
    (7, "G", "Pune"), (8, "H", "chennai")],
    ("departmentid", "departmentname", "departmentadd"))

df4 = spark.createDataFrame(
    [(3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi")],
    ("departmentid", "departmentname", "departmentadd"))

Checks:

is_identical(df1, df2)
# True
is_identical(df1, df3)
# False
is_identical(df1, df4)
# False
is_identical(df4, df4)
# True

With outer join

from pyspark.sql.functions import col, coalesce, lit

from functools import reduce
from operator import and_

def is_identical_(x, y, keys=("departmentid", )):
    def both_null(c):
        return (col("x.{}".format(c)).isNull() & 
                col("y.{}".format(c)).isNull())
    def both_equal(c):
        return coalesce((col("x.{}".format(c)) == 
                col("y.{}".format(c))), lit(False))



    p = reduce(and_, [both_null(c) | both_equal(c) for c in x.columns if c not in keys])

    return (x.alias("x").join(y.alias("y"), list(keys), "full_outer")
            .where(~p).count() == 0)

you'd get the same result:

is_identical_(df1, df2)
# True
is_identical_(df1, df3)
# False
is_identical_(df1, df4)
# False
is_identical_(df4, df4)
# True

md5 is not use for you, because it is not an aggregation function. It computes checksum for a specific value.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115