Taming Awful Legacy Data Feeds Using Akka

The first time you encounter a legacy data feed, in an industry whose technology predates the web, can be shocking. It turns out that not every API on the internet is RESTful, some vendors have not heard of websockets (what is this? 2009?!), and SOAP XML is not the weirdest data format ever devised.

As a former colleague once sighed, "All feeds are awful, but they're all awful in their own unique ways". Well, he didn't exactly say awful, but this is a family-friendly blog.

Some of the ways they can be awful include (in no particular order):

  • Not asynchronous (they block with no timeout guarantees)
  • Bespoke data format (maybe binary)
  • Non-existent error handling
  • No access except via closed-source libraries for a very limited set of languages
  • Weird login procedures
  • Stateful (sometimes in subtle ways)
  • Test and production data all mixed together
  • Messages that don't conform to the vendor's spec
  • Missing, partial, or duplicated messages
  • etc.

I want to show how Scala, and specifically Akka, can make working with these feeds less painful than it is in other stacks. I'll do it with a toy example and I'll gradually introduce gruesome features and show how Akka can help with the pain.

The API

Our fictional API is line-oriented and uses a very compact text representation. The messages are pipe-delimited and a bit cryptic, for example QUO|442|8.44. The underlying technology (probably awful) has been wrapped in a Scala class by a helpful colleague. Its read/write interface looks something like this:

trait Connection {  
  def read: Option[String] // returns None if no message available on the server
  def write(data: String): Unit
}

By the way, when I use ... in my code examples it either means something uninteresting, or some code you've already seen and I'm skipping for brevity. Anyway, there are two things to notice about this. Firstly, it's a blocking API, meaning reading the incoming messages in a modern asynchronous program means somehow polling the read method frequently in one thread and handing the results over to the thread that actually uses them (likewise, sending a message by calling write may block for an awkwardly long time.) Secondly, read can only return one line at a time, so if the server wants to send a large set of messages, you'd better be prepared to call the read method as fast as possible and hope you can keep up. If your latency is poor you're going to be in trouble, but even if it's good how can you maintain a decent throughput without hogging all your system's resources?

Akka

We build most of our asynchronous services at Football Radar using Akka. It's hard to overstate how nice it is to work with, especially if you're used to manually coordinating your multi-threaded programs by chaining callbacks (or even lower-level constructs like mutexes.)

The building block is the Actor, which is an object that processes incoming messages in a single-threaded context, meaning none of its state is shared between threads. Its only way of communicating with other actors is to send messages to them and maybe hope for a reply at some point in the future. You can have many actors running on the same thread pool and not have to worry about resource starvation, deadlock etc. If you are learning Scala and you haven't looked into Akka yet, now is the time. Go on - we'll wait.

Stop Blocking!

Welcome back! In case you've forgotten, we're trying to tame a legacy API that is awful in two ways - synchronous and based on cryptic message formats - by wrapping it in an actor. From the point of view of client code, ideally you would send Input(data) messages to the actor and somehow it writes them to the connection, and somehow messages would be read from the connection in a timely fashion and forwarded on to you ('you' being another actor). That's a lot of somehows. How should we go about trying to implement it?

First of all, let's rule out just calling read inside a while(true). It would just lock up the actor and make it stop processing incoming messages - preventing us from calling write. Not to mention all that wasted CPU thrashing in a tight loop when there's nothing to read.

OK, what if we just poll the read method once every N microseconds? Well, if N is too small then we end up using an awful lot of CPU when the server is not trying to send us messages, and if N is too big we can only consume the messages at a rather low rate, meaning they might get backed up on the server side. If N = 5000, for example, then you'd end up with a maximum throughput of 200 messages/second. Nope.

Akka provides a simple solution to this with its flexible scheduling. Instead of polling at a fixed interval, you can schedule a single message to be sent to self after a fixed delay. So, when you receive a ReadNow message, try to read from the connection. If it returns None, schedule another for - say - 5 milliseconds in the future. This is a long time on a modern computer, so you won't be using much resources if there's no message waiting on the server. If instead you get Some(string) then process the message and send yourself another ReadNow immediately. So as long as there are messages waiting you'll greedily consume them as fast as you can empty your mailbox (which is pretty fast). Input(data) messages are naturally interleaved with the ReadNows, and your maximum latency is still a respectable 5 milliseconds. Here's what the code looks like, minus some boilerplate like imports and a props method:

...
class ConnectionActor(connection: Connection, receiver: ActorRef) extends ActorRef {

  import ConnectionActor._
  private case object ReadNow
  self ! ReadNow

  override def receive {
    case ReadNow =>
      connection.read match {
        case None =>
          context.system.scheduler.scheduleOnce(5 millis, self, ReadNow)
        case Some(data) =>
          receiver ! Output(data)
          self ! ReadNow
      }
    case Input(data) =>
      connection.write(data)
    case x: Any =>
      println(s"Oh dear. Received unexpected message '$x'")
  }
}

object ConnectionActor {  
  case class Input(data: String)
  case class Output(data: String)
  ...
}

Wrapping the blocking read and write calls in an Actor means the thread will be reused by Akka while waiting on the API, which is nice and efficient. This is a good start, but there are lots of things we could improve. For a start, the input and output data are strings, but clearly it would be nicer if they were case classes. Line-oriented protocols are often efficient and flexible but they can be extremely cryptic. Anyone who's dealt with e.g. FIX messages will know that the last thing you want to be doing is poking around in the raw messages looking for the fields you need. So let's read the spec and find out what the messages represent.

Build A Data Model

Welcome to ACME Corp's widget price feed, providing you with real-time  
prices of a wide variety of widgets!

Once you are logged in, we will send you prices for widgets,  
identified by their ID in our catalogue.

To subscribe or unsubscribe from prices for a particular widget, send  
a SUB or UNS message with the ID:

> SUB|5
> UNS|5

Once subscribed, we will send QUO (quote) messages at regular intervals

< QUO|5|2.99  
< QUO|5|2.95

until we receive an UNS from you.  

OK, that sounds straightforward; let's try to model it. Scala makes modelling hierarchies of data types extremely easy and concise:

package acme  
sealed trait AcmeMessage  
sealed trait Incoming extends AcmeMessage // from Acme's point of view  
sealed trait Outgoing extends AcmeMessage // -"-  
case class WidgetId(value: Int)  
case class Price(value: Double)  
trait WidgetSpecific {  
  def id: WidgetId
}
case class Subscribe(id: WidgetId) extends Incoming with WidgetSpecific  
case class Unsubscribe(id: WidgetId) extends Incoming with WidgetSpecific  
case class Quote(id: WidgetId, price: Price) extends Outgoing with WidgetSpecific

object Serialiser {  
  def toAcmeMessage(s: String): Option[Outgoing] = { ... }
  def toString(input: Incoming): String = { ... }
}

So now we have the beginnings of a data model for Acme's messages and also - by the magic of blogging - some fully unit-tested code for serialising and deserialising it. Ahem. Let's plug it into the ConnectionActor:

...
class ConnectionActor(connection: Connection, receiver: ActorRef) extends ActorRef {

  import ConnectionActor._
  private case object ReadNow
  self ! ReadNow

  override def receive {
    case ReadNow =>
      connection.read match {
        case None =>
          context.system.scheduler.scheduleOnce(5 millis, self, ReadNow)
        case Some(string) =>
          Serialiser.toAcmeMessage(string) match {
            case Some(outgoingMessage) =>
              receiver ! outgoingMessage
            case None =>
              println(s"Oh dear. I couldn't deserialise '$string'")
          }
          self ! ReadNow
      }
    case incoming: Incoming =>
      connection.write(Serialiser.toString(incoming))
    case x: Any =>
      println(s"Oh dear. Received unexpected message '$x'")
  }
}

We no longer need the Input and Output case classes as wrappers round the strings, as we have a rich set of types to send instead.

Logging In: Implicit State And Other Horrors

We've come a long way already. Now clients no longer have to deal with cryptic pipe-delimited strings like UNS and QUO, instead sending and receiving a stream of Scala objects with rich type information. Let's move onto login/logout; how hard can it be? Down at the bottom of page 1 it says:

To log in, send a LIN message containing the username and password. We  
will reply with either a LIS or LIF (Login Success / Login Failure)  
message.  To log out, send a LOU message containing the username (we  
will not send a response).  For example:

> LIN|randall|BatteryHorseStaple
< LIS  
> LOU|randall

Well, that doesn't sound too bad. Let's add the necessary types:

package acme  
...
case class Login(username: String, password: String) extends Incoming  
case object LoginSuccess extends Outgoing  
case object LoginFailure extends Outgoing  
case class Logout(username: String) extends Incoming  

Then we go and add some code to the Serialiser to deal with these types, add some unit tests, and we're done. Let's just check page 2 of the spec...

Warning:

If you send a SUB or UNS message when you are not logged in, we will  
terminate the connection immediately.

We verify logins by hand using a large paper ledger. If you do not  
receive a reply to your Login message within two minutes, please try  
again.

If you disconnect and fail to send a LOU (Logout) message before  
disconnection you will continue to be charged at the normal hourly  
rate. Please phone our Kuala Lumpur helpdesk for assistance.  

Wait. What?

OK, so now we see there are four more kinds of awfulness, bringing the total to six:

  • Synchronous (already solved)
  • Cryptic message format (already solved)
  • Our connection to Acme can implicitly be in three quite different states (Logged Out, Logged In, and Login Pending While Someone Looks In A Large Book).
  • Only some messages are valid in certain states.
  • The transition between Login Pending and Logged Out can be driven by both a LoginFailure message and a timeout.
  • If we're logged in, we have to send a 'LOU' message when the actor shuts down.

Luckily, Akka can help with all of the new ones too.

Finite State Machines To The Rescue

Actors that can be in different states, hold different data in each of those states, and respond differently to messages depending on their current state, are supported by Akka's FSM (finite state machine) class. It's still an Actor, but the receive partial function is replaced by a series of when clauses. Each ends with a decision to stay in the current state (perhaps with modified data), or goto another of the states. It's a surprisingly powerful abstraction for this kind of protocol; let's see how it works.

We can easily enumerate the states:

object ConnectionActor {  
  ...
  sealed trait State
  object State {
    case object LoggedOut extends State
    case object LoginPending extends State
    case object LoggedIn extends State
  }
  ...
}

What about the data we need to store in each State? When we're LoggedOut there's nothing to store, but once we've received a Login message we'll need to keep the username for use on shutdown:

object ConnectionActor {  
  ...
  sealed trait Data
  object Data {
    case object Empty extends Data
    case class Username(value: String) extends Data
  }
  ...
}

These new 'login state' requirements force us to change the interface of the ConnectionActor. We have to inform client code when it's done something illegal (sent a subscribe/unsubscribe message when we are not yet logged in) or login has succeeded or failed due to timeout, wrong password, etc. This means adding a couple more message types:

object ConnectionActor {  
  sealed trait Error
  case object NotLoggedIn extends Error
  case object AlreadyLoggedIn extends Error
  case object LoginFailed extends Error
  case object LoginSucceeded
  ...
}

So now we need to think about how we can modify the actor to behave differently in each of the three cases. The first thing to realise is that reading from the connection is independent of the state - if Acme want to send us messages we should immediately read and deserialise them, regardless of state. The FSM class offers a handy catch-all clause called whenUnhandled for use when the state doesn't matter:

class ConnectionActor(connection: Connection, receiver: ActorRef)  
extends FSM[ConnectionActor.State, ConnectionActor.Data] {  
  import ConnectionActor._
  ...
  startWith(State.LoggedOut, Data.Empty)

  whenUnhandled {
    case Event(ReadNow, _) =>
      connection.read match {
        case None =>
          context.system.scheduler.scheduleOnce(5 millis, self, ReadNow)
        case Some(string) =>
          Serialiser.toAcmeMessage(string) match {
            case Some(outgoingMessage) =>
              self ! outgoingMessage
            case None =>
              println(s"Oh dear. I couldn't deserialise '$string'")
          }
          self ! ReadNow
      }
      stay
    case Event(x: Any, _) =>
      println(s"Oh dear. Received unexpected message '$x' while in state '$stateName'")
      stay
  }
  ...
}

There are lots of things to see here. Firstly, the startWith statement in the constructor - no prizes for guessing what it does. Secondly, the compiler-enforced stay at the end of each case, meaning "remain in the current state, whatever that may be." Thirdly, notice that the messages to the actor have been wrapped in an Event object that also includes the current Data. In this case we don't need it so we just call it _, but it can be very useful, as we shall see. Finally, rather than forwarding all Acme messages directly to the receiver, we are sending them to self instead. The reason for this will become clear in a minute.

State-specific Behaviour

OK, we've covered the catch-all case. Lets turn to the specific states. I'll go through them in order of increasing complexity. First, what should happen when we're already logged in?

  when(State.LoggedIn) {
    case Event(logout: Logout, _) =>
      connection.write(Serialiser.toString(logout))
      goto(State.LoggedOut) using Data.Empty
    case Event(Login(_, _), _) =>
      sender ! AlreadyLoggedIn
      stay
    case Event(incoming: Incoming, _) =>
      connection.write(Serialiser.toString(incoming))
      stay
    case Event(outgoing: Outgoing, _) =>
      receiver ! outgoing
      stay
  }

I think this little snippet of code shows how easy it is to clearly express intent in Akka; it almost reads like English. "When we are in the LoggedIn state and we receive a Logout message, write it to the connection and transition to the LoggedOut state, discarding any Username data. If we receive a duplicated Login, inform the sender of its mistake and stay in the LoggedIn state. If we receive anyother kind of Incoming (i.e. Subscribe and Unsubscribe), writeit to the connection and stay LoggedIn. If we receive any messages from Acme forward them to the receiver."

Anything not handled here will drop into the whenUnhandled catch-all we saw earlier. Next, let's take a look at the partial function for the LoggedOut state:

  when(State.LoggedOut) {
    case Event(login: Login, _) =>
      connection.write(Serialiser.toString(login))
      goto(State.LoginPending) using Data.Username(login.username) forMax(2 minutes)
    case Event(incoming: Incoming, _) =>
      sender ! NotLoggedIn
      stay
    case Event(outgoing: Outgoing, _) =>
      receiver ! outgoing
      stay
  }

Most of this section is similar to the LoggedIn one. Login messages cause a state transition, other Incoming messages are treated as a client error, and Outgoing messages from Acme are forwarded to the receiver. But the state transition itself is more complex. Firstly, it times out after two minutes (the forMax clause), and secondly we store the username for use on shutdown (the using clause selects which of the Data subtypes we want to use.)

Finally, let's look at the last partial function, which is the most complicated:

  when(State.LoginPending) {
    case Event(LoginSuccess, _) =>
      receiver ! LoginSucceeded
      goto(State.LoggedIn)
    case Event(LoginFailure, _) =>
      receiver ! LoginFailed
      goto(State.LoggedOut) using Data.Empty
    case Event(incoming: Incoming, _) =>
      sender ! NotLoggedIn
      stay
    case Event(outgoing: Outgoing, _) =>
      receiver ! outgoing
      stay
    case Event(StateTimeout, _) =>
      receiver ! LoginFailed
      goto(State.LoggedOut) using Data.Empty
  }

There are two subtleties here:

  • We intercept the LoginSuccess or LoginFailure messages from Acme, which tell us what state we need to transition to.
  • We handle a special message called StateTimeout which is sent when the forMax timer expires.

...and last (but not least, if we are to avoid a phone call to Kuala Lumpur), we need to ensure that we log out on shutdown by adding a hook to the actor's postStop method:

  override def postStop {
    (stateName, stateData) match {
      case (State.LoggedIn, Data.Username(username)) =>
        connection.write(Serialiser.toString(Logout(username)))
      case (_, _) =>
    }
  }

So, there we have it. The feed turned out to be awful in six different ways, but we managed to hide all the awfulness inside an Akka Actor about 80 lines long. The result is performant and easy to work with, and I think it's also quite easy to read.