Server-Side WebSocket Support

WebSocket is a protocol that provides a bi-directional channel between browser and webserver usually run over an upgraded HTTP(S) connection. Data is exchanged in messages whereby a message can either be binary data or Unicode text.

Akka HTTP provides a stream-based implementation of the WebSocket protocol that hides the low-level details of the underlying binary framing wire-protocol and provides a simple API to implement services using WebSocket.

Model

The basic unit of data exchange in the WebSocket protocol is a message. A message can either be binary message, i.e. a sequence of octets or a text message, i.e. a sequence of Unicode code points.

Akka HTTP provides a straight-forward model for this abstraction:

/**
 * The ADT for WebSocket messages. A message can either be a binary or a text message.
 */
sealed trait Message extends akka.http.javadsl.model.ws.Message

/**
 * Represents a WebSocket text message. A text message can either be a [[TextMessage.Strict]] in which case
 * the complete data is already available or it can be [[TextMessage.Streamed]] in which case `textStream`
 * will return a Source streaming the data as it comes in.
 */
sealed trait TextMessage extends akka.http.javadsl.model.ws.TextMessage with Message {
  /**
   * The contents of this message as a stream.
   */
  def textStream: Source[String, _]

  /** Java API */
  override def getStreamedText: javadsl.Source[String, _] = textStream.asJava
  override def asScala: TextMessage = this
}
sealed trait BinaryMessage extends akka.http.javadsl.model.ws.BinaryMessage with Message {
  /**
   * The contents of this message as a stream.
   */
  def dataStream: Source[ByteString, _]

  /** Java API */
  override def getStreamedData: javadsl.Source[ByteString, _] = dataStream.asJava
  override def asScala: BinaryMessage = this
}

The data of a message is provided as a stream because WebSocket messages do not have a predefined size and could (in theory) be infinitely long. However, only one message can be open per direction of the WebSocket connection, so that many application level protocols will want to make use of the delineation into (small) messages to transport single application-level data units like “one event” or “one chat message”.

Many messages are small enough to be sent or received in one go. As an opportunity for optimization, the model provides a Strict subclass for each kind of message which contains data as a strict, i.e. non-streamed, ByteString or String.

When receiving data from the network connection the WebSocket implementation tries to create a Strict message whenever possible, i.e. when the complete data was received in one chunk. However, the actual chunking of messages over a network connection and through the various streaming abstraction layers is not deterministic from the perspective of the application. Therefore, application code must be able to handle both streamed and strict messages and not expect certain messages to be strict. (Particularly, note that tests against localhost will behave differently than tests against remote peers where data is received over a physical network connection.)

For sending data, use TextMessage.apply(text: String) to create a Strict message which is often the natural choice when the complete message has already been assembled. Otherwise, use TextMessage.apply(textStream: Source[String, Any]) to create a streamed message from an Akka Stream source.

Server API

The entrypoint for the WebSocket API is the synthetic UpgradeToWebSocket header which is added to a request if Akka HTTP encounters a WebSocket upgrade request.

The WebSocket specification mandates that details of the WebSocket connection are negotiated by placing special-purpose HTTP-headers into request and response of the HTTP upgrade. In Akka HTTP these HTTP-level details of the WebSocket handshake are hidden from the application and don’t need to be managed manually.

Instead, the synthetic UpgradeToWebSocket represents a valid WebSocket upgrade request. An application can detect a WebSocket upgrade request by looking for the UpgradeToWebSocket header. It can choose to accept the upgrade and start a WebSocket connection by responding to that request with an HttpResponse generated by one of the UpgradeToWebSocket.handleMessagesWith methods. In its most general form this method expects two arguments: first, a handler Flow[Message, Message, Any] that will be used to handle WebSocket messages on this connection. Second, the application can optionally choose one of the proposed application-level sub-protocols by inspecting the values of UpgradeToWebSocket.requestedProtocols and pass the chosen protocol value to handleMessages.

Handling Messages

A message handler is expected to be implemented as a Flow[Message, Message, Any]. For typical request-response scenarios this fits very well and such a Flow can be constructed from a simple function by using Flow[Message].map or Flow[Message].mapAsync.

There are other use-cases, e.g. in a server-push model, where a server message is sent spontaneously, or in a true bi-directional scenario where input and output aren’t logically connected. Providing the handler as a Flow in these cases may not fit. Another method, UpgradeToWebSocket.handleMessagesWithSinkSource, is provided which allows to pass an output-generating Source[Message, Any] and an input-receiving Sink[Message, Any] independently.

Note that a handler is required to consume the data stream of each message to make place for new messages. Otherwise, subsequent messages may be stuck and message traffic in this direction will stall.

Example

Let’s look at an example.

WebSocket requests come in like any other requests. In the example, requests to /greeter are expected to be WebSocket requests:

val requestHandler: HttpRequest => HttpResponse = {
  case req @ HttpRequest(GET, Uri.Path("/greeter"), _, _, _) =>
    req.header[UpgradeToWebSocket] match {
      case Some(upgrade) => upgrade.handleMessages(greeterWebSocketService)
      case None          => HttpResponse(400, entity = "Not a valid websocket request!")
    }
  case r: HttpRequest =>
    r.discardEntityBytes() // important to drain incoming HTTP Entity stream
    HttpResponse(404, entity = "Unknown resource!")
}

It uses pattern matching on the path and then inspects the request to query for the UpgradeToWebSocket header. If such a header is found, it is used to generate a response by passing a handler for WebSocket messages to the handleMessages method. If no such header is found a “400 Bad Request” response is generated.

The passed handler expects text messages where each message is expected to contain (a person’s) name and then responds with another text message that contains a greeting:

// The Greeter WebSocket Service expects a "name" per message and
// returns a greeting message for that name
val greeterWebSocketService =
  Flow[Message]
    .mapConcat {
      // we match but don't actually consume the text message here,
      // rather we simply stream it back as the tail of the response
      // this means we might start sending the response even before the
      // end of the incoming message has been received
      case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream) :: Nil
      case bm: BinaryMessage =>
        // ignore binary messages but drain content to avoid the stream being clogged
        bm.dataStream.runWith(Sink.ignore)
        Nil
    }
Note

Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.

Routing support

The routing DSL provides the handleWebSocketMessages directive to install a WebSocket handler if the request was a WebSocket request. Otherwise, the directive rejects the request.

Here’s the above simple request handler rewritten as a route:

def greeter: Flow[Message, Message, Any] =
  Flow[Message].mapConcat {
    case tm: TextMessage =>
      TextMessage(Source.single("Hello ") ++ tm.textStream ++ Source.single("!")) :: Nil
    case bm: BinaryMessage =>
      // ignore binary messages but drain content to avoid the stream being clogged
      bm.dataStream.runWith(Sink.ignore)
      Nil
  }
val websocketRoute =
  path("greeter") {
    handleWebSocketMessages(greeter)
  }

// tests:
// create a testing probe representing the client-side
val wsClient = WSProbe()

// WS creates a WebSocket request for testing
WS("/greeter", wsClient.flow) ~> websocketRoute ~>
  check {
    // check response for WS Upgrade headers
    isWebSocketUpgrade shouldEqual true

    // manually run a WS conversation
    wsClient.sendMessage("Peter")
    wsClient.expectMessage("Hello Peter!")

    wsClient.sendMessage(BinaryMessage(ByteString("abcdef")))
    wsClient.expectNoMessage(100.millis)

    wsClient.sendMessage("John")
    wsClient.expectMessage("Hello John!")

    wsClient.sendCompletion()
    wsClient.expectCompletion()
  }

The example also includes code demonstrating the testkit support for WebSocket services. It allows to create WebSocket requests to run against a route using WS which can be used to provide a mock WebSocket probe that allows manual testing of the WebSocket handler’s behavior if the request was accepted.

The source code for this page can be found here.