Ok, here you are
val jdata = """[
{"id": 1, "group": "finance", "person": "scott", "item": "fan", "count": 100},
{"id": 2, "group": "hr", "person": "taylor", "item": "light", "count": 200},
{"id": 3, "group": "finance", "person": "troy", "item": "table", "count": 300},
{"id": 4, "group": "legal", "person": "willian", "item": "chair", "count": 400}
]"""
val data = spark.read.json(Seq(jdata).toDS)
data.registerTempTable("data")
val jtrans = """[
{"field": "person", "source": "scott", "targets": "taylor,willian"},
{"field": "item", "source": "light", "targets": "table,chair"},
{"field": "group", "source": "hr", "targets": "legal"}
]"""
val trans = spark.read.json(Seq(jtrans).toDS)
trans.registerTempTable("trans")
Let's check
scala> spark.sql("SELECT * FROM data").show
+-----+-------+---+-----+-------+
|count| group| id| item| person|
+-----+-------+---+-----+-------+
| 100|finance| 1| fan| scott|
| 200| hr| 2|light| taylor|
| 300|finance| 3|table| troy|
| 400| legal| 4|chair|willian|
+-----+-------+---+-----+-------+
scala> spark.sql("SELECT * FROM trans").show
+------+------+--------------+
| field|source| targets|
+------+------+--------------+
|person| scott|taylor,willian|
| item| light| table,chair|
| group| hr| legal|
+------+------+--------------+
And the SQL
spark.sql("""
WITH
datac AS (
SELECT CONCAT('item_', item) itemc, CONCAT('person_', person) personc, count
FROM data
),
trans_person AS (
SELECT count, personc, itemc, source as source_person, targets as targets_person
FROM datac LEFT JOIN trans ON CONCAT(field, '_', source) = personc
),
trans_item AS (
SELECT count, personc, itemc, source_person, targets_person, source as source_item, targets as targets_item
FROM trans_person LEFT JOIN trans t2 ON CONCAT(t2.field, '_', t2.source) = itemc
),
trans_concat AS (
SELECT CASE WHEN source_person IS NOT NULL THEN 'person' WHEN source_item IS NOT NULL THEN 'item' END AS field,
CONCAT(COALESCE(source_person, ''), COALESCE(source_item, '')) as source,
CONCAT(COALESCE(targets_person, ''), COALESCE(targets_item, '')) AS targets,
count
FROM trans_item
),
trans_source AS (
SELECT field, source, count as cnt
FROM trans_concat
WHERE field IS NOT NULL
),
trans_target AS (
SELECT field, EXPLODE(SPLIT(targets, ',')) as target, count / SIZE(SPLIT(targets, ',')) as cnt
FROM trans_concat
WHERE field IS NOT NULL
)
SELECT count + COALESCE(t1.cnt, 0) + COALESCE(t2.cnt, 0) - COALESCE(t3.cnt, 0) - COALESCE(t4.cnt, 0) AS count,
group, id, item, person
FROM data
LEFT JOIN trans_target t1 ON CONCAT('person_', person) = CONCAT(t1.field, '_', t1.target)
LEFT JOIN trans_target t2 ON CONCAT('item_', item) = CONCAT(t2.field, '_', t2.target)
LEFT JOIN trans_source t3 ON CONCAT('person_', person) = CONCAT(t3.field, '_', t3.source)
LEFT JOIN trans_source t4 ON CONCAT('item_', item) = CONCAT(t4.field, '_', t4.source)
""").show()
The result
+-----+-------+---+-----+-------+
|count| group| id| item| person|
+-----+-------+---+-----+-------+
| 50.0| hr| 2|light| taylor|
|550.0| legal| 4|chair|willian|
|400.0|finance| 3|table| troy|
| 0.0|finance| 1| fan| scott|
+-----+-------+---+-----+-------+
For simplicity I did it only for 'item' and 'person', but you can easily extend it for 'id' and 'group'.
Also if you want to apply only the first rule you have to use window function which I also omitted for simplicity.
The answer is tested in spark Scala shell, but the main code is in plain SQL and it should work in pyspark.