0

I have a table with 3 columns: a date, an id, and a key. I'm hoping to find an efficient way to sum the pair-wise instances of keys within an id, then combine with totals across other ids. Basically building a list of temporal transitions. For example:

Input:

╔══════════╦════╦═════╗
║   Date   ║ ID ║ Key ║
╠══════════╬════╬═════╣
║ 1/1/2018 ║ A  ║ XY  ║
║ 1/2/2018 ║ A  ║ GT  ║
║ 1/6/2018 ║ A  ║ WE  ║
║ 1/9/2018 ║ A  ║ PO  ║
║ 1/2/2018 ║ B  ║ XY  ║
║ 1/4/2018 ║ B  ║ GT  ║
╚══════════╩════╩═════╝

Output:

╔══════════╦═══════════╦═══════╗
║ FirstKey ║ SecondKey ║ Count ║
╠══════════╬═══════════╬═══════╣
║    XY    ║    GT     ║   2   ║
║    GT    ║    WE     ║   1   ║
║    WE    ║    PO     ║   1   ║
╚══════════╩═══════════╩═══════╝

It'd be trivially simple to order by ID, then Date then just loop through and building the counts, but I was hoping one of you gurus might be able to help me structure it to be more parallelized/efficient.

Basically, since ordered by date, I am trying to capture the number of transitions in time between keys. So for ID=A, we have XY, then we have GT (so increment XY->GT by 1). Then we have GT followed by WE (so increment GT->PO by 1).

Working on spark with scala/python.

Fincher
  • 100
  • 10
  • 1
    why do you only need 3 of the 6 possible combinations for id=A? – Vamsi Prabhala Mar 13 '19 at 18:17
  • @VamsiPrabhala I'm not sure I understand? I probably could have phrased better. Basically, since ordered by date, I am trying to capture the number of transitions in time between keys. So for ID=A, we have XY, then we have GT (so increment XY->GT by 1). Then we have GT followed by WE (so increment GT->PO by 1). Does that clarify? – Fincher Mar 13 '19 at 18:34

3 Answers3

2

Here's a solution in Scala using lag(Key, 1) to pair up previous/current keys for the key-pair count:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = Seq(
  ("1/1/2018", "A", "XY"),
  ("1/2/2018", "A", "GT"),
  ("1/6/2018", "A", "WE"),
  ("1/9/2018", "A", "PO"),
  ("1/2/2018", "B", "XY"),
  ("1/4/2018", "B", "GT")
).toDF("Date", "ID", "Key")

val win = Window.partitionBy("ID").orderBy("Date", "Key")

df.
  withColumn("Date", to_date($"Date", "M/d/yyyy")).
  withColumn("FirstKey", lag($"Key", 1).over(win)).
  groupBy($"FirstKey", $"Key".as("SecondKey")).agg(count("*").as("Count")).
  where($"firstKey".isNotNull).
  show
// +--------+---------+-----+
// |FirstKey|SecondKey|Count|
// +--------+---------+-----+
// |      WE|       PO|    1|
// |      GT|       WE|    1|
// |      XY|       GT|    2|
// +--------+---------+-----+

Note that the to_date conversion is for ensuring proper chronological ordering.

Leo C
  • 22,006
  • 3
  • 26
  • 39
0

Here is a potential solution requiring just 3 lines:

import pandas as pd

df = pd.DataFrame({'Date': ['1/1/2018', '1/2/2018', '1/6/2018', '1/9/2018', '1/2/2018', '1/4/2018'], 'ID': ['A', 'A', 'A', 'A', 'B', 'B'], 'Key': ['XY', 'GT', 'WE', 'PO', 'XY', 'GT']})
print(df)


       Date ID Key
0  1/1/2018  A  XY
1  1/2/2018  A  GT
2  1/6/2018  A  WE
3  1/9/2018  A  PO
4  1/2/2018  B  XY
5  1/4/2018  B  GT
df['key_lag'] = df.Key.shift(-1)
df['key_pairs'] = df.Key + ' ' + df.key_lag
print(df.groupby('key_pairs').size())


key_pairs
GT WE    1
PO XY    1
WE PO    1
XY GT    2
dtype: int64
Nathaniel
  • 3,230
  • 11
  • 18
  • That loop appears to be exactly what I was thinking as the "straight-forward" solution. Unfortunately, I think would not really be parallel in any meaningful sense, no? – Fincher Mar 13 '19 at 18:36
  • You're right. However, I just thought of a way to do it. I'll edit my answer momentarily. – Nathaniel Mar 13 '19 at 18:50
0

You can add a new column that shows the next Key for each ID ordered by Date using pyspark.sql.functions.lead. Then group by the FirstKey and SecondKey and count:

from pyspark.sql import Window
from pyspark.sql.functions import col, lead

df.withColumn("SecondKey", lead("Key").over(Window.partitionBy("ID").orderBy("Date")))\
    .where(col("SecondKey").isNotNull())\
    .groupBy(col("Key").alias("FirstKey"), "SecondKey")\
    .count()\
    .show()
#+--------+---------+-----+
#|FirstKey|SecondKey|count|
#+--------+---------+-----+
#|      WE|       PO|    1|
#|      GT|       WE|    1|
#|      XY|       GT|    2|
#+--------+---------+-----+

This assumes that the Date column is a DateType so that it can be ordered appropriately. If it is a string, you will have to convert it to a date or the ordering will happen lexicographically.

pault
  • 41,343
  • 15
  • 107
  • 149