By Mario Garcia <@marioggar>
1. Intro
This workshop objective is to learn how to create actors using GPars.
1.1. What is GPars ?
The GPars framework offers Java developers intuitive and safe ways to handle Java or Groovy tasks concurrently. Leveraging the enormous flexibility of the Groovy programing language and building on proven Java technologies, we aim to make concurrent programming for multi-core hardware intuitive, robust and enjoyable.
1.2. What an actor is ?
There are two common ways of synchronization among threads, one is the dark and full of terrors synchronization way, and the second one is message passing. The latter is the one actor model embraces.
The actor model in computer science is a mathematical model of concurrent computation that treats "actors" as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received
An actor provides isolated mutability and ensures that:
-
At most one thread processes the actor’s body
-
Actor’s state can be safely modified by code in the body without any extra effort
To make sure those conditions will remain true the programmer should be sure that actor’s code is never invoked directly from outside the actors body.
1.3. What an actor does ?
An actor has three main related actions:
-
Receiving messages
-
Sending messages
-
Sending replies
-
Creating new actors
1.4. When to use actors ?
Whenever you need to access a given resource from multiple threads.
1.5. Why not using threads ?
Well, historically synchronizing threads has not been the best skill of Java programmers so far, so to speak. Actor model eases very much the way concurrent access could be cordinated.
Normally there is no good reason to do low level concurrent programming. It’s been always error prone and it requires a really good understanding not only about JDK’s concurrency api but the Java memory model as well. |
Scalability is the other main reason. While the JVM has a certain thread limit, any actor model implementation normally has a pool of threads. If an actor is not doing anythin, it doesn’t consume threads.
Actors share a pool of threads, which are dynamically assigned to actors when the actors need to react to messages sent to them. The threads are returned back to the pool once a message has been processed and the actor is idle waiting for some more messages to arrive. Actors become detached from the underlying threads and so a relatively small thread pool can serve potentially unlimited number of actors. Virtually unlimited scalability in number of actors is the main advantage of event-based actors , which are detached from the underlying physical threads
2. Before Getting Started
What do we need to do sample code ?
-
Javatm 1.7+
-
Groovy 2.1.x
-
Gradle 1.9+
-
Git
3. Sending
Actors are all about passing messages so the first thing we are going to do is to send messages to an actor.
For this first example we are using the GroovyConsole. So open it up and create your first actor
3.1. Our first actor
(1)
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor
DefaultActor receiver = Actors.actor { (2)
loop { (3)
react { String msg -> (4)
println "Hey thanks for telling me $msg"
}
}
}
(5)
receiver.send 'Message 1'
receiver << 'Message 2'
receiver 'Message 3'
So few lines of code and many concepts already. Let’s review all of them.
1 | To work with actors in GPars you should import groovyx.gpars.actor package. |
2 | Actor’s dsl is created by using Actors.actor method. |
3 | This actor never will end without an explicit termination |
4 | When reacting for a given received message, the type could be established as parameter to the react closure |
5 | You can pass messages to an actor by using the actor’s send method. This method can be invoked in the three ways exposed in the example. |
3.2. Type of actors
GPars distinguises between stateful actors and stateless actors.
We’ve just built a groovyx.gpars.actor.DefaultActor. A DefaultActor belongs to the group of stateful actors. Stateful actors allow programmer to handle implicit state directly.
Inside stateless actors GPars has DynamicDispatchActor and ReactiveActor classes.
Depending on your requirements you can choose one or another, but it is clear stateless actors will perform better than the stateful ones. |
3.3. Send and forget
In real applications sometimes we just want to send some data without blocking the application and forget about the result of the data sent. A typical scenario would be logging user behavior through the application right ?
import groovy.transform.Immutable
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor
@Immutable (1)
class UserItem {
String userName
Long itemId
}
final DefaultActor loggingServiceActor = Actors.actor {
loop {
react { UserItem userItem -> (2)
println "user ${userItem.userName} added item ${userItem.itemId} at ${new Date()}"
}
}
}
(3)
loggingServiceActor.send(new UserItem(userName:'Mario', itemId: 10L))
Thread.sleep(1000) // Adding a delay to see it working
loggingServiceActor << ([userName: 'Rober', itemId: 20L] as UserItem)
Here we’are sending a message to the actor from an outer process holding the actor’s reference.
1 | Immutability is always the right choice when dealing with concurrency and parallelism. Use it as much as you can. |
2 | The react method closure can receive as parameter any type. Of course make sure the type you expect is the one your are actually receiving, otherwise you’ll get a cast exception. |
3 | Here is not another actor sending messages to our actor but a different thread (the main thread). |
An actor’s message handler (react) can only expect 0 or 1 argument |
Before starting the exercise, take a look how it would look like without the DSL. That will be really important when going through the exercises.
import groovy.transform.Immutable
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor
@Immutable
class UserItem {
String userName
Long itemId
}
class LoggingActorService extends DefaultActor { (1)
void act() { (2)
loop {
react { UserItem userItem ->
println """
user ${userItem.userName} added item ${userItem.itemId}
at ${new Date()}
"""
}
}
}
}
class UserService {
def loggingService (3)
void trackAddedItem(final UserItem userItem) {
loggingService << userItem
}
}
def loggingService = new LoggingActorService()
def userService = new UserService(loggingService: loggingService)
loggingService.start() (4)
userService.trackAddedItem(new UserItem(userName: 'John', itemId: 43L))
Thread.sleep(1000)
userService.trackAddedItem(new UserItem(userName: 'Ronnie', itemId: 23L))
1 | Now we are explicitly declaring a DefaultActor class |
2 | Now the actor’s body is constraint within the body of the act() method. |
3 | In order to be able to send messages to the actor we need its reference, so we pass it when building our service instance |
4 | Before sending messages to our actor, the actor should be already listening otherwise you will get an exception |
3.4. Exercise
Create an actor receiving 4 numbers an prints all of them to the console. You can use the GroovyConsole to do that. Please try to do it using GPars DSL and plain classes.
3.5. Send And Block
Logging is really important but sometimes we just want a response as a consequence of sending data. Now we will be sending a message to a given actor and expecting a response to that data.
Let’s say we want to have some word translated we want to send the word to an actor instance and get the translation back.
import static groovyx.gpars.actor.Actors.actor
final def english2SpanishService = actor {
loop {
react { String word ->
if (word == 'hello') {
reply 'hola' (1)
} else if (word == 'goodbye') {
reply 'adios'
} else {
reply 'no idea :P'
}
}
}
}
String helloTranslation = english2SpanishService.sendAndWait('hello') (2)
String goodbyeTranslation = english2SpanishService.sendAndWait('goodbye')
String seeyoulaterTranslation = english2SpanishService.sendAndWait('see you later')
assert helloTranslation == 'hola'
assert goodbyeTranslation == 'adios'
assert seeyoulaterTranslation == 'no idea :P'
1 | The actor can only use the reply method from a non-actor-request when the client uses sendAndWait for sending the message |
2 | The sendAndWait method blocks the current thread until a response has been received |
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.dataflow.Promise
def reallySlowService = actor {
loop {
react {
Thread.sleep(2000)
reply 43
}
}
}
def reallyFastService = actor {
loop {
react {
reply 'No idea'
}
}
}
Promise answer1 =
reallySlowService.
sendAndPromise('Tell me the meaning of life the universe and everything')
String answer2 =
reallyFastService.
sendAndWait('Tellme how is gonna end 24 this season')
assert answer1.get() == 43
assert answer2 == 'No idea'
3.6. Exercise
Now that we now how to send and receive a simple response from a given actor, lets modify the actor you built for the first exercise for instead of printing out the result, make it send back the result to you.
You have to send three different list of numbers, get back the results and sum them in the client once you have received the partial results from the actor.
The objective of this exercises is using promises or blocking calls to get results from a given actor.
4. Receiving
So far we have focused our efforts on how can we send a message to a given actor. Well in this chapter we’ll be seeing from the other side, we will be focusing on topics such as:
-
How an actor receive messages ?
-
How can an actor respond to a given message ?
-
What type of objects can an actor receive ?
-
Is an actor capable of behave differently depending on the object received ?
4.1. A specific type of messages
So far we’ve seen how to send one specific type of messages to our actors.
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.actor.DefaultActor
final DefaultActor echoActor = actor {
loop {
react { String message ->
reply "Nice to hear: $message"
}
}
}
String response = echoActor.sendAndWait('Hi there')
assert response == "Nice to hear: Hi there"
If you try to send a message with another value not expected in the actor’s closure, it will throw an exception |
4.2. Different types of messages
That’s cool, but sometimes you may want to send different types of messages to a given object and you may expect the actor to behave differently depending on the type received.
That kind of behavior is acomplished by DynamicDispatchActor. Whether you use the DSL or the Java flavor you will be able to create different processing for different type of messages using the same actor.
import static groovyx.gpars.actor.Actors.messageHandler
import groovyx.gpars.actor.DynamicDispatchActor
DynamicDispatchActor handler = messageHandler {
when { String name ->
println "Nice to meet you $name"
}
when { Integer age ->
println "You look younger than $age"
}
}
handler << "Mario"
handler << 37
import groovyx.gpars.actor.DynamicDispatchActor
class Handler extends DynamicDispatchActor {
void onMessage(String name) {
println "Nice to meet you $name"
}
void onMessage(Integer age) {
println "You look younger than $age"
}
}
final Handler handler = new Handler()
handler.start()
handler << "Mario"
handler << 37
If you send a value of a type not expected by any of the when clauses, it will throw an exception. |
4.3. Exercise
You have to create an actor that processes national and international orders. All orders have a field call content where they carry the message.
For national orders no reply is needed because all national orders are supposed to reach destination. But for international orders you need the actor to reply with a MD5 signature of the message (I’ll give you the code to get the MD5).
import static java.security.MessageDigest.getInstance
String md5(String st) {
return getInstance("MD5").digest(st.getBytes("UTF-8")).encodeHex().toString()
}
5. Make Actors socialize
So far we have been sending messages to an actor, receiving messages as an actor, and even replying as an actor but there’s one important part missing here… I want actors to talk to each other. Lets see how to do it.
5.1. Joining actors
When you want callers to wait an actor to terminate you should be using actor’s join() method. Sometimes maybe a client calling to an actor, sometimes could be an actor sending messages to another actor.
Actors provide a join() method to allow callers to wait for the actor to terminate. A variant accepting a timeout is also available
import static groovyx.gpars.actor.Actors.actor
final receiver = actor {
loop {
react { msg ->
reply "Replying actor received: '$msg'"
}
}
}
final sender = actor {
receiver << "Sent Ping"
react { msg ->
println msg
}
}
[receiver, sender]*.join()
println "Never reached"
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.actor.DefaultActor
class SendingActor extends DefaultActor {
final DefaultActor destination
SendingActor(DefaultActor destination) {
this.destination = destination
}
void act() {
println destination.sendAndWait("Sent Ping")
}
}
class ReplyingActor extends DefaultActor {
void act() {
loop {
react { String msg ->
reply "ReplyingActor received: $msg'"
}
}
}
}
final receiver = new ReplyingActor().start()
final sender = new SendingActor(receiver).start()
[receiver, sender]*.join()
println "Never reached"
5.2. Forwarding
Suppose you want to send a message to a given actor and then get the response back to send it to another actor. Well good news is that you can set where the response should be sent.
import groovyx.gpars.actor.DefaultActor
class WithId extends DefaultActor {
String id
String toString() { return this.id }
}
class Bank extends WithId {
void act() {
loop {
react { Integer amount ->
reply "Amount $amount has been stored in -- ${sender} --"
}
}
}
}
class BankBranch extends WithId {
void act() {
loop {
react { String message ->
println message
}
}
}
}
final branch1 = new BankBranch(id: '0001').start()
final branch2 = new BankBranch(id: '0002').start()
final bank = new Bank(id: 'SuperRichBank').start()
bank.send(1000, branch1)
bank.send(2000, branch2)
println bank.sendAndWait(3000)
5.3. Exercise
You can forward messages if you know the destination but most of the time you just deliver messages to a proxy which then, forwards the message to the proper destination.
In this exercise your boss need you to do some optimization on some files of the corporate site.
-
All text files should include his name
-
All images should be resized to a certain dimension (ie 300x300)
You have to create
-
1 actor acting as a proxy, so every time a file is received you must send it to the correspondent actor to be processed.
-
1 Text processing actor
-
1 Image processing actor
Every time you send a file to the proxy actor it should forward that file to the correspondent processing actor (text files to text actor and images to image actor).
You can solve this exercise just with DefaultActor instances |
Although not necessary you can use the Java7 WatchService to look for changes in a given directory instead of sending files manually to your actors. |
Follow up I leave a snippet to resize an image with the library imgscalr |
@Grab('org.imgscalr:imgscalr-lib:4.2')
import java.awt.image.BufferedImage
import javax.imageio.ImageIO
import org.imgscalr.Scalr
BufferedImage src = ImageIO.read(new File('//source'))
BufferedImage img = Scalr.resize(src, 300, 200)
ImageIO.write(img, 'jpg', new File('//destination'))
6. Shit happens
If something may go wrong it will go wrong. It’s a universal rule isn’t it ?
6.1. Actor throws an exception
An actor terminates automatically when an exception that is not handled inside actor’s body is thrown.
Then… What happens if an actor throws an exception ? Should we try-catch the possible exception and deal with it ? Well, general recomendation about this is let-it-crash. If we don’t add try-catch code in our actors, they will remain cleaner and will be easier to maintain.
During the lifecycle of our actor there are some callback methods useful to use when undesirable situations such as exceptions, interruptions, or timeouts happen.
/* called when the actor's thread gets interrupted. Thread interruption will result
* in the stopping the actor in any case */
onInterrupt(InterruptedException ie)
/* called when no messages are sent to the actor within the timeout specified for
* the currently blocking react method */
onTimeout()
/* called when an exception occurs in the actor's event handler.
* Actor will stop after return from this method. */
onException(Throwable th)
If the worst scenario happens we would like at least to terminate the actor in an ordered and controlled way.
In order to to that all actors have the onException(Exception) method. This method is a nice place to put the code we want to execute when a not handled exception is thrown inside the actor. That way we keep logic code away from guard code.
Remember: By default an actor stops when an exception has been thrown within the body of the actor. |
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor
DefaultActor buggyActor = Actors.actor {
delegate.metaClass.onException = {
println "Close database connection"
println "Release other resources"
println "Bye"
}
loop {
react { String message ->
throw new Exception("Shit happens :P")
}
}
}
buggyActor << "Something nice"
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor
class BuggyActor extends DefaultActor {
void onException(Throwable th) {
println "Close database connection"
println "Release other resources"
println "Bye"
}
void act() {
loop {
react { String message ->
throw new Exception("Shit happens :P")
}
}
}
}
new BuggyActor().start() << "Something nice"
Well it’s nice we have a mecanism to make the actor stop nicely… but I don’t want the actor to stop receiving messages… I want it to continue… how do I do that ?
6.2. Supervising
Supervising is a really useful feature because you could:
-
Get notified of changes on other actors
-
Take control over them (stopping, restarting…etc)
Although GPars doesn’t have that functionality out-of-the-box yet, I’ll try to implement a very simple example on how supervisors may look like using GPars.
First thing is to implement the skeleton of what a supervised actor may look like. A Supervised object is an actor that can inform to its linked Supervisor before it stops, giving information about the reasons that made it stop.
import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DefaultActor
class SupervisedExceptionMessage { (1)
String message
}
class Supervised extends DefaultActor {
Supervisor supervisor (2)
void onException(Throwable th) {
supervisor << new SupervisedExceptionMessage(message: th.message) (3)
}
}
1 | When supervised actor crashes it will create an exception message to send it to its supervisor |
2 | We need to hold a supervisor’s reference in order to be able to inform it whenever may come |
3 | Everytime an actor throws an unhandled exception the onException(Throwable) is invoked. Once the onException() method ends the actor stops. Before that we want to inform the supervisor in case it wanted to do something about it. |
import groovyx.gpars.actor.DynamicDispatchActor
class Supervisor extends DynamicDispatchActor { (1)
Supervised supervised (2)
Class<? extends Supervised> supervisedClass (3)
void link(Supervised supervised) { (4)
this.supervised = supervised
this.supervised.supervisor = this
this.supervisedClass = supervised.getClass()
}
}
A Supervisor is an actor that supervises other actor and makes decisions whenever a supervised actor stops because a failure (an unhandled exception most of the time).
The Supervisor instance can be linked to the supervised actor thanks to the supervisor’s link method. This method makes actors available to each other meaning that Supervisor's instance can receive supervised’s SupervisedExceptionMessage among other things.
1 | A Supervisor extends DynamicDispatchActor. It could receive many other type of messages other than SupervisedExceptionMessage. |
2 | A reference to the supervised actor |
3 | The reference to the supervised actor’s class. This is useful in case the supervisor wanted to create a new instance of a supervised actor that stopped because of an unhandled exception. |
4 | In order to link (I took this term from Scala Akka) supervised instance to this supervisor we invoke supervisor’s link method passing the supervised actor as parameter. |
Lets see how to use it.
class SimpleSupervisedActor extends Supervised {
void act() {
loop {
react { String message ->
if (message == 'gift') {
throw new Exception(message)
}
println "NORMAL: $message"
}
}
}
}
def supervised = new SimpleSupervisedActor().start()
def supervisor = new Supervisor() {
void onMessage(String query) {
supervised << query
}
void onMessage(SupervisedException ex) {
println "SUPERVISED ACTOR DIES WITH MESSAGE: ${ex.message}"
supervised = supervisedClass.newInstance().start() (1)
}
}.start()
supervisor.link(supervised)
supervisor << "something"
Thread.sleep(1000)
supervisor << "something"
Thread.sleep(1000)
supervisor << "gift"
Thread.sleep(1000)
supervisor << "Message processed by a new supervised instance 1"
Thread.sleep(1000)
supervisor << "Message processed by a new supervised instance 2"
This time we have two actors. We’ve got a Supervised actor receiving messages through its Supervisor. This way in case the supervised actor stops, its Supervisor could still create another Supervised actor of the same type and keep processing messages transparently to the client.
1 | The interesting part. Supervisor receives a Supervised’s SupervisedException message when it’s about to stop. Then the Supervisor instance has chosen to create another Supervised instance to keep processing messages the same way. |
Enough to say, the more stateless is the Supervised actor the less difficult will be to restart the Supervised actor again
What "restart" means ? In this example, and in general, in other JVM’s implementations, to restart and actor is eventually creating a new instance of the same type of the stopped actor. That’s why it’s so important to keep your actor instances stateless. If you had some state you should be serializing the state of the actor in order to recreate the state later on. |
6.2.1. OneToAll Strategy
(TODO) or
6.2.2. OneToOne Strategy
(TODO) and
7. Stateless Actors
8. Remoting
Remoting has become part of GPars since version 1.3-SNAPSHOT
. That’s great news!!!! Until
this moment all actors should run in the same JVM in order to be able to
communicate each other. Now you can be running an actor in one machine sending
messages to another in another machine.
You can find more information about remoting at GPars ascii documentation, and at this codehaus page Confluence |
8.1. Dataflow
Although this documentation is all about actors, I’ve added some extra examples about remoting because I think remoting is going to be one of the most important features of GPars in the near future.
When working with remote dataflows the most important classes to keep in mind are:
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.remote.RemoteDataflows
The following example just runs a server instance which publish a dataflow variable and a client asking for the value of the dataflow variable published at the server.
First of all lets see how to build the server:
RemoteDataflows startServer() {
def dataflows = RemoteDataflows.create() (1)
dataflows.startServer('localhost',9000) (2)
def variable = new DataflowVariable() (3)
variable << 42
dataflows.publish(variable,"meaning-of-life") (4)
return dataflows
}
1 | Create an instance of a RemoteDataflows |
2 | Stablishing the address and port of the server instance and start
the server instance. Notice that what startServer returns is not an instance
of RemoteDataflows but an instance of LocalHost . |
3 | Declaring a dataflow variable and setting a value |
4 | Publishing the dataflow variable |
And then it’s time to create the client:
void 'reading a simple dataflow variable'() {
setup: 'starting server exposing dataflow variable'
def serverDataflows = startServer() (1)
when: 'listening to that server'
def clientDataflows = RemoteDataflows.create() (2)
def remoteVariablePromise = (3)
clientDataflows.getVariable(
"localhost",
9000,
"meaning-of-life"
)
and: 'asking for a given dataflow variable'
def result = remoteVariablePromise.get() (4)
then: 'you should get what you expected'
result.value == 42
cleanup: 'stopping server instance'
serverDataflows.stopServer() (5)
}
1 | Now it’s time to build and run the server |
2 | Create an instance of RemoteDataflows |
3 | Getting a pointer to the dataflow variable published at the server |
4 | Once we get the reference we can start asking for the variable’s value |
5 | Stopping the server |
8.2. Actors
The way actors are published is very similar than the way dataflows were published. Instead
of using the RemoteDataflows
we will be using the RemoteActors
instance.
RemoteActors
have 2
responsabilities:
-
To create
a remote actor’s server: Actors' instances will bepublished
to that server using an id (a simple string). -
To enable clients
to get a reference
of remote actors: That will be possible by setting address, port and the actor’s identifier at the server.
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.actor.remote.RemoteActors
void 'simple remote actors test'() {
given: 'a remote actor server'
(1)
def remoteActorServer = RemoteActors.create()
remoteActorServer.startServer('localhost',9000)
and: 'a simple DefaultActor instance'
(2)
def simpleActor = actor {
loop {
react { String msg ->
reply "Thanks for saying: $msg"
}
}
}
when: 'publishing the actor remotely'
(3)
remoteActorServer.publish(simpleActor, 'greetingsActor')
and: 'getting the actor reference'
(4)
def remoteActorsClient = RemoteActors.create()
def actorPromise =
remoteActorsClient
.get('localhost', 9000, 'greetingsActor')
(5)
def remoteActor = actorPromise.get()
and: 'sending a message to actor'
(6)
def result = remoteActor.sendAndWait('hello')
then: 'the reply should be the expected'
result == 'Thanks for saying: hello'
cleanup: 'shutting down the actor server'
remoteActorServer.stopServer()
}
1 | Creating the RemoteActors instance. This instance will create the remote server
where actors will be published. |
2 | This is a regular groovyx.gpars.actor.DefaultActor instance. |
3 | Publishing the actor’s instance to the server |
4 | Creating another RemoteActors instance to simulate client won’t be able to
access the server RemoteActors instance. Once we get the RemoteActors instance
we ask for the actor instance published with the id greetingsActor . |
5 | The RemoteActors instance returned a promise containing the
requested actor. |
6 | Then we can send messages (serialized) to the remote actor like if we had the actor working locally. |
Appendix A: Exercises solutions
Solution 3.4
import static groovyx.gpars.actor.Actors.actor
def printer = actor {
loop {
react { List<Integer> numbers ->
println numbers.sum()
}
}
}
printer << (1..4) as List
import groovyx.gpars.actor.DefaultActor
class Printer extends DefaultActor {
void act() {
loop {
react { numbers ->
println numbers.sum()
}
}
}
}
def printer = new Printer()
printer.start()
printer << (1..4)
Solution 3.6
import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.dataflow.Promise
def printer = actor {
loop {
react { List<Integer> numbers ->
reply numbers.sum()
}
}
}
Promise first = printer.sendAndPromise((1..4))
Integer second = printer.sendAndWait((4..8))
Integer third = printer.sendAndWait((8..12))
assert (first.get() + second + third) == 90
import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.dataflow.Promise
class Printer extends DefaultActor {
void act() {
loop {
react { numbers ->
reply numbers.sum()
}
}
}
}
def printer = new Printer()
printer.start()
Promise first = printer.sendAndPromise((1..4))
Integer second = printer.sendAndWait((4..8))
Integer third = printer.sendAndWait((8..12))
assert (first.get() + second + third) == 90
Solution 4.3
import static groovyx.gpars.actor.Actors.messageHandler
import static java.security.MessageDigest.getInstance
import groovyx.gpars.actor.DynamicDispatchActor
import groovyx.gpars.dataflow.Promise
class Order { String content }
class NationalOrder extends Order {}
class InternationalOrder extends Order {}
final Closure<String> md5 = { String st ->
return getInstance("MD5").digest(st.getBytes("UTF-8")).encodeHex().toString()
}
final DynamicDispatchActor orderProcessor = messageHandler {
when { InternationalOrder international ->
reply md5(international.content)
}
when { NationalOrder national ->
println "I'm sure the order: ${national.content} will be delivered"
}
}
Promise md5IdPromise =
orderProcessor.sendAndPromise(new InternationalOrder(content: 'supersecret'))
orderProcessor << new NationalOrder(content: 'something really silly')
orderProcessor << new NationalOrder(content: 'something as silly as the previous one')
println "I need to hold the id: ${md5IdPromise.get()}"
import static groovyx.gpars.actor.Actors.messageHandler
import static java.security.MessageDigest.getInstance
import groovyx.gpars.actor.DynamicDispatchActor
import groovyx.gpars.dataflow.Promise
class Order { String content }
class NationalOrder extends Order {}
class InternationalOrder extends Order {}
class OrderProcessor extends DynamicDispatchActor {
String md5(String st) {
return getInstance("MD5").digest(st.getBytes("UTF-8")).encodeHex().toString()
}
void onMessage(InternationalOrder international) {
reply md5(international.content)
}
void onMessage(NationalOrder national) {
println "I'm sure the order: ${national.content} will be delivered"
}
}
final OrderProcessor orderProcessor = new OrderProcessor().start()
final Promise md5IdPromise =
orderProcessor.sendAndPromise(new InternationalOrder(content: 'supersecret'))
orderProcessor << new NationalOrder(content: 'something really silly')
orderProcessor << new NationalOrder(content: 'something as silly as the previous one')
println "I need to hold the id: ${md5IdPromise.get()}"
Solution 5.3
@Grab('org.imgscalr:imgscalr-lib:4.2')
import static javax.imageio.ImageIO.read
import static javax.imageio.ImageIO.write
import static org.imgscalr.Scalr.resize
import static groovyx.gpars.actor.Actors.actor
import static java.io.File.createTempFile
import groovyx.gpars.actor.DefaultActor
final DefaultActor imageActor = actor {
loop {
react { File image ->
def src = read(image)
def img = resize(src, 400, 300)
write(img, 'jpg', createTempFile('actors_', image.name))
println "IMAGE: ${image.name} processed"
}
}
}
final DefaultActor textActor = actor {
loop {
react { File textFile ->
def dst = createTempFile('actors_', textFile.name)
// Don't do this at home :P
dst << "Author: MYBOSS"
dst << textFile.text
println "TEXT: ${textFile.name} processed"
}
}
}
final DefaultActor dispatchingActor = actor {
loop {
react { File file ->
if (file.name.endsWith('jpg')) {
imageActor << file
} else if (file.name.endsWith('txt')) {
textActor << file
}
}
}
}
new File('/somedirwithfiles/').
listFiles().each { dispatchingActor << it }
Because the NO-DSL version has its own Spock Specification it has been organized in several classes.
package exercises.forwarding.images
import groovyx.gpars.actor.DefaultActor
class FileProcessorActor extends DefaultActor {
DefaultActor imageActor
DefaultActor textActor
void act() {
loop {
react { File file2Process ->
println "Start to processing ${file2Process.name}"
if (isImageFile(file2Process)) {
sendToImageActor(file2Process)
}
if (isTextFile(file2Process)) {
sendToTextActor(file2Process)
}
}
}
}
Boolean isImageFile(File file2Process) {
return doesItEndWith(file2Process, 'jpg')
}
Boolean isTextFile(File file2Process) {
return doesItEndWith(file2Process, 'txt')
}
Boolean doesItEndWith(File file, String ending) {
return file.name.endsWith(ending)
}
void sendToImageActor(File file) {
imageActor << file
}
void sendToTextActor(File file) {
textActor << file
}
}
package exercises.forwarding.images
import static javax.imageio.ImageIO.read
import static javax.imageio.ImageIO.write
import static org.imgscalr.Scalr.resize
import static java.io.File.createTempFile
import exercises.test.NotificationActor
import groovyx.gpars.actor.DefaultActor
class ImageProcessorActor extends NotificationActor {
ImageProcessorActor(DefaultActor monitor) {
super(monitor)
}
void act() {
loop {
react { File image ->
def src = read(image)
def img = resize(src, 300, 200)
def tmp = new File(System.getProperty('java.io.tmpdir'))
write(img, 'jpg', new File(tmp, "processed_${image.name}"))
notifyIfMonitor(image.name)
}
}
}
}
package exercises.forwarding.images
import static java.io.File.createTempFile
import exercises.test.NotificationActor
import groovyx.gpars.actor.DefaultActor
class TextProcessorActor extends NotificationActor {
TextProcessorActor(DefaultActor monitor) {
super(monitor)
}
void act() {
loop {
react { File textFile ->
def tmpfile = new File(System.getProperty('java.io.tmpdir'))
def newFile = new File(tmpfile, "processed_${textFile.name}")
newFile << 'Author: me'
newFile << textFile.text
notifyIfMonitor(textFile.name)
}
}
}
}
package exercises.forwarding.images
import static groovyx.gpars.actor.Actors.actor
import exercises.test.NotifiedActor
import spock.lang.Specification
class ImagesProcessingSpecification extends Specification {
final File RESOURCES_DIR = 'src/test/resources/exercises/forwarding/images/' as File
final File TEMPORARY_DIR = new File(System.getProperty('java.io.tmpdir'))
def 'Processing successfully images'() {
setup: 'Creating needed directories references'
def monitorActor = new NotifiedActor(3).start()
def imageActor = new ImageProcessorActor(monitorActor).start()
def textActor = new TextProcessorActor(monitorActor).start()
def proxyActor = new FileProcessorActor(imageActor: imageActor, textActor: textActor).start()
when: 'Sending a couple of image files'
proxyActor << new File(RESOURCES_DIR, 'image1.jpg')
proxyActor << new File(RESOURCES_DIR, 'image2.jpg')
proxyActor << new File(RESOURCES_DIR, 'text.txt')
and: 'Expecting process to finish'
monitorActor.join()
and: 'Looking for processed files'
def file1 = new File(TEMPORARY_DIR, 'processed_text.txt')
def file2 = new File(TEMPORARY_DIR, 'processed_image1.jpg')
def file3 = new File(TEMPORARY_DIR, 'processed_image2.jpg')
then: 'We should make sure they exist'
[file1, file2, file3].every { it.exists() }
cleanup: 'Deleting processed files'
[file1, file2, file3]*.delete()
}
}
I’ve used the NotificationActor and NotifiedActor to make tests wait until everything has been processed.