12

I would like to do some DBSCAN on Spark. I have currently found 2 implementations:

I have tested the first one with the sbt configuration given in its github but:

  • functions in the jar are not the same as those in the doc or in the source on github. For example, I cannot find the train function in the jar

  • I manage to run a test with the fit function (found in the jar) but a bad configuration of epsilon (a little to big) put the code in an infinite loop.

code :

val model = DBSCAN.fit(eps, minPoints, values, parallelism)

Has someone managed to do someting with the first library?

Has someone tested the second one?

Benjamin
  • 3,350
  • 4
  • 24
  • 49
  • 2
    Use ELKI with Cover tree. It's much faster, despite being single-node only. I tried one of the Spark versions and it went out of memory, but ELKI still worked fine and was fasr. – Has QUIT--Anony-Mousse Mar 18 '16 at 22:22
  • Another incomplete implementation of DBSCAN on Spark is https://github.com/mraad/dbscan-spark – Randall Whitman Mar 28 '18 at 18:59
  • Can you put your code for running the first package here? I test the one-line code, it does not work for me. I use intelliJ but the package cannot be automatically downloaded using sbt. So I manually downloaded the jar file from here: http://dl.bintray.com/irvingc/maven/com/irvingc/spark/dbscan_2.10/0.1.0/ – Yuhan Sun May 30 '18 at 01:19

4 Answers4

11

Please try ELKI. As this is Java, it should be easy to call from Scala.

ELKI is very well optimized, and with indexes it will scale to quite large data sets.

We tried to include one of these Spark implementations in our benchmarking study - but it ran out of memory (and it was the only implementation to run out of memory ... the k-means of Spark and Mahout were also among the slowest):

Hans-Peter Kriegel, Erich Schubert, and Arthur Zimek.
The (black) art of runtime evaluation: Are we comparing algorithms or implementations?
In: Knowledge and Information Systems (KAIS). 2016, 1–38

Professor Neukirchen benchmarked parallel implementations of DBSCAN in this technical report:

Helmut Neukirchen
Survey and Performance Evaluation of DBSCAN Spatial Clustering Implementations for Big Data and High-Performance Computing Paradigms

apparently he got some of the Spark implementations working, but noted that:

The result is devastating: none of the implementations for Apache Spark is anywhere near to the HPC implementations. In particular on bigger (but still rather small) data sets, most of them fail completely and do not even deliver correct results.

and earlier:

When running any of the "Spark DBSCAN" implementations while making use of all available cores of our cluster, we experienced out-of-memory exceptions.

(also, "Spark DBSCAN" took 2406 seconds on 928 cores, ELKI took 997 seconds on 1 core for the smaller benchmark - the other Spark implementation didn't fare too well either, in particular it did not return the correct result...)

"DBSCAN on Spark" did not crash, but returned completely wrong clusters.

While "DBSCAN on Spark" finishes faster, it delivered completely wrong clustering results. Due to the hopelessly long run-times of the DBSCAN implementations for Spark already with the maximum number of cores, we did not perform measurements with a lower number of cores.

You can wrap a double[][] array as ELKI database:

// Adapter to load data from an existing array.
DatabaseConnection dbc = new ArrayAdapterDatabaseConnection(data);
// Create a database (which may contain multiple relations!)
Database db = new StaticArrayDatabase(dbc, null);
// Load the data into the database (do NOT forget to initialize...)
db.initialize();

// Squared Euclidean is faster than Euclidean.
Clustering<Model> c = new DBSCAN<NumberVector>(
  SquaredEuclideanDistanceFunction.STATIC, eps*eps, minpts).run(db);

for(Cluster<KMeansModel> clu : c.getAllClusters()) {
  // Process clusters
}

See also: Java API example (in particular, how to map DBIDs back to row indexes). For better performance, pass an index factory (such as new CoverTree.Factory(...)) as second parameter to the StaticArrayDatabase constructor.

Erich Schubert
  • 8,575
  • 2
  • 26
  • 42
  • 1
    For this example, could you give how exactly the new CoverTree.Factory looks like? – Paul Z Wu Jan 16 '18 at 18:07
  • @PaulZWu You can find a full example with index in the regular ELKI tutorial sources: https://github.com/elki-project/elki/blob/master/addons/tutorial/src/main/java/tutorial/javaapi/GeoIndexing.java – Erich Schubert Feb 15 '19 at 16:12
  • Thank you. I figured it out about a year ago. Your work is awesome -- we are using this package for an interesting project and its performance is impressive (with the indexing). – Paul Z Wu Feb 18 '19 at 05:46
  • I think this example is not valid anymore. The run method requires a `Relation` not a `Database` object... – Niko Feb 02 '23 at 14:29
4

I successfully use the second library (https://github.com/alitouka/spark_dbscan) in my project.Actually,I can't use it as follows:

libraryDependencies += "org.alitouka" % "spark_dbscan_2.10" % "0.0.4"

resolvers += "Aliaksei Litouka's repository" at "http://alitouka-public.s3-website-us-east-1.amazonaws.com/"

Instead, I download the code and update it to spark 2.2.1 version.Besides,some of the libraries should be added.Finally, add the code to my project, it works!

Yilan Zhang
  • 49
  • 1
  • 3
  • Some libraries should update,for example, change "import org.apache.spark.Logging" to "import org.apache.spark.internal.Logging" – Yilan Zhang Mar 15 '18 at 13:31
2

I tested https://github.com/irvingc/dbscan-on-spark and can say that it consumes a lot of memory. For 400K dataset with smooth distribution i used -Xmx12084m and even in this case it works too long (>20 min). In addition, it is only fo 2D. I used project with maven, not sbt.

I tested also second implementation. This is still the best that I found. Unfortunately, the author does not support it since 2015. It really took some time to raise the version of the Spark and resolve the version conflicts. I needed it to deploy on aws.

Valeriy K.
  • 2,616
  • 1
  • 30
  • 53
1

You can also consider using smile which provides an implementation of DBSCAN. You would have to use groupBy combined with either mapGroups or flatMapGroups in the most direct way and you would run dbscan there. Here's an example:

  import smile.clustering._

  val dataset: Array[Array[Double]] = Array(
    Array(100, 100),
    Array(101, 100),
    Array(100, 101),
    Array(100, 100),
    Array(101, 100),
    Array(100, 101),

    Array(0, 0),
    Array(1, 0),
    Array(1, 2),
    Array(1, 1)
  )

  val dbscanResult = dbscan(dataset, minPts = 3, radius = 5)
  println(dbscanResult)

  // output
  DBSCAN clusters of 10 data points:
  0     6 (60.0%)
  1     4 (40.0%)
  Noise     0 ( 0.0%)

You can also write a User Defined Aggregate Function (UDAF) if you need to eek out more performance.

I use this approach at work to do clustering of time-series data, so grouping using Spark's time window function and then being able to execute DBSCAN within each window allows us to parallelize the implementation.

I was inspired by the following article to do this

Cal
  • 734
  • 1
  • 9
  • 25