For impatient readers: this is a work in progress, where I ask for help, during the process. Please do not judge the tools by my temporary data, as they can change while I try to get better results.
We are in the middle of the decision process on the architecture for a tool to analyse the output from co-simulations.
As part of that process I was asked to write a benchmark tool, and get data on the speeds of several distributed processing frameworks.
The frameworks I tested are: Apache Spark, Apache Flink, Hazelcast Jet. And as a comparison baseline plain Java.
The test case I used was a simple "here is a list of Pojos, with one field in the pojo a double value. Find the smallest(min) value".
Simple, straightforward and hopefully highly comparable.
Three out of four tests use a simple comparator, the fourth (flink) uses a reducer that is basically identical to the comparators. The analysing functions look like this:
Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();
Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();
Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();
Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();
I tested this extensively, varying the size of the test list as well as the allocated ressources. And the results blew my mind. The BEST results can be seen below (all numbers in ms, 1 mio pojos, 10 tests each):
- instances: how long it took to to declare and initiate the instance of the frameworks
- list: how long it took to parse/transfer the List to the frameworks "list"
- process: how long it took to process the data to retrieve the min
- overall: from start to end of each test
Outcome:
java:
Instances:
List:
Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16,
Overall: 111, 24, 16, 17, 16, 16, 16, 16, 16, 16,
spark:
Instances: 2065, 89, 62, 69, 58, 49, 56, 47, 41, 52,
List: 166, 5, 1, 1, 2, 1, 0, 0, 0, 0,
Process: 2668, 2768, 1936, 2016, 1950, 1936, 2105, 2674, 1913, 1882,
Overall: 4943, 2871, 2011, 2094, 2020, 1998, 2172, 2728, 1961, 1943,
hazel:
Instances: 6347, 2891, 2817, 3106, 2636, 2936, 3018, 2969, 2622, 2799,
List: 1984, 1656, 1470, 1505, 1524, 1429, 1512, 1445, 1394, 1427,
Process: 4348, 3809, 3655, 3751, 3927, 3887, 3592, 3810, 3673, 3769,
Overall: 12850, 8373, 7959, 8384, 8110, 8265, 8133, 8239, 7701, 8007
flink:
Instances: 45, 0, 0, 0, 0, 0, 0, 0, 0, 0,
List: 92, 35, 16, 13, 17, 15, 19, 11, 19, 24,
Process: 34292, 20822, 20870, 19268, 17780, 17390, 17124, 19628, 17487, 18586,
Overall: 34435, 20857, 20886, 19281, 17797, 17405, 17143, 19639, 17506, 18610,
The most interesting parts are:
- the best results ALL come from purely local tests (one instance)
- any tests that made use of distributed mechanics (additional nodes and such), were up to an order of magnitude slower still (spark for example 2.5 slower if distributed).
Now don't get me wrong, it's basic logic that distributed processing has to be slower per core than mono-threaded processing.
But 2 orders of magnitude EVEN if used on a mono-thread? And 3 orders of magnitude if distributed? Can someone see the mistake I apparently made in all 3 distributed processes? I expected some factor < 10, so killing it with more hardware would be an option.
So is there some way to reduce the overhead of those frameworks to, hmm maybe x9 instead of x999?
I know I know, the test data I use is much to small, but even if scaling it up, I haven't seen any reduction in overhead vs. performance. And it's roughly the size of the batches of data we need to analyse (0.1M - 1M objects/s per simulation). So your help to find my error is welcome. :D
UPDATE Spark:
After some more thorough testing on Spark, I'm still not impressed. The setup was as follows:
java client on one machine in a 64 core, 480 GB RAM job master and 7 slaves on a separate rack, 32 cors, 20 GB each
1 mio objects, 256 tasks, 64 cpus local[*]
java:
Instances:
List:
Process: 622, 448, 68, 45, 22, 32, 15, 27, 22, 29,
spark:
Instances: 4865, 186, 160, 133, 121, 112, 106, 78, 121, 106,
List: 310, 2, 2, 1, 2, 4, 2, 1, 2, 1,
Process: 8190, 4433, 4200, 4073, 4201, 4092, 3822, 3852, 3921, 4051,
10 mio objects, 256 tasks, 64 cpus local[*]
java:
Instances:
List:
Process: 2329, 144, 50, 65, 75, 70, 69, 66, 66, 66,
spark:
Instances: 20345,
List: 258, 2, 1, 1, 1, 4, 1, 1, 1, 1,
Process: 55671, 49629, 48612, 48090, 47897, 47857, 48319, 48274, 48199, 47516
1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 1+1 Spark machines (different rack)
java:
Instances:
List:
Process: 748, 376, 70, 31, 69, 64, 46, 17, 50, 53,
spark:
Instances: 4631,
List: 249, 1, 2, 2, 3, 3, 1, 1, 2, 1,
Process: 12273, 7471, 6314, 6083, 6228, 6158, 5990, 5953, 5981, 5972
1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
java:
Instances:
List:
Process: 820, 494, 66, 29, 5, 30, 29, 43, 45, 21,
spark:
Instances: 4513,
List: 254, 2, 2, 2, 2, 4, 2, 2, 1, 1,
Process: 17007, 6545, 7174, 7040, 6356, 6502, 6482, 6348, 7067, 6335
10 mio objects, 52k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
java Process: 3037, 78, 48, 45, 53, 73, 72, 73, 74, 64,
spark:
Instances: 20181,
List: 264, 3, 2, 2, 1, 4, 2, 2, 1, 1,
Process: 77830, 67563, 65389, 63321, 61416, 63007, 64760, 63341, 63440, 65320
1 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i =0 to 100
java Process: 722, 631, 62, 26, 25, 42, 26, 11, 12, 29, 40, 16, 14, 23, 29, 18, 14, 11, 71, 76, 37, 52, 32, 15, 51, 54, 19, 74, 62, 54, 7, 60, 37, 54, 42, 3, 7, 60, 33, 44, 50, 50, 39, 34, 34, 13, 47, 63, 46, 4, 52, 20, 19, 24, 6, 53, 4, 3, 68, 10, 59, 52, 48, 3, 48, 37, 5, 38, 10, 47, 4, 53, 36, 41, 31, 57, 7, 64, 45, 33, 14, 53, 5, 41, 40, 48, 4, 60, 49, 37, 20, 34, 53, 4, 58, 36, 12, 35, 35, 4,
spark:
Instances: 4612,
List: 279, 3, 2, 1, 2, 5, 3, 1, 1, 1, 2, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1,
Process: 16300, 6577, 5802, 6136, 5389, 5912, 5885, 6157, 5440, 6199, 5902, 6299, 5919, 6066, 5803, 6612, 6120, 6775, 6585, 6146, 6860, 6955, 6661, 6819, 6868, 6700, 7140, 7532, 7077, 7180, 7360, 7526, 7770, 7877, 8048, 7678, 8260, 8131, 7837, 7526, 8261, 8404, 8431, 8340, 9000, 8825, 8624, 9340, 9418, 8677, 8480, 8678, 9003, 9036, 8912, 9235, 9401, 9577, 9808, 9485, 9955, 10029, 9506, 9387, 9794, 9998, 9580, 9963, 9273, 9411, 10113, 10004, 10369, 9880, 10532, 10815, 11039, 10717, 11251, 11475, 10854, 11468, 11530, 11488, 11077, 11245, 10936, 11274, 11233, 11409, 11527, 11897, 11743, 11786, 11086, 11782, 12001, 11795, 12075, 12422
2 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 0 to 30
java Process: 1759, 82, 31, 18, 30, 41, 47, 28, 27, 13, 28, 46, 5, 72, 50, 81, 66, 44, 36, 72, 44, 11, 65, 67, 58, 47, 54, 60, 46, 34,
spark:
Instances: 6316,
List: 265, 3, 3, 2, 2, 6, 1, 2, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 1, 1, 5, 1, 1, 1, 1, 2, 1, 1, 1,
Process: 24084, 13041, 11451, 11274, 10919, 10972, 10677, 11048, 10659, 10984, 10820, 11057, 11355, 10874, 10896, 11725, 11580, 11149, 11823, 11799, 12414, 11265, 11617, 11762, 11561, 12443, 12448, 11809, 11928, 12095
10 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 5 to 30
java Process: 1753, 91, 57, 71, 86, 86, 151, 80, 85, 72, 61, 78, 80, 87, 93, 89, 70, 83, 166, 84, 87, 94, 90, 88, 92, 89, 196, 96, 97, 89,
spark:
Instances: 21192,
List: 282, 3, 2, 2, 3, 4, 2, 2, 1, 0, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1,
Process: 60552, 53960, 53166, 54971, 52827, 54196, 51153, 52626, 54138, 51134, 52427, 53618, 50815, 50807, 52398, 54315, 54411, 51176, 53843, 54736, 55313, 56267, 50837, 54996, 52230, 52845
Results: no matter how much hardware was thrown on it, and how the tasks where clustered, it took 5-6 seconds per million pojos in the list using spark.
Java on the other hand dealed with the same amount taking 5-30 ms. So basically a factor of 200-1,000.
Does anyone have a suggestion how to "speed up" Spark for such a simple job?
UPDATE Hazel:
Now I'm starting to get impressed. While I'm still fighting with some weird problems, at least Hazelcast Jet seems to understand that local data can be processed locally if possible. With only 100% (factor x2) overhead, which is completely acceptable.
10 mio objects
java:
Instances:
List: 68987,
Process: 2288, 99, 54, 52, 54, 64, 89, 83, 79, 88,
hazel:
Instances: 6136,
List: 97225,
Process: 1112, 375, 131, 123, 148, 131, 137, 119, 176, 140
UPDATE Flink:
Dropped it from the benchmarking for now, as it's causing too much trouble without giving great results.
EDIT: The whole benchmark can be found under: https://github.com/anderschbe/clusterbench
The cluster setup for spark uses spark-2.1.0-bin-hadoop2.7 as it comes out of the box. With one minor change in spark_env.sh : SPARK_NO_DAEMONIZE=true
the only change necessary to make it work on the cluster is replacing "localhost" in SparcProc line 25 with "spark://I_cant_give_you_my_cluster_IP.doo"