By Mario Garcia <@marioggar>

1. Intro

This workshop objective is to learn how to create actors using GPars.

1.1. What is GPars ?

The GPars framework offers Java developers intuitive and safe ways to handle Java or Groovy tasks concurrently. Leveraging the enormous flexibility of the Groovy programing language and building on proven Java technologies, we aim to make concurrent programming for multi-core hardware intuitive, robust and enjoyable.

— GPars site

1.2. What an actor is ?

There are two common ways of synchronization among threads, one is the dark and full of terrors synchronization way, and the second one is message passing. The latter is the one actor model embraces.

The actor model in computer science is a mathematical model of concurrent computation that treats "actors" as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received

— Wikipedia

An actor provides isolated mutability and ensures that:

  • At most one thread processes the actor’s body

  • Actor’s state can be safely modified by code in the body without any extra effort

To make sure those conditions will remain true the programmer should be sure that actor’s code is never invoked directly from outside the actors body.

1.3. What an actor does ?

An actor has three main related actions:

  • Receiving messages

  • Sending messages

  • Sending replies

  • Creating new actors

1.4. When to use actors ?

Whenever you need to access a given resource from multiple threads.

1.5. Why not using threads ?

Well, historically synchronizing threads has not been the best skill of Java programmers so far, so to speak. Actor model eases very much the way concurrent access could be cordinated.

Normally there is no good reason to do low level concurrent programming. It’s been always error prone and it requires a really good understanding not only about JDK’s concurrency api but the Java memory model as well.

Scalability is the other main reason. While the JVM has a certain thread limit, any actor model implementation normally has a pool of threads. If an actor is not doing anythin, it doesn’t consume threads.

Actors share a pool of threads, which are dynamically assigned to actors when the actors need to react to messages sent to them. The threads are returned back to the pool once a message has been processed and the actor is idle waiting for some more messages to arrive. Actors become detached from the underlying threads and so a relatively small thread pool can serve potentially unlimited number of actors. Virtually unlimited scalability in number of actors is the main advantage of event-based actors , which are detached from the underlying physical threads

— GPars documentation

2. Before Getting Started

What do we need to do sample code ?

  • Javatm 1.7+

  • Groovy 2.1.x

  • Gradle 1.9+

  • Git

3. Sending

Actors are all about passing messages so the first thing we are going to do is to send messages to an actor.

For this first example we are using the GroovyConsole. So open it up and create your first actor

3.1. Our first actor

(1)
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor

DefaultActor receiver = Actors.actor { (2)
    loop { (3)
        react { String msg -> (4)
            println "Hey thanks for telling me $msg"
        }
    }
}

(5)
receiver.send 'Message 1'
receiver << 'Message 2'
receiver 'Message 3'

So few lines of code and many concepts already. Let’s review all of them.

1 To work with actors in GPars you should import groovyx.gpars.actor package.
2 Actor’s dsl is created by using Actors.actor method.
3 This actor never will end without an explicit termination
4 When reacting for a given received message, the type could be established as parameter to the react closure
5 You can pass messages to an actor by using the actor’s send method. This method can be invoked in the three ways exposed in the example.

3.2. Type of actors

GPars distinguises between stateful actors and stateless actors.

We’ve just built a groovyx.gpars.actor.DefaultActor. A DefaultActor belongs to the group of stateful actors. Stateful actors allow programmer to handle implicit state directly.

Inside stateless actors GPars has DynamicDispatchActor and ReactiveActor classes.

Depending on your requirements you can choose one or another, but it is clear stateless actors will perform better than the stateful ones.

3.3. Send and forget

In real applications sometimes we just want to send some data without blocking the application and forget about the result of the data sent. A typical scenario would be logging user behavior through the application right ?

Send and forget DSL
import groovy.transform.Immutable

import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor

@Immutable (1)
class UserItem {
    String userName
    Long itemId
}

final DefaultActor loggingServiceActor = Actors.actor {
    loop {
        react { UserItem userItem -> (2)
            println "user ${userItem.userName} added item ${userItem.itemId} at ${new Date()}"
        }
    }
}

(3)
loggingServiceActor.send(new UserItem(userName:'Mario', itemId: 10L))
Thread.sleep(1000) // Adding a delay to see it working
loggingServiceActor << ([userName: 'Rober', itemId: 20L] as UserItem)

Here we’are sending a message to the actor from an outer process holding the actor’s reference.

1 Immutability is always the right choice when dealing with concurrency and parallelism. Use it as much as you can.
2 The react method closure can receive as parameter any type. Of course make sure the type you expect is the one your are actually receiving, otherwise you’ll get a cast exception.
3 Here is not another actor sending messages to our actor but a different thread (the main thread).
An actor’s message handler (react) can only expect 0 or 1 argument

Before starting the exercise, take a look how it would look like without the DSL. That will be really important when going through the exercises.

Send and forget No-DSL
import groovy.transform.Immutable

import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor

@Immutable
class UserItem {
    String userName
    Long itemId
}

class LoggingActorService extends DefaultActor { (1)
    void act() { (2)
        loop {
            react { UserItem userItem ->
                println """
                    user ${userItem.userName} added item ${userItem.itemId}
                    at ${new Date()}
                """
            }
        }
    }
}

class UserService {
    def loggingService (3)

    void trackAddedItem(final UserItem userItem) {
        loggingService << userItem
    }
}

def loggingService = new LoggingActorService()
def userService = new UserService(loggingService: loggingService)

loggingService.start() (4)

userService.trackAddedItem(new UserItem(userName: 'John', itemId: 43L))
Thread.sleep(1000)
userService.trackAddedItem(new UserItem(userName: 'Ronnie', itemId: 23L))
1 Now we are explicitly declaring a DefaultActor class
2 Now the actor’s body is constraint within the body of the act() method.
3 In order to be able to send messages to the actor we need its reference, so we pass it when building our service instance
4 Before sending messages to our actor, the actor should be already listening otherwise you will get an exception

3.4. Exercise

Create an actor receiving 4 numbers an prints all of them to the console. You can use the GroovyConsole to do that. Please try to do it using GPars DSL and plain classes.

3.5. Send And Block

Logging is really important but sometimes we just want a response as a consequence of sending data. Now we will be sending a message to a given actor and expecting a response to that data.

Let’s say we want to have some word translated we want to send the word to an actor instance and get the translation back.

SendAndWait - Reply
import static groovyx.gpars.actor.Actors.actor

final def english2SpanishService = actor {
    loop {
        react { String word ->
            if (word == 'hello') {
                reply 'hola' (1)
            } else if (word == 'goodbye') {
                reply 'adios'
            } else {
                reply 'no idea :P'
            }
        }
    }
}

String helloTranslation = english2SpanishService.sendAndWait('hello') (2)
String goodbyeTranslation = english2SpanishService.sendAndWait('goodbye')
String seeyoulaterTranslation = english2SpanishService.sendAndWait('see you later')

assert helloTranslation == 'hola'
assert goodbyeTranslation == 'adios'
assert seeyoulaterTranslation == 'no idea :P'
1 The actor can only use the reply method from a non-actor-request when the client uses sendAndWait for sending the message
2 The sendAndWait method blocks the current thread until a response has been received
SendAndPromise- Reply
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.dataflow.Promise

def reallySlowService = actor {
    loop {
        react {
            Thread.sleep(2000)
            reply 43
        }
    }
}

def reallyFastService = actor {
    loop {
        react {
            reply 'No idea'
        }
    }
}

Promise answer1 =
    reallySlowService.
        sendAndPromise('Tell me the meaning of life the universe and everything')
String answer2 =
    reallyFastService.
        sendAndWait('Tellme how is gonna end 24 this season')

assert answer1.get() == 43
assert answer2 == 'No idea'

3.6. Exercise

Now that we now how to send and receive a simple response from a given actor, lets modify the actor you built for the first exercise for instead of printing out the result, make it send back the result to you.

You have to send three different list of numbers, get back the results and sum them in the client once you have received the partial results from the actor.

The objective of this exercises is using promises or blocking calls to get results from a given actor.

4. Receiving

So far we have focused our efforts on how can we send a message to a given actor. Well in this chapter we’ll be seeing from the other side, we will be focusing on topics such as:

  • How an actor receive messages ?

  • How can an actor respond to a given message ?

  • What type of objects can an actor receive ?

  • Is an actor capable of behave differently depending on the object received ?

4.1. A specific type of messages

So far we’ve seen how to send one specific type of messages to our actors.

Receiving one specific type
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.actor.DefaultActor

final DefaultActor echoActor = actor {
    loop {
        react { String message ->
            reply "Nice to hear: $message"
        }
    }
}

String response = echoActor.sendAndWait('Hi there')

assert response == "Nice to hear: Hi there"
If you try to send a message with another value not expected in the actor’s closure, it will throw an exception

4.2. Different types of messages

That’s cool, but sometimes you may want to send different types of messages to a given object and you may expect the actor to behave differently depending on the type received.

That kind of behavior is acomplished by DynamicDispatchActor. Whether you use the DSL or the Java flavor you will be able to create different processing for different type of messages using the same actor.

Dispatching DSL
import static groovyx.gpars.actor.Actors.messageHandler
import groovyx.gpars.actor.DynamicDispatchActor

DynamicDispatchActor handler = messageHandler {
    when { String name ->
       println "Nice to meet you $name"
    }
    when { Integer age ->
        println "You look younger than $age"
    }
}

handler << "Mario"
handler << 37
Dispatching NO-DSL
import groovyx.gpars.actor.DynamicDispatchActor

class Handler extends DynamicDispatchActor {
    void onMessage(String name) {
        println "Nice to meet you $name"
    }

    void onMessage(Integer age) {
        println "You look younger than $age"
    }

}

final Handler handler = new Handler()
handler.start()

handler << "Mario"
handler << 37
If you send a value of a type not expected by any of the when clauses, it will throw an exception.

4.3. Exercise

You have to create an actor that processes national and international orders. All orders have a field call content where they carry the message.

For national orders no reply is needed because all national orders are supposed to reach destination. But for international orders you need the actor to reply with a MD5 signature of the message (I’ll give you the code to get the MD5).

Getting MD5 for a given String
import static java.security.MessageDigest.getInstance

String md5(String st) {
    return getInstance("MD5").digest(st.getBytes("UTF-8")).encodeHex().toString()
}

5. Make Actors socialize

So far we have been sending messages to an actor, receiving messages as an actor, and even replying as an actor but there’s one important part missing here… I want actors to talk to each other. Lets see how to do it.

5.1. Joining actors

When you want callers to wait an actor to terminate you should be using actor’s join() method. Sometimes maybe a client calling to an actor, sometimes could be an actor sending messages to another actor.

Actors provide a join() method to allow callers to wait for the actor to terminate. A variant accepting a timeout is also available

— GPars official documentation
Joining DSL
import static groovyx.gpars.actor.Actors.actor

final receiver = actor {
    loop {
        react { msg ->
            reply "Replying actor received: '$msg'"
        }
    }
}

final sender = actor {
    receiver << "Sent Ping"
    react { msg ->
        println msg
    }
}

[receiver, sender]*.join()
println "Never reached"
Joining NO-DSL
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.actor.DefaultActor

class SendingActor extends DefaultActor {
    final DefaultActor destination
    SendingActor(DefaultActor destination) {
       this.destination = destination
    }
    void act() {
        println destination.sendAndWait("Sent Ping")
    }
}

class ReplyingActor extends DefaultActor {
    void act() {
        loop {
            react { String msg ->
                reply "ReplyingActor received: $msg'"
            }
        }
    }
}

final receiver = new ReplyingActor().start()
final sender = new SendingActor(receiver).start()

[receiver, sender]*.join()
println "Never reached"

5.2. Forwarding

Suppose you want to send a message to a given actor and then get the response back to send it to another actor. Well good news is that you can set where the response should be sent.

Simple forwarding NO-DSL
import groovyx.gpars.actor.DefaultActor

class WithId extends DefaultActor {
    String id
    String toString() { return this.id }
}

class Bank extends WithId {
    void act() {
        loop {
            react { Integer amount ->
                reply "Amount $amount has been stored in -- ${sender} --"
            }
        }
    }
}
class BankBranch extends WithId {
    void act() {
        loop {
            react { String message ->
                println message
            }
        }
    }
}

final branch1 = new BankBranch(id: '0001').start()
final branch2 = new BankBranch(id: '0002').start()
final bank = new Bank(id: 'SuperRichBank').start()

bank.send(1000, branch1)
bank.send(2000, branch2)

println bank.sendAndWait(3000)

5.3. Exercise

You can forward messages if you know the destination but most of the time you just deliver messages to a proxy which then, forwards the message to the proper destination.

In this exercise your boss need you to do some optimization on some files of the corporate site.

  • All text files should include his name

  • All images should be resized to a certain dimension (ie 300x300)

You have to create

  • 1 actor acting as a proxy, so every time a file is received you must send it to the correspondent actor to be processed.

  • 1 Text processing actor

  • 1 Image processing actor

Every time you send a file to the proxy actor it should forward that file to the correspondent processing actor (text files to text actor and images to image actor).

You can solve this exercise just with DefaultActor instances
Although not necessary you can use the Java7 WatchService to look for changes in a given directory instead of sending files manually to your actors.
Follow up I leave a snippet to resize an image with the library imgscalr
@Grab('org.imgscalr:imgscalr-lib:4.2')
import java.awt.image.BufferedImage
import javax.imageio.ImageIO

import org.imgscalr.Scalr

BufferedImage src = ImageIO.read(new File('//source'))
BufferedImage img = Scalr.resize(src, 300, 200)

ImageIO.write(img, 'jpg', new File('//destination'))

6. Shit happens

If something may go wrong it will go wrong. It’s a universal rule isn’t it ?

6.1. Actor throws an exception

An actor terminates automatically when an exception that is not handled inside actor’s body is thrown.

Then… What happens if an actor throws an exception ? Should we try-catch the possible exception and deal with it ? Well, general recomendation about this is let-it-crash. If we don’t add try-catch code in our actors, they will remain cleaner and will be easier to maintain.

During the lifecycle of our actor there are some callback methods useful to use when undesirable situations such as exceptions, interruptions, or timeouts happen.


/* called when the actor's thread gets interrupted. Thread interruption will result
 * in the stopping the actor in any case  */
onInterrupt(InterruptedException ie)

/* called when no messages are sent to the actor within the timeout specified for
 * the currently blocking react method */
onTimeout()

/* called when an exception occurs in the actor's event handler.
 * Actor will stop after return from this method. */
onException(Throwable th)

If the worst scenario happens we would like at least to terminate the actor in an ordered and controlled way.

In order to to that all actors have the onException(Exception) method. This method is a nice place to put the code we want to execute when a not handled exception is thrown inside the actor. That way we keep logic code away from guard code.

Remember: By default an actor stops when an exception has been thrown within the body of the actor.
Handling an actor exception DSL
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor

DefaultActor buggyActor = Actors.actor {
    delegate.metaClass.onException = {
        println "Close database connection"
        println "Release other resources"
        println "Bye"
    }
    loop {
        react { String message ->
            throw new Exception("Shit happens :P")
        }
    }
}

buggyActor << "Something nice"
Handling an actor exception NO-DSL
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor

class BuggyActor extends DefaultActor {

    void onException(Throwable th) {
        println "Close database connection"
        println "Release other resources"
        println "Bye"
    }

    void act() {
        loop {
            react { String message ->
                throw new Exception("Shit happens :P")
            }
        }
    }
}

new BuggyActor().start() << "Something nice"

Well it’s nice we have a mecanism to make the actor stop nicely… but I don’t want the actor to stop receiving messages… I want it to continue… how do I do that ?

6.2. Supervising

Supervising is a really useful feature because you could:

  • Get notified of changes on other actors

  • Take control over them (stopping, restarting…etc)

Although GPars doesn’t have that functionality out-of-the-box yet, I’ll try to implement a very simple example on how supervisors may look like using GPars.

First thing is to implement the skeleton of what a supervised actor may look like. A Supervised object is an actor that can inform to its linked Supervisor before it stops, giving information about the reasons that made it stop.

Supervised Actor

import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor

class SupervisedExceptionMessage { (1)
   String message
}

class Supervised extends DefaultActor {
    Supervisor supervisor  (2)
    void onException(Throwable th) {
        supervisor << new SupervisedExceptionMessage(message: th.message) (3)
    }
}
1 When supervised actor crashes it will create an exception message to send it to its supervisor
2 We need to hold a supervisor’s reference in order to be able to inform it whenever may come
3 Everytime an actor throws an unhandled exception the onException(Throwable) is invoked. Once the onException() method ends the actor stops. Before that we want to inform the supervisor in case it wanted to do something about it.
Supervisor Actor
import groovyx.gpars.actor.DynamicDispatchActor

class Supervisor extends DynamicDispatchActor { (1)
    Supervised supervised (2)
    Class<? extends Supervised> supervisedClass (3)

    void link(Supervised supervised) { (4)
        this.supervised = supervised
        this.supervised.supervisor = this
        this.supervisedClass = supervised.getClass()
    }
}

A Supervisor is an actor that supervises other actor and makes decisions whenever a supervised actor stops because a failure (an unhandled exception most of the time).

The Supervisor instance can be linked to the supervised actor thanks to the supervisor’s link method. This method makes actors available to each other meaning that Supervisor's instance can receive supervised’s SupervisedExceptionMessage among other things.

1 A Supervisor extends DynamicDispatchActor. It could receive many other type of messages other than SupervisedExceptionMessage.
2 A reference to the supervised actor
3 The reference to the supervised actor’s class. This is useful in case the supervisor wanted to create a new instance of a supervised actor that stopped because of an unhandled exception.
4 In order to link (I took this term from Scala Akka) supervised instance to this supervisor we invoke supervisor’s link method passing the supervised actor as parameter.

Lets see how to use it.

Supervising Example
class SimpleSupervisedActor extends Supervised {
    void act() {
        loop {
            react { String message ->
                if (message == 'gift') {
                    throw new Exception(message)
                }
                println "NORMAL: $message"
            }
        }
    }
}

def supervised = new SimpleSupervisedActor().start()
def supervisor = new Supervisor() {
    void onMessage(String query) {
        supervised << query
    }
    void onMessage(SupervisedException ex) {
        println "SUPERVISED ACTOR DIES WITH MESSAGE: ${ex.message}"
        supervised = supervisedClass.newInstance().start() (1)
    }
}.start()

supervisor.link(supervised)

supervisor << "something"
Thread.sleep(1000)
supervisor << "something"
Thread.sleep(1000)
supervisor << "gift"
Thread.sleep(1000)
supervisor << "Message processed by a new supervised instance 1"
Thread.sleep(1000)
supervisor << "Message processed by a new supervised instance 2"

This time we have two actors. We’ve got a Supervised actor receiving messages through its Supervisor. This way in case the supervised actor stops, its Supervisor could still create another Supervised actor of the same type and keep processing messages transparently to the client.

1 The interesting part. Supervisor receives a Supervised’s SupervisedException message when it’s about to stop. Then the Supervisor instance has chosen to create another Supervised instance to keep processing messages the same way.

Enough to say, the more stateless is the Supervised actor the less difficult will be to restart the Supervised actor again

What "restart" means ? In this example, and in general, in other JVM’s implementations, to restart and actor is eventually creating a new instance of the same type of the stopped actor. That’s why it’s so important to keep your actor instances stateless. If you had some state you should be serializing the state of the actor in order to recreate the state later on.

6.2.1. OneToAll Strategy

(TODO) or

6.2.2. OneToOne Strategy

(TODO) and

7. Stateless Actors

8. Remoting

Remoting has become part of GPars since version 1.3-SNAPSHOT . That’s great news!!!! Until this moment all actors should run in the same JVM in order to be able to communicate each other. Now you can be running an actor in one machine sending messages to another in another machine.

You can find more information about remoting at GPars ascii documentation, and at this codehaus page Confluence

8.1. Dataflow

Although this documentation is all about actors, I’ve added some extra examples about remoting because I think remoting is going to be one of the most important features of GPars in the near future.

When working with remote dataflows the most important classes to keep in mind are:

imports
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.remote.RemoteDataflows

The following example just runs a server instance which publish a dataflow variable and a client asking for the value of the dataflow variable published at the server.

First of all lets see how to build the server:

Server
    RemoteDataflows startServer() {
        def dataflows = RemoteDataflows.create() (1)
        dataflows.startServer('localhost',9000) (2)
        def variable = new DataflowVariable() (3)
        variable << 42
        dataflows.publish(variable,"meaning-of-life") (4)

        return dataflows
    }
1 Create an instance of a RemoteDataflows
2 Stablishing the address and port of the server instance and start the server instance. Notice that what startServer returns is not an instance of RemoteDataflows but an instance of LocalHost.
3 Declaring a dataflow variable and setting a value
4 Publishing the dataflow variable

And then it’s time to create the client:

Client
    void 'reading a simple dataflow variable'() {
        setup: 'starting server exposing dataflow variable'
            def serverDataflows = startServer() (1)
        when: 'listening to that server'
            def clientDataflows = RemoteDataflows.create() (2)
            def remoteVariablePromise = (3)
                clientDataflows.getVariable(
                    "localhost",
                    9000,
                    "meaning-of-life"
                )
        and: 'asking for a given dataflow variable'
            def result = remoteVariablePromise.get() (4)
        then: 'you should get what you expected'
            result.value == 42
        cleanup: 'stopping server instance'
            serverDataflows.stopServer() (5)
    }
1 Now it’s time to build and run the server
2 Create an instance of RemoteDataflows
3 Getting a pointer to the dataflow variable published at the server
4 Once we get the reference we can start asking for the variable’s value
5 Stopping the server

8.2. Actors

The way actors are published is very similar than the way dataflows were published. Instead of using the RemoteDataflows we will be using the RemoteActors instance.

RemoteActors have 2 responsabilities:

  • To create a remote actor’s server: Actors' instances will be published to that server using an id (a simple string).

  • To enable clients to get a reference of remote actors: That will be possible by setting address, port and the actor’s identifier at the server.

Imports
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.actor.remote.RemoteActors
Full example
    void 'simple remote actors test'() {
        given: 'a remote actor server'
            (1)
            def remoteActorServer = RemoteActors.create()
            remoteActorServer.startServer('localhost',9000)
        and: 'a simple DefaultActor instance'
            (2)
            def simpleActor = actor {
                loop {
                    react { String msg ->
                        reply "Thanks for saying: $msg"
                    }
                }
            }
        when: 'publishing the actor remotely'
            (3)
            remoteActorServer.publish(simpleActor, 'greetingsActor')
        and: 'getting the actor reference'
            (4)
            def remoteActorsClient = RemoteActors.create()
            def actorPromise =
                remoteActorsClient
                    .get('localhost', 9000, 'greetingsActor')
            (5)
            def remoteActor = actorPromise.get()
        and: 'sending a message to actor'
            (6)
            def result = remoteActor.sendAndWait('hello')
        then: 'the reply should be the expected'
             result == 'Thanks for saying: hello'
        cleanup: 'shutting down the actor server'
            remoteActorServer.stopServer()
    }
1 Creating the RemoteActors instance. This instance will create the remote server where actors will be published.
2 This is a regular groovyx.gpars.actor.DefaultActor instance.
3 Publishing the actor’s instance to the server
4 Creating another RemoteActors instance to simulate client won’t be able to access the server RemoteActors instance. Once we get the RemoteActors instance we ask for the actor instance published with the id greetingsActor.
5 The RemoteActors instance returned a promise containing the requested actor.
6 Then we can send messages (serialized) to the remote actor like if we had the actor working locally.

Appendix A: Exercises solutions

Solution 3.4

Solution DSL
import static groovyx.gpars.actor.Actors.actor

def printer = actor {
    loop {
        react { List<Integer> numbers ->
           println numbers.sum()
        }
    }
}

printer << (1..4) as List
Solution NO-DSL
import groovyx.gpars.actor.DefaultActor

class Printer extends DefaultActor {
    void act() {
        loop {
            react { numbers ->
                println numbers.sum()
            }
        }
    }
}

def printer = new Printer()
printer.start()
printer << (1..4)

Solution 3.6

Solution DSL
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.dataflow.Promise

def printer = actor {
    loop {
        react { List<Integer> numbers ->
           reply numbers.sum()
        }
    }
}

Promise first = printer.sendAndPromise((1..4))
Integer second = printer.sendAndWait((4..8))
Integer third = printer.sendAndWait((8..12))

assert (first.get() + second + third) == 90
Solution NO-DSL
import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.dataflow.Promise

class Printer extends DefaultActor {
    void act() {
        loop {
            react { numbers ->
                reply numbers.sum()
            }
        }
    }
}

def printer = new Printer()
printer.start()

Promise first = printer.sendAndPromise((1..4))
Integer second = printer.sendAndWait((4..8))
Integer third = printer.sendAndWait((8..12))

assert (first.get() + second + third) == 90

Solution 4.3

Solution DSL

import static groovyx.gpars.actor.Actors.messageHandler
import static java.security.MessageDigest.getInstance

import groovyx.gpars.actor.DynamicDispatchActor
import groovyx.gpars.dataflow.Promise

class Order { String content }
class NationalOrder extends Order {}
class InternationalOrder extends Order {}

final Closure<String> md5 = { String st ->
    return getInstance("MD5").digest(st.getBytes("UTF-8")).encodeHex().toString()
}

final DynamicDispatchActor orderProcessor = messageHandler {
    when { InternationalOrder international ->
        reply md5(international.content)
    }
    when { NationalOrder national ->
        println "I'm sure the order: ${national.content} will be delivered"
    }
}

Promise md5IdPromise =
    orderProcessor.sendAndPromise(new InternationalOrder(content: 'supersecret'))

orderProcessor << new NationalOrder(content: 'something really silly')
orderProcessor << new NationalOrder(content: 'something as silly as the previous one')

println "I need to hold the id: ${md5IdPromise.get()}"
Solution NO-DSL
import static groovyx.gpars.actor.Actors.messageHandler
import static java.security.MessageDigest.getInstance

import groovyx.gpars.actor.DynamicDispatchActor
import groovyx.gpars.dataflow.Promise

class Order { String content }
class NationalOrder extends Order {}
class InternationalOrder extends Order {}

class OrderProcessor extends DynamicDispatchActor {
    String md5(String st) {
        return getInstance("MD5").digest(st.getBytes("UTF-8")).encodeHex().toString()
    }
    void onMessage(InternationalOrder international) {
        reply md5(international.content)
    }
    void onMessage(NationalOrder national) {
        println "I'm sure the order: ${national.content} will be delivered"
    }
}

final OrderProcessor orderProcessor = new OrderProcessor().start()
final Promise md5IdPromise =
    orderProcessor.sendAndPromise(new InternationalOrder(content: 'supersecret'))

orderProcessor << new NationalOrder(content: 'something really silly')
orderProcessor << new NationalOrder(content: 'something as silly as the previous one')

println "I need to hold the id: ${md5IdPromise.get()}"

Solution 5.3

DSL
@Grab('org.imgscalr:imgscalr-lib:4.2')
import static javax.imageio.ImageIO.read
import static javax.imageio.ImageIO.write
import static org.imgscalr.Scalr.resize
import static groovyx.gpars.actor.Actors.actor
import static java.io.File.createTempFile

import groovyx.gpars.actor.DefaultActor

final DefaultActor imageActor = actor {
    loop {
        react { File image ->
            def src = read(image)
            def img = resize(src, 400, 300)

            write(img, 'jpg', createTempFile('actors_', image.name))

            println "IMAGE: ${image.name} processed"
        }
    }
}

final DefaultActor textActor = actor {
    loop {
        react { File textFile ->
            def dst = createTempFile('actors_', textFile.name)

            // Don't do this at home :P
            dst << "Author: MYBOSS"
            dst << textFile.text

            println "TEXT: ${textFile.name} processed"
        }
    }

}

final DefaultActor dispatchingActor = actor {
    loop {
        react { File file ->
            if (file.name.endsWith('jpg')) {
                imageActor << file
            } else if (file.name.endsWith('txt')) {
                textActor << file
            }
        }
    }
}

new File('/somedirwithfiles/').
    listFiles().each { dispatchingActor << it }

Because the NO-DSL version has its own Spock Specification it has been organized in several classes.

Actor acting as a Proxy (NO-DSL)
package exercises.forwarding.images

import groovyx.gpars.actor.DefaultActor

class FileProcessorActor extends DefaultActor {

    DefaultActor imageActor
    DefaultActor textActor

    void act() {
        loop {
            react { File file2Process ->
                println "Start to processing ${file2Process.name}"
                if (isImageFile(file2Process)) {
                    sendToImageActor(file2Process)
                }
                if (isTextFile(file2Process)) {
                    sendToTextActor(file2Process)
                }
            }
        }
    }

    Boolean isImageFile(File file2Process) {
        return doesItEndWith(file2Process, 'jpg')
    }

    Boolean isTextFile(File file2Process) {
        return doesItEndWith(file2Process, 'txt')
    }

    Boolean doesItEndWith(File file, String ending) {
        return file.name.endsWith(ending)
    }

    void sendToImageActor(File file) {
        imageActor << file
    }

    void sendToTextActor(File file) {
        textActor << file
    }

}
Actor processing images (NO-DSL)
package exercises.forwarding.images

import static javax.imageio.ImageIO.read
import static javax.imageio.ImageIO.write
import static org.imgscalr.Scalr.resize
import static java.io.File.createTempFile

import exercises.test.NotificationActor
import groovyx.gpars.actor.DefaultActor

class ImageProcessorActor extends NotificationActor {

    ImageProcessorActor(DefaultActor monitor) {
        super(monitor)
    }

    void act() {
        loop {
            react { File image ->
                def src = read(image)
                def img = resize(src, 300, 200)
                def tmp = new File(System.getProperty('java.io.tmpdir'))

                write(img, 'jpg', new File(tmp, "processed_${image.name}"))

                notifyIfMonitor(image.name)
            }
        }
    }

}
Actor processing text files (NO-DSL)
package exercises.forwarding.images

import static java.io.File.createTempFile

import exercises.test.NotificationActor
import groovyx.gpars.actor.DefaultActor

class TextProcessorActor extends NotificationActor {

    TextProcessorActor(DefaultActor monitor) {
        super(monitor)
    }

    void act() {
        loop {
            react { File textFile ->
                def tmpfile = new File(System.getProperty('java.io.tmpdir'))
                def newFile = new File(tmpfile, "processed_${textFile.name}")

                newFile << 'Author: me'
                newFile << textFile.text

                notifyIfMonitor(textFile.name)
            }
        }
    }

}
Specification (NO-DSL)
package exercises.forwarding.images

import static groovyx.gpars.actor.Actors.actor

import exercises.test.NotifiedActor
import spock.lang.Specification

class ImagesProcessingSpecification extends Specification {

    final File RESOURCES_DIR = 'src/test/resources/exercises/forwarding/images/' as File
    final File TEMPORARY_DIR = new File(System.getProperty('java.io.tmpdir'))

    def 'Processing successfully images'() {
        setup: 'Creating needed directories references'
            def monitorActor = new NotifiedActor(3).start()
            def imageActor   = new ImageProcessorActor(monitorActor).start()
            def textActor    = new TextProcessorActor(monitorActor).start()
            def proxyActor   = new FileProcessorActor(imageActor: imageActor, textActor: textActor).start()
        when: 'Sending a couple of image files'
            proxyActor << new File(RESOURCES_DIR, 'image1.jpg')
            proxyActor << new File(RESOURCES_DIR, 'image2.jpg')
            proxyActor << new File(RESOURCES_DIR, 'text.txt')
        and: 'Expecting process to finish'
            monitorActor.join()
        and: 'Looking for processed files'
            def file1 = new File(TEMPORARY_DIR, 'processed_text.txt')
            def file2 = new File(TEMPORARY_DIR, 'processed_image1.jpg')
            def file3 = new File(TEMPORARY_DIR, 'processed_image2.jpg')
        then: 'We should make sure they exist'
            [file1, file2, file3].every { it.exists() }
        cleanup: 'Deleting processed files'
            [file1, file2, file3]*.delete()
    }
}

I’ve used the NotificationActor and NotifiedActor to make tests wait until everything has been processed.