-1

I am stuck on a problem where I have to produce a resulting dataset by joining a source dataset against a bunch of transformation rules. Now for a given entity, a number of rules can match but only the first rule that matches needs be selected for this match. Here's an example of my source dataset

Id,Group,Person,Item,Count
1000000,Finance,Scott,Fan,100
1000000,HR,Taylor,Light,200
1000000,Finance,Troy,Table,100
1000000,Legal,Willian,Chair,100

The other data set to join has a bunch of transformations that look like the following,

Field,Source,Targets
Person,Scott,[Taylor, William]
Item,Light,[Table, Chair]
GroupHR,Finance;[Legal]

The following is the processing order for which I'm stuck in building the SQL Join query

Row1 -> Matches tansform1, Scott -100, Taylor +50, William +50
Row2 -> Matches tansform2,tansform3. Pick tansform2 only, Light -200, Table +100, Chair +100
Row3 -> Doesn't match any policy.
Row4 -> Doesn't match any policy.

Any ideas on how this can be achieved in Spark SQL ? As a C# primary developer I could do this with a foreach on the row but is that the ideal way to do this computation ?

Hedrack
  • 694
  • 1
  • 6
  • 19
kernelman
  • 41
  • 2
  • 15
  • Why doesn't `row4` match `transform3`? Did you make a mistake in writing it and it should be `Group,HR,[Finance, Legal]` ? – Hedrack Feb 18 '20 at 21:59
  • 1
    What is the expected output? What do you mean by "*only the first rule that matches needs be selected for this match*"? Selected or applied? What is the size of the transformations DF? The question is quite unclear – blackbishop Feb 19 '20 at 18:03
  • 1
    your question is not clear, try to follow the instructions provided [here](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples) – abiratsis Feb 21 '20 at 09:11
  • Can you add an example in LINQ (as you are used to C#) of what you are trying to do? – Danny Varod Feb 23 '20 at 16:20

1 Answers1

1

Ok, here you are

val jdata = """[
{"id": 1, "group": "finance", "person": "scott", "item": "fan", "count": 100},
{"id": 2, "group": "hr", "person": "taylor", "item": "light", "count": 200},
{"id": 3, "group": "finance", "person": "troy", "item": "table", "count": 300},
{"id": 4, "group": "legal", "person": "willian", "item": "chair", "count": 400}
]"""

val data = spark.read.json(Seq(jdata).toDS)
data.registerTempTable("data")

val jtrans = """[
{"field": "person", "source": "scott", "targets": "taylor,willian"},
{"field": "item", "source": "light", "targets": "table,chair"},
{"field": "group", "source": "hr", "targets": "legal"}
]"""

val trans = spark.read.json(Seq(jtrans).toDS)
trans.registerTempTable("trans")

Let's check

scala> spark.sql("SELECT * FROM data").show
+-----+-------+---+-----+-------+
|count|  group| id| item| person|
+-----+-------+---+-----+-------+
|  100|finance|  1|  fan|  scott|
|  200|     hr|  2|light| taylor|
|  300|finance|  3|table|   troy|
|  400|  legal|  4|chair|willian|
+-----+-------+---+-----+-------+


scala> spark.sql("SELECT * FROM trans").show
+------+------+--------------+
| field|source|       targets|
+------+------+--------------+
|person| scott|taylor,willian|
|  item| light|   table,chair|
| group|    hr|         legal|
+------+------+--------------+

And the SQL

spark.sql("""
WITH
datac AS (
SELECT CONCAT('item_', item) itemc, CONCAT('person_', person) personc, count
FROM data
),
trans_person AS (
SELECT count, personc, itemc, source as source_person, targets as targets_person 
FROM datac LEFT JOIN trans ON CONCAT(field, '_', source) = personc
),
trans_item AS (
SELECT count, personc, itemc, source_person, targets_person, source as source_item, targets as targets_item 
FROM trans_person LEFT JOIN trans t2 ON CONCAT(t2.field, '_', t2.source) = itemc
),
trans_concat AS (
SELECT CASE WHEN source_person IS NOT NULL THEN 'person' WHEN source_item IS NOT NULL THEN 'item' END AS field,
CONCAT(COALESCE(source_person, ''), COALESCE(source_item, '')) as source,  
CONCAT(COALESCE(targets_person, ''), COALESCE(targets_item, '')) AS targets,
count
FROM trans_item
),
trans_source AS (
SELECT field, source, count as cnt
FROM trans_concat
WHERE field IS NOT NULL
),
trans_target AS (
SELECT field, EXPLODE(SPLIT(targets, ',')) as target, count / SIZE(SPLIT(targets, ',')) as cnt
FROM trans_concat
WHERE field IS NOT NULL
)
SELECT count + COALESCE(t1.cnt, 0) + COALESCE(t2.cnt, 0) - COALESCE(t3.cnt, 0) - COALESCE(t4.cnt, 0) AS count, 
group, id, item, person
FROM data 
LEFT JOIN trans_target t1 ON  CONCAT('person_', person) = CONCAT(t1.field, '_', t1.target)
LEFT JOIN trans_target t2 ON  CONCAT('item_', item) = CONCAT(t2.field, '_', t2.target)
LEFT JOIN trans_source t3 ON  CONCAT('person_', person) = CONCAT(t3.field, '_', t3.source)
LEFT JOIN trans_source t4 ON  CONCAT('item_', item) = CONCAT(t4.field, '_', t4.source)
""").show()

The result

+-----+-------+---+-----+-------+
|count|  group| id| item| person|
+-----+-------+---+-----+-------+
| 50.0|     hr|  2|light| taylor|
|550.0|  legal|  4|chair|willian|
|400.0|finance|  3|table|   troy|
|  0.0|finance|  1|  fan|  scott|
+-----+-------+---+-----+-------+

For simplicity I did it only for 'item' and 'person', but you can easily extend it for 'id' and 'group'.

Also if you want to apply only the first rule you have to use window function which I also omitted for simplicity.

The answer is tested in spark Scala shell, but the main code is in plain SQL and it should work in pyspark.

Oleg Pavliv
  • 20,462
  • 7
  • 59
  • 75