forkOS :: IO () -> IO Thread
Frege and parallel computation
While attending last Lambda World one of the Haskell talks mentioned multithreading programming with Haskell. I immediately checked out what Frege had implemented upfront. This is what I’ve found so far.
Creating a Thread
The easiest way of execute a given IO
computation parallel to the
main execution thread is creating a new thread using the forkOS
function. First thing we have to keep in mind is that a new thread
computation can have side effects, and therefore it has to be declared
as an IO
computation.
So, what forkOS
does ?: it first receives a computation IO ()
as
parameter, then creates a new thread and executes the computation in
the new thread and finally, returns the new thread: IO Thread
Lets
see and example.
executeInNewThread :: IO () -> IO ()
executeInNewThread action = do
thread <- forkOS action (1)
name <- thread.getName (2)
Thread.sleep 2000
println ("Thread name: " ++ name)
doSomething :: IO ()
doSomething = do
println "Launching a new thread"
executeInNewThread ioAction
println "Main process continues"
where ioAction = (println . sum) [1..200]
1 | Using forkOS to launch a given IO () action in a new thread |
2 | Because forkOS returns the launched thread, we can do something
like getting some information about the thread |
The output of this project should be something like:
Launching a new thread
Main process continues
Thread name: xxx
Using an ExecutorService
When creating threads directly, you would be wasting a lot of system
resources. A better way of creating new threads is creating threads
through an ExecutorService
. The ExecutorService
keeps a thread
pool and therefore threads could be reused. The Frege API provides
forkIO
function to execute a given computation in a new thread
coming from an ExecutorService. The signature is:
forkIO :: IO () -> IO ()
Here is an example of executing the previous computation using an executor service:
executeFromThreadPool :: IO () -> IO ()
executeFromThreadPool action = do
forkIO action (1)
println "No information of thread is provided"
Unfortunately, as it is mentioned in the documentation, this is not suitable for not-ending processes, and the executor service may manage a fixed small number of concurrent threads only.
Also it would be very hard to coordinate results from several threads,
so I would only recommend it for cases when you would like to launch
unrelated tasks using a thread pool. If you wanted to coordinate
computations, then I would use async
or make use of MVar
Using MVar
Although neither forkIO
nor forkOS
return anything, the primitive
MVar
exposes functions to store and retrieve values in a
multithreaded environment. I’m not going to list all of them here, for
further details just go to
frege.control.Concurrent.
My example is just launching to computations and combine them to give a result. First computation calculates a number and sleeps and the second one just return a value. Eventually both values will be added up.
useMVar = do
mvar1 <- newEmptyMVar (1)
mvar2 <- newEmptyMVar (2)
forkIO do (3)
mvar1.put $ sum [2,3] (4)
Thread.sleep 5000
println "end computation1"
forkIO do (5)
mvar2.put 10 (6)
println "end computation2"
res1 <- mvar1.take (7)
res2 <- mvar2.take (8)
return $ res1 + res2 (9)
1 | main: Creating mvar1 to handle values from first computation |
2 | main: Creating mvar2 to handle values from second computation |
3 | thr1: Launching first computation |
4 | thr1: setting mvar1 with computation result |
5 | thr2: Launching second computation |
6 | thr2: setting mvar2 with computation result |
7 | main: blocking until getting a value from mvar1 |
8 | main: blocking until getting a value from mvar2 |
9 | main: return result from |
Please notice that when executing mvarX.take it blocks
only until a value has been provided, then it will continue. That
means that when the first computation begins to sleep meanwhile res1
+ res2 is being resolved.
|
You can find some good examples of using MVar
with forkOS
and
forkIO
here.
Make it easier with async
So far it seemed we were getting too low level in order to launch a
computation in a new thread. Something like coordinating several
computations in different threads seemed a daunting task and very
difficult to reason about. The function async
enables the execution
of an IO a
computation and returns a pointer to the on-going
computation. Once you get the pointer you can choose whether to block
until getting the result, or start new computations in parallel. This
is the signature of async
async :: IO a -> IO (MVar (Exception | a))
In the following example we are launching two computations, the first one sleeps 2 seconds and then continues whereas the quick operation just return a given value.
slowerOp :: IO Int
slowerOp = do
Thread.sleep(2000)
println "Returning first op"
return 20
quickOp :: IO Int
quickOp = do
println "Returning second op"
return 40
We would like to launch both operations in parallel , eventually extract each computation’s value, and finally add them up.
divideAndConquer = do
slowerOpRef <- async slowerOp (1)
quickOpRef <- async quickOp (2)
xs <- sequence $ map extractValue [slowerOpRef, quickOpRef] (3)
return $ sum xs (4)
1 | Launch slow operation |
2 | Launch quick operation |
3 | Block until getting each computation’s values |
4 | Sum results |
In order to block and get the value returned by the operation I’m
using the take
function from the
frege.control.Concurrent
module. Because the result of calling take
returns an MVar
(Exception | a)
, meaning an MVar
of an Either Exception a
, I need
to use a case
expression.
extractValue :: MVar (Exception | Int) -> IO Int
extractValue var = do
value <- var.take (1)
case value of
Left _ -> return 0
Right x -> return x
1 | Use take to extract the value wrapped in MVar |
I’ve written a couple of alternatives to create a reusable
extractValue
function.
extractValue2 :: MVar (Exception | a) -> a -> IO a
extractValue2 mv defaultValue = do
value <- mv.take
return $ either (\_ -> defaultValue) id value
extractValue3 :: MVar (Exception | a) -> a -> IO a
extractValue3 mv defaultValue = do
value <- mv.take
case value of
Left _ -> return defaultValue
Right x -> return x