Previous Entry Share Next Entry
Actors and Immutability
jducoeur wrote in querki_project
[In response to my last architecture post, marphod asked a bunch of good questions. Conveniently, some of them are addressed here, in the next section of the Architecture documentation. As always, this page can also be found in the wiki.]

Querki is going to operate mainly in-memory for the first release -- we're going to have what amounts to a distributed in-memory object-oriented database for everything that is currently in-use. The way we're going to make that work without melting down in race-condition chaos is through a very principled use of Actors and Immutability.


Querki is going to be built on the Scala stack, including a number of free and open-source packages. The most important of these is Akka, which looks to be the most advanced Actors library available for Scala.

If you aren't familiar with the Actor style of architecture, it's worth reading into it (here's the Wikipedia article on the subject). But the high concept is that an Actor is sort of like an object in a typical object-oriented system, *except*:
  • Actors communicate solely through message-passing -- they are not permitted to call each other's methods directly.

  • Each Actor operates in its own single-threaded bubble. The Actors collectively share a big thread pool, but any given Actor at any given time has at most one active thread. In its thread, an Actor takes messages from its incoming queue, processes them, and sends out responses. It never does anything aside from this message-processing loop.
Combine these, and you get a tried-and-true model for building very large scale, extremely high-reliability systems. It is a way of thinking about concurrency that avoids many of the common problems in multi-threaded programming. It is relatively easy to reason about -- vastly easier than traditional, lock-based threading.

The Actor Model is the main reason why the Erlang language (which drives me a little batty in most respects) has become popular in recent years: for decades, Erlang and the Actor Model have been used to build some of the most reliable computer systems in the world.

One of the early decisions in the Scala community was to adapt the Actor Model to Scala. This was carried further by Jonas Bonér and associates with the Akka project, which did a very principled job of adapting all of the lessons from Erlang's experience to the Scala world -- not just Actors, but all the infrastructure required to make a truly high-availability system. The result is an increasingly popular mechanism for handling large scale programming in Scala, and on the JVM in general.


Actors are only half the story, though. The other trick is how you *use* them. Actors make the system pretty safe for multi-threaded programming if you truly confine all mutable state to them. This is a hard rule for Querki: **all** mutable state **must** be locked inside a single Actor.

That is, Actors communicate via message-passing. The important rule is that these messages are all immutable objects. You don't send a message in and have it edited by the receiver; the message is simply information to be sent along. Any actual *variables* are contained within a single Actor. If another Actor needs access to that information, it has to send a request for it. Moreover, the "client" that receives that information is not permitted to reason about it excessively -- if it is potentially mutable information, then all that the client knows is that the state *was* this particular value. It doesn't necessarily knows what the value is now, and shouldn't make assumptions.

How These Will be Used in Querki

This model actually works very well for Querki, but you have to keep in mind some key constraints. The most important is that we are trying for "eventual consistency" -- any given outside view of the information may be slightly out of date. This really isn't controversial any more -- it's downright common in the social-media world -- but it isn't the way most engineers think of data.

Remember that a user is always working in a "Space" -- essentially a small, usually personal, database for a particular application. There is precisely one copy of that Space loaded into memory, somewhere in the Querki Cluster. That is contained in an Actor, and absolutely *all* changes to that Space go through the Actor as requests. These requests are lined up in a queue. Each one in its turn gets validated (check to make sure that the change isn't out of date due to edit contentions); processed (changing the state of the Space); and stored (sending a request off to the Storage Actor for this Space to make this change to the on-disk database).

When another Actor (such as a user session) wants to display something from the Space, it sends a request to the Space's Actor, asking for a copy of the Space. That request generally returns a *complete copy* of the Space. (This is one of the reasons why we are currently limiting Spaces to be fairly small -- most are expected to be well under 1Meg in size, and the majority will probably only be tens of K.) The requesting client runs its queries and displays over that. This lets us perform extremely sophisticated queries with ease, since the query processor has a complete copy of the Space right in memory to examine; performance in most cases is expected to be extremely high.

Local-Node Caching

The client session should generally drop its copy of the Space as soon as it is done running the request. To avoid excessive message-passing traffic, though, each Space will be proxied as needed. That is, if a client on node A is interacting with a Space on node B, we will construct a thin proxy on A. That will work with the actual Space Actor on B, and maintain a cached copy of the Space's State on A, updating it as seems best.

In practice, we don't expect these proxies to be the norm: most Spaces, most of the time, will only be in use by one user session, and the Space Actor will tend to wind up on the same node as the session itself. In those cases, having the Session ask for the current state of the Space each time will be so cheap that there is no reason not to do it. But we will use proxies as necessary, so that a Session can avoid keeping its own cache while still having quick and cheap access to a reasonably current state of the Space.

Why This is Good

This approach has many advantages. Besides the speed with which we can process interesting queries, it also takes a healthy attitude towards the Space's state. Since we are always operating on a coherent copy of the Space, but *not* assuming that it is the on-disk copy, it will be extremely easy down the line to provide access to older snapshots of the Space. If the user wants to see what his Space looked like three months ago, the Space's Actor just loads up the change list, constructs a version that represents the state at that time, and hands that off to the client to process. In principle, this will provide an extremely easy variety of version control for end users to play with, given them a clear view of the evolution of their state with no manual work involved.

It also provides a clear approach to dealing with contention. By the nature of Querki and its use cases, edit contention is not expected to be common -- I am usually only editing a single Thing at a time, so contention is mainly an issue if two people are trying to edit the same thing at the same time. To deal with this case, we will always remember which timestamp of the Space we are editing against. When I submit a change request, I will say what timestamp this change is being submitted against. If the Thing has changed since that timestamp, the Space's Actor will bounce the request, asking for a fresh edit against the current state of the Space.

This approach to contention is a bit crude, but terribly easy and common, and should do well enough for the time being. Later, we might get fancier about it if there is user demand -- if we get *terribly* ambitious, we could even implement a Google Wave style of operational transformations, to get co-editing on the same field at the same time.

  • 1
Are you worrying about Byzantine agreement?

That is, you've just mentioned eventual consistency, message-passing, and possible failures (where a message bounces in case of contention). How are you thinking about the possibility that some messages might get lost?

[Disclosure: remember that I'm much more a practical engineer than a theoretician -- I had to go look up the terminology. Tell me if I'm missing your point.]

For much of it, I'm relying on the infrastructure. Keep in mind, I'm building this on top of pretty well-established libraries, not rolling it myself, so a chunk of that question is similar to, "what happens if TCP breaks?"

Like it says above, Akka is based on Erlang, and Erlang is the language of choice for seven-9's reliability for good reasons. Used properly, the architecture is designed for fault-tolerance -- indeed, most of the complexity of using Akka or Erlang is how you set up the "supervisor hierarchies" to properly recover from failures. I suspect that that will be a learning process for me to get it right, and the system will take a while to harden properly, but the fact that this stuff *is* used for extremely high-availability systems, and has fault-tolerance built into its heart, gives me hope that this will be a healthy direction to take.

Also, keep in mind that this Actor cloud is fairly simple as these things go. So far, I'm only anticipating four main categories of Actors:

-- Space Actors, which own the data for a Space and arbitrate all changes to it.

-- User Session Actors, which manage the I/O to the browser and fire off most of the interesting processing.

-- DB I/O Actors, which are subsidiary slaves to the Space Actors (broken out mainly to prevent blocking on write).

-- QL Actors, which take a Space State and a QL (Querki Query Language) expression and do the computation on them; these are expected to be transient slaves to User Session Actors. (Again, mainly to prevent blocking on relatively long-running operations.)

That's a fairly simple setup, with relatively few lines of communication. So there aren't an awful lot of complex failure modes.

And everything should be working in terms of Promises and Trys -- essentially, the message-passing version of exception-handling blocks, with well-defined mechanisms for propagating failures -- which strongly biases the code towards actually managing exceptions. The Scala language and environment are generally biased to make it easier to deal with problems than avoid them. You should prefer matching instead of if statements; Options instead of nulls; Trys instead of hand-rolled error propagation. The general attitude is that all possibilities should be in your face, and failure to handle a case results in a warning or compile error.

All that said, there's the simple fact that nodes can and will crash -- indeed, most of the cloud-hosting environments I'm thinking of as initial hosts insist that you be prepared for frequent reboots. So the general rule will be that there should be appropriate timeouts and fallbacks at every level -- if you're waiting for a response, you should be prepared to cope if that times out. That's not a one-size-fits-all answer, and yes, there are possibilities of cascading failures if it's done clumsily; fortunately, the fact that the architecture just isn't that complicated ought to make that pretty unlikely.

(And now that I've said all of that, I should go update the Wiki page to state some of these rules more explicitly...)

If you're going to use timeouts and other fail-detection modes, you should probably include a mechanism for 'alive, still processing' ping backs. Which is something I have frequently seen done badly or only as an afterthought.

Back of the envelope, message request envelope could have a requested pingback frequency, maximum allowed time without a pingback, who to pingback to, and I'm sure there is some relevant meta-data. The pingback message could have estimated time till completion and the source of the next message/pingback. This framework would also allow the system to hand-off requests to peers, if a long-running query is swamping that node.

If you're going to use timeouts and other fail-detection modes, you should probably include a mechanism for 'alive, still processing' ping backs. Which is something I have frequently seen done badly or only as an afterthought.

That's planned for places where it's appropriate -- in particular, loading a Space into memory could take non-trivial time, so I'm hoping to do once-a-second progress-bar updates that propagate all the way to the browser. Anything else that would normally be slow will get the same treatment.

Most queries, though, *shouldn't* take more than, say, ten seconds. So I'm likely to instead go for an approach where timeouts there come back as hard errors *and* get logged as high-priority errors. In general, I'm planning on being intolerant of excessively long-running processes until and unless I find reason to believe they are necessary. In most cases, a timeout shouldn't happen unless the actor at the other end has hard-crashed.

The message design you're suggesting is reasonable, but remember that I'm not writing the infrastructure myself, I'm using Akka. I may layer some things like you describe on top of the existing infrastructure if it's not already there, but I specifically don't want to go too far down that road -- it tends to lead to lots of unintended consequences...

Huh. The unix practice of each thing doing one thing, and communicating through pipes instead of shared memory comes to mind. Although there has been a lot of movement away from that model in recent linux developments. I'll have to read more into the actor model to see if my instinctual pattern matching here holds up.

They're both instances of the broad concept of normalization: keeping your objects tightly focused, and reducing bleed-over. That's become ever-more-important in programming in recent years: much of the science of object-oriented programming has been devoted to reducing "coupling" (the amount of semantic cross-talk between classes and functions), and improving "cohesion" (the degree to which an object does exactly one thing well).

Mostly, though, the critical thing about Actors is that it's a reasonably good answer to The Scaling Problem. Ever since the industry realized (about five years ago) that we had hit a dead-end in how fast you can run a single-threaded program, all the attention has been on multi-threaded programming. And the problem is, reasoning correctly about multi-threading is *hard*. I'm pretty good at it, but I've been doing it for decades; most people make errors routinely, and those errors are often subtle and insidious.

The thing about Actors (*especially* in Akka) is that they provide an answer that is easy to reason about. You have to adhere to some hard-and-fast rules that can get *very* annoying if you're used to the traditional way of doing business, but doing so results in relatively predictable behaviour. And if you follow best practices, you wind up with programs that scale *extremely* well compared to traditional architectures.

Hadn't realized that Linux was moving away from that sort of decoupling. Windows has been moving in the other direction: PowerShell strongly emphasizes that sort of architecture...

  • 1

Log in

No account? Create an account