0

So let's say that I have a DataFrame that is in an event based order. Basically every time something happens, I get a new event that says someone changed location or job. Here is what an example input could look like:

+--------+----+----------------+---------------+
|event_id|name|             job|       location|
+--------+----+----------------+---------------+
|      10| Bob|         Manager|               |
|       9| Joe|                |             HQ|
|       8| Tim|                |New York Office|
|       7| Joe|                |New York Office|
|       6| Joe| Head Programmer|               |
|       5| Bob|                |      LA Office|
|       4| Tim|         Manager|             HQ|
|       3| Bob|                |New York Office|
|       2| Bob|DB Administrator|             HQ|
|       1| Joe|      Programmer|             HQ|
+--------+----+----------------+---------------+

In this example, 10 is the newest event and 1 is the oldest. Now I want get the newest information about each person. Here is what I would want the output to be:

+----+---------------+---------------+
|name|            job|       location|
+----+---------------+---------------+
| Bob|        Manager|      LA Office|
| Joe|Head Programmer|             HQ|
| Tim|        Manager|New York Office|
+----+---------------+---------------+

The current way that I do this reorganization is by collecting the data and then looping through the events, from newest to oldest in order to find the information about each person. The issue with this approach is that it is extremely slow for large DataFrame and it eventually won't all fit within the memory of one computer. What is the proper way to do this with spark?

zero323
  • 322,348
  • 103
  • 959
  • 935
Bren077s
  • 152
  • 1
  • 2
  • 8

1 Answers1

1

According to your question, I think this is what you want

 val spark =
    SparkSession.builder().master("local").appName("test").getOrCreate()

  import spark.implicits._

  val data = spark.sparkContext.parallelize(
    Seq(
      (10, "Bob", "Manager", ""),
      (9, "Joe", "", "HQ"),
      (8, "Tim", "", "New York Office"),
      (7, "Joe", "", "New York Office"),
      (6, "Joe", "Head Programmer", ""),
      (5, "Bob", "", "LA Office"),
      (4, "Tim", "Manager", "HQ"),
      (3, "Bob", "", "New York Office"),
      (2, "Bob", "DB Administrator", "HQ"),
      (1, "Joe", "Programmer", "HQ")
    )).toDF("event_id", "name", "job", "location")

  val latest = data.groupBy("name").agg(max(data("event_id")).alias("event_id"))

  latest.join(data, "event_id").drop("event_id").show

This is a scala code, Hope you can convert it in Python

koiralo
  • 22,594
  • 6
  • 51
  • 72
  • That does not solve the issue. That will get me the newest reference, but it does not get all of the updated fields. I want the newest job and location for each person, not the newest record. – Bren077s May 22 '17 at 15:31