3

For impatient readers: this is a work in progress, where I ask for help, during the process. Please do not judge the tools by my temporary data, as they can change while I try to get better results.

We are in the middle of the decision process on the architecture for a tool to analyse the output from co-simulations.

As part of that process I was asked to write a benchmark tool, and get data on the speeds of several distributed processing frameworks.

The frameworks I tested are: Apache Spark, Apache Flink, Hazelcast Jet. And as a comparison baseline plain Java.

The test case I used was a simple "here is a list of Pojos, with one field in the pojo a double value. Find the smallest(min) value".

Simple, straightforward and hopefully highly comparable.

Three out of four tests use a simple comparator, the fourth (flink) uses a reducer that is basically identical to the comparators. The analysing functions look like this:

Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();

Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();

Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();

Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();

I tested this extensively, varying the size of the test list as well as the allocated ressources. And the results blew my mind. The BEST results can be seen below (all numbers in ms, 1 mio pojos, 10 tests each):

  • instances: how long it took to to declare and initiate the instance of the frameworks
  • list: how long it took to parse/transfer the List to the frameworks "list"
  • process: how long it took to process the data to retrieve the min
  • overall: from start to end of each test

Outcome:

java:
Instances: 
List: 
Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16, 
Overall: 111, 24, 16, 17, 16, 16, 16, 16, 16, 16, 

spark:
Instances: 2065, 89, 62, 69, 58, 49, 56, 47, 41, 52, 
List: 166, 5, 1, 1, 2, 1, 0, 0, 0, 0, 
Process: 2668, 2768, 1936, 2016, 1950, 1936, 2105, 2674, 1913, 1882, 
Overall: 4943, 2871, 2011, 2094, 2020, 1998, 2172, 2728, 1961, 1943, 

hazel:
Instances: 6347, 2891, 2817, 3106, 2636, 2936, 3018, 2969, 2622, 2799, 
List: 1984, 1656, 1470, 1505, 1524, 1429, 1512, 1445, 1394, 1427, 
Process: 4348, 3809, 3655, 3751, 3927, 3887, 3592, 3810, 3673, 3769, 
Overall: 12850, 8373, 7959, 8384, 8110, 8265, 8133, 8239, 7701, 8007

flink:
Instances: 45, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
List: 92, 35, 16, 13, 17, 15, 19, 11, 19, 24, 
Process: 34292, 20822, 20870, 19268, 17780, 17390, 17124, 19628, 17487, 18586, 
Overall: 34435, 20857, 20886, 19281, 17797, 17405, 17143, 19639, 17506, 18610, 

The most interesting parts are:

  • the best results ALL come from purely local tests (one instance)
  • any tests that made use of distributed mechanics (additional nodes and such), were up to an order of magnitude slower still (spark for example 2.5 slower if distributed).

Now don't get me wrong, it's basic logic that distributed processing has to be slower per core than mono-threaded processing.

But 2 orders of magnitude EVEN if used on a mono-thread? And 3 orders of magnitude if distributed? Can someone see the mistake I apparently made in all 3 distributed processes? I expected some factor < 10, so killing it with more hardware would be an option.

So is there some way to reduce the overhead of those frameworks to, hmm maybe x9 instead of x999?

I know I know, the test data I use is much to small, but even if scaling it up, I haven't seen any reduction in overhead vs. performance. And it's roughly the size of the batches of data we need to analyse (0.1M - 1M objects/s per simulation). So your help to find my error is welcome. :D

UPDATE Spark:

After some more thorough testing on Spark, I'm still not impressed. The setup was as follows:

java client on one machine in a 64 core, 480 GB RAM job master and 7 slaves on a separate rack, 32 cors, 20 GB each

    1 mio objects, 256 tasks, 64 cpus local[*]
    java:
      Instances: 
      List: 
      Process: 622, 448, 68, 45, 22, 32, 15, 27, 22, 29, 
    spark:
      Instances: 4865, 186, 160, 133, 121, 112, 106, 78, 121, 106, 
      List: 310, 2, 2, 1, 2, 4, 2, 1, 2, 1, 
      Process: 8190, 4433, 4200, 4073, 4201, 4092, 3822, 3852, 3921, 4051, 

    10 mio objects, 256 tasks, 64 cpus local[*]
    java:
      Instances: 
      List: 
      Process: 2329, 144, 50, 65, 75, 70, 69, 66, 66, 66, 
    spark:
      Instances: 20345, 
      List: 258, 2, 1, 1, 1, 4, 1, 1, 1, 1, 
      Process: 55671, 49629, 48612, 48090, 47897, 47857, 48319, 48274, 48199, 47516

    1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 1+1 Spark machines (different rack)
    java:
      Instances: 
      List: 
      Process: 748, 376, 70, 31, 69, 64, 46, 17, 50, 53, 
    spark:
      Instances: 4631, 
      List: 249, 1, 2, 2, 3, 3, 1, 1, 2, 1, 
      Process: 12273, 7471, 6314, 6083, 6228, 6158, 5990, 5953, 5981, 5972

    1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
    java:
      Instances: 
      List: 
      Process: 820, 494, 66, 29, 5, 30, 29, 43, 45, 21, 
    spark:
      Instances: 4513, 
      List: 254, 2, 2, 2, 2, 4, 2, 2, 1, 1, 
      Process: 17007, 6545, 7174, 7040, 6356, 6502, 6482, 6348, 7067, 6335

    10 mio objects, 52k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
    java Process: 3037, 78, 48, 45, 53, 73, 72, 73, 74, 64, 
    spark:
      Instances: 20181, 
      List: 264, 3, 2, 2, 1, 4, 2, 2, 1, 1, 
      Process: 77830, 67563, 65389, 63321, 61416, 63007, 64760, 63341, 63440, 65320

    1 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i =0 to 100
    java Process: 722, 631, 62, 26, 25, 42, 26, 11, 12, 29, 40, 16, 14, 23, 29, 18, 14, 11, 71, 76, 37, 52, 32, 15, 51, 54, 19, 74, 62, 54, 7, 60, 37, 54, 42, 3, 7, 60, 33, 44, 50, 50, 39, 34, 34, 13, 47, 63, 46, 4, 52, 20, 19, 24, 6, 53, 4, 3, 68, 10, 59, 52, 48, 3, 48, 37, 5, 38, 10, 47, 4, 53, 36, 41, 31, 57, 7, 64, 45, 33, 14, 53, 5, 41, 40, 48, 4, 60, 49, 37, 20, 34, 53, 4, 58, 36, 12, 35, 35, 4, 
    spark:
      Instances: 4612, 
      List: 279, 3, 2, 1, 2, 5, 3, 1, 1, 1, 2, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 
      Process: 16300, 6577, 5802, 6136, 5389, 5912, 5885, 6157, 5440, 6199, 5902, 6299, 5919, 6066, 5803, 6612, 6120, 6775, 6585, 6146, 6860, 6955, 6661, 6819, 6868, 6700, 7140, 7532, 7077, 7180, 7360, 7526, 7770, 7877, 8048, 7678, 8260, 8131, 7837, 7526, 8261, 8404, 8431, 8340, 9000, 8825, 8624, 9340, 9418, 8677, 8480, 8678, 9003, 9036, 8912, 9235, 9401, 9577, 9808, 9485, 9955, 10029, 9506, 9387, 9794, 9998, 9580, 9963, 9273, 9411, 10113, 10004, 10369, 9880, 10532, 10815, 11039, 10717, 11251, 11475, 10854, 11468, 11530, 11488, 11077, 11245, 10936, 11274, 11233, 11409, 11527, 11897, 11743, 11786, 11086, 11782, 12001, 11795, 12075, 12422

    2 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 0 to 30
    java Process: 1759, 82, 31, 18, 30, 41, 47, 28, 27, 13, 28, 46, 5, 72, 50, 81, 66, 44, 36, 72, 44, 11, 65, 67, 58, 47, 54, 60, 46, 34, 
    spark:
      Instances: 6316, 
      List: 265, 3, 3, 2, 2, 6, 1, 2, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 1, 1, 5, 1, 1, 1, 1, 2, 1, 1, 1, 
      Process: 24084, 13041, 11451, 11274, 10919, 10972, 10677, 11048, 10659, 10984, 10820, 11057, 11355, 10874, 10896, 11725, 11580, 11149, 11823, 11799, 12414, 11265, 11617, 11762, 11561, 12443, 12448, 11809, 11928, 12095

    10 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 5 to 30
    java Process: 1753, 91, 57, 71, 86, 86, 151, 80, 85, 72, 61, 78, 80, 87, 93, 89, 70, 83, 166, 84, 87, 94, 90, 88, 92, 89, 196, 96, 97, 89, 
    spark:
      Instances: 21192, 
      List: 282, 3, 2, 2, 3, 4, 2, 2, 1, 0, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1, 
      Process: 60552, 53960, 53166, 54971, 52827, 54196, 51153, 52626, 54138, 51134, 52427, 53618, 50815, 50807, 52398, 54315, 54411, 51176, 53843, 54736, 55313, 56267, 50837, 54996, 52230, 52845

Results: no matter how much hardware was thrown on it, and how the tasks where clustered, it took 5-6 seconds per million pojos in the list using spark.

Java on the other hand dealed with the same amount taking 5-30 ms. So basically a factor of 200-1,000.

Does anyone have a suggestion how to "speed up" Spark for such a simple job?

UPDATE Hazel:

Now I'm starting to get impressed. While I'm still fighting with some weird problems, at least Hazelcast Jet seems to understand that local data can be processed locally if possible. With only 100% (factor x2) overhead, which is completely acceptable.

10 mio objects

java:
   Instances: 
   List: 68987, 
   Process: 2288, 99, 54, 52, 54, 64, 89, 83, 79, 88, 
hazel:
  Instances: 6136, 
  List: 97225, 
  Process: 1112, 375, 131, 123, 148, 131, 137, 119, 176, 140

UPDATE Flink:

Dropped it from the benchmarking for now, as it's causing too much trouble without giving great results.

EDIT: The whole benchmark can be found under: https://github.com/anderschbe/clusterbench

The cluster setup for spark uses spark-2.1.0-bin-hadoop2.7 as it comes out of the box. With one minor change in spark_env.sh : SPARK_NO_DAEMONIZE=true

the only change necessary to make it work on the cluster is replacing "localhost" in SparcProc line 25 with "spark://I_cant_give_you_my_cluster_IP.doo"

Anders Bernard
  • 541
  • 1
  • 6
  • 19
  • Is there a way you can share your code ? For example on a Github private repo. – Neil Stevenson Mar 13 '17 at 09:02
  • I'll attach the most important parts here. Honestly it's real just basic stuff, right out of the frameworks tuts/manuals. – Anders Bernard Mar 13 '17 at 10:10
  • @AndersBernard Added important edit in my answer - you're running `local`, which means only one worker thread – T. Gawęda Mar 13 '17 at 10:19
  • *nod* As said, that's the fastest. Of course I tested with non-local as well... and this is even worse. I'm not publishing our intern cluster data though. ;) – Anders Bernard Mar 13 '17 at 10:27
  • Maybe it wasn't enough clear ;) Please alert me when you'll have cluster setup ;) – T. Gawęda Mar 13 '17 at 10:38
  • OK, uploaded the benchmark to github after cleaning it from all confidential data. Sadly this includes all data on the cluster ... but it does indeed work on several cores on the cluster. – Anders Bernard Mar 13 '17 at 11:07
  • Hazelcast List isn't a distributed source - you should be using a Map instead. I would be curious to know how this affects your results. – Can Gencer Mar 13 '17 at 13:08
  • 1
    If your data originates on a single node, the first consideration should be the cost of distributing it over the network vs. the computation cost. Since your test uses a simple _min_ function, this is several orders of magnitude cheaper than network latency. If it turns out that your actual computation is still cheaper than the network overhead, you will not benefit from a distributed computation engine. – Marko Topolnik Mar 15 '17 at 06:47

1 Answers1

5

When you are calculating something in cluster framework, like Spark or Flink, framework:

  • serializes your code
  • send resource request
  • send your code via network
  • schedule execution
  • wait for result

As you can see, there are many steps peformed - not only your calculation! Distributed computing make sense if you:

  • can split your calculation to small tasks, which can be done in parallel
  • have too much data to be processed on one machine or processing on one machine can be too slow - disk I/O, some other specific factors in project OR calculations are very specific and requires many CPUs, more than one machine usually have - but then calculation of one part of the data must be very long

Try to calculate maks occurence count of words in 10 GB text file - then Spark and Flink will beat one-node Java

Sometimes user code may cause slowness of distributed computing. Typical mistakes:

  • user writes lambdas in clasess with many references - all other classes are serialized, serialization takes much time
  • tasks are not really parallel - they must wait for each other or must proceed on large part of data
  • data skewness - objects may have inproper hashCode implementation and HashPartitioner causes that all data wents to one partition = one node
  • incorrect number of partition - you can add 1000 more machines, but if you still have 4 partitions, then you can archive at most 4 parallel tasks in one time
  • too much network communication - in you case it's not a problem, but sometimes user are doing a lot of join and reduce

EDIT After Question edit: In your example, Spark runs on local - which means 1 thread only! Use at least local[*] or other cluster manager. You've got overheads listed in this answer and only one thread

T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • *nod* I realize all that. I'm just perplexed by how extreme the overhead is. As said, we might have to handle data that might be too much for one machine... but we can't pay for that fact by having to increase our hardware requirements with a factor of 1000. A factor of 10 would be acceptable. – Anders Bernard Mar 13 '17 at 09:41
  • @AndersBernard Distributed computing is not for every use case. In a minute I will post details why Spark can be slower, wait a second ;) – T. Gawęda Mar 13 '17 at 09:42
  • *nod* understood as well. Though while I'm testing with a single package atm, it's only the preparation for a continuous stream of such packages, which have to be analyzed. Dozens of them per second. – Anders Bernard Mar 13 '17 at 09:48
  • Dozens? Millions - ok, thousands - ok, but dozens sounds quite low amount ;) Run tests with much bigger amount of data. If it will not give better results, then post your configuration, maybe you've got some error in your configuration – T. Gawęda Mar 13 '17 at 09:51
  • Well, we are talking about 1-15 packages per simulation (10-100 simulations) per second. Each package consists of 10-200k objects, each object has roughly 120byte. So the low count would be in total 12 Mbyte to 36 Gbyte per second. – Anders Bernard Mar 13 '17 at 09:59
  • @AndersBernard Please post your configuration and code - maybe there is some specific error – T. Gawęda Mar 13 '17 at 10:00