I want to process stream of events received via the MQTT. Library which I'm using uses a callback to provide the results. Processing I'm doing depends on the previous state not only the latest event. Also in the future events might be gathered from the other sources.
At the first I decided to compose it into the list which sounds as a good idea. I had the minor issue cause IO prevents lazy evaluation and waiting for infinite stream might be long, but I solved it with interleaving IO.
stream :: IO [Event]
allows me to do the nice stuff like foldl
, foldM
map
, mapM
, etc... Unfortunately with this approach I rather wont be able to combine two streams, cause there is no more locking feature there.
I was diging through many libs, and found STM with TQueue for example. Unfortunately it is not what I exactly want.
I decide to create custom type and make it Foldable
so I will be able to fold it. I failed due to IO.
import Control.Concurrent.STM
newtype Stream a = Stream (STM a)
runStream
:: ((a -> IO ()) -> IO i)
-> IO (Stream a)
runStream block = do
queue <- newTQueueIO
block (atomically . writeTQueue queue)
return $ Stream (readTQueue queue)
foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
n <- atomically read
m <- f n s
foldStream f m (Stream read)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read
zipStream :: [Stream a] -> Stream a
zipStream = undefined
Whih can be used like main = foldStream (\x _ -> print x) () =<< events
Is it possible to implement base some of base classes to work with this stream as with regular List?