Frege and parallel computation

While attending last Lambda World one of the Haskell talks mentioned multithreading programming with Haskell. I immediately checked out what Frege had implemented upfront. This is what I’ve found so far.

Creating a Thread

The easiest way of execute a given IO computation parallel to the main execution thread is creating a new thread using the forkOS function. First thing we have to keep in mind is that a new thread computation can have side effects, and therefore it has to be declared as an IO computation.

forkOS
forkOS :: IO () -> IO Thread

So, what forkOS does ?: it first receives a computation IO () as parameter, then creates a new thread and executes the computation in the new thread and finally, returns the new thread: IO Thread Lets see and example.

New Thread
executeInNewThread :: IO () -> IO ()
executeInNewThread action = do
  thread <- forkOS action (1)
  name   <- thread.getName (2)
  Thread.sleep 2000
  println ("Thread name: " ++ name)

doSomething :: IO ()
doSomething = do
  println "Launching a new thread"
  executeInNewThread ioAction
  println "Main process continues"
  where ioAction = (println . sum) [1..200]
1 Using forkOS to launch a given IO () action in a new thread
2 Because forkOS returns the launched thread, we can do something like getting some information about the thread

The output of this project should be something like:

output
Launching a new thread
Main process continues
Thread name: xxx

Using an ExecutorService

When creating threads directly, you would be wasting a lot of system resources. A better way of creating new threads is creating threads through an ExecutorService. The ExecutorService keeps a thread pool and therefore threads could be reused. The Frege API provides forkIO function to execute a given computation in a new thread coming from an ExecutorService. The signature is:

Using an ExecutorService
forkIO :: IO () -> IO ()

Here is an example of executing the previous computation using an executor service:

Using an ExecutorService
executeFromThreadPool :: IO () -> IO ()
executeFromThreadPool action = do
  forkIO action (1)
  println "No information of thread is provided"

Unfortunately, as it is mentioned in the documentation, this is not suitable for not-ending processes, and the executor service may manage a fixed small number of concurrent threads only.

Also it would be very hard to coordinate results from several threads, so I would only recommend it for cases when you would like to launch unrelated tasks using a thread pool. If you wanted to coordinate computations, then I would use async or make use of MVar

Using MVar

Although neither forkIO nor forkOS return anything, the primitive MVar exposes functions to store and retrieve values in a multithreaded environment. I’m not going to list all of them here, for further details just go to frege.control.Concurrent.

My example is just launching to computations and combine them to give a result. First computation calculates a number and sleeps and the second one just return a value. Eventually both values will be added up.

MVar
useMVar = do
   mvar1 <- newEmptyMVar       (1)
   mvar2 <- newEmptyMVar       (2)
   forkIO do                   (3)
     mvar1.put $ sum [2,3]     (4)
     Thread.sleep 5000
     println "end computation1"
   forkIO do                   (5)
     mvar2.put 10              (6)
     println "end computation2"
   res1 <- mvar1.take          (7)
   res2 <- mvar2.take          (8)
   return $ res1 + res2        (9)
1 main: Creating mvar1 to handle values from first computation
2 main: Creating mvar2 to handle values from second computation
3 thr1: Launching first computation
4 thr1: setting mvar1 with computation result
5 thr2: Launching second computation
6 thr2: setting mvar2 with computation result
7 main: blocking until getting a value from mvar1
8 main: blocking until getting a value from mvar2
9 main: return result from
Please notice that when executing mvarX.take it blocks only until a value has been provided, then it will continue. That means that when the first computation begins to sleep meanwhile res1 + res2 is being resolved.

You can find some good examples of using MVar with forkOS and forkIO here.

Make it easier with async

So far it seemed we were getting too low level in order to launch a computation in a new thread. Something like coordinating several computations in different threads seemed a daunting task and very difficult to reason about. The function async enables the execution of an IO a computation and returns a pointer to the on-going computation. Once you get the pointer you can choose whether to block until getting the result, or start new computations in parallel. This is the signature of async

async
async :: IO a -> IO (MVar (Exception | a))

In the following example we are launching two computations, the first one sleeps 2 seconds and then continues whereas the quick operation just return a given value.

Operations
slowerOp :: IO Int
slowerOp = do
  Thread.sleep(2000)
  println "Returning first op"
  return 20

quickOp :: IO Int
quickOp = do
  println "Returning second op"
  return 40

We would like to launch both operations in parallel , eventually extract each computation’s value, and finally add them up.

Execute operations asynchronously
divideAndConquer = do
  slowerOpRef <- async slowerOp (1)
  quickOpRef  <- async quickOp (2)
  xs          <- sequence $ map extractValue [slowerOpRef, quickOpRef] (3)
  return $ sum xs (4)
1 Launch slow operation
2 Launch quick operation
3 Block until getting each computation’s values
4 Sum results

In order to block and get the value returned by the operation I’m using the take function from the frege.control.Concurrent module. Because the result of calling take returns an MVar (Exception | a), meaning an MVar of an Either Exception a, I need to use a case expression.

Extract
extractValue :: MVar (Exception | Int) -> IO Int
extractValue var = do
  value <- var.take (1)
  case value of
    Left  _ -> return 0
    Right x -> return x
1 Use take to extract the value wrapped in MVar

I’ve written a couple of alternatives to create a reusable extractValue function.

Extract alternatives
extractValue2 :: MVar (Exception | a) -> a -> IO a
extractValue2 mv defaultValue = do
  value <- mv.take
  return $ either (\_ -> defaultValue) id value

extractValue3 :: MVar (Exception | a) -> a -> IO a
extractValue3 mv defaultValue = do
  value <- mv.take
  case value of
    Left  _ -> return defaultValue
    Right x -> return x