4

I'm writing a simple script to run bunch of tasks in parallel using the Shelly library but I want to limit the max number of tasks running at any one time. The script takes a file with an input on each line and runs a task for that input. There are a few hundred inputs in the file and I want to limit to around 16 processes at a time.

The current script actually limits to 1 (well tries to) using a QSem with an initial count of 1. I seem to be missing something though because when I run on a test file with 4 inputs I see this:

Starting
Starting
Starting
Starting
Done
Done
Done
Done

So the threads are not blocking on the QSem as I would expect, they're all running simultaneously. I've even gone so far as to implement my own semaphores both on MVar and TVar and neither worked the way I expected. I'm obviously missing something fundamental but what? I've also tried compiling the code and running it as a binary.

#!/usr/bin/env runhaskell
{-# LANGUAGE TemplateHaskell, QuasiQuotes, DeriveDataTypeable, OverloadedStrings #-}

import Shelly
import Prelude hiding (FilePath)
import Text.Shakespeare.Text (lt)
import qualified Data.Text.Lazy as LT
import Control.Monad (forM)
import System.Environment (getArgs)

import qualified Control.Concurrent.QSem as QSem
import Control.Concurrent (forkIO, MVar, putMVar, newEmptyMVar, takeMVar)

-- Define max number of simultaneous processes
maxProcesses :: IO QSem.QSem
maxProcesses = QSem.newQSem 1

bkGrnd :: ShIO a -&gt ShIO (MVar a)
bkGrnd proc = do
  mvar &lt- liftIO newEmptyMVar
  _ &lt- liftIO $ forkIO $ do
    -- Block until there are free processes
    sem &lt- maxProcesses
    QSem.waitQSem sem
    putStrLn "Starting"
    -- Run the shell command
    result &lt- shelly $ silently proc
    liftIO $ putMVar mvar result
    putStrLn "Done"
    -- Signal that this process is done and another can run.
    QSem.signalQSem sem
  return mvar

main :: IO ()
main = shelly $ silently $ do
    [img, file] &lt- liftIO $ getArgs
    contents &lt- readfile $ fromText $ LT.pack file
    -- Run a backgrounded process for each line of input.
    results &lt- forM (LT.lines contents) $ \line -> bkGrnd $ do
      runStdin &ltcommand> &ltarguments>
    liftIO $ mapM_ takeMVar results
asm
  • 8,758
  • 3
  • 27
  • 48
  • 1
    I don't know about Shelly, but from your code it seems like every application of `bkGrnd` has its own new semaphore initialized to 1. You should create one first and then pass that same one to every call. – Riccardo T. Apr 04 '12 at 16:16

3 Answers3

6

As I said in my comment, each call to bkGrnd creates its own semaphonre, allowing every thread to continue without waiting. I would try something like this instead, where the semaphore is created in the main and passed each time to bkGrnd.

bkGrnd :: QSem.QSem -> ShIO a -> ShIO (MVar a)
bkGrnd sem proc = do
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
    -- Block until there are free processes
    QSem.waitQSem sem
    --
    -- code continues as before
    --

main :: IO ()
main = shelly $ silently $ do
    [img, file] <- liftIO $ getArgs
    contents <- readfile $ fromText $ LT.pack file
    sem <- maxProcesses
    -- Run a backgrounded process for each line of input.
    results <- forM (LT.lines contents) $ \line -> bkGrnd sem $ do
      runStdin <command> <arguments>
    liftIO $ mapM_ takeMVar results
Riccardo T.
  • 8,907
  • 5
  • 38
  • 78
  • Wow, I am an idiot :p. I've never tried to use global mutable data in Haskell before (not something I normally like, but it's a script) but the problem is obvious now that you've pointed it out. Thanks! – asm Apr 04 '12 at 17:33
  • 1
    @AndrewMyers: don't worry, even the easiest concurrency errors sometimes are freaking hard to spot :) By the way, `sem` is not global, I would rather say it is shared. It is declared within `main` and passed to the threads as a "reference" to the shared semaphore. – Riccardo T. Apr 04 '12 at 17:58
  • Yeah, I meant what I was intending to do. I was thinking of `maxProcesses` as a global semaphore but it was a global `IO QSem` action that created a new semaphore every time. Your way is much cleaner and what I would normally do if I wasn't trying to script the way I do in zsh. So am I correct in think that it's actually impossible to have global mutable state without using `unsafePerformIO`. If so that's pretty cool but not something I had realized before. – asm Apr 04 '12 at 18:16
  • I see, I thought you was talking about my solution when you said "global". I'm relatively new to Haskell however, so I don't still know anything about `unsafePerformIO`: I'm sorry I can't tell you if that is the only way (nor if it is indeed a way to handle global data :) ). – Riccardo T. Apr 04 '12 at 18:23
3

You have an answer, but I need to add: QSem and QSemN are not thread safe if killThread or asynchronous thread death is possible.

My bug report and patch are GHC trac ticket #3160. The fixed code is available as a new library called SafeSemaphore with module Control.Concurrent.MSem, MSemN, MSampleVar, and a bonus FairRWLock.

Chris Kuklewicz
  • 8,123
  • 22
  • 33
  • I saw part of the discussion on the mailing list when the plan was to merge that into 7.0.1 as an update to QSem. I see from the Trac ticket that none of that happened so I'll check out the safe package. Thanks for the tip! – asm Apr 05 '12 at 15:08
  • This answer is no longer true: https://gitlab.haskell.org/ghc/ghc/-/issues/3160#note_141737 QSem and QSemN are exception safe. – Cigarette Smoking Man Jul 25 '22 at 08:55
0

Isn't it better

bkGrnd sem proc = do
  QSem.waitQSem sem
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
  ...

so not even forkIO until you get the semaphore?

halacsy
  • 205
  • 2
  • 8