Chapter 9. Robust, Scalable Concurrency with Actors

The Problems of Shared, Synchronized State

Concurrency isn’t easy. Getting a program to do more than one thing at a time has traditionally meant hassling with mutexes, race conditions, lock contention, and the rest of the unpleasant baggage that comes along with multithreading. Event-based concurrency models alleviate some of these concerns, but can turn large programs into a rat’s nest of callback functions. No wonder, then, that concurrent programming is a task most programmers dread, or avoid altogether by retreating to multiple independent processes that share data externally (for example, through a database or message queue).

A large part of the difficulty of concurrent programming comes down to state: how do you know what your multithreaded program is doing, and when? What value does a particular variable hold when you have two threads running, or five, or fifty? How can you guarantee that your program’s many tendrils aren’t clobbering one another in a race to take action? A thread-based concurrency paradigm poses more questions than it answers.

Thankfully, Scala offers a reasonable, flexible approach to concurrency that we’ll explore in this chapter.

Actors

Though you may have heard of Scala and Actors in the same breath, Actors aren’t a concept unique to Scala. Actors, originally intended for use in Artificial Intelligence research, were first put forth in 1973 (see [Hewitt1973] and [Agha1987]). Since then, variations on the idea of Actors have appeared in a number of programming languages, most notably in Erlang and Io. As an abstraction, Actors are general enough that they can be implemented as a library (as in Scala), or as the fundamental unit of a computational system.

Actors in Abstract

Fundamentally, an Actor is an object that receives messages and takes action on those messages. The order in which messages arrive is unimportant to an Actor, though some Actor implementations (such as Scala’s) queue messages in order. An Actor might handle a message internally, or it might send a message to another Actor, or it might create another Actor to take action based on the message. Actors are a very high-level abstraction.

Unlike traditional object systems (which, you might be thinking to yourself, have many of the same properties we’ve described), Actors don’t enforce a sequence or ordering to their actions. This inherent eschewing of sequentiality, coupled with independence from shared global state, allow Actors to do their work in parallel. As we’ll see later on, the judicious use of immutable data fits the Actor model ideally, and further aides in safe, comprehensible concurrent programming.

Enough theory. Let’s see Actors in action.

Actors in Scala

At their most basic, Actors in Scala are objects that inherit from scala.actors.Actor.

// code-examples/Concurrency/simple-actor-script.scala

import scala.actors.Actor

class Redford extends Actor {
  def act() {
    println("A lot of what acting is, is paying attention.")
  }
}

val robert = new Redford
robert.start

As we can see, an Actor defined in this way must be both instantiated and started, similar to how threads are handled in Java. It must also implement the abstract method act, which returns Unit. Once we’ve started this simple Actor, the following sage advice for thespians is printed to the console.

A lot of what acting is, is paying attention.

The scala.actors package contains a factory method for creating Actors that avoids much of the setup in the above example. We can import this method and other convenience methods from scala.actors.Actors._. Here is a factory-made Actor.

// code-examples/Concurrency/factory-actor-script.scala

import scala.actors.Actor
import scala.actors.Actor._

val paulNewman = actor {
  println("To be an actor, you have to be a child.")
}

While a subclass that extends the Actor class must define act in order to be concrete, a factory-produced Actor has no such limitation. In this shorter example, the body of the method passed to actor is effectively promoted to the act method from our first example. Predictably, this Actor also prints a message when run. Illuminating, but we still haven’t shown the essential piece of the Actors puzzle: sending messages.

Sending Messages to Actors

Actors can receive any sort of object as a message, from strings of text to numeric types to whatever classes you’ve cooked up in your programs. For this reason, Actors and pattern matching go hand in hand. An Actor should only act on messages of familiar types; a pattern match on the class and/or contents of a message is good defensive programming, and increases the readability of Actor code.

// code-examples/Concurrency/pattern-match-actor-script.scala

import scala.actors.Actor
import scala.actors.Actor._

val fussyActor = actor {
  loop {
    receive {
      case s: String => println("I got a String: " + s)
      case i: Int => println("I got an Int: " + i.toString)
      case _ => println("I have no idea what I just got.")
    }
  }
}

fussyActor ! "hi there"
fussyActor ! 23
fussyActor ! 3.33

This example prints the following when run.

I got a String: hi there
I got an Int: 23
I have no idea what I just got.

The body of fussyActor is a receive method wrapped in a loop. loop is essentially a nice shortcut for while(true); it does whatever is inside its block repeatedly. receive blocks until it gets a message of a type that will satisfy one of its internal pattern matching cases.

The final lines of this example demonstrate use of the ! (exclamation point, or bang) method to send messages to our Actor. If you’ve ever seen Actors in Erlang, you’ll find this syntax familiar. The Actor is always on the left-hand side of the bang, and the message being sent to said Actor is always on the right. If you need a mnemonic for this granule of syntactic sugar, imagine that you’re an irate director shouting commands at your Actors.

The Mailbox

Every Actor has a mailbox in which messages sent to that Actor are queued. Let’s see an example where we inspect the size of an Actor’s mailbox.

// code-examples/Concurrency/actor-mailbox-script.scala

import scala.actors.Actor
import scala.actors.Actor._

val countActor = actor {
  loop {
    react {
      case "how many?" => {
        println("I've got " + mailboxSize.toString + " messages in my mailbox.")
      }
    }
  }
}

countActor ! 1
countActor ! 2
countActor ! 3
countActor ! "how many?"
countActor ! "how many?"
countActor ! 4
countActor ! "how many?"

This example produces the following output.

I've got 3 messages in my mailbox.
I've got 3 messages in my mailbox.
I've got 4 messages in my mailbox.

Note that the first and second lines of output are identical. Because our Actor was set up solely to process messages of the string “how many?”, those messages didn’t remain in its mailbox. Only the messages of types we didn’t know about - in this case, Int - remained unprocessed.

Tip

If you see an Actor’s mailbox size ballooning unexpectedly, you’re probably sending messages of a type that the Actor doesn’t know about. Include a catchall case (_) when pattern matching received messages to find out what’s harassing your Actors.

Actors in Depth

Now that we’ve got a basic sense of what Actors are and how they’re used in Scala, let’s put them to work. Specifically, let’s put them to work cutting hair. The sleeping barber problem ([SleepingBarberProblem]) is one of a popular set of computer science hypotheticals designed to demonstrate issues of concurrency and synchronization.

The problem is this: a hypothetical barber shop has just one barber with one barber chair, and three chairs in which customers may wait for a haircut. Without customers around, the barber sleeps. When a customer arrives, the barber wakes up to cut his hair. If the barber is busy cutting hair when a customer arrives, the customer sits down in an available chair. If a chair isn’t available, the customer leaves.

The sleeping barber problem is usually solved with semaphores and mutexes, but we’ve got better tools at our disposal. Straightaway, we see several things to model as Actors: the barber is clearly one, as are the customers. The barbershop itself could be modeled as an Actor, too; there need not be a real-world parallel to verbal communication in an Actor system, even though we’re sending messages.

2 comments

  1. Joe Posted 1 month, 9 days and 7 hours ago

    Typo: "Actosr"

  2. Dean Wampler Posted 29 days and 23 hours ago

    Thanks. will fix

Add a comment

Let’s start with the sleeping barber’s customers, as they have the simplest responsibilities.

// code-examples/Concurrency/sleepingbarber/customer.scala

package sleepingbarber

import scala.actors.Actor
import scala.actors.Actor._

case object Haircut

class Customer(val id: Int) extends Actor {
  var shorn = false

  def act() = {
    loop {
      react {
        case Haircut => {
          shorn = true
          println("[c] customer " + id + " got a haircut")
        }
      }
    }
  }
}

For the most part, this should look pretty familiar: we declare the package in which this code lives, we import code from the scala.actors package, and we define a class that extends Actor. There are a few details worth noting, however.

First of all, there’s our declaration of case object Haircut. A common pattern when working with Actors in Scala is to use a case object to represent a message without internal data. If we wanted to include, say, the time at which the haircut was completed, we’d use a case class instead. We declare Haircut here because it’s a message type that will be sent solely to customers.

Note as well that we’re storing one bit of mutable state in each Customer: whether or not they’ve gotten a haircut. In their internal loop, each Customer waits for a Haircut message and, upon receipt of one, we set the shorn boolean to true. Customer uses the asynchronous react method to respond to incoming messages. If we needed to return the result of processing the message, we would use receive, but we don’t, and in the process we save some memory and thread use under the hood.

Let’s move on to the barber himself. Because there’s only one barber, we could have used the actor factory method technique mentioned above to create him. For testing purposes, we’ve instead defined our own Barber class.

// code-examples/Concurrency/sleepingbarber/barber.scala

package sleepingbarber

import scala.actors.Actor
import scala.actors.Actor._
import scala.util.Random

class Barber extends Actor {
  private val random = new Random()

  def helpCustomer(customer: Customer) {
    if (self.mailboxSize >= 3) {
      println("[b] not enough seats, turning customer " + customer.id + " away")
    } else {
      println("[b] cutting hair of customer " + customer.id)
      Thread.sleep(100 + random.nextInt(400))
      customer ! Haircut
    }
  }

  def act() {
    loop {
      react {
        case customer: Customer => helpCustomer(customer)
      }
    }
  }
}

The core of the Barber class looks very much like the Customer. We loop around react, waiting for a particular type of object. To keep that loop tight and readable, we call a method, helpCustomer, when a new Customer is sent to the barber. Within that method we employ a check on the mailbox size to serve as our “chairs” that customers may occupy; we could have the Barber or Shop classes maintain an internal Queue, but why bother when each actor’s mailbox already is one?

If three or more customers are in the queue, we simply ignore that message; it’s then discarded from the barber’s mailbox. Otherwise, we simulate a semi-random delay (always at least 100 milliseconds) for the time it takes to cut a customer’s hair, then send off a Haircut message to that customer. (Were we not trying to simulate a real-world scenario, we would of course remove the call to Thread.sleep() and allow our barber to run full tilt.)

Next up, we have a simple class to represent the barbershop itself.

// code-examples/Concurrency/sleepingbarber/shop.scala

package sleepingbarber

import scala.actors.Actor
import scala.actors.Actor._

class Shop extends Actor {
  val barber = new Barber()
  barber.start

  def act() {
    println("[s] the shop is open")

    loop {
      react {
        case customer: Customer => barber ! customer
      }
    }
  }
}

By now, this should all look very familiar. Each Shop creates and starts a new Barber, prints a message telling the world that the shop is open, and sits in a loop waiting for customers. When a Customer comes in, he’s sent to the barber. We now see an unexpected benefit of Actors: they allow us to describe concurrent business logic in easily understood terms. “Send the customer to the barber” makes perfect sense, much more so than “notify the barber, unlock the mutex around the customer seats, increment the number of free seats,” and so forth. Actors get us closer to our domain.

Finally, we have a driver for our simulation.

// code-examples/Concurrency/sleepingbarber/barbershop-simulator.scala

package sleepingbarber

import scala.actors.Actor._
import scala.collection.{immutable, mutable}
import scala.util.Random

object BarbershopSimulator {
  private val random = new Random()
  private val customers = new mutable.ArrayBuffer[Customer]()
  private val shop = new Shop()

  def generateCustomers {
    for (i <- 1 to 20) {
      val customer = new Customer(i)
      customer.start()
      customers += customer
    }

    println("[!] generated " + customers.size + " customers")
  }

  // customers arrive at random intervals
  def trickleCustomers {
    for (customer <- customers) {
      shop ! customer
      Thread.sleep(random.nextInt(450))
    }
  }

  def tallyCuts {
    // wait for any remaining concurrent actions to complete
    Thread.sleep(2000)

    val shornCount = customers.filter(c => c.shorn).size
    println("[!] " + shornCount + " customers got haircuts today")
  }

  def main(args: Array[String]) {
    println("[!] starting barbershop simulation")
    shop.start()

    generateCustomers
    trickleCustomers
    tallyCuts

    System.exit(0)
  }
}

After “opening the shop”, we generate a number of Customer objects, assigning a numeric ID to each and storing the lot in an ArrayBuffer. Next, we “trickle” the customers in by sending them as messages to the shop and sleeping for a semi-random amount of time between loops. At the end of our simulated day, we tally up the number of customers who got haircuts by filtering out the customers whose internal shorn boolean was set to true and asking for the size of the resulting sequence.

3 comments

  1. Joe Posted 1 month, 9 days and 5 hours ago

    I think this is a great example and a great toy problem! However, I find Thread.sleep(2000) to be a slightly unsatisfying way to wait for concurrent actions to complete. It's difficult to guarantee that all computation finishes by a certain time, especially if you were to use RemoteActors. This also skews any benchmarking you might want to do. Knowing how to end a concurrent problem gracefully is essential to many applications, although it's probably overkill for this example.

    I like to have a definitive signal or message that computation has finished. I have gone about this by having the actors communicate with the coordinating class (which isn't explicitly an actor itself). I'm not sure if this is the best way to do things.

  2. Dean Wampler Posted 29 days and 23 hours ago

    Thanks for the feedback. Obviously, we're trying to strike a balance between keeping it simple for clarity, yet realistic enough to be applicable to nontrivial problems. Will consider what we can do here.

  3. Joe Posted 29 days and 9 hours ago

    One other non-obvious issue: your Barbershop simulator and Barber could very well be using the exact same stream of random numbers if they both call Random() within 1ms of each other. It probably shouldn't give you adverse behavior in this example, but imagine you added "hair length" to your Customers which was determined by a random number generator at construction: all customers would have identical hair length. Even if you manually specified seeds (or there was more than 1ms delay), there could still be dependencies due to artifacts related to Java's weak random number generation (see my example at https://lampsvn.epfl.ch/trac/scala/ticket/2149). I'd like to see Scala references and docs make some warning about being careful with random number generation in concurrent programs.

Add a comment

Compile and run the code within the sleepingbarber directory as follows:

fsc *.scala
scala -classpath . sleepingbarber.BarbershopSimulator

Throughout our code, we’ve prefixed console messages with abbreviations for the classes from which the messages were printed. When we look at an example run of our simulator, it’s easy to see where each message came from.

[!] starting barbershop simulation
[s] the shop is open
[!] generated 20 customers
[b] cutting hair of customer 1
[b] cutting hair of customer 2
[c] customer 1 got a haircut
[c] customer 2 got a haircut
[b] cutting hair of customer 3
[c] customer 3 got a haircut
[b] cutting hair of customer 4
[b] cutting hair of customer 5
[c] customer 4 got a haircut
[b] cutting hair of customer 6
[c] customer 5 got a haircut
[b] cutting hair of customer 7
[c] customer 6 got a haircut
[b] not enough seats, turning customer 8 away
[b] cutting hair of customer 9
[c] customer 7 got a haircut
[b] not enough seats, turning customer 10 away
[c] customer 9 got a haircut
[b] cutting hair of customer 11
[b] cutting hair of customer 12
[c] customer 11 got a haircut
[b] cutting hair of customer 13
[c] customer 12 got a haircut
[b] cutting hair of customer 14
[c] customer 13 got a haircut
[b] not enough seats, turning customer 15 away
[b] not enough seats, turning customer 16 away
[b] not enough seats, turning customer 17 away
[b] cutting hair of customer 18
[c] customer 14 got a haircut
[b] cutting hair of customer 19
[c] customer 18 got a haircut
[b] cutting hair of customer 20
[c] customer 19 got a haircut
[c] customer 20 got a haircut
[!] 15 customers got haircuts today

You’ll find that each run’s output is, predictably, slightly different. Every time the barber takes a bit longer to cut hair than it does for several customers to enter, the “chairs” (the barber’s mailbox queue) fill up, and new customers simply leave.

Of course, we have to include the standard caveats that come with simple examples. For one, it’s possible that our example may not be suitably random, particularly if random values are retrieved within a millisecond of one another. This is a byproduct of the way the JVM generates random numbers, and a good reminder to be careful about randomness in concurrent programs. You’d also want to replace the sleep inside tallyCuts with a clearer signal that the various actors in the system are done doing their work, perhaps by making the BarbershopSimulation an Actor and sending it messages that indicate completion.

Try modifying the code to introduce more customers, additional message types, different delays, or to remove the randomness altogether. If you’re an experienced multithreaded programmer, you might try writing your own sleeping barber implementation just to compare and contrast. We’re willing to bet that an implementation in Scala with Actors will be terser and easier to maintain.

Effective Actors

In order to get the most out of Actors, there are few things to remember. First off, note that there are several methods you can use to get different types of behavior out of your Actors. The table below should help clarify when to use each method.

Table 9.1. Actor Methods

Method Returns Description

act

Unit

Abstract, top-level method for an Actor. Typically contains one of the following methods inside it.

receive

Result of processing message

Blocks until a message of matched type is received.

receiveWithin

Result of processing message

Like receive but unblocks after specified number of milliseconds.

react

Nothing

Requires less overhead (threads) than receive.

reactWithin

Nothing

Like react but unblocks after specified number of milliseconds.


Typically, you’ll want to use react wherever possible. If you need the results of processing a message (that is, you need a synchronous response from sending a message to an Actor), use the receiveWithin variant to reduce your chances of blocking indefinitely on an Actor that’s gotten wedged.

Another strategy to keep your Actor-based code asynchronous is the use of futures. A future is a placeholder object for a value that hasn’t yet been returned from an asynchronous process. You can send a message to an Actor with the !! method; a variant of this method allows you pass along a partial function which is applied to the future value. As you can see from the example below, retrieving a value from a Future is as straightforward as invoking its apply method. Note that retrieving a value from a Future is a blocking operation.

// code-examples/Concurrency/future-script.scala
import scala.actors.Futures._

val eventually = future(5 * 42)
println(eventually())

Each Actor in your system should have clear responsibilities. Don’t use Actors for general-purpose, highly stateful tasks. Instead, think like a director: what are the distinct roles in the “script” of your application, and what’s the least amount of information each Actor needs to do its job? Give each Actor just a couple of responsibilities, and use messages (usually in the form of a case class or case object) to delegate those responsibilities to other Actors.

Don’t be hesitant to copy data when writing Actor-centric code. The more immutable your design, the less likely you are to end up with unexpected state. The more you communicate via messages, the less you have to worry about synchronization. All those messages and immutable variables might appear to be overly costly. But, with today’s plentiful hardware, trading memory overhead for clarity and predictability seems more than fair for most applications.

Lastly, know when Actors aren’t appropriate. Just because Actors are a great way to handle concurrency in Scala doesn’t mean they’re the only way, as we’ll see below. Traditional threading and locking may better suit write-heavy critical paths for which a messaging approach would incur too much overhead. In our experience, you can use a purely Actor-based design to prototype a concurrent solution, then use profiling tools to suss out parts of your application that might benefit from a different approach.

2 comments

  1. Joe Posted 1 month, 9 days and 5 hours ago

    Some info on using Actors and futures (i.e. "!!") would be helpful

  2. Dean Wampler Posted 29 days and 23 hours ago

    Will consider adding.

Add a comment

Traditional Concurrency in Scala: Threading and Events

While Actors are a great way to handle concurrent operations, they’re not the only way to do so in Scala. As Scala is interoperable with Java, the concurrency concepts that you may be familiar with on the JVM still apply.

One-Off Threads

For starters, Scala provides a handy way to run a block of code in a new thread.

// code-examples/Concurrency/threads/by-block-script.scala

new Thread { println("this will run in a new thread") }

A similar construct is available in the scala.concurrent package, as a method on the ops object to run a block asynchronously with spawn.

// code-examples/Concurrency/threads/spawn-script.scala

import scala.concurrent.ops._

object SpawnExample {
  def main(args: Array[String]) {
    println("this will run synchronously")

    spawn {
      println("this will run asychronously")
    }
  }
}

Using java.util.concurrent

If you’re familiar with the venerable java.util.concurrent package, you’ll find it just as easy to use from Scala (or hard to use, depending on your point of view). Let’s use Executors to create a pool of threads. We’ll use the thread pool to run a simple class, implementing Java’s Runnable interface for thread-friendly classes, that identifies which thread it’s running on.

// code-examples/Concurrency/threads/util-concurrent-script.scala

import java.util.concurrent._

class ThreadIdentifier extends Runnable {
  def run {
    println("hello from Thread " + currentThread.getId)
  }
}

val pool = Executors.newFixedThreadPool(5)

for (i <- 1 to 10) {
  pool.execute(new ThreadIdentifier)
}

As is standard in Java concurrency, the run method is where a threaded class starts. Every time our pool executes a new ThreadIdentifier, its run method is invoked. A look at the output below tells us that we’re running on the five threads in the pool, with IDs ranging from 9 to 13.

hello from Thread 9
hello from Thread 10
hello from Thread 11
hello from Thread 12
hello from Thread 13
hello from Thread 9
hello from Thread 11
hello from Thread 10
hello from Thread 10
hello from Thread 13

This is, of course, just scratching the surface of what’s available in java.util.concurrent. You’ll find that your existing knowledge of Java’s approach to multithreading still applies in Scala. What’s more, you’ll be able to accomplish the same tasks using less code, which should contribute to maintainability and productivity.

Events

Threading and Actors aren’t the only way to do concurrency. Event-based concurrency, a particular approach to asynchronous or non-blocking I/O (NIO), has become a favored way to write servers that need to scale to thousands of simultaneous clients. Eschewing the traditional one-to-one relationship of threads to clients, this model of concurrency exposes events that occur when particular conditions are met (for example, when data is received from a client over a network socket.) Typically, the programmer will associate a callback method with each event that’s relevant to her program.

While the java.nio package provides a variety of useful primitives for non-blocking I/O (buffers, channels, etc.), it’s still a fair bit of work to cobble together an event-based concurrent program from those simple parts. Enter Apache MINA, built atop Java NIO and described on its homepage as “a network application framework which helps users develop high performance and high scalability network applications easily” (see [MINA]).

While MINA may be easier to use than Java’s built-in NIO libraries, we’ve gotten used to some conveniences of Scala that just aren’t available in MINA. The open-source Naggati library (see [Naggati]) adds a Scala-friendly layer atop MINA that, according to its author, “makes it easy to build protocol filters [using a] sequential style”. Essentially, Naggati is a DSL for parsing network protocols, with MINA’s powerful NIO abilities under the hood.

Let’s use Naggati to write the foundations of an SMTP email server. To keep things simple, we’re only dealing with two SMTP commands: HELO and QUIT. The former command identifies a client, and the latter ends the client’s session.

We’ll keep ourselves honest with a test suite, facilitated by the Specs behavior-driven development library (see the section called “Specs”).

// code-examples/Concurrency/smtpd/src/test/scala/com/programmingscala/smtpd/SmtpDecoderSpec.scala

package com.programmingscala.smtpd

import java.nio.ByteOrder
import net.lag.naggati._
import org.apache.mina.core.buffer.IoBuffer
import org.apache.mina.core.filterchain.IoFilter
import org.apache.mina.core.session.{DummySession, IoSession}
import org.apache.mina.filter.codec._
import org.specs._
import scala.collection.{immutable, mutable}

object SmtpDecoderSpec extends Specification {
  private var fakeSession: IoSession = null
  private var fakeDecoderOutput: ProtocolDecoderOutput = null
  private var written = new mutable.ListBuffer[Request]

  def quickDecode(s: String): Unit = {
    Codec.decoder.decode(fakeSession, IoBuffer.wrap(s.getBytes), fakeDecoderOutput)
  }

  "SmtpRequestDecoder" should {
    doBefore {
      written.clear()
      fakeSession = new DummySession
      fakeDecoderOutput = new ProtocolDecoderOutput {
        override def flush(nextFilter: IoFilter.NextFilter, s: IoSession) = {}
        override def write(obj: AnyRef) = written += obj.asInstanceOf[Request]
      }
    }

    "parse HELO" in {
      quickDecode("HELO client.example.org\n")
      written.size mustEqual 1
      written(0).command mustEqual "HELO"
      written(0).data mustEqual "client.example.org"
    }

    "parse QUIT" in {
      quickDecode("QUIT\n")
      written.size mustEqual 1
      written(0).command mustEqual "QUIT"
      written(0).data mustEqual null
    }
  }
}

After setting up an environment for each test run, our suite exercises the two SMTP commands we’re interested in. The doBefore block runs before each test, guaranteeing that mock session and output buffers are in a clean state. In each test we’re passing a string of hypothetical client input to our as-yet-unimplemented Codec, then verifying that the resulting Request (a case class) contains the correct command and data fields. As the QUIT command doesn’t require any additional information from the client, we simply check that data is null.

With our tests in place, let’s implement a basic codec (an encoder and decoder) for SMTP.

// code-examples/Concurrency/smtpd/src/main/scala/com/programmingscala/smtpd/Codec.scala

package com.programmingscala.smtpd

import org.apache.mina.core.buffer.IoBuffer
import org.apache.mina.core.session.{IdleStatus, IoSession}
import org.apache.mina.filter.codec._
import net.lag.naggati._
import net.lag.naggati.Steps._

case class Request(command: String, data: String)
case class Response(data: IoBuffer)

object Codec {
  val encoder = new ProtocolEncoder {
    def encode(session: IoSession, message: AnyRef, out: ProtocolEncoderOutput) = {
      val buffer = message.asInstanceOf[Response].data
      out.write(buffer)
    }

    def dispose(session: IoSession): Unit = {
      // no-op, required by ProtocolEncoder trait
    }
  }

  val decoder = new Decoder(readLine(true, "ISO-8859-1") { line =>
    line.split(' ').first match {
      case "HELO" => state.out.write(Request("HELO", line.split(' ')(1))); End
      case "QUIT" => state.out.write(Request("QUIT", null)); End
      case _ => throw new ProtocolError("Malformed request line: " + line)
    }
  })
}

We first define a Request case class in which to store request data as it arrives. Then we specify the encoder portion of our codec, which exists simply to write data out. A dispose method is defined (but not fleshed out) to fulfill the contract of the ProtocolEncoder trait.

The decoder is what we’re really interested in. readRequest reads a line, picks out the first word in that line, and pattern matches on it to find SMTP commands. In the case of a HELO command, we also grab the subsequent string on that line. The results are placed in a Request object and written out to state. As you might imagine, state stores our progress throughout the parsing process.

Though trivial, the above example demonstrates just how easy it is to parse protocols with Naggati. Now that we’ve got a working codec, let’s combine Naggati and MINA with Actors to wire up a server.

First, a few lines of setup grunt work to get things going for our SMTP server.

// code-examples/Concurrency/smtpd/src/main/scala/com/programmingscala/smtpd/Main.scala

package com.programmingscala.smtpd

import net.lag.naggati.IoHandlerActorAdapter
import org.apache.mina.filter.codec.ProtocolCodecFilter
import org.apache.mina.transport.socket.SocketAcceptor
import org.apache.mina.transport.socket.nio.{NioProcessor, NioSocketAcceptor}
import java.net.InetSocketAddress
import java.util.concurrent.{Executors, ExecutorService}
import scala.actors.Actor._

object Main {
  val listenAddress = "0.0.0.0"
  val listenPort = 2525

  def setMaxThreads = {
    val maxThreads = (Runtime.getRuntime.availableProcessors * 2)
    System.setProperty("actors.maxPoolSize", maxThreads.toString)
  }

  def initializeAcceptor = {
    var acceptorExecutor = Executors.newCachedThreadPool()
    var acceptor =
      new NioSocketAcceptor(acceptorExecutor, new NioProcessor(acceptorExecutor))
    acceptor.setBacklog(1000)
    acceptor.setReuseAddress(true)
    acceptor.getSessionConfig.setTcpNoDelay(true)
    acceptor.getFilterChain.addLast("codec",
            new ProtocolCodecFilter(smtpd.Codec.encoder, smtpd.Codec.decoder))
    acceptor.setHandler(
            new IoHandlerActorAdapter(session => new SmtpHandler(session)))
    acceptor.bind(new InetSocketAddress(listenAddress, listenPort))
  }

  def main(args: Array[String]) {
    setMaxThreads
    initializeAcceptor
    println("smtpd: up and listening on " + listenAddress + ":" + listenPort)
  }
}

To ensure that we’re getting the most out of the Actor instances in our server, we set the actors.maxPoolSize system property to twice the number of available processors on our machine. We then initialize an NioSocketAcceptor, a key piece of MINA machinery that accepts new connections from clients. The final three lines of this configuration are critical, as they put our codec to work, tell the acceptor to handle requests with a special object, and start the server listening for new connections on port 2525 (real SMTP servers run on the privileged port 25).

The aforementioned special object is an Actor wrapped in an IoHandlerActorAdapter, a bridging layer between Scala Actors and MINA that’s provided by Naggati. This is the piece of our server that talks back to the client. Now that we know what the client is saying, thanks to the decoder, we actually know what to say back!

// code-examples/Concurrency/smtpd/src/main/scala/com/programmingscala/smtpd/SmtpHandler.scala

package com.programmingscala.smtpd

import net.lag.naggati.{IoHandlerActorAdapter, MinaMessage, ProtocolError}
import org.apache.mina.core.buffer.IoBuffer
import org.apache.mina.core.session.{IdleStatus, IoSession}
import java.io.IOException
import scala.actors.Actor
import scala.actors.Actor._
import scala.collection.{immutable, mutable}

class SmtpHandler(val session: IoSession) extends Actor {
  start

  def act = {
    loop {
      react {
        case MinaMessage.MessageReceived(msg) =>
            handle(msg.asInstanceOf[smtpd.Request])
        case MinaMessage.SessionClosed => exit()
        case MinaMessage.SessionIdle(status) => session.close
        case MinaMessage.SessionOpened => reply("220 localhost Tapir SMTPd 0.1\n")

        case MinaMessage.ExceptionCaught(cause) => {
          cause.getCause match {
            case e: ProtocolError => reply("502 Error: " + e.getMessage + "\n")
            case i: IOException   => reply("502 Error: " + i.getMessage + "\n")
            case _                => reply("502 Error unknown\n")
          }
          session.close
        }
      }
    }
  }

  private def handle(request: smtpd.Request) = {
    request.command match {
      case "HELO" => reply("250 Hi there " + request.data + "\n")
      case "QUIT" => reply("221 Peace out girl scout\n"); session.close
    }
  }

  private def reply(s: String) = {
    session.write(new smtpd.Response(IoBuffer.wrap(s.getBytes)))
  }

}

Straight away, we see the same pattern that we saw in the Actors examples earlier in the chapter: looping around a react block that pattern matches on a limited set of cases. In SmtpHandler, all of those cases are events provided by MINA. For example, MINA will send us MinaMessage.SessionOpened when a client connects and MinaMessage.SessionClosed when a client disconnects.

The case we’re most interested in is MinaMessage.MessageReceived. We’re handed a familiar Request object with each newly received valid message, and we can pattern match on the command field to take appropriate action. When the client says HELO, we can reply with an acknowledgement. When the client says QUIT, we say goodbye and disconnect him.

Now that we’ve got all the pieces in place, let’s have a conversation with our server.

[al3x@jaya ~]$ telnet localhost 2525
Trying ::1...
Connected to localhost.
Escape character is '^]'.
220 localhost Tapir SMTPd 0.1
HELO jaya.local
250 Hi there jaya.local
QUIT
221 Peace out girl scout
Connection closed by foreign host.

A brief conversation, to be sure, but our server works! Now, what happens if we throw something unexpected at it?

[al3x@jaya ~]$ telnet localhost 2525
Trying ::1...
Connected to localhost.
Escape character is '^]'.
220 localhost Tapir SMTPd 0.1
HELO jaya.local
250 Hi there jaya.local
BAD COMMAND
502 Error: Malformed request line: BAD COMMAND
Connection closed by foreign host.

Nicely handled. Good thing we took the time to dig out those exceptions when we received a MinaMessage.ExceptionCaught in our SmtpHandler Actor.

Of course, what we’ve built just handles the beginning and end of a complete SMTP conversation. As an exercise, try filling out the rest of the commands. Or, to skip ahead to something very much akin to what we’ve built here, check out the open source Mailslot project on GitHub (see [Mailslot]).

Recap and What’s Next

We learned how to build scalable, robust concurrent applications using Scala’s Actor library that avoid the problems of traditional approaches based on synchronized access to shared, mutable state. We also demonstrated that Java’s powerful built-in threading model is easily accessible from Scala. Finally, we learned how to combine Actors with the powerful MINA NIO framework and Naggati to develop event-driven, asynchronous network servers from the ground up in just a few lines of code.

The next chapter examines Scala’s built-in support for working with XML.

You must sign in or register before commenting