Solution is GraphFrame library (https://graphframes.github.io/graphframes/docs/_site/index.html)
DISCLAIMER: tested with Spark 2.4.4 and GraphFrame 0.7.0
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import scala.collection._
import org.graphframes.GraphFrame
object SparkApp extends App {
val appName = "appName"
val master = "local[*]"
val spark = SparkSession
.builder
.appName(appName)
.master(master)
.getOrCreate
import spark.implicits._
val dataTest =
Seq(
("A", "B"),
("C", "D"),
("A", "E"),
("E", "F")
).toDF("C1", "C2")
// it's mandatory for GraphFrame
spark.sparkContext.setCheckpointDir("/some/path/hdfs/test_checkpoints")
// dataframe to list of vertices and connections list
val graphTest: GraphFrame =
GraphFrame(
dataTest.select('C1 as "id").union(dataTest.select('C2 as "id")).distinct,
dataTest.withColumnRenamed("C1", "src").withColumnRenamed("C2","dst")
)
val graphComponentsTest = graphTest.connectedComponents.run()
val clustersResultTestDF =
graphComponentsTest
.groupBy("component")
.agg(collect_list("id") as "intersections")
clustersResultTestDF.show
}
output is
+--------------+
| intersections|
+--------------+
|[A, B, E, F] |
|[C, D] |
+--------------+