Concurrency isn’t easy. Getting a program to do more than one thing at a time has traditionally meant hassling with mutexes, race conditions, lock contention, and the rest of the unpleasant baggage that comes along with multithreading. Event-based concurrency models alleviate some of these concerns, but can turn large programs into a rat’s nest of callback functions. No wonder, then, that concurrent programming is a task most programmers dread, or avoid altogether by retreating to multiple independent processes that share data externally (for example, through a database or message queue).
A large part of the difficulty of concurrent programming comes down to state: how do you know what your multithreaded program is doing, and when? What value does a particular variable hold when you have two threads running, or five, or fifty? How can you guarantee that your program’s many tendrils aren’t clobbering one another in a race to take action? A thread-based concurrency paradigm poses more questions than it answers.
Thankfully, Scala offers a reasonable, flexible approach to concurrency that we’ll explore in this chapter.
Though you may have heard of Scala and Actors in the same breath, Actors aren’t a concept unique to Scala. Actors, originally intended for use in Artificial Intelligence research, were first put forth in 1973 (see [Hewitt1973] and [Agha1987]). Since then, variations on the idea of Actors have appeared in a number of programming languages, most notably in Erlang and Io. As an abstraction, Actors are general enough that they can be implemented as a library (as in Scala), or as the fundamental unit of a computational system.
Fundamentally, an Actor is an object that receives messages and takes action on those messages. The order in which messages arrive is unimportant to an Actor, though some Actor implementations (such as Scala’s) queue messages in order. An Actor might handle a message internally, or it might send a message to another Actor, or it might create another Actor to take action based on the message. Actors are a very high-level abstraction.
Unlike traditional object systems (which, you might be thinking to yourself, have many of the same properties we’ve described), Actors don’t enforce a sequence or ordering to their actions. This inherent eschewing of sequentiality, coupled with independence from shared global state, allow Actors to do their work in parallel. As we’ll see later on, the judicious use of immutable data fits the Actor model ideally, and further aides in safe, comprehensible concurrent programming.
Enough theory. Let’s see Actors in action.
At their most basic, Actors in Scala are objects that inherit from scala.actors.Actor
.
// code-examples/Concurrency/simple-actor-script.scala import scala.actors.Actor class Redford extends Actor { def act() { println("A lot of what acting is, is paying attention.") } } val robert = new Redford robert.start
As we can see, an Actor defined in this way must be both instantiated and started, similar to how threads are handled in Java. It must also implement the abstract method act
, which returns Unit
. Once we’ve started this simple Actor, the following sage advice for thespians is printed to the console.
A lot of what acting is, is paying attention.
The scala.actors
package contains a factory method for creating Actors that avoids much of the setup in the above example. We can import this method and other convenience methods from scala.actors.Actors._
. Here is a factory-made Actor.
// code-examples/Concurrency/factory-actor-script.scala import scala.actors.Actor import scala.actors.Actor._ val paulNewman = actor { println("To be an actor, you have to be a child.") }
While a subclass that extends the Actor
class must define act
in order to be concrete, a factory-produced Actor has no such limitation. In this shorter example, the body of the method passed to actor
is effectively promoted to the act
method from our first example. Predictably, this Actor also prints a message when run. Illuminating, but we still haven’t shown the essential piece of the Actors puzzle: sending messages.
Actors can receive any sort of object as a message, from strings of text to numeric types to whatever classes you’ve cooked up in your programs. For this reason, Actors and pattern matching go hand in hand. An Actor should only act on messages of familiar types; a pattern match on the class and/or contents of a message is good defensive programming, and increases the readability of Actor code.
// code-examples/Concurrency/pattern-match-actor-script.scala import scala.actors.Actor import scala.actors.Actor._ val fussyActor = actor { loop { receive { case s: String => println("I got a String: " + s) case i: Int => println("I got an Int: " + i.toString) case _ => println("I have no idea what I just got.") } } } fussyActor ! "hi there" fussyActor ! 23 fussyActor ! 3.33
This example prints the following when run.
I got a String: hi there I got an Int: 23 I have no idea what I just got.
The body of fussyActor
is a receive
method wrapped in a loop
. loop
is essentially a nice shortcut for while(true)
; it does whatever is inside its block repeatedly. receive
blocks until it gets a message of a type that will satisfy one of its internal pattern matching cases.
The final lines of this example demonstrate use of the !
(exclamation point, or bang) method to send messages to our Actor. If you’ve ever seen Actors in Erlang, you’ll find this syntax familiar. The Actor is always on the left-hand side of the bang, and the message being sent to said Actor is always on the right. If you need a mnemonic for this granule of syntactic sugar, imagine that you’re an irate director shouting commands at your Actors.
Every Actor has a mailbox in which messages sent to that Actor are queued. Let’s see an example where we inspect the size of an Actor’s mailbox.
// code-examples/Concurrency/actor-mailbox-script.scala import scala.actors.Actor import scala.actors.Actor._ val countActor = actor { loop { react { case "how many?" => { println("I've got " + mailboxSize.toString + " messages in my mailbox.") } } } } countActor ! 1 countActor ! 2 countActor ! 3 countActor ! "how many?" countActor ! "how many?" countActor ! 4 countActor ! "how many?"
This example produces the following output.
I've got 3 messages in my mailbox. I've got 3 messages in my mailbox. I've got 4 messages in my mailbox.
Note that the first and second lines of output are identical. Because our Actor was set up solely to process messages of the string “how many?”, those messages didn’t remain in its mailbox. Only the messages of types we didn’t know about - in this case, Int
- remained unprocessed.
If you see an Actor’s mailbox size ballooning unexpectedly, you’re probably sending messages of a type that the Actor doesn’t know about. Include a catchall case (_
) when pattern matching received messages to find out what’s harassing your Actors.
Now that we’ve got a basic sense of what Actors are and how they’re used in Scala, let’s put them to work. Specifically, let’s put them to work cutting hair. The sleeping barber problem ([SleepingBarberProblem]) is one of a popular set of computer science hypotheticals designed to demonstrate issues of concurrency and synchronization.
The problem is this: a hypothetical barber shop has just one barber with one barber chair, and three chairs in which customers may wait for a haircut. Without customers around, the barber sleeps. When a customer arrives, the barber wakes up to cut his hair. If the barber is busy cutting hair when a customer arrives, the customer sits down in an available chair. If a chair isn’t available, the customer leaves.
The sleeping barber problem is usually solved with semaphores and mutexes, but we’ve got better tools at our disposal. Straightaway, we see several things to model as Actors: the barber is clearly one, as are the customers. The barbershop itself could be modeled as an Actor, too; there need not be a real-world parallel to verbal communication in an Actor system, even though we’re sending messages.
Typo: "Actosr"
Thanks. will fix
Let’s start with the sleeping barber’s customers, as they have the simplest responsibilities.
// code-examples/Concurrency/sleepingbarber/customer.scala package sleepingbarber import scala.actors.Actor import scala.actors.Actor._ case object Haircut class Customer(val id: Int) extends Actor { var shorn = false def act() = { loop { react { case Haircut => { shorn = true println("[c] customer " + id + " got a haircut") } } } } }
For the most part, this should look pretty familiar: we declare the package in which this code lives, we import code from the scala.actors
package, and we define a class that extends Actor
. There are a few details worth noting, however.
First of all, there’s our declaration of case object Haircut
. A common pattern when working with Actors in Scala is to use a case object
to represent a message without internal data. If we wanted to include, say, the time at which the haircut was completed, we’d use a case class
instead. We declare Haircut
here because it’s a message type that will be sent solely to customers.
Note as well that we’re storing one bit of mutable state in each Customer
: whether or not they’ve gotten a haircut. In their internal loop, each Customer
waits for a Haircut
message and, upon receipt of one, we set the shorn
boolean to true
. Customer
uses the asynchronous react
method to respond to incoming messages. If we needed to return the result of processing the message, we would use receive
, but we don’t, and in the process we save some memory and thread use under the hood.
Let’s move on to the barber himself. Because there’s only one barber, we could have used the actor
factory method technique mentioned above to create him. For testing purposes, we’ve instead defined our own Barber
class.
// code-examples/Concurrency/sleepingbarber/barber.scala package sleepingbarber import scala.actors.Actor import scala.actors.Actor._ import scala.util.Random class Barber extends Actor { private val random = new Random() def helpCustomer(customer: Customer) { if (self.mailboxSize >= 3) { println("[b] not enough seats, turning customer " + customer.id + " away") } else { println("[b] cutting hair of customer " + customer.id) Thread.sleep(100 + random.nextInt(400)) customer ! Haircut } } def act() { loop { react { case customer: Customer => helpCustomer(customer) } } } }
The core of the Barber
class looks very much like the Customer
. We loop around react
, waiting for a particular type of object. To keep that loop tight and readable, we call a method, helpCustomer
, when a new Customer
is sent to the barber. Within that method we employ a check on the mailbox size to serve as our “chairs” that customers may occupy; we could have the Barber
or Shop
classes maintain an internal Queue
, but why bother when each actor’s mailbox already is one?
If three or more customers are in the queue, we simply ignore that message; it’s then discarded from the barber’s mailbox. Otherwise, we simulate a semi-random delay (always at least 100 milliseconds) for the time it takes to cut a customer’s hair, then send off a Haircut
message to that customer. (Were we not trying to simulate a real-world scenario, we would of course remove the call to Thread.sleep()
and allow our barber to run full tilt.)
Next up, we have a simple class to represent the barbershop itself.
// code-examples/Concurrency/sleepingbarber/shop.scala package sleepingbarber import scala.actors.Actor import scala.actors.Actor._ class Shop extends Actor { val barber = new Barber() barber.start def act() { println("[s] the shop is open") loop { react { case customer: Customer => barber ! customer } } } }
By now, this should all look very familiar. Each Shop
creates and starts a new Barber
, prints a message telling the world that the shop is open, and sits in a loop waiting for customers. When a Customer
comes in, he’s sent to the barber. We now see an unexpected benefit of Actors: they allow us to describe concurrent business logic in easily understood terms. “Send the customer to the barber” makes perfect sense, much more so than “notify the barber, unlock the mutex around the customer seats, increment the number of free seats,” and so forth. Actors get us closer to our domain.
Finally, we have a driver for our simulation.
// code-examples/Concurrency/sleepingbarber/barbershop-simulator.scala package sleepingbarber import scala.actors.Actor._ import scala.collection.{immutable, mutable} import scala.util.Random object BarbershopSimulator { private val random = new Random() private val customers = new mutable.ArrayBuffer[Customer]() private val shop = new Shop() def generateCustomers { for (i <- 1 to 20) { val customer = new Customer(i) customer.start() customers += customer } println("[!] generated " + customers.size + " customers") } // customers arrive at random intervals def trickleCustomers { for (customer <- customers) { shop ! customer Thread.sleep(random.nextInt(450)) } } def tallyCuts { // wait for any remaining concurrent actions to complete Thread.sleep(2000) val shornCount = customers.filter(c => c.shorn).size println("[!] " + shornCount + " customers got haircuts today") } def main(args: Array[String]) { println("[!] starting barbershop simulation") shop.start() generateCustomers trickleCustomers tallyCuts System.exit(0) } }
After “opening the shop”, we generate a number of Customer
objects, assigning a numeric ID to each and storing the lot in an ArrayBuffer
. Next, we “trickle” the customers in by sending them as messages to the shop and sleeping for a semi-random amount of time between loops. At the end of our simulated day, we tally up the number of customers who got haircuts by filtering out the customers whose internal shorn
boolean was set to true
and asking for the size of the resulting sequence.
I think this is a great example and a great toy problem! However, I find Thread.sleep(2000) to be a slightly unsatisfying way to wait for concurrent actions to complete. It's difficult to guarantee that all computation finishes by a certain time, especially if you were to use RemoteActors. This also skews any benchmarking you might want to do. Knowing how to end a concurrent problem gracefully is essential to many applications, although it's probably overkill for this example.
I like to have a definitive signal or message that computation has finished. I have gone about this by having the actors communicate with the coordinating class (which isn't explicitly an actor itself). I'm not sure if this is the best way to do things.
Thanks for the feedback. Obviously, we're trying to strike a balance between keeping it simple for clarity, yet realistic enough to be applicable to nontrivial problems. Will consider what we can do here.
One other non-obvious issue: your Barbershop simulator and Barber could very well be using the exact same stream of random numbers if they both call Random() within 1ms of each other. It probably shouldn't give you adverse behavior in this example, but imagine you added "hair length" to your Customers which was determined by a random number generator at construction: all customers would have identical hair length. Even if you manually specified seeds (or there was more than 1ms delay), there could still be dependencies due to artifacts related to Java's weak random number generation (see my example at https://lampsvn.epfl.ch/trac/scala/ticket/2149). I'd like to see Scala references and docs make some warning about being careful with random number generation in concurrent programs.
Compile and run the code within the sleepingbarber
directory as follows:
fsc *.scala scala -classpath . sleepingbarber.BarbershopSimulator
Throughout our code, we’ve prefixed console messages with abbreviations for the classes from which the messages were printed. When we look at an example run of our simulator, it’s easy to see where each message came from.
[!] starting barbershop simulation [s] the shop is open [!] generated 20 customers [b] cutting hair of customer 1 [b] cutting hair of customer 2 [c] customer 1 got a haircut [c] customer 2 got a haircut [b] cutting hair of customer 3 [c] customer 3 got a haircut [b] cutting hair of customer 4 [b] cutting hair of customer 5 [c] customer 4 got a haircut [b] cutting hair of customer 6 [c] customer 5 got a haircut [b] cutting hair of customer 7 [c] customer 6 got a haircut [b] not enough seats, turning customer 8 away [b] cutting hair of customer 9 [c] customer 7 got a haircut [b] not enough seats, turning customer 10 away [c] customer 9 got a haircut [b] cutting hair of customer 11 [b] cutting hair of customer 12 [c] customer 11 got a haircut [b] cutting hair of customer 13 [c] customer 12 got a haircut [b] cutting hair of customer 14 [c] customer 13 got a haircut [b] not enough seats, turning customer 15 away [b] not enough seats, turning customer 16 away [b] not enough seats, turning customer 17 away [b] cutting hair of customer 18 [c] customer 14 got a haircut [b] cutting hair of customer 19 [c] customer 18 got a haircut [b] cutting hair of customer 20 [c] customer 19 got a haircut [c] customer 20 got a haircut [!] 15 customers got haircuts today
You’ll find that each run’s output is, predictably, slightly different. Every time the barber takes a bit longer to cut hair than it does for several customers to enter, the “chairs” (the barber’s mailbox queue) fill up, and new customers simply leave.
Of course, we have to include the standard caveats that come with simple examples. For one, it’s possible that our example may not be suitably random, particularly if random values are retrieved within a millisecond of one another. This is a byproduct of the way the JVM generates random numbers, and a good reminder to be careful about randomness in concurrent programs. You’d also want to replace the sleep
inside tallyCuts
with a clearer signal that the various actors in the system are done doing their work, perhaps by making the BarbershopSimulation
an Actor and sending it messages that indicate completion.
Try modifying the code to introduce more customers, additional message types, different delays, or to remove the randomness altogether. If you’re an experienced multithreaded programmer, you might try writing your own sleeping barber implementation just to compare and contrast. We’re willing to bet that an implementation in Scala with Actors will be terser and easier to maintain.
In order to get the most out of Actors, there are few things to remember. First off, note that there are several methods you can use to get different types of behavior out of your Actors. The table below should help clarify when to use each method.
Table 9.1. Actor Methods
Method | Returns | Description |
---|---|---|
|
| Abstract, top-level method for an Actor. Typically contains one of the following methods inside it. |
| Result of processing message | Blocks until a message of matched type is received. |
| Result of processing message | Like |
|
| Requires less overhead (threads) than receive. |
|
| Like |
Typically, you’ll want to use react
wherever possible. If you need the results of processing a message (that is, you need a synchronous response from sending a message to an Actor), use the receiveWithin
variant to reduce your chances of blocking indefinitely on an Actor that’s gotten wedged.
Another strategy to keep your Actor-based code asynchronous is the use of futures. A future is a placeholder object for a value that hasn’t yet been returned from an asynchronous process. You can send a message to an Actor with the !!
method; a variant of this method allows you pass along a partial function which is applied to the future value. As you can see from the example below, retrieving a value from a Future
is as straightforward as invoking its apply
method. Note that retrieving a value from a Future
is a blocking operation.
// code-examples/Concurrency/future-script.scala import scala.actors.Futures._ val eventually = future(5 * 42) println(eventually())
Each Actor in your system should have clear responsibilities. Don’t use Actors for general-purpose, highly stateful tasks. Instead, think like a director: what are the distinct roles in the “script” of your application, and what’s the least amount of information each Actor needs to do its job? Give each Actor just a couple of responsibilities, and use messages (usually in the form of a case class
or case object
) to delegate those responsibilities to other Actors.
Don’t be hesitant to copy data when writing Actor-centric code. The more immutable your design, the less likely you are to end up with unexpected state. The more you communicate via messages, the less you have to worry about synchronization. All those messages and immutable variables might appear to be overly costly. But, with today’s plentiful hardware, trading memory overhead for clarity and predictability seems more than fair for most applications.
Lastly, know when Actors aren’t appropriate. Just because Actors are a great way to handle concurrency in Scala doesn’t mean they’re the only way, as we’ll see below. Traditional threading and locking may better suit write-heavy critical paths for which a messaging approach would incur too much overhead. In our experience, you can use a purely Actor-based design to prototype a concurrent solution, then use profiling tools to suss out parts of your application that might benefit from a different approach.
Some info on using Actors and futures (i.e. "!!") would be helpful
Will consider adding.
While Actors are a great way to handle concurrent operations, they’re not the only way to do so in Scala. As Scala is interoperable with Java, the concurrency concepts that you may be familiar with on the JVM still apply.
For starters, Scala provides a handy way to run a block of code in a new thread.
// code-examples/Concurrency/threads/by-block-script.scala new Thread { println("this will run in a new thread") }
A similar construct is available in the scala.concurrent
package, as a method on the ops
object to run a block asynchronously with spawn
.
// code-examples/Concurrency/threads/spawn-script.scala import scala.concurrent.ops._ object SpawnExample { def main(args: Array[String]) { println("this will run synchronously") spawn { println("this will run asychronously") } } }
java.util.concurrent
If you’re familiar with the venerable java.util.concurrent
package, you’ll find it just as easy to use from Scala (or hard to use, depending on your point of view). Let’s use Executors
to create a pool of threads. We’ll use the thread pool to run a simple class, implementing Java’s Runnable
interface for thread-friendly classes, that identifies which thread it’s running on.
// code-examples/Concurrency/threads/util-concurrent-script.scala import java.util.concurrent._ class ThreadIdentifier extends Runnable { def run { println("hello from Thread " + currentThread.getId) } } val pool = Executors.newFixedThreadPool(5) for (i <- 1 to 10) { pool.execute(new ThreadIdentifier) }
As is standard in Java concurrency, the run
method is where a threaded class starts. Every time our pool
executes a new ThreadIdentifier
, its run
method is invoked. A look at the output below tells us that we’re running on the five threads in the pool, with IDs ranging from 9 to 13.
hello from Thread 9 hello from Thread 10 hello from Thread 11 hello from Thread 12 hello from Thread 13 hello from Thread 9 hello from Thread 11 hello from Thread 10 hello from Thread 10 hello from Thread 13
This is, of course, just scratching the surface of what’s available in java.util.concurrent
. You’ll find that your existing knowledge of Java’s approach to multithreading still applies in Scala. What’s more, you’ll be able to accomplish the same tasks using less code, which should contribute to maintainability and productivity.
Threading and Actors aren’t the only way to do concurrency. Event-based concurrency, a particular approach to asynchronous or non-blocking I/O (NIO), has become a favored way to write servers that need to scale to thousands of simultaneous clients. Eschewing the traditional one-to-one relationship of threads to clients, this model of concurrency exposes events that occur when particular conditions are met (for example, when data is received from a client over a network socket.) Typically, the programmer will associate a callback method with each event that’s relevant to her program.
While the java.nio
package provides a variety of useful primitives for non-blocking I/O (buffers, channels, etc.), it’s still a fair bit of work to cobble together an event-based concurrent program from those simple parts. Enter Apache MINA, built atop Java NIO and described on its homepage as “a network application framework which helps users develop high performance and high scalability network applications easily” (see [MINA]).
While MINA may be easier to use than Java’s built-in NIO libraries, we’ve gotten used to some conveniences of Scala that just aren’t available in MINA. The open-source Naggati library (see [Naggati]) adds a Scala-friendly layer atop MINA that, according to its author, “makes it easy to build protocol filters [using a] sequential style”. Essentially, Naggati is a DSL for parsing network protocols, with MINA’s powerful NIO abilities under the hood.
Let’s use Naggati to write the foundations of an SMTP email server. To keep things simple, we’re only dealing with two SMTP commands: HELO
and QUIT
. The former command identifies a client, and the latter ends the client’s session.
We’ll keep ourselves honest with a test suite, facilitated by the Specs behavior-driven development library (see the section called “Specs”).
// code-examples/Concurrency/smtpd/src/test/scala/com/programmingscala/smtpd/SmtpDecoderSpec.scala package com.programmingscala.smtpd import java.nio.ByteOrder import net.lag.naggati._ import org.apache.mina.core.buffer.IoBuffer import org.apache.mina.core.filterchain.IoFilter import org.apache.mina.core.session.{DummySession, IoSession} import org.apache.mina.filter.codec._ import org.specs._ import scala.collection.{immutable, mutable} object SmtpDecoderSpec extends Specification { private var fakeSession: IoSession = null private var fakeDecoderOutput: ProtocolDecoderOutput = null private var written = new mutable.ListBuffer[Request] def quickDecode(s: String): Unit = { Codec.decoder.decode(fakeSession, IoBuffer.wrap(s.getBytes), fakeDecoderOutput) } "SmtpRequestDecoder" should { doBefore { written.clear() fakeSession = new DummySession fakeDecoderOutput = new ProtocolDecoderOutput { override def flush(nextFilter: IoFilter.NextFilter, s: IoSession) = {} override def write(obj: AnyRef) = written += obj.asInstanceOf[Request] } } "parse HELO" in { quickDecode("HELO client.example.org\n") written.size mustEqual 1 written(0).command mustEqual "HELO" written(0).data mustEqual "client.example.org" } "parse QUIT" in { quickDecode("QUIT\n") written.size mustEqual 1 written(0).command mustEqual "QUIT" written(0).data mustEqual null } } }
After setting up an environment for each test run, our suite exercises the two SMTP commands we’re interested in. The doBefore
block runs before each test, guaranteeing that mock session and output buffers are in a clean state. In each test we’re passing a string of hypothetical client input to our as-yet-unimplemented Codec
, then verifying that the resulting Request
(a case class
) contains the correct command
and data
fields. As the QUIT
command doesn’t require any additional information from the client, we simply check that data
is null
.
With our tests in place, let’s implement a basic codec (an encoder and decoder) for SMTP.
// code-examples/Concurrency/smtpd/src/main/scala/com/programmingscala/smtpd/Codec.scala package com.programmingscala.smtpd import org.apache.mina.core.buffer.IoBuffer import org.apache.mina.core.session.{IdleStatus, IoSession} import org.apache.mina.filter.codec._ import net.lag.naggati._ import net.lag.naggati.Steps._ case class Request(command: String, data: String) case class Response(data: IoBuffer) object Codec { val encoder = new ProtocolEncoder { def encode(session: IoSession, message: AnyRef, out: ProtocolEncoderOutput) = { val buffer = message.asInstanceOf[Response].data out.write(buffer) } def dispose(session: IoSession): Unit = { // no-op, required by ProtocolEncoder trait } } val decoder = new Decoder(readLine(true, "ISO-8859-1") { line => line.split(' ').first match { case "HELO" => state.out.write(Request("HELO", line.split(' ')(1))); End case "QUIT" => state.out.write(Request("QUIT", null)); End case _ => throw new ProtocolError("Malformed request line: " + line) } }) }
We first define a Request
case class
in which to store request data as it arrives. Then we specify the encoder
portion of our codec, which exists simply to write data out. A dispose
method is defined (but not fleshed out) to fulfill the contract of the ProtocolEncoder
trait.
The decoder is what we’re really interested in. readRequest
reads a line, picks out the first word in that line, and pattern matches on it to find SMTP commands. In the case of a HELO
command, we also grab the subsequent string on that line. The results are placed in a Request
object and written out to state
. As you might imagine, state
stores our progress throughout the parsing process.
Though trivial, the above example demonstrates just how easy it is to parse protocols with Naggati. Now that we’ve got a working codec, let’s combine Naggati and MINA with Actors to wire up a server.
First, a few lines of setup grunt work to get things going for our SMTP server.
// code-examples/Concurrency/smtpd/src/main/scala/com/programmingscala/smtpd/Main.scala package com.programmingscala.smtpd import net.lag.naggati.IoHandlerActorAdapter import org.apache.mina.filter.codec.ProtocolCodecFilter import org.apache.mina.transport.socket.SocketAcceptor import org.apache.mina.transport.socket.nio.{NioProcessor, NioSocketAcceptor} import java.net.InetSocketAddress import java.util.concurrent.{Executors, ExecutorService} import scala.actors.Actor._ object Main { val listenAddress = "0.0.0.0" val listenPort = 2525 def setMaxThreads = { val maxThreads = (Runtime.getRuntime.availableProcessors * 2) System.setProperty("actors.maxPoolSize", maxThreads.toString) } def initializeAcceptor = { var acceptorExecutor = Executors.newCachedThreadPool() var acceptor = new NioSocketAcceptor(acceptorExecutor, new NioProcessor(acceptorExecutor)) acceptor.setBacklog(1000) acceptor.setReuseAddress(true) acceptor.getSessionConfig.setTcpNoDelay(true) acceptor.getFilterChain.addLast("codec", new ProtocolCodecFilter(smtpd.Codec.encoder, smtpd.Codec.decoder)) acceptor.setHandler( new IoHandlerActorAdapter(session => new SmtpHandler(session))) acceptor.bind(new InetSocketAddress(listenAddress, listenPort)) } def main(args: Array[String]) { setMaxThreads initializeAcceptor println("smtpd: up and listening on " + listenAddress + ":" + listenPort) } }
To ensure that we’re getting the most out of the Actor instances in our server, we set the actors.maxPoolSize
system property to twice the number of available processors on our machine. We then initialize an NioSocketAcceptor
, a key piece of MINA machinery that accepts new connections from clients. The final three lines of this configuration are critical, as they put our codec to work, tell the acceptor to handle requests with a special object, and start the server listening for new connections on port 2525 (real SMTP servers run on the privileged port 25).
The aforementioned special object is an Actor wrapped in an IoHandlerActorAdapter
, a bridging layer between Scala Actors and MINA that’s provided by Naggati. This is the piece of our server that talks back to the client. Now that we know what the client is saying, thanks to the decoder, we actually know what to say back!
// code-examples/Concurrency/smtpd/src/main/scala/com/programmingscala/smtpd/SmtpHandler.scala package com.programmingscala.smtpd import net.lag.naggati.{IoHandlerActorAdapter, MinaMessage, ProtocolError} import org.apache.mina.core.buffer.IoBuffer import org.apache.mina.core.session.{IdleStatus, IoSession} import java.io.IOException import scala.actors.Actor import scala.actors.Actor._ import scala.collection.{immutable, mutable} class SmtpHandler(val session: IoSession) extends Actor { start def act = { loop { react { case MinaMessage.MessageReceived(msg) => handle(msg.asInstanceOf[smtpd.Request]) case MinaMessage.SessionClosed => exit() case MinaMessage.SessionIdle(status) => session.close case MinaMessage.SessionOpened => reply("220 localhost Tapir SMTPd 0.1\n") case MinaMessage.ExceptionCaught(cause) => { cause.getCause match { case e: ProtocolError => reply("502 Error: " + e.getMessage + "\n") case i: IOException => reply("502 Error: " + i.getMessage + "\n") case _ => reply("502 Error unknown\n") } session.close } } } } private def handle(request: smtpd.Request) = { request.command match { case "HELO" => reply("250 Hi there " + request.data + "\n") case "QUIT" => reply("221 Peace out girl scout\n"); session.close } } private def reply(s: String) = { session.write(new smtpd.Response(IoBuffer.wrap(s.getBytes))) } }
Straight away, we see the same pattern that we saw in the Actors examples earlier in the chapter: looping around a react
block that pattern matches on a limited set of cases. In SmtpHandler
, all of those cases are events provided by MINA. For example, MINA will send us MinaMessage.SessionOpened
when a client connects and MinaMessage.SessionClosed
when a client disconnects.
The case we’re most interested in is MinaMessage.MessageReceived
. We’re handed a familiar Request
object with each newly received valid message, and we can pattern match on the command
field to take appropriate action. When the client says HELO
, we can reply with an acknowledgement. When the client says QUIT
, we say goodbye and disconnect him.
Now that we’ve got all the pieces in place, let’s have a conversation with our server.
[al3x@jaya ~]$ telnet localhost 2525 Trying ::1... Connected to localhost. Escape character is '^]'. 220 localhost Tapir SMTPd 0.1 HELO jaya.local 250 Hi there jaya.local QUIT 221 Peace out girl scout Connection closed by foreign host.
A brief conversation, to be sure, but our server works! Now, what happens if we throw something unexpected at it?
[al3x@jaya ~]$ telnet localhost 2525 Trying ::1... Connected to localhost. Escape character is '^]'. 220 localhost Tapir SMTPd 0.1 HELO jaya.local 250 Hi there jaya.local BAD COMMAND 502 Error: Malformed request line: BAD COMMAND Connection closed by foreign host.
Nicely handled. Good thing we took the time to dig out those exceptions when we received a MinaMessage.ExceptionCaught
in our SmtpHandler
Actor.
Of course, what we’ve built just handles the beginning and end of a complete SMTP conversation. As an exercise, try filling out the rest of the commands. Or, to skip ahead to something very much akin to what we’ve built here, check out the open source Mailslot project on GitHub (see [Mailslot]).
We learned how to build scalable, robust concurrent applications using Scala’s Actor library that avoid the problems of traditional approaches based on synchronized access to shared, mutable state. We also demonstrated that Java’s powerful built-in threading model is easily accessible from Scala. Finally, we learned how to combine Actors with the powerful MINA NIO framework and Naggati to develop event-driven, asynchronous network servers from the ground up in just a few lines of code.
The next chapter examines Scala’s built-in support for working with XML.
No comments yet
Add a comment