1

Suppose a dataframe with two columns: C1 and C2

+---+-----+
|C1 | C2  |
+---+-----+
|A  |  B  |
|C  |  D  |
|A  |  E  |
|E  |  F  |
+---+-----+

My goal is: collect into in array intersections

+--------------+
| intersections|
+--------------+
|[A, B, E, F]  |
|[C, D]        |
+--------------+

How it can be done good if the dataframe has the large number of rows (~ 1 billion)

Code_VM
  • 23
  • 1
  • 4
  • 1
    this problem is really best solved using a network graph approach. Load the data into a graph where the distinct values from both columns are the nodes and the pairs between columns are edges. Then first test if your graph is completely connected - this would mean that every value intersects with every other value, in that case you don't have to proceed. If the graph is not fully connected, then calculate clusters (communities) and the nodes in each cluster will represent your intersections – Ranvir Mohanlal Sep 16 '21 at 20:06
  • 3
    Please check [this question](https://stackoverflow.com/q/62450917/2129801). You could use a similar approach – werner Sep 16 '21 at 20:08

1 Answers1

0

Solution is GraphFrame library (https://graphframes.github.io/graphframes/docs/_site/index.html)

DISCLAIMER: tested with Spark 2.4.4 and GraphFrame 0.7.0

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

import org.apache.spark.storage.StorageLevel

import scala.collection._

import org.graphframes.GraphFrame

object SparkApp extends App {

val appName = "appName"
val master = "local[*]"
  
val spark = SparkSession
  .builder
  .appName(appName)
  .master(master)
  .getOrCreate
 
import spark.implicits._

val dataTest =
      Seq(
        ("A", "B"),
        ("C", "D"),
        ("A", "E"),
        ("E", "F")
      ).toDF("C1", "C2")

// it's mandatory for GraphFrame
spark.sparkContext.setCheckpointDir("/some/path/hdfs/test_checkpoints")

// dataframe to list of vertices and connections list
val graphTest: GraphFrame = 
GraphFrame(
    dataTest.select('C1 as "id").union(dataTest.select('C2 as "id")).distinct, 
    dataTest.withColumnRenamed("C1", "src").withColumnRenamed("C2","dst")
    )

val graphComponentsTest = graphTest.connectedComponents.run()

val clustersResultTestDF = 
graphComponentsTest
  .groupBy("component")
  .agg(collect_list("id") as "intersections")


clustersResultTestDF.show
}

output is

+--------------+
| intersections|
+--------------+
|[A, B, E, F]  |
|[C, D]        |
+--------------+
Code_VM
  • 23
  • 1
  • 4