Michael L Perry's blog

Partial order

Historical Modeling is based on the concept of partial order. Partial order is necessary for making sense of events in the absence of a central authority.

In any centralized system, you can define a full and complete order in which all events occurred. Take any two events, like two different users each entering a phone number. You can clearly say that one happened before the other.

image

In a distributed system, order is less clear. Events occurring on different machines might have happened in any order. Each machine performed the user action without knowledge of what the other was doing.

image

Take any two events. You might be able to say definitively that one occurred before the other. Or you might not. The events are partially ordered.

Accidental and essential order

The order in which events occurred may be important to understanding the events. If Alice entered a phone number for a contact, and then Bob changed it, then Bob’s entry is the one we want to keep. Bob made his change with full knowledge of Alice’s entry. The order between these two events is essential.

In many cases, however, the order in which events occurred is not important to understanding their meaning. If Alice and Bob entered phone numbers for two different contacts, then we don’t care which one happened first. If both changes happened to occur on the same server, then we know the order in which they occurred. But this order is accidental.

There are several kinds of essential order between events:

  • Cause and effect
  • Request and response
  • Parent and child
  • Original and replacement

One event my cause another to occur. For example, an order shipment will cause a delivery receipt. An invoice will cause a payment. The cause occurs before the effect.

In other scenarios, one event may be a response to another. For example, a request for proposal will be answered with a bid. The request occurs before the response.

Many events occur in a parent/child relationship with one another. One event establishes a customer relationship, and subsequent events represent that customer’s orders. The parent occurs before the child.

Finally, one event may supersede another, replacing the value that the first one set. Bob can replace the phone number that Alice entered. The original event occurs before the replacement.

In a fully ordered system, there is inevitably some accidental order. Events are placed one before another just because they happened to occur that way. A historical model, however, aims to capture only the essential order. One event occurs before another only because it was the cause for its effect, the request for its response, the parent for its child, or the original for its replacement. In this way, a historical model defines a partial order among events.

Slides and code for Historical Modeling presentation

I presented Historical Modeling at Shreveport .NET User Group. To navigate the slides below, click to set focus and then use Page Down, Page Up.

Download the source code for the presentation and examples from GitHub.

A historical order fulfillment system

Historical Modeling, like Event Sourcing, records the events that led up to the current state, not the current state itself. The difference is that Historical Modeling imposes a partial order on those events. I illustrate this property with an order fulfillment system.


The ordering example

Let’s build a historical model to fulfill orders placed on a web site. We start with a fact to represent the order. This represents the fact that the customer has decided to order some products. Contained within the fact, we have all of the details of the order. Now that the order has been submitted, it cannot be modified. It is a historical fact.

Before someone can submit an order, they have to establish a relationship with the company. They have to become a customer. Let’s define another fact that represents this relationship. The Customer fact is a predecessor of the Order fact. It must come before. And when you look at an Order, you have a reference to the Customer.

But the Customer fact has a predecessor of its own. Before you can become a Customer, there must be a Company.

Now we have a chain of facts from Company to Order. Whenever one machine exchanges facts with another machine, it has to exchange them in this order. Before the web site can tell you about the new Order, it needs to ensure that you know about the Customer.

But what about other orders placed by the same customer? Those orders don’t have a predecessor/successor relationship. Any order could be exchanged before any other order. And what about orders placed by other customers? There is no predecessor/successor relationship there either. No customer comes before any other customer. Order is only enforced when it is important.

Shipping and billing

After a customer places an order, it goes to the warehouse. There, they will pick, pack and ship. They record a historical fact of the shipment. You can’t ship until the customer has placed an order, so the Order is a prerequisite of the shipment.

At the same time, the order goes to the billing department. They will generate an invoice to send to the customer. You might think that since you invoice the customer, you would need a reference to the Customer fact. But the important part of this relationship is that it enforces order. You can’t invoice a customer until they place an order. So the true predecessor of the Invoice is the Order.

But we don’t define a predecessor/successor relationship between Invoice and Shipment. We can ship an order before we invoice it, or vice-versa. Invoicing happens in one department. Shipping happens in another. These two departments have their own hardware. These processes can take place independently.

The next fact to consider is Delivery. The customer signs for the order when they take delivery. We record this as a new fact. What is the predecessor of Delivery. We can’t take delivery until we ship.

Then consider Payment. What is its predecessor? The customer can only pay for an order after they’ve been invoiced. So it fits in at that level.

With this diagram now, we can see which facts have to occur before which other facts. When order matters, the facts are related as predecessors or successors. When order doesn’t matter, they are unrelated.

Finally, let’s get customer support involved. After the order has been both paid and delivered, they need to follow up. Once they have, they record it as a new fact. What is the predecessor of the FollowUp fact? The follow up can only happen after the Delivery and the Payment.

The customer service department will have received the Delivery from the warehouse. Before the warehouse can send the Delivery, it must first send all of it’s predecessors, including the Order. Similarly, customer service will receive the Payment from accounts receivable. In the process, they will receive the Order from them as well. To make this all work, we need to ensure that accounts receivable doesn’t see this as two separate orders. For that, we need to establish a few rules.

The rules of Historical Modeling

  1. Facts are immutable.
  2. A fact has a reference to all of its predecessors.
  3. A fact is identified by its type, fields, and predecessors.

Constructability of facts

Definitions

Let us first define the properties of historical facts. We’ll use the letters f, g, and h to denote facts. A fact can be the predecessor of another. We define the predicate P(f, g) to mean that g is a predecessor of f.

A set of facts M is closed under P if and only if:

∀fg(f∈M ∧ P(f, g) → g∈M)

That is to say, for any two facts, if the first is in the set and the second is a predecessor of the first, then the second is also in the set.

Axioms

With those definitions, we can now state the first axiom of facts. This axiom is universally true. It applies to all facts, and is the basis of the theorems that follow.

Axiom 1: A fact is constructible, meaning:

∀f∃M[∀g(P(f, g) → g∈M) ∧ f ∉M], where M is a finite set closed under P.

This axiom asserts that for any fact, I can find a closed set that does not contain the fact, but does contain all of its predecessors.

The reason that I call this property of facts constructability is that it is the minimum practical requirement for building a model. At each step along the way, I am going to have a finite set of facts — the portion of the model that I’ve already built. I want to construct one new fact that is not already in the set. In order to do so, I need to have all of this new fact’s predecessors.

Once I have constructed the new fact, I can define a new set. This will be the set I started with plus the new fact. As we’ll see, this new set will be both finite and closed. We haven’t proven that yet, but we chose the axiom in such a way that we can.

Theorems

Theorem 1: A fact is not its own predecessor: -P(f, f)

Since f is constructible, I can find a closed set M such that

∀g(P(f, g) → g∈M) ∧ f ∉M

This is true for all g, so it must be true for f:

(P(f, f) → f∈M) ∧ f ∉M

(P(f, f) ∧ f∉M) → (f∈M ∧ f ∉M)

(P(f, f) ∧ f∉M) → False

-P(f, f) ∨ f∈M

Since f∉M by constructability, -P(f, f). Q.E.D.

For the next theorem, we’ll define B as a relation between facts. B tells us whether there is a path from one fact to another by following predecessors:

B(f, h) = P(f, h) ∨ ∃g(P(f, g) ∧ B(g, h))

We can easily get from f to h if h is a predecessor of f. We can also get there if we can find a predecessor g and a path from g to h.

Theorem 2: The relation is irreflexive: -B(f, f)

Let’s lay out the path from f to h – if one exists – as a sequence fn, 0 ≤ n ≤ N. Start at f0 = f. Each fact is a predecessor of the prior fact. And h is a predecessor of the last fact fN.

f0 = f, P(fn, fn+1), P(fN, h)

We don’t yet know if such a sequence exists, but we do know that B(f, h).

B(f0, h) = P(f0, h) ∨ ∃g(P(f0, g) ∧ B(g, h))

If the first condition is true (P(f0, h)), then the sequence exists and N = 0. If not, then we know we can find a suitable predecessor g. We’ll choose that predecessor for the next fact in our sequence: f1.

P(f0, f1) ∧ B(f1, h)

Now that we know that B(f1, h), we can continue the sequence.

B(fn, h) = P(fn, h) ∨ ∃fn+1(P(fn, fn+1) ∧ B(fn+1, h))

(0 ≤ n < N) → ∃fn+1(P(fn, fn+1) ∧ B(fn+1, h))

(n = N) → P(fn, h)

So if B(f, h), then the sequence exists. And if the sequence exists, then it satisfies B.

Now that we know the sequence exists, let’s use it to prove our assertion. We know that f is constructible (it’s axiomatic), so we can find a closed finite set M:

∃M[∀g(P(f, g) → g∈M) ∧ f∉M] (f is constructible), and (i)

∀fg(f∈M ∧ P(f, g) → g∈M) (M is closed). (ii)

Into (i), plug in f=f0 and g=f1, and we get:

P(f0, f1) → f1∈M

Into (ii), plug in f=fn, g=fn+1, and we get:

fn∈M ∧ P(fn, fn+1) → fn+1∈M

Which by induction tells us that for every n, 1 ≤ n ≤ N, fn∈M.

Furthermore, into (ii), plug f=fN, g=h, and we see that:

fN∈M ∧ P(fN, h) → h∈M

And so we have shown that if B(f, h), the sequence exists, and h∈M.

We know from constructability that f∉M. But if we plug in h=f, then we get.

B(f, f) → f∈M

Which is a contradiction, so –B(f, f). Q. E. D.

Theorem 3: The relation is transitive: B(f, g) ∧ B(g, h) → B(f, h)

B(f, g) implies that there exists a sequence fn, 0 ≤ n ≤ N for which:

f0 = f, P(fn, fn+1), P(fN, g)

And B(g, h) implies that there exists a sequence gm, 0 ≤ m ≤ M for which:

g0 = g, P(gm, gm+1), P(gM, h)

Define a sequence kp, 0 ≤ p ≤ N+M+1:

kp = fp for p ≤ N, kp = gp-N-1 for p > N

The sequences join because:

P(FN, g)

P(kN, kN+1).

For this sequence:

k0 = f, P(kp, fp+1), P(kN+M+1, h)

So this sequence satisfies B(k0, h), or B(f, h).

B(f, g) ∧ B(g, h) → B(f, h). Q. E. D.

Theorem 4: The relation is asymmetric: B(f, g) → -B(g, f)

Assume B(f, g) ^ B(g, f).

B(f, g) ^ B(g, f) → B(f, f) (Theorem 3)

-B(f, f) (Theorem 2)

This leads to a contradiction, so B(f, g) and B(g, f) cannot both be true. Therefore:

B(f, g) → -B(g, f). Q. E. D.

Since the relation B is irreflexive and transitive (and hence asymmetric), then B defines a strict partial order among facts. Let’s henceforth use the symbol <, which we will pronounce “Before”:

g < f ≡ B(f, g).

Purge

The purpose of the purge keyword is to inform the client that a fact and all of its successors are no longer relevant. It allows a client to clean up its local database.

Logically speaking, facts are never deleted. However, there usually comes a time when the existence of a fact has no bearing on query results. One occurrence of this is when you use the Entity Delete pattern.

fact Entity {
   purge where this.deleted;

key:
   unique;
   Context context;

query:
   bool deleted {
      exists EntityDelete d : d.entity = this
   }
}

fact EntityDelete {
key:
   Entity entity;
}

The Entity has a predicate that is true when the EntityDelete fact is deleted. Queries for this entity that include the condition “where not e.deleted” will not return deleted entities.

Unsafe purges

If a query might be affected by a purge, then the purge is unsafe. This could happen if the query does not include the condition. For example:

fact Context {
query:
   Entity* entities {
      Entity e : e.context = this
   }
}

The entities query does not include the deleted condition, so an Entity purge would affect its results. That makes the purge unsafe. The Factual compiler issues an error. To correct this error, add the deleted condition to the entities query.

A purge is also unsafe if its condition could become false. This could occur when using the Entity Restore pattern. The Factual compiler issues an error in this scenario. To correct this error, change the purge condition to one that cannot become false. For example, define a new fact that a service creates after a certain period of time to permanently remove a deleted entity from the recycle bin.

Client behavior

When the purge condition is true and the client has uploaded the fact and all its successors to all connected servers, then the client may remove the target fact and all successors from its database.

For example, the EntityDelete fact is initially created at the client. At that moment, the Entity cannot be purged, despite the fact that the purge condition has become true. The client must first send the EntityDelete fact to the server, since it is a successor of the Entity. Only after the server has received the EntityDelete can the client remove the facts from its database.

Server behavior

Clients will respond to a purge long before a server does. A client is a producer and consumer of facts, but is not a distributor. No other system will be connecting to a client to receive facts. A server, however, must respond to requests from clients for historical facts. If these clients have the purged fact but not its successors, they may not know that the fact was purged. The server must send its successors to those clients upon request.

Servers may choose to purged facts after a configurable duration, based on the frequency at which clients tend to connect. A server that chooses this algorithm cannot respond to requests for successors after a fact has been purged. It must return an error.

Servers may also choose to keep a watermark of client requests. When the watermark is between a fact and its purging successor, the server cannot perform the purge. Once all clients have received the purging successor, it may safely remove those facts from the database. This is an expensive algorithm, and should only be used when the number of clients is limited.

The typical server behavior is to ignore the purge. Compared to the cost of keeping per-client information or returning an error to out-of-date clients, the cost of storing the full history of purged facts is small.

The CAP Theorem

Eric Brewer is an expert in distributed systems. In the Principles of Distributed Computing 2000 keynote address, he gave us the CAP Theorem. It states that a distributed system cannot simultaneously guarantee these three attributes:

  • Consistency
  • Availability
  • Partition Tolerance

imageIt can guarantee at most two. Which two you choose should depend upon the system’s architectural requirements.

Consistency

Consistency in a distributed system is not strictly the same as ACID consistency. A distributed system is consistent if a read at any node returns data that is no older than that written by a previous write. The read and the write may occur at the same node or at different nodes. The nodes may use any algorithm they wish to keep each other up-to-date. But if I write version 2, a consistent system will never again read version 1.

There are many ways to guarantee consistency. One would be to block writes until all nodes have been notified. Another would be to block reads until all nodes are consulted. Yet another is to delegate one node the master of that particular piece of data, and route all messages to it. In practice, consistent distributed systems use a combination of these algorithms.

Availability

image A distributed system is available if any non-failing node responds to a request in a reasonable amount of time. It doesn’t mean that nodes can’t fail. It just means that, whatever other guarantees the system offers, it will respond when you address one of the remaining nodes.

You can see the tension between consistency and availability. To guarantee both, we need redundancy. What if the data that we try to read was only stored on the node that was lost after the write? That data would not be available, and we could not guarantee consistency.

Partition Tolerance

A distributed system is partition tolerant if it can tolerate the loss of any number of messages. If enough messages are lost between islands of nodes, the network has been partitioned.

image Network partitioning happens most often in wide area networks. A client disconnects from the internet. Or the connection between two data centers is severed. But network partitioning can happen in a local area network. No network, no matter how expensive, can guarantee that all packets are delivered. It is up to the designer of the distributed system to decide whether the system will tolerate message loss.

Most distributed systems respond to momentary message loss by resending messages. But since the network cannot guarantee message delivery, even those retries might be lost. It’s how the system responds to maintained message loss that determines whether it can guarantee partition tolerance.

Proof

The proof of the CAP Theorem is pretty simple. Informally, we can ask: how can a system guarantee both consistency and partition tolerance? If all messages between two given nodes are lost, how can a write at one affect a read at the other? No matter what algorithm you come up with, the only way to guarantee both consistency and partition tolerance is to give up availability. When messages between two nodes are lost, you must fail the read as if the second node was down. There is no way to respond with consistent data.

Or you can ask how a system can guarantee both consistency and availability. Remember our three example algorithms. If we block writes until every available node is notified, and those notifications are lost, then we must fail the write. If we block reads until every available node is consulted, and those messages are lost, we must fail the read. And if we delegate to the master, and that message is lost, then we fail as well.

And finally, we can guarantee both availability and partition tolerance, but we have to relax our consistency guarantee. Nodes might be down and messages might be lost, but if we assume that those problems will eventually be solved, then we can say that we will eventually be consistent. It is possible to read stale data from such a system, but given enough time all nodes will be up-to-date.

When designing a distributed system, consider the guarantees that the problem demands. But also consider the guarantees that you will be unable to make, and decide how best to respond in those situations.

Eventual consistency

Most of the modern distributed systems frameworks have opted to relax the consistency guarantee. They instead promise “eventual consistency”, or that you will read the new value if you wait long enough. More formally, this guarantee states:

  • All writes are durable.
  • Once a version has been read, a later read will not return an earlier version.

All writes are durable. No data will be lost. Data might, however, be overwritten by a later write.

Once a write becomes visible at a particular node, that node will no longer return an earlier version. It will never go back in time to a state before that write completed.

image

Consider a timeline of writes. We collect that timeline fully ordered at the node where the writes occurred. Those writes produce a series of state changes. By observing the state at that node, we can detect whether a write took place.

Now, allow that stream of writes to move to another node. It causes a similar series of state changes there. If we observe the state at the second node, it might be earlier than the state at the first. Consistency is not guaranteed. But, once the second node catches up, it will not go back. It is eventually consistent.

Historical Modeling

Like many other frameworks, Historical Modeling guarantees eventual consistency. It does so by transmitting historical facts from one node to another.

A historical fact is a record of a decision or state change that occurred at one node. All writes in a historical system are creations of new facts. Facts are never modified or destroyed.

If you observe the history of facts at a target node, you might find that the fact you just wrote at the source is not yet there. Consistency is not guaranteed. This allows the target node to remain available even if the network that it shares with the source is partitioned.

image

When the fact is eventually shared with the target node, the transmission includes its fields and predecessors. These fields and predecessors uniquely identify the fact and distinguish it from others. In this way, an observer can recognize the target fact as the same as the source.

Predecessor facts must be transmitted first, so predecessors will always be present. Successors, however, will arrive eventually. Once they do, they will never be deleted. An observer can query for successors to determine the current state of the system. He will find that the state will never go backwards to a time when the new facts did not exist.

Guarantees

A distributed system based on durable message queues relies upon three guarantees:

  • Messages will be delivered at least once.
  • Message duplication will have no ill effects.
  • Messages will be delivered in the order that they were sent.

As it turns out, these guarantees are difficult to ensure. Different distributed systems architectures have different strategies for upholding these guarantees.

Delivery

In a distributed system based on message queues, at-least-once delivery is the easiest guarantee. The queue itself is durable. Once a message is queued, the sender relinquishes responsibility. Assuming that the recipient will eventually read from the queue (which our operations team monitors), the message will get delivered.

Duplication

Duplication is a little trickier. It usually happens when the recipient has some trouble processing the message.

Queues don’t protect us from duplication. After the recipient processes a message, it removes it from the queue. But if it can’t finish its task, then it has to leave the message on the queue. Depending upon how much of its task was completed, some or all of the work may be duplicated the next time the recipient pulls the message.

One solution to duplication is idempotency. An idempotent message is one that will have the same outcome no matter how many times it is processed (assuming no other related messages intervene). Changing a customer’s phone number is idempotent. Charging their credit card is not. Some messages can be designed to be idempotent, but not all.

To protect against duplication of non-idempotent messages, we have two strategies: journaling and transactions. A journaling strategy involves keeping track of the steps that have already been completed. We check the journal to ensure that we don’t repeat those steps. A transaction-based strategy involves doing the work in the same transaction as the queue. A distributed transaction coordinator (DTC) ensures that both removing the message and completing the work happen as an atomic unit.

Order

Order of delivery is the most difficult guarantee to uphold. One reason is poison messages.

When a recipient fails to process a message, it must leave the message on the queue. Otherwise messages will get lost (see the delivery guarantee). Most failures are transient, meaning that they are caused by temporary conditions, and might work if tried again. Deadlocks and timeouts are examples of transient failures. But some failures are intrinsic to the message itself. These poison messages will not succeed if retried. If they are left at the top of the queue, they will prevent later messages from being processed.

To detect a poison message, a service typically retries a specific number of times. Once that threshold is exceeded, the message is considered poison. The typical strategy for dealing with poison messages is to move them to a different queue. System operators monitor the poison message queue (also known as a dead letter queue) and intervene when messages arrive. They take whatever actions are necessary to ensure that the messages succeed, and then put them back on the application queue.

While this strategy allows the system to continue functioning, it changes the order in which the messages are processed. If a later message depends upon a poison message, then it will be processed in the wrong order. In the best case, the system detects the dependency and treats the later message as poison as well. In the worst case, results are undefined.

Parallelism can also cause messages to be processed out of order. If multiple nodes pull work from a single queue, there is no guarantee that they will finish that work in the same order that they started. If one of the nodes experiences a transient failure, the problem is exacerbated. While it is working on one message, other nodes will pull messages from further down the queue. If the first message fails, the service will put it back on the top to be processed later.

The most flexible solution to the ordering problem is to ensure that order between messages does not matter. Like idempotency, this can be achieved with most messages, but not with all. When order matters, the system must be programmed to recognize when it is violated. It can then move the later message to the bottom of the queue, thus increasing the likelihood that its prerequisite will be processed first.

Historical Modeling provides the three guarantees in the following ways:

  • A fact is both data and message.
  • Identity is determined by state.
  • Predecessors define a partial order among facts.

Data and message

Most architectures keep messages separate from the data. RPC messages are just wire protocols. Service busses store messages in queues, not in the database. Brokers persist the state of workflows separately, distinct from the data that those workflows operate on.

Historical Modeling, on the other hand, stores both data and message in facts. Facts store data, and can be queried to find the current state of an entity. Facts also represent messages, and can be queried to find work. When the user performs an action, a fact is stored. This implicitly sends the message.

Some distributed systems architectures rely upon a DTC to ensure that a message is only processed once. If the message fails, then the DTC rolls back both the removal of the message and the database update. But when the repository is the queue, a DTC is not necessary. Handling the message adds the fact that implicitly removes the message from the queue.

Identity

A historical model takes advantage of immutability to protect against duplication. A historical fact cannot be modified. That immutable state identifies the fact. Any other fact with the same state is the same fact.

Suppose that a Stock fact has only one field: symbol. Any Stock where symbol=MSFT is the same fact. If we record a related fact (for example a Purchase by Account(12345) of 300 shares of Stock(MSFT) at 3:49pm on 1/28), then that related fact is also uniquely identified by its collection of fields. Another fact with exactly the same fields will be considered the same fact. It will not be duplicated.

Partial order

Queuing systems attempt to impose a full order among messages. The queue itself does not know when that order is important and when it is not. As a result, it tries to uphold the order guarantee equally in all cases. A full order among messages is over-constrained.

Instead of trying to achieve full order, a historical model defines a partial order among messages. Each message has a reference to its predecessors. These messages must be sent first. The infrastructure knows this, and preserves order when necessary. It does not leave detection up to the application.

On the flip side, the infrastructure also understands when messages are unrelated. In those cases, it can freely violate the order guarantee. The application will not be adversely affected when unrelated messages are processed out-of-order.

A fact only references its direct predecessors. It does not directly reference all facts that were a part of the conversation. Nevertheless, the identity of a fact is dependent upon the identities of its predecessors. And that relationship is transitive. To understand the identity of a fact, a node must receive all of its direct and indirect predecessors. In this way, the predecessor relationships among the facts place them in the correct partial order relative to one another.

The queuing guarantees that distributed systems rely upon are not easy to achieve. Each architecture has its own mechanism for upholding these guarantees. Historical Modeling is no exception. It determines which guarantees are truly important to the correct operation of the system, and upholds them only when necessary.

Service Bus

Message queues can improve the reliability and scalability of a distributed system when carefully applied. They solve the RPC problems of synchronous and unreliable messaging. However, they do not solve the problem of one-to-one coupling and configuration. The sender knows which queue to push messages to, and the recipient knows which queue to pull messages from. Service busses solve that problem.

A service bus is not a single monolithic component. Nor is it an infrastructure stack running on a cluster of machines (for example, BizTalk). Those are brokers. A service bus is a logical relationship among distinct services running on different machines, known as nodes. Often, those services use the same framework as one another, but that framework is installed individually at each node. Some popular service bus frameworks for .NET are:

Messages

The most important task in designing a distributed system is defining the right set of messages. A message is an immutable block of information. It contains all of the information needed for the recipient to do something meaningful. Messages typically represent one of two things:

  • Command – a request for the system to take action
  • Event – a notification that something has occurred related to the business domain

Commands generally flow from the user of the system. A command might be “Submit order”, or “Accept payment”. Events, on the other hand, flow from one part of the system to another. An event might be “Order submitted”, “Order shipped”, or “Payment received”. Events are named with a past tense verb, while commands have an imperative verb.

A conversation is a series of related messages. The messages are all about the same thing (perhaps an order or a patient visit). The messages have a cause-and-effect relationship among them. A command message comes from the user and kicks off the conversation. Then the recipient of that command makes some decision, takes some action, and publishes one or more event messages. Other nodes respond to those events and the conversation continues. Conversations may end quickly, or they may continue for very long periods of time.

Handlers

image A service bus delegates message processing to handlers. A handler is a service running on a node that responds to a single kind of message.

There is typically only one handler for each type of command message. When the user issues a “submit order” command, one service is responsible for validating it and entering it into the database. Many nodes may be competing for that command, but only one will be selected to perform it.

On the other hand, there can be many handlers for event messages. Events in a problem domain have lots of side-effects. When an order is submitted, an invoice must be sent, items must be picked and shipped, and customer preference must be adjusted. Each of these side-effects is a separate handler for the “order submitted” event.

Handlers are not coupled to each other. They only know about the types of messages they consume and create. In fact, handlers are not even coupled to their queues. The service bus determines which queue a handler consumes messages from, and which other queues it posts messages to. This is all based on configuration.

Configuration

Like I said earlier, a service bus is not one monolithic thing. As such, there is no central configuration. Instead, each node is configured with the queue names and handlers that it needs to play its part in the system.

The user interface (typically a web server) is responsible for sending command messages to the proper handler. Therefore the UI node is configured with the name and location of the queue that each command handler pulls from.

image

The command handler’s node is configured to pull messages from that same queue. But it is not configured with the names and locations of downstream queues. Remember that the command handler is creating event messages, and events have multiple handlers. If each command handler were configured with every downstream event handler, the operational overhead would be unbearable.

image

Instead, event handlers subscribe to their messages. Command handlers publish messages. Subscribers tell the bus which messages they want to receive. The bus is configured with the names and locations of publishers of those messages. The bus registers with those publishers, so that the publishers know where to queue messages without the need of explicit configuration.

image

To summarize, a service bus does two things:

  • Decouples handlers from queues
  • Adds multicast

Message queues are infrastructure components that must be explicitly provisioned. Without a service bus to route the messages, each service would have to know which queues to pull from and push to. Because a message is consumed as soon as it is processed, a single message queue does not support multicast. The service bus implements multicast patterns on top of queues.

Service bus in a historical system

In historical modeling, every fact is potentially a queue. This means that a queue is logical, not physical. No provisioning of infrastructure is required to set up a queue. While this doesn’t eliminate the need for a service bus, it does change the nature of that need.

In a historical model, a fact plays the part of a message. A fact is a historical record of a decision made either by a user or by the system. A fact can represent both a command and an event.

To begin a historical conversation, the user interface creates an initial fact. This fact acts as both a command and an event. This fact is typically named with a noun. The verb (“submit”, “process”, etc.) is implied. For example, the UI would create the “Order” fact.

Order

The Order is published to the Company. Any node that subscribes to the company will receive the Orders.

fact Order {
    publish Company company;
    Customer customer;
    OrderLine* items;
}

A historical model acts as both message queue and database. As a result, it is not necessary to create a command handler to write the order to the database. It’s already there. Instead, we can focus on the event handlers. Each fact handler is responsible for one side-effect. For example, one handler will ship the order, and another will prepare an invoice. Each side-effect is itself a fact.

Order

Instead of pulling facts from a physical queue, a handler runs a query. The query returns all facts to which the side-effect has not yet been applied. For example, the invoicing service processes all orders that have not yet been invoiced.

fact Company {
    unique;

    Order* ordersPendingInvoice {
        Order o : o.company = this
            where o.isPendingInvoice
    }
}

When the handler completes its task, it adds the Invoice fact. Adding this fact saves the information to the historical database. But, it also publishes the event for any down-stream handlers (accounts receivable or collections, for example). Furthermore, it effectively removes the Order from the “ordersPendingInvoice” query. This is accomplished through the “isPendingInvoice” predicate.

fact Order {
    publish Company company;
    Customer customer;
    OrderLine* items;

    bool isPendingInvoice {
        not exists Invoice i : i.order = this
    }
}

A service bus in a historical system does not need to route messages to the correct queues. Instead, a historical service bus invokes handlers based on logical subscriptions. Each handler subscribes to a root fact. When subsequent facts are published to that root, the handler is notified. The handler then executes the appropriate query and processes the facts. Its response removes the fact from the query.

Message Queues

For a distributed system to work, it has to move information from machine to machine. No single machine is responsible for the system as a whole. But yet all information is somehow related to all other information. So it stands to reason that a major concern of distributed system infrastructure is moving data to the machines that need it. This also ends up being one of the most significant challenges.

Remote procedure calls

The simplest way to move information from one box to another is through a remote procedure call (RPC). An RPC models the way that code calls functions in a program. The caller passes a packet of information to the recipient as parameters. It then waits for the recipient to do whatever it needs to do, even if the recipient is going to call another procedure. And then the recipient returns another packet of information in the results. This model works fine for programs, but it has some drawbacks in distributed systems.

RPCs are synchronous. The calling machine allocates some resources, typically a thread, that stand waiting for the recipient to respond. Sometimes the request is a query, where the caller is waiting for the results before it can continue processing. Other times, it is a command, informing the recipient that it needs to take action. Even if the call is a command that returns “void”, the caller must wait for the response. If it doesn’t receive the response, it doesn’t know that the intended recipient received the call. So it has to either fail the request or retry the RPC.

RPCs are also unreliable. A local method call in a program cannot typically fail to reach the recipient. But a remote procedure call can get dropped, it can time out, or it can be corrupted. This might not just happen to the request, but also the response. If the call fails, the caller has no way of knowing which was lost. If it was the request, then a retry would be safe. But if it was the response, then a retry might lead to duplication.

Advantages of message queues

Message queues address both of these problems. First, they are asynchronous. After the message has been queued, the caller doesn’t wait for it to be processed. It can free up that thread for handling additional work. If the caller expects a response, then the caller must pull response messages from a second queue. While this complicates matters, we have tools (for example sagas) to address this new complexity.

Message queues are also reliable. Once the sender is sure that the message is in the queue, it can be confident that it will be received once and only once. Message queues typically have a two-phase API for receipt: first the recipient gets the message, and then it commits. If a problem occurs before the commit phase, then the message is “put back on” the queue (in actual fact, it never truly left; it only looked like it did). This two-phase receipt ensures that a message will be processed once and only once. It is no longer the concern of the sender.

Disadvantages of message queues

Message queues and RPCs do have one feature in common. They are both one-to-one in nature: they transmit information from one sender to one recipient. Like a caller of an RPC, the message sender has some idea about the system for which the message is intended. A queue is not a broadcast mechanism. When one recipient receives the message, it is no longer available for others to pick up. We have additional tools (for example dispatchers) to support one-to-many scenarios.

Message queues create additional operational complexity. Every queue must be created, configured, and monitored. Every sender and recipient must be configured with the location and name of each queue. We have created tools (for example service busses) to manage this complexity, but they do not eliminate it entirely.

Message queuing in historical modeling

Historical modeling puts the idea of the message queue into the model itself. Every fact is potentially a queue. A subsequent fact can be published to this predecessor.

Take, for example, a medical claims processing service.model

fact Physician {
    unique;
}

fact Patient {
    unique;
}

fact Visit {
    Physician physician;
    Patient patient;
    date dateOfService;
}

fact Payer {
    unique;
}

fact Claim {
    publish Payer payer;
    Visit visit;
}

In this model, the Payer fact acts as a queue. A Claim is published to that queue. This is indicated in the factual code with the “publish” keyword, and in the diagram with a red arrow.

When the payer processes the claim, it responds with a remittance advice.model

fact RemittanceAdvice {
    publish Claim claim;
    decimal amount;
}

The RemittanceAdvice fact is published to the Claim. The practice subscribes to the claim in order to receive the response.

To make the Payer act as a queue, it needs to query for all of the unprocessed claims:

fact Payer {
    unique;

    Claim* unprocessedClaims {
        Claim c : c.payer = this
            where not c.processed
    }
}

The query depends upon the “processed” predicate:

fact Claim {
    publish Payer payer;
    Visit visit;

    bool processed {
        exists RemittanceAdvice a : a.claim = this
    }
}

Adding a RemittanceAdvice causes the “processed” predicate to become true, thus removing the Claim from unprocessedClaims.

Advantages of historical message queues

The historical modeling message queue pattern has some advantages over traditional message queues. Most significantly, the queue is no longer coupled to a physical location. The practice didn’t know the location or name of the payer’s queue. Neither did the payer know the location of the practice. As long as the sender and recipient share a common upstream server, the claims and remittance advice will flow to the interested parties.

We also have the advantage of creating queues on the fly with no operational overhead. We can add a new payer to the system as easily as creating a new object. Each payer’s service subscribes to its own queue, without the need for configuration. And consider the operational nightmare of configuring a new response queue per practice, let alone per claim as we’ve done in this model.

Disadvantages of historical message queues

On the other hand, historical modeling has one significant disadvantage as compared to message queues: it is impossible to ensure that only one recipient handles each message. The two-phase receipt of a traditional message queue lets one service lock a message. Other services pulling work from the same queue will not receive it unless the first service fails. This is an effective technique for load-balancing backend services.

The rules of historical modeling forbid locking. To balance the load among competing backend systems, you must bridge a historical model into a more traditional message queue. The bridge pushes a message onto the queue for each unprocessed fact, and then creates a new fact marking it as received. This bookkeeping fact is not intended for application use, and is typically not published. The historical database and the message queue must be compatible so that they can participate in the same transaction, thus ensuring reliability.

Conclusion

Message queues offer significant advantages over RPCs in distributed systems. Whereas RPCs are synchronous and unreliable, message queues are asynchronous and reliable. They add complexity both to application design and operations, but that complexity can be managed.

Historical modeling supports the concept of a message queue through the “publish” keyword, predicates, and queries. It can ease some of the operational complexity, since it decouples senders and recipients from queue location. And since any fact can be a queue, operational overhead is not incurred to add a new queue. However, historical modeling does not support locking, so additional work is required to implement a load-balancing scenario.

Distributed Systems

Distributed systems are enterprise solutions to large scale business problems running across multiple machines. They typically have several stakeholders, each from different divisions of an organization. Business run on distributed systems, and when the system fails, the business suffers.

The challenges in maintaining distributed systems are not just technical. They span several disciplines, including:

  • Project management
  • Business analysis
  • Development
  • Configuration management
  • Operations
  • Database administration
  • Business intelligence

Major business problems include:

  • Identifying the important metrics and indicators to make business decisions
  • Aligning workflows with business processes
  • Allocating ownership and funding for system development and maintenance

Major technical problems include:

  • Getting relevant data to the appropriate system
  • Reducing latency
  • Guaranteeing business continuity in the face of technical outages
  • Ensuring that no data or transaction is lost

Historical Modeling alone does not address all of these concerns. Instead, it works within a framework of thought developed by the brightest minds of the software industry, supported by past experience and ongoing research. This series of articles explores that framework, and the role that Historical Modeling plays within it.

Mainstream guidance and tools

There are large gaps between expert thinking and common practice regarding distributed systems. Mainstream vendors like Microsoft, Oracle, and Force.com provide tools and platforms for a large set of solutions. Those tools and platforms are not intended specifically for distributed systems. When they are inappropriately applied, they often fail. Common failure scenarios include:

  • Lost or duplicated transactions
  • Inability to scale
  • Fragility in configuration and operation
  • Slow or unreliable reporting
  • Misalignment of technical dependencies with business priorities

Mainstream guidance, inappropriately applied, is usually to blame for these failures.

For example, tools like Web Services and Windows Communication Foundation (WCF) lead us into building distributed systems exclusively on remote procedure calls (RPCs). RPCs are point-to-point: the caller knows about one specific recipient. RPCs are synchronous: the caller waits for the recipient to respond. RPCs are unreliable: if the call fails, the caller does not know whether the recipient has received the message. All of these factors conspire to cause lost transactions and fragile systems.

Additionally, tools like relational databases lead us to create a system of record (SOR). The SOR is the authority for all information pertaining to a specific topic. Having an SOR for each domain encourages us to ask the SOR every time we need information about that domain. This puts unnecessary load on the system, making it difficult to scale. Furthermore, when the SOR is unavailable, all downstream business is affected. When an important business system depends upon a less important system of record, technology is misaligned with business.

Finally, conventional enterprise development has taught us to create an enterprise data model (EDM). All of the data needed to run a business is stored in one place, normalized and indexed in one way, and completely interrelated. Updating an EDM in response to an application action sometimes imposes locks on several different tables to guarantee consistency. Reporting against an EDM requires that we join across many tables and run aggregate functions to get the necessary data for decision making. Taken together, this affects the scalability of our system, and the speed of our reports.

Distributed systems theory and practice

None of these mainstream recommendations is incorrect in its own right. They simply cannot be broadly applied, particularly within a distributed system. Industry experts have known about the problems that inappropriate application causes. They have offered several solutions.

Rather than building distributed systems exclusively with RPCs, experts advise us to use message queues where appropriate. A well-placed message queue breaks the point-to-point coupling between components, leading to less fragile systems and better business alignment. It also creates reliable, transactional intermediate storage so that no messages are lost and the system is generally more reliable.

Instead of directly querying the system of record, experts advise that we should separate queries from commands. Command query responsibility segregation (CQRS) is the practice of creating two data stores per domain, one optimized for reads and the other optimized for writes. A background process moves data from the write side to the read side according to a service level agreement (SLA). This allows the system to scale, and still provide quick and reliable reporting.

Finally, rather than always updating state during a transaction, experts sometimes recommend event sourcing. This is the practice of recording an event stream, and using that stream as the source of knowledge. The event stream serves as an audit log, revealing every business operation that has occurred within the system. Furthermore, it serves as an authority. Any view of the system can be recreated by replaying the event stream. Correctly applied, event sourcing leads to more scalable and reliable systems that never loose or duplicate transactions.

Resources

For more information on distributed systems and the experts who have influenced Historical Modeling, please see the following:

Syndicate content