139

We are developing a program which receives and forwards "messages", while keeping a temporary history of those messages, so that it can tell you the message history if requested. Messages are identified numerically, are typically around 1 kilobyte in size, and we need to keep hundreds of thousands of these messages.

We wish to optimize this program for latency: the time between sending and receiving a message must be below 10 milliseconds.

The program is written in Haskell and compiled with GHC. However, we have found that garbage collection pauses are far too long for our latency requirements: over 100 milliseconds in our real-world program.

The following program is a simplified version of our application. It uses a Data.Map.Strict to store messages. Messages are ByteStrings identified by an Int. 1,000,000 messages are inserted in increasing numeric order, and the oldest messages are continually removed to keep the history at a maximum of 200,000 messages.

module Main (main) where

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if 200000 < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

We compiled and ran this program using:

$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
   3,116,460,096 bytes allocated in the heap
     385,101,600 bytes copied during GC
     235,234,800 bytes maximum residency (14 sample(s))
     124,137,808 bytes maximum slop
             600 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      6558 colls,     0 par    0.238s   0.280s     0.0000s    0.0012s
  Gen  1        14 colls,     0 par    0.179s   0.250s     0.0179s    0.0515s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.652s  (  0.745s elapsed)
  GC      time    0.417s  (  0.530s elapsed)
  EXIT    time    0.010s  (  0.052s elapsed)
  Total   time    1.079s  (  1.326s elapsed)

  %GC     time      38.6%  (40.0% elapsed)

  Alloc rate    4,780,213,353 bytes per MUT second

  Productivity  61.4% of total user, 49.9% of total elapsed

The important metric here is the "max pause" of 0.0515s, or 51 milliseconds. We wish to reduce this by at least an order of magnitude.

Experimentation shows that the length of a GC pause is determined by the number of messages in the history. The relationship is roughly linear, or perhaps super-linear. The following table shows this relationship. (You can see our benchmarking tests here, and some charts here.)

msgs history length  max GC pause (ms)
===================  =================
12500                                3
25000                                6
50000                               13
100000                              30
200000                              56
400000                             104
800000                             199
1600000                            487
3200000                           1957
6400000                           5378

We have experimented with several other variables to find whether they can reduce this latency, none of which make a big difference. Among these unimportant variables are: optimization (-O, -O2); RTS GC options (-G, -H, -A, -c), number of cores (-N), different data structures (Data.Sequence), the size of messages, and the amount of generated short-lived garbage. The overwhelming determining factor is the number of messages in the history.

Our working theory is that the pauses are linear in the number of messages because each GC cycle has to walk over all the working accessible memory and copy it, which are clearly linear operations.

Questions:

  • Is this linear-time theory correct? Can the length of GC pauses be expressed in this simple way, or is the reality more complex?
  • If GC pause is linear in the working memory, is there any way to reduce the constant factors involved?
  • Are there any options for incremental GC, or anything like it? We can only see research papers. We are very willing to trade throughput for lower latency.
  • Are there any ways to "partition" memory for smaller, GC cycles, other than splitting into multiple processes?
jameshfisher
  • 34,029
  • 31
  • 121
  • 167
  • I unfortunately cannot provide any help. I assume you already [read this related question](http://stackoverflow.com/q/12404031/510937). Anyway having a *must* required for latency to be at most 10 milliseconds sounds like a real-time constraint... to achieve that you probably want to tune the OS scheduler for real-time tasks too because there's no point in optimizing a lot the Haskell code when the OS decides that you can wait 100 milliseconds... – Bakuriu Apr 21 '16 at 14:33
  • 1
    @Bakuriu: right, but 10 ms should be achievable with pretty much any modern OS without any tweaks. When I run simplistic C programs, even on my old Raspberry pi, they easily achieve latencies in the range of 5 ms, or at least _reliably_ something like 15 ms. – leftaroundabout Apr 21 '16 at 14:39
  • 4
    Are you confident your test-case is useful (like you're not using `COntrol.Concurrent.Chan` for instance? Mutable objects change the equation)? I would suggest starting by making sure you know what garbage you're generating and making as little of it as possible (e.g. make sure fusion happens, try `-funbox-strict`). Maybe try using a streaming lib (iostreams, pipes, conduit, streaming), and calling `performGC` directly at more frequent intervals. – jberryman Apr 21 '16 at 14:56
  • 9
    If what you're trying to accomplish can be done in constant space, then start by trying to make that happen (e.g. maybe a ring buffer from a `MutableByteArray`; GC won't be involved at all in that case) – jberryman Apr 21 '16 at 15:02
  • 1
    The GC spent a comparable amount of total time on gen0 and gen1 (0.280s vs. 0.250s) but did 6558 collections in gen0 and only 14 in gen1. Your long lived data (i.e. the history) corresponds to gen1 - the GC decides to only look into there occasionally, so when it does, it must do a lot of work at once. Not checking gen1 often is by design. As opposed to the gen0 GC, which actually did *more* total work over the life of your program, but the time per cycle was much much lower. Using `performGC` may really help here - as well as storing data in non-GC'ed memory (good suggestions by @jberryman). – user2407038 Apr 21 '16 at 15:12
  • 2
    To those suggesting mutable structures and taking care to create minimal garbage, note that it is the **retained** size, not the amount of garbage collected which appears to dictate the pause time. Forcing more frequent collections results in more pauses of about the same length. Edit: Mutable off-heap structures may be interesting, but not nearly so much fun to work with in many cases! – mike Apr 21 '16 at 15:16
  • 6
    This description certainly suggests that GC time will be linear in the size of the heap for all generations, important factors being the size of the retained objects (for copying) and the number of pointers existing to them (for scavenging): https://ghc.haskell.org/trac/ghc/wiki/Commentary/Rts/Storage/GC/Copying – mike Apr 21 '16 at 16:32
  • Couldn't the ghc FFI be used to manually manage the message history using `malloc*` functions in Foreign.Marshal.Alloc on the "low-level" heap (the same as the runtime system uses)? According to https://wiki.haskell.org/Foreign_Function_Interface Allocations on the low-level heap are not managed by the Haskell implementation and must be freed explicitly with `free`. Thus most of your ghc implementation would use standard gc but the gc problematic message history part would use malloc and free. – George Co Oct 18 '16 at 22:57
  • @GeorgeColpitts yes, that's exactly what we experimented with after abandoning the approach described above. The FFI approach is viable, but we didn't take it, for a raft of other reasons. – jameshfisher Oct 19 '16 at 13:35
  • Haskell 8.2.1 has been released (https://downloads.haskell.org/~ghc/master/users-guide/8.2.1-notes.html), check it out, probably you can go back again to Haskell after your one year trip to Go – securecurve Jun 07 '17 at 13:16
  • @securecurve we'll definitely check it out! But has 8.2.1 really been released? [GHC version page](https://www.haskell.org/ghc/) doesn't list it and [the 8.2.1 status page](https://ghc.haskell.org/trac/ghc/wiki/Status/GHC-8.2.1) talks about it in the future tense ... – jameshfisher Jun 12 '17 at 07:12
  • @jameshfisher, Now it is :) – securecurve Jul 25 '17 at 06:14
  • 1
    @securecurve thanks! We (Pusher) are going to experiment with compact regions and add a benchmark for it to https://gitlab.com/gasche/gc-latency-experiment – jameshfisher Jul 25 '17 at 08:53
  • @jameshfisher, Hi ... Have you considered Rust for such a use case, I think will do pretty well ... I didn't see it in the published benchmarks you have done ... Thanks! – securecurve Oct 14 '17 at 18:22

5 Answers5

102

You're actually doing pretty well to have a 51ms pause time with over 200Mb of live data. The system I work on has a larger max pause time with half that amount of live data.

Your assumption is correct, the major GC pause time is directly proportional to the amount of live data, and unfortunately there's no way around that with GHC as it stands. We experimented with incremental GC in the past, but it was a research project and didn't reach the level of maturity needed to fold it into the released GHC.

One thing that we're hoping will help with this in the future is compact regions: https://phabricator.haskell.org/D1264. It's a kind of manual memory management where you compact a structure in the heap, and the GC doesn't have to traverse it. It works best for long-lived data, but perhaps it will be good enough to use for individual messages in your setting. We're aiming to have it in GHC 8.2.0.

If you're in a distributed setting and have a load-balancer of some kind there are tricks you can play to avoid taking the pause hit, you basically make sure that the load-balancer doesn't send requests to machines that are about to do a major GC, and of course make sure that the machine still completes the GC even though it isn't getting requests.

Simon Marlow
  • 12,785
  • 4
  • 42
  • 32
  • 13
    Hi Simon, thank you very much for your detailed reply! It's bad news, but good to have closure. We are currently moving towards a mutable implementation being the only suitable alternative. A few things we don't understand: (1) What are the tricks involved in the load balancing scheme - do they involve manual `performGC`? (2) Why does compacting with `-c` perform worse - we suppose because it doesn't find many things it can leave in-place? (3) Are there any more details about compacts? It sounds very interesting but unfortunately it's a bit too far in the future for us to consider. – jameshfisher Apr 22 '16 at 10:07
  • 3
    @mljrg you might be interested in http://www.well-typed.com/blog/2019/10/nonmoving-gc-merge/ – Alfredo Di Napoli Oct 29 '19 at 19:02
13

As mentioned in other answers, the garbage collector in GHC traverses live data, which means the more long-living data you store in memory, the longer GC pauses will be.

GHC 8.2

To overcome this problem partially, a feature called compact regions was introduced in GHC-8.2. It is both a feature of the GHC runtime system and a library that exposes convenient interface to work with. The compact regions feature allows putting your data into a separate place in memory and GC won't traverse it during the garbage collecting phase. So if you have a large structure you want to keep in memory, consider using compact regions. However, the compact region itself doesn't have mini garbage collector inside, it works better for append-only data structures, not something like HashMap where you also want to delete stuff. Though you can overcome this problem. For details refer to the following blog post:

GHC 8.10

Moreover, since GHC-8.10 a new low-latency incremental garbage collector algorithm is implemented. It's an alternative GC algorithm which is not enabled by default but you can opt-in to it if you want. So you can switch the default GC to a newer one to automatically get features provided by compact regions without the need to do manual wrapping and unwrapping. However, the new GC is not a silver bullet and doesn't solve all the problems automagically, and it has its trade-offs. For benchmarks of the new GC refer to the following GitHub repository:

Shersh
  • 9,019
  • 3
  • 33
  • 61
10

I've tried your code snippet with a ringbuffer approach using IOVector as the underlying data structure. On my system (GHC 7.10.3, same compilation options) this resulted in a reduction of max time (the metric you mentioned in your OP) by ~22%.

NB. I made two assumptions here:

  1. A mutable data structure is an okay fit for the problem (I guess message passing implies IO anyhow)
  2. Your messageId's are continuous

With some additional Int parameter and arithmetic (like when messageId's are reset back to 0 or minBound) it should then be straightforward to determine whether a certain message is still in the history and retrieve it form the corresponding index in the ringbuffer.

For your testing pleasure:

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

import qualified Data.Vector.Mutable as Vector

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

data Chan2 = Chan2
    { next          :: !Int
    , maxId         :: !Int
    , ringBuffer    :: !(Vector.IOVector ByteString.ByteString)
    }

chanSize :: Int
chanSize = 200000

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))


newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize

pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
    let ix' = if ix == chanSize then 0 else ix + 1
    in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if chanSize < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main, main1, main2 :: IO ()

main = main2

main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])
mgmeier
  • 111
  • 4
  • 2
    Hi! Nice answer. I suspect the reason this only gets a 22% speedup is because GC still has to walk the `IOVector` and the (immutable, GC'd) values at each index. We're currently investigating the options for re-implementing using mutable structures. It's likely to be similar to your ring buffer system. But we're moving it entirely outside Haskell memory space to do our own manual memory management. – jameshfisher Apr 22 '16 at 18:09
  • 13
    @jamesfisher: I was actually facing a similar problem, but decided to keep mem management on the Haskell side. The solution was indeed a ring buffer, which keeps a bytewise copy of the original data in a single, continuous block of memory, thus resulting in a single Haskell value. Have a look at it in this [RingBuffer.hs gist](https://gist.github.com/mgmeier/d0febcc79e79b25155ac18180057ea16). I tested it against your sample code, and had a speedup of around 90% of the critical metric. Feel free to use the code at your convenience. – mgmeier Apr 28 '16 at 15:13
9

I have to agree with the others - if you have hard real-time constraints, then using a GC language is not ideal.

However, you might consider experimenting with other available data structures rather than just Data.Map.

I rewrote it using Data.Sequence and got some promising improvements:

msgs history length  max GC pause (ms)
===================  =================
12500                              0.7
25000                              1.4
50000                              2.8
100000                             5.4
200000                            10.9
400000                            21.8
800000                            46
1600000                           87
3200000                          175
6400000                          350

Even though you're optimising for latency, I noticed other metrics improving too. In the 200000 case, execution time drops from 1.5s to 0.2s, and total memory usage drops from 600MB to 27MB.

I should note that I cheated by tweaking the design:

  • I removed the Int from the Msg, so it's not in two places.
  • Instead of using a Map from Ints to ByteStrings, I used a Sequence of ByteStrings, and instead of one Int per message, I think it can be done with one Int for the whole Sequence. Assuming messages can't get reordered, you can use a single offset to translate which message you want to where it sits in the queue.

(I included an additional function getMsg to demonstrate that.)

{-# LANGUAGE BangPatterns #-}

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S

newtype Msg = Msg ByteString.ByteString

data Chan = Chan Int (Seq ByteString.ByteString)

message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))

maxSize :: Int
maxSize = 200000

pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
    Exception.evaluate $
        let newSize = 1 + S.length sq
            newSq = sq |> msgContent
        in
        if newSize <= maxSize
            then Chan offset newSq
            else
                case S.viewl newSq of
                    (_ :< newSq') -> Chan (offset+1) newSq'
                    S.EmptyL -> error "Can't happen"

getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
    where
    getMsg' i
        | i < 0            = Nothing
        | i >= S.length sq = Nothing
        | otherwise        = Just (Msg (S.index sq i))

main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])
John H
  • 91
  • 1
  • 3
  • 4
    Hi! Thanks for your answer. Your results definitely still show the linear slowdown, but it's pretty interesting that you got such a speedup from `Data.Sequence` - we tested that, and found it to actually be worse than Data.Map! I'm not sure what the difference was, so I'll have to investigate ... – jameshfisher Apr 22 '16 at 18:00
3

Well you found the limitation of languages with GC: They aren't fit for hardcore real-time systems.

You have 2 options:

1st Increase heap size and use a 2 level caching system, oldest messages are sent to disk and you keep newest messages on memory, you can do this by using OS paging. The problem, though with this solution is that paging can be expensive depending on the reading capabilities of the secondary memory unit used.

2nd Program that solution using 'C' and interface it with FFI to haskell. That way you can do your own memory management. This would be the best option as you can control the memory you need by yourself.

  • 1
    Hi Fernando. Thanks for this. Our system is only "soft" real-time, but in our case we've found GC to be too punishing even for soft real-time. We're definitely leaning towards your #2 solution. – jameshfisher Apr 22 '16 at 18:15