Skip to main content

On Implementation of Distributed Protocols

· 69 min read

This post concludes the first phase of the state-of-the-art exploration in the scope of milestone M0.1 of the Replica_IO project, namely exploration of selected notable distributed protocol implementations. It shares the main conclusions drawn from exploring 14 different code bases and outlines the key areas of focus for the next steps developing the Replica_IO framework.

note

A companion video is available here.

Exploring Distributed Protocol Implementations

I believe that discovering neat, yet practical, solutions to complicated problems demands serious, deliberate preparation. Clearly, before being able to come up with such solutions, one first needs to acquire a deep understanding of the problem, identify the relevant aspects and requirements. It is also important to learn from prior attempts to deal with the problem. Otherwise, it would be naive to expect any significant advancement beyond the status quo.

Since Replica_IO aims at making a breakthrough in designing and implementing distributed protocols, I decided to start exploring the state of the art by selecting and looking into a number of notable distributed protocol implementations. Although I already had experience implementing such protocols myself, nevertheless, I decided to dive in and see for myself how others had approached this challenge. I wanted to learn from those projects, better understand the typical requirements and difficulties coming up in real-world use cases, and, perhaps, discover some interesting techniques or ideas along the way, as well as to identify the key areas of focus for the next steps.

So I onboarded myself into one code base after the other, as if I were to work on it. I was focused on the general structure of code, node-to-node communication mechanisms, the implementation details of the core protocols ensuring consistency between nodes, as well as mechanisms for monitoring and controlling execution of the protocol. I tried my best to understand how those protocols are structured and implemented. After having explored each of the code bases, I summarized and shared some of my findings. You can find those overviews on this wiki page.

Here is the full list of the code bases, written in different programming languages, that I explored1:

  1. Tendermint Core / CometBFT — a state machine replication engine (written in Go);
  2. etcd Raft — a library for maintaining replicated state machines (written in Go);
  3. AptosBFT — a consensus component supporting state machine replication in the Aptos blockchain (written in Rust);
  4. BFT-SMaRt — a library implementing BFT-SMaRt, a state machine replication system (written in Java);
  5. SmartBFT-Go — a library implementing state machine replication inspired by BFT-SMaRt (written in Go);
  6. Substrate — a framework for building application-specific blockchains (written in Rust);
  7. Lighthouse — an Ethereum consensus client (written in Rust);
  8. Algorand — a blockchain based on the Algorand consensus protocol (written in Go);
  9. Avalanche — a blockchain platform based on the Avalanche consensus protocol (written in Go);
  10. Internet Computer blockchain (ICP) — a general-purpose blockchain system developed by the DFINITY Foundation (written in Rust);
  11. Sui — a smart contract platform based on Narwhal and Bullshark protocols (written in Rust);
  12. Apache ZooKeeper — a distributed coordination, synchronization, and configuration service (written in Java);
  13. Apache Kafka — a distributed event streaming platform implementing a variant of the Raft consensus protocol (written in Java, integrated with Scala);
  14. Cardano — a blockchain platform based on the Ouroboros family of consensus protocols (written in Haskell).

In the subsequent sections, I will share with you some of the observations and conclusions I made while exploring those code bases. I decided to structure the discussion around the following aspects of implementing distributed protocols:

  • complexity: what makes distributed protocols hard to reason about and implement;
  • correctness: how to ensure that the implementation guarantees the requires properties;
  • resource utilization: how to prevent ineffective expenditure of limited computing resources;
  • maintainability: how to manage long-running distributed systems and diagnose issues;
  • flexibility: how to achieve high adaptability, reusability, and evolvability of code.

Complexity

Distributed, fault-tolerant protocols are notoriously hard to implement, and there are justifiable reasons for that. This is primarily because that kind of system consists of largely independent nodes communicating through potentially unstable, unreliable network; some of the nodes may fail in different ways. The protocol is required to tolerate, within a bound, such unfavorable conditions and keep working reliably. More than that, it is supposed to deliver decent performance using limited resources. All this adds a great deal of inherent, essential complexity, which we simply cannot remove without weakening our requirements.

However, when it comes to actually designing and implementing these protocols, there is also another kind of complexity: incidental, non-essential complexity. This kind of complexity, though being closely related, does not strictly belong to the problem. We incidentally introduce it because we are not aware of or fail to recognize a simpler way of solving the problem at hand.

Incidental complexity can start creeping in when trying to understand and interpret a protocol specification2, which is often too far from the realities of software engineering. Simply the way a protocol is specified can misguide the engineer trying to implement it and induce all sorts of difficulties. For example, pseudo-code in scientific papers is often defined in terms of global, unstructured variables and omits concurrency issues.

Implementing a distributed protocol in one of the conventional programming languages, chances are that the implementation will simply employ some general techniques commonly used in that language's ecosystem. Such general techniques may be very powerful and universal, but freely using this unconstrained power and flexibility, we can easily end up with a code base that is very hard to understand and maintain. For example, dealing with concurrency and synchronization using low-level primitives in the implementation of high-level protocol logic clutters the code and multiplies the complexity.

Haste is another great source of incidental complexity. There is always temptation to cut corners, especially when under time pressure. Imprudently copying approaches from elsewhere, adding temporary workarounds and ad hoc patches makes code entangled and poorly structured.

Using advanced features and sophisticated techniques can also add unnecessary complexity. Though this is ambivalent because it can actually help to express the implementation more conveniently and simplify the reasoning about it, but only when the advanced machinery is hidden behind a simple, clear, and easy to use interface.

It is pretty clear that introducing additional complexity is generally bad. But does it really matter? Couldn't we just implement the thing somehow, test it well, and simply tolerate the additional complexity? Well, surely, with rigorous testing, we can be sufficiently confident that our implementation is correct. However, in that case, making a small change, e.g. applying a simple fix to address a major issue discovered later can reportedly3 take months of work. So it would be very hard to further improve, adapt, or reuse such implementation.

We need a structured, yet flexible enough, approach guiding us away from incidental complexity if we wish to avoid wasted efforts and foster innovation in the field. Let's look into more details concerning complexity in implementation of distributed protocols.

Modularity

We can deal with a complex problem, such as implementing a distributed protocol, by dividing it into smaller, simpler problems, solving them individually, and then combining the solutions to finally address the original problem. This is, basically, what modularity is about. In this process, it is crucial how we divide the problem, what kind of pieces we get, and how we combine them back together.

Granularity

First of all, modularity comes in different levels of granularity. Implementing a large component, such as a state machine replication engine, we can define its external dependencies, then split the component into several chunks of functionality and stop there. That is certainly better than having to deal with a complete monolith, but this level of modularity would still be too coarse. Instead, we can continue decomposing the sub-components further until we end up with reasonably small and simple, yet non-trivial, components; this is fine modularity.

All of the explored code bases exhibit some level of modularity. It is quite common to separate concerns by delegating pieces of functionality to external components. This way, most of the code bases clearly separate implementation of the protocol logic from such functionality as communication between nodes, producing and verifying cryptographic signatures, persistent storage, executing transactions on the replicated state, etc. Most of the implementations also separate dispatching of events, such as inbound messages, from their handling; there is typically a component responsible for classifying events and a number of components responsible for handling specific event types. Quite often there are separate components implementing the protocol logic specific to different roles that a node can play, e.g. leader and follower, or modes of operation, e.g. synchronization and normal operation. It is also common to separate different logical stages of the protocol, e.g. creating a proposal, validating a proposal, finalizing the decision. Another common pattern is to have a separate class of component responsible for maintaining state for each of the remote peers the node communicates with.

Some implementations go further and introduce smaller components, e.g. encapsulating the state of each individual proposal or representing the logic of counting votes and determining if there is a sufficient quorum. Nevertheless, there still remain components that are too complicated and hard to follow, so this modularity cannot be considered fine. To combat complexity, we need to learn how to achieve fine modularity.

Decomposition

As one can cut bricks at different angles, so one can decompose components into sub-components in different ways. One way is to focus on the operational aspects, i.e. on how the pieces of implementation are going to be executed. With this approach, components would be primarily organized around actual data and control flow. This has a profound effect on the structure of the implementation.

Focusing on the operational aspects, protocol implementations will tend to be represented as stateful components, or as collections of stateful components, reacting to external events. This naturally induces applying the object-oriented approach to structuring the whole implementation, in which the protocol logic is mostly expressed as modifying pieces of component's state in response to individual inbound events and optionally producing new outbound events.

Although pieces of functionality tend to imply some state, individual events in a distributed system mostly happen as logical consequences of some other, causally related events in the scope of a larger distributed process. Thus structuring the implementation around event handling might not help to clearly express the overall protocol logic.

The way we approach decomposition also greatly impacts such properties of code as coupling and cohesion, i.e. the degree of interdependence between different components and the strength of relationship between the elements inside components, respectively. Loose coupling and high cohesion are generally desirable.

Failing to recognize the significance of implicit logical connections and properly express them often causes higher degree of coupling between components, i.e. entanglement. It is particularly important to distinguish essential and incidental complexity here. Sometimes complications, such as circular dependencies, may occur naturally and represent essential features of the protocol logic, e.g. recursiveness. For example, some internal events should often be treated, for the most part, the same way as equivalent external events. This can be achieved by looping those events back for handling in the protocol implementation.

Organizing components in a more structured way helps to manage dependencies between them, e.g. they can be arranged in layers or hierarchically, chained together in pipelines, etc. For example, in the Algorand implementation, the core logic of the consensus protocol is structured as a hierarchical state machine. The layered approach is well exemplified in the tower networking library, which is used by the Sui implementation. In the Apache ZooKeeper implementation, client requests are processed using a pipeline of request processing components chained together.

The amount of mutable internal state maintained by components also matters. Making components more static can often help to simplify the implementation. For example, in the Sui implementation, most of the consensus-specific components are static in terms of consensus configuration, i.e. instead of supporting reconfiguration directly in those components, they are simply recreated upon reconfiguration.

Many components require certain context or environment, i.e. they depend on some common piece of state or functionality like information about prior communication with the remote peer, access to persistent storage, diagnostic logging, etc. This is usually accomplished by capturing references to the environment inside the component or passing it explicitly. In functional programming, one can represent the environment with a monadic interface. Some programming languages provide special features for that purpose, e.g. contextual parameters in Scala. Interacting with the context from withing a component should be convenient but clearly constrained by the component's interface.

We would like to possibly avoid fragmentation of the core logic and facilitate local reasoning so that it is easier to reason about correctness, especially when introducing changes, without being too much concerned about larger scopes. We need to shift the focus more onto the functional and logical aspects, i.e. what the pieces of implementation achieve and how they ensure the desired outcome, so that the protocol implementation better reflects causal dependencies and logical connections.

Composability

Even a highly modular implementation is not necessarily highly composable, i.e. allowing to easily recombine and reuse its components. It is hard to reuse components that are not composable. Moreover, composability has huge transformative potential: unlocking true power of expressiveness and flexibility, we can push the limits and uncover a new dimension of possibilities for finding better solutions, whether to fix a flaw in an existing implementation or to design and implement something completely new.

Composability primarily emerges from the properties of individual components and the way they can be combined together. It demands components that are not only loosely coupled, but generic as well. It also requires unified means of abstraction and combination that satisfy certain properties, such as closure and associativity, while preserving principal properties of individual components in combination.

All of the explored code bases were meant to implement only a specific protocol or a close family of protocols, except Substrate, which is supposed to support a wide range of protocols. Most of the implementations define abstract interfaces for major components, employing various forms of polymorphism, and apply the dependency inversion principle. This makes components replacable and can help to reduce coupling. Being able to replace a component allows using alternative implementations of the component, e.g. for unit testing. However, most of those dependency inversion interfaces seem to only make sense within a very specific, predefined structure of the whole implementation, i.e. they are mostly about decomposing rather than recomposing. Truly composable components, on the other hand, are those that can be put together in an open-ended range of new, surprising combinations.

Though there are some examples of composability used in the explored code bases. The communication layer in the Sui implementation takes advantage of the layered design of  anemo, a peer-to-peer networking library based on tower: it processes RPC requests from other nodes through pipelines composed out of tower middleware layers provided by the anemo library. The state machine representing the core logic of the consensus protocol in the Algorand implementation consists of uniformly defined event handlers organized in a hierarchy of event routers dispatching events to the corresponding handler.

Asynchrony and concurrency make achieving composability particularly challenging. Components implemented using usual concurrent programming techniques based on lock-based synchronization primitives fall short of composability: a simple combination of individually absolutely correct components may easily fail to ensure consistency or cause a deadlock. Ensuring correctness in this model often requires breaking abstractions and dealing with synchronization directly in an awkward and error-prone way. Alternative concurrent programming techniques, such as software transactional memory (STM) used in the Cardano implementation, can help to overcome these issues without compromising on modularity and composability. More on asynchrony and concurrency in the next section.

Functional programming places a significant emphasis on composability. One of its core principles is to break down programs into smaller, reusable functions, avoiding side effects, that can be easily combined to create more complex functionality. This encourages a more declarative notation, which often results in code that is easier to reason about. The approaches and techniques employed in functional programming, such as immutability, lazy evaluation, monads and effect systems, etc., are therefore a great source of ideas for enhancing composability.

Composability is indispensable for future-proof software solutions. Though this property doesn't necessarily emerge together with modularity; conversely, achieving it may be challenging, especially in the inherently concurrent context of distributed systems. Therefore, we should approach this proactively and deliberately design for composability.

Concurrency

Concurrent programming is a way to structure code into multiple threads of control—concurrent tasks—that can execute concurrently. Observable effects caused by individual tasks can interleave in concurrent execution. Understanding and reasoning about code in concurrent programming requires a more complex mental model compared to sequential programming. Perhaps, nondeterminism is the main source of complexity in concurrent programming: concurrent programs can produce different results depending on the exact timing of external events and task execution.

Concurrent programming is known to be error prone. Concurrent tasks accessing shared resources generally require some form of coordination. Depending on the available mechanisms for interaction and communication between concurrent tasks, there may be different methods of coordinating them and controlling concurrency, e.g. lock-based synchronization primitives, message passing, and software transactional memory. However, properly applying those methods in a nontrivial system often becomes complicated and requires great deal of care. When concurrent tasks happen to interfere with each other in unanticipated ways, subtle issues, such as race conditionsdeadlocks, and resource starvation, may start manifesting themselves.

Concurrent programs with mutable memory shared between threads can suffer from data races. A data race is basically a situation where one thread accesses a memory location whereas another thread can simultaneously perform a conflicting write to that memory location. Preventing data races is not only important to avoid memory corruption; this can also significantly simplify the mental model.

Normally, we can assume sequential consistency in concurrent programs that are free of data races. In essence, a sequentially consistent execution of a concurrent program must be equivalent to some sequential execution, respecting the order and semantics of operations specified in the program. So such executions are linear schedules, each representing a possible concurrent interleaving of the program.

Execution schedules that only differ in the interleaving of operations local to threads of execution, i.e. operations not visible to other threads or externally, are effectively equivalent. Therefore, the number of possible distinct schedules depends on the number of non-local operations in the execution, i.e. operations used to communicate between threads or cause externally visible effects, and it grows exponentially.

Concurrent programming is an effective model of computation, but it is more complex and requires an appropriate approach in order to avoid subtle concurrency issues. Data face freedom is a particularly desired property since it simplifies the model providing sequential consistency. Under that model, reducing the number of non-local operations can greatly help to further simplify reasoning about the concurrent program.

Approaches to Concurrency

Programming languages support different approaches to concurrency; they provide different features and different concurrency mechanisms in their runtimes and ecosystems. The explored code bases are written in the following languages: Java, Go, Rust, and Haskell. Let's have a look at how those code bases approach concurrency, depending on the choice of programming language.

The code bases written in traditional mainstream languages like Java tend to achieve concurrency by explicitly spawning OS threads, which communicate through shared mutable memory and synchronize with lock-based primitives. Those implementations are normally structured into objects exposing thread-safe methods to interact with them concurrently. This approach is well known and established in the industry; it is therefore widely supported in the ecosystems built around those languages. Newer system programming languages like Rust usually provide support for this approach, as well.

This low-level approach gives the programmer a high level of control as it directly reflects how concurrency is actually achieved by the system. On the other hand, it requires a lot of care since properly using low-level synchronization primitives together is tricky and error prone. Moreover, OS threads are relatively expensive, and, therefore, building highly concurrent programs by frequently spawning short-living threads on demand is impractical. Instead, programs are often organized into a small number of long-running threads; though using thread pools can help to achieve more flexibility. Most importantly, as mentioned before, concurrent components synchronized with the lock-based primitives suffer from poor composability.

The Go language has built-in support for concurrency based on preemptive multitasking with lightweight user threads called goroutines. It encourages message-passing style of communication and synchronization between goroutines through blocking, optionally buffered FIFO channels; though it also supports traditional lock-based synchronization. The built-in select statement can be used to combine several channel operations in order to perform a single pseudo-randomly selected operation that is ready to proceed; unless there is a default case, the select statement blocks until at least one of the operations can proceed.

The select statement in Go allows composing multiple potentially blocking operations on channels into a single operation. For that reason, some of the explored code bases, e.g. SmartBFT-Go, occasionally use Go channels in place of traditional lock-based synchronization primitives in order to combine them with channel operations in a single select statement.

Go does not restrict access to shared mutable data by concurrent goroutines, so data races are still possible. Go provides quite limited control over the runtime managing execution of goroutines, thus making fine-tuning and controlling concurrent execution difficult.

The Rust language emphasizes safety without sacrificing performance. Thanks to the ownership model and strong type system, it can effectively ensure at compile time that the code is free of data races. Being a system programming language, Rust supports concurrent programming with OS threads and shared memory, which are useful to optimize performance and for implementing other styles of concurrency, such as message passing. Rust's ownership and type system features prevent accidental sharing of mutable state between threads.

The explored code bases written in Rust primarily rely on the asynchronous programming features of Rust to achieve concurrency. Async Rust can be seen as a form of cooperative multitasking where asynchronous, non-blocking computations are represented with the Future trait (interface). Rust futures are passive, i.e. they have to be actively driven by polling in order to make progress.

Ultimately, asynchronous code in Rust requires some executor function that can drive a future by polling it to completion. There is an open-ended choice of async runtimes in the Rust ecosystem, which provide executors. Tokio is one of the most widely used runtimes in the Rust ecosystem; all of the explored code bases written in Rust are based on it. One can create specialized runtimes, e.g. Sui has a simulator that provides an drop-in replacement for Tokio and supports deterministic, randomized execution.

The async/await syntax in Rust helps writing asynchronous fragments of code very close to normal, synchronous code. The async keyword introduces an async context by constructing a future from the corresponding piece of code; the await expression can be used within an async context to poll another future and yield control if that future is not yet ready to produce a value.

Apart from using the async/await syntax, Rust futures can be composed together using various combinators provided by the futures, tokio, and other crates. In particular, the select macro allows polling multiple futures simultaneously until one of them completes, similar to the select statement in Go; the join macro polls multiple futures to completion. There are also asynchronous channels for asynchronously producing a sequence of values and streams for communication between asynchronous tasks.

Asynchronous Rust is evolving rapidly; thus, it may still lack maturity, has limited documentation and less-established best practices. Many developers find programming in asynchronous Rust quite challenging and sometimes counter-intuitive, e.g. when dealing with cancellation, long-running or blocking operations, and due to the passive nature of futures. Although Rust prevents some concurrency problems like data races, concurrent code is still vulnerable to different types of concurrency bugs (e.g., deadlocks, logic errors, etc.) and requires deep understanding and careful design.

Finally, concurrency in Haskell is based on lightweight user threads. Haskell allows throwing asynchronous exceptions from one thread to another. Handling asynchronous exceptions safely requires great care in critical sections, i.e. when manipulating shared resources. Since Haskell is a purely functional programming language, it does not explicitly support shared mutable memory for communication between threads. One of the mechanisms for normal communication between Haskell threads is MVar, a synchronizing variable, which can act as a synchronized container for shared state or as a one-place channel. Concurrent Haskell prevents data races, but using MVars is susceptible to other concurrency bugs, such as race conditions, deadlocks, etc.

Another mechanism for concurrent communication widely used in the Haskell ecosystem is Software Transactional Memory (STM). STM is an optimistic concurrency mechanism that allows transactions over shared mutable variables (transactional variables or TVars) to be safely composed and atomically executed, without exposing the implementation details. STM transactions can block on an arbitrary condition; alternative STM transactions can be composed together using the orElse combinator. The Haskell type system ensure that STM transactions cannot have undesired side effects and thus are safe to roll back and retry.

Building various custom concurrency abstractions and combinators with STM is relatively easy and safe, thanks to high composability. For instance, in Cardano, concurrent components expose STM transactions for retrieving relevant pieces of their mutable current state; the components then interact by combining and atomically executing such STM queries from other components and atomically updating the corresponding pieces of their own mutable state.4

However, STM has some limitations and caveats. First of all, composable multi-way communication between threads cannot be expressed in STM. That is because STM transactions cannot produce a visible side effect while being blocked. This is closely related to another limitation: STM does not provide fairness for threads waiting in a blocked STM transaction. In contrast, MVars guarantee fair scheduling of threads blocked on the same MVar. STM also incurs some overhead in terms of memory and performance costs, which depends on the size of transactions. Though sometimes it can actually help building more efficient mechanisms. Long-running STM transactions can suffer from starvation, i.e. being repeatedly rolled-back and retried. Finally, Haskell, similar to Go, provides quite limited control over its runtime.

To summarize, the traditional mainstream approach to concurrency based on explicit, low-level synchronization primitives and communication directly through shared mutable memory is well known and established, but it is tricky, error prone, and suffers from poor composability. Restricting concurrent access to shared memory, e.g. with the ownership model as in Rust or immutability as in Haskell, can help preventing data races. Communication and synchronization through message passing primitives like channels and using combinators like select can improve composability. Spawning short-lived OS threads may be too expensive; thread pools and lightweight user thread runtimes can help to achieve more flexibility. Though relying on a concurrency runtime is an additional dependency that is not always replacable or adjustable. Another approach to concurrency with good flexibility and composability is asynchronous programming with cooperative multitasking and async/await syntax, as exemplified by Rust. Software transactional memory is a highly composable and flexible approach to concurrency, though it has some restrictions, additional overhead and does not guarantee fairness.

Evading Concurrency

Given all the challenges with concurrent programming, why not trying to avoid concurrency as much as possible? Some of the explored code bases go quite far this route and implement almost all of the core protocol logic as a completely sequential state machine, perhaps only offloading long-running operations (e.g., computationally intensive cryptography) to dedicated concurrent execution pools. Let's consider consequences of this approach.

First of all, distributed systems are inherently concurrent because they, by definition, consist of multiple nodes running largely independently. Thus, each node needs to handle events (e.g., messages received over the network or expired timeouts) originating from different sources asynchronously, i.e. independent of its main program flow and handling of events from other sources. Moreover, the protocol logic must also reflect the concurrent nature of the system.

So some parts of the protocol are fundamentally sequential, e.g. delivering totally ordered transactions, whereas some parts are fundamentally concurrent, e.g. handling of messages received over the network from different peer nodes. Some parts may be concurrent, but don't have to, e.g. validation of the subsequent messages while finishing processing of the current one.

Attempting to implement an essentially concurrent part of the protocol in a sequential manner, i.e. without using concurrent programming techniques, necessarily requires explicitly maintaining and switching contexts. Not only this adds some amount of boilerplate code and makes it entangled, more importantly, this causes fragmentation of the core protocol logic because such an artificially sequential component still has to multiplex handling of asynchronous events. Therefore, what in concurrent programming could have been naturally expressed as a blocking operation becomes an abrupt return of control, breaking out of the sequential component.

There is another problem with multiplexed handling of asynchronous events in sequential code, namely controlling the flow of events from concurrent sources. Consider a situation where a sequential component is given an event to handle that it cannot yet fully process because, in order to make a decision on how to react to the event, it first needs to handle some other events, e.g. it has to complete the current round of the protocol before participating in a new one. Since the sequential component cannot block waiting and has to return the control back, there are basically two options: dropping the event or putting it aside into some kind of buffer. In the first case, the event source cannot assume that all events it emits will be reliably handled and has to take this into account in its logic, e.g. emit an equivalent event under some conditions later. In the second case, there should be some way to enforce a reasonable bound on the amount of buffered pending events without compromising the protocol properties, e.g. emitting further events only after having received an acknowledgement from the destination. This can add a lot more complexity to the protocol implementation.

So concurrency cannot be easily evaded in distributed systems. Attempting to avoid using concurrent programming techniques complicates the implementation and causes fragmentation of the protocol logic in code. On the other hand, when done appropriately, designing for concurrency and using concurrent programming techniques can actually be advantageous. It boils down to recognizing inherently concurrent and sequential parts of the protocol and finding appropriate ways to express this distinction in code. Those parts of the protocol that are neither inherently concurrent nor sequential may nevertheless benefit from being implemented as concurrent: Designing for concurrency can guide towards better decoupling of components while concurrent execution can help to achieve higher responsiveness and performance.

Nondeterminism

As mentioned in the previous section, distributed systems are inherently concurrent and therefore nondeterministic. If we think of nondeterminism in terms of events happening in the system then it can manifest itself as unpredictable events or their order. For example, requests from external agents (clients, users, etc.), values produced by a random-number generator, or node failures are not known in advance; the same set of messages may arrive at different nodes in different order due to unpredictable delays in communication; timeouts may happen due to unexpectedly long delays. Inner workings of nodes can introduce additional, implementation-specific nondeterminism, e.g. unspecified order of iteration over unordered collections, scheduling of concurrent tasks, etc. To some extend, the purpose of distributed protocols can be seen as confining nondeterminism within certain constraints in order to maintain the required invariants.

Nondeterministic steps in protocol execution introduce alternative state transitions, thus expanding the state space. This complicates reasoning about distributed protocols, as well as implementing and verifying them, because it often requires considering a large number of possible executions. Nondeterministic execution also makes reproducing problems and debugging particularly challenging. Therefore, it is desirable to control nondeterminism or attempt to eliminate it.

Some of the explored code bases constrain nondeterminism by implementing parts of the protocol as deterministic state machines. Inherently nondeterministic aspects, such as time, randomness, and asynchronous operations, are abstracted out of those state machines. Randomness, as well as current time, can be supplied to the state machine through abstract interfaces provided as dependencies. Alternatively, the current timestamp can be supplied to the state machine at each step explicitly in the input. Time can also be represented in terms of an abstract logical clock maintained by the state machine, which is then advanced with special tick events periodically supplied to the state machine. Asynchronous operations can be requested by the state machine by emitting special output events; the result is then supplied back as special input events. This approach is very close to evading concurrency discussed before and therefore is associated with the same kind of disadvantages.

In Haskell, as a strictly typed purely functional programming language, ordinary functions are deterministic (referentially transparent) in the mathematical sense: given the same input, they must produce the same result. Nondeterministic computations are expressed using the monadic interface. Only IO actions, when executed in the IO monad, can cause side effects and produce nondeterministic results. This is enforced by the type system. Cardano takes advantage of this by making most of its code polymorphic in the main IO-like monad. This allows fully controlling nondeterminism by choosing the main monad implementation.

Being able to control nondeterminism is particularly useful for testing and debugging. This allows creating reproducible test environments, as well as discrete-event simulation for faster-than-real-time simulation of time delays. For example, Cardano uses a simulation environment for the IO monad that closely follows core Haskell packages; Sui has a simulator based on madsim that provides an API-compatible replacement for the Tokio runtime and intercepts various POSIX API calls in order to enforce determinism. Both allow running the same code in production as in the simulator for testing.

Nondeterminism is an important aspect of distributed systems, so it should be clearly expressed in the implementation. Type system features can help with that. Confining nondeterminism within natural boundaries of components can reduce complexity and simplify reasoning about the protocol implementation. Simulated execution of unmodified code with controlled nondeterminism is a very effective technique in testing and debugging.

Communication

Communication is at the core of distributed systems where individual nodes need to coordinate in order to act as a coherent system. Nodes in a distributed system interact with each other by exchanging peer-to-peer (P2P) messages. The communication happens over an unreliable network medium that only provides best-effort, unordered delivery of data packets, i.e. it may fail to deliver individual packets or deliver them out of order. Moreover, nodes can fail, and, in general, it may be impossible to determine precisely if a peer node has failed or its messages were simply dropped or delayed in the network. Nodes can also differ in processing power and experience different traffic load. Therefore, it is important to manage the rate of data transmission using flow control mechanisms, as well as to retransmit lost pieces of data. This can contribute significantly to the overall complexity of distributed protocols and their implementation.

Communication Layers

Most of the explored implementations use SSL/TLS over TCP/IP as a transport layer for P2P communication. Establishing a TCP connection takes a few packet round trips over the network. Moreover, operating systems impose limits on the number of open TCP connections per process because they consume system resources. For those reasons, communication layers based on TCP establish long-lived connections with remote peers and try to keeps the number of open connections low. This often means that the transport-level connections have to be multiplexed into multiple logical sub-streams.

Substrate and Lighthouse use libp2p as a networking stack for communication between nodes. The libp2p framework is a versatile modular peer-to-peer networking stack. It provides a collections of abstractions, mechanisms, and protocols for facilitating communication in P2P systems. In particular, libp2p supports multiple transport mechanisms (TCP, QUIC, WebSocket, WebTransport, etc.), encryption schemes (TLS and Noise), and stream multiplexing. Higher-level protocols in libp2p are implemented on top of reliable, ordered, bidirectional binary streams, which are transparently encrypted and multiplexed by the framework.

Communication layer in Sui is based on anemo, a peer-to-peer networking library built on top of QUIC. QUIC is a modern higher-level network transport protocol layered over UDP. It has built-in support for encryption and multiplexing. Similar to TCP connections, QUIC streams are reliable, ordered, bidirectional, providing flow control (backpressure), but they are cheap and almost instantaneous to open once an initial connection is established. The anemo library takes advantage of the efficient stream-multiplexing capability of QUIC; libp2p also uses the built-in capabilities of QUIC when it is used as a transport mechanism.

So there may be several levels of communication abstractions. There are low-level transport protocols like UDP or TCP, medium-level ones like QUIC, and comprehensive high-level networking stacks like libp2p. Higher-level mechanisms can be built on top of lower-level layers. Sometimes, it makes sense to fuse several layers, e.g. QUIC efficiently embeds security into the transport layer. In order to simplify implementation of higher-level layers, it is desirable to take advantage of those properties that are already guaranteed by lower-level layers, e.g. reliable, ordered delivery and flow control provided by commonly used transport layers such as TCP and QUIC.

Styles of Communication

There are different ways to organize communication between nodes. The most common styles of communication in the explored code bases are request-response and fire-and-forget message delivery. The request-response style follows the remote procedure call (RPC) pattern: the initiator node sends a message to the remote node, and the latter is expected to respond back. In the fire-and-forget style, the initiator node unidirectionally sends messages to the remote node without waiting for a response. Another style of communication, which is also often used in the explored implementations, is gossiping, where nodes publish and disseminate pieces of information among themselves in an indirect and random manner. Cardano uses a session-based style of communication, where peers establish continuous bidirectional communication channels and exchange messages according to some stateful communication protocol.

The fire-and-forget message delivery is a very simple style of communication. It does not mandate any acknowledgement from the remote node, so it can only provide best-effort delivery guarantee. Messages that cannot be handled for any reason are often simply dropped, e.g. when a message queue is full. Usually, there is also no guarantee about ordering of messages. Higher-level code needs to take care of such things as flow control, retransmission of lost messages, as well as determining and maintaining the context to handle messages in. On the other hand, this style can be expressed with a non-blocking interface. That allows sending a message to a group of remote nodes at once, which is a simple form of best-effort multi-/broadcasting. Some implementations provide a blocking or asynchronous variant of the interface giving more control over data flow within the local node. For example, in Substrate, the sender should wait until it acquires a free slot in the outgoing message buffer; the slot reservation is then consumed to enqueue a message.

The request-response style is a simple type of session-based communication: sending a request initiates a new session, which normally terminates with reception of the corresponding response. Sessions can terminate abnormally, e.g. upon a timeout. The request-response style demands blocking or asynchronous interface on the sender side since it should wait for and handle the eventual response or error. This provides a context for response messages linking them to the corresponding requests. However, the communication layer treats each individual session independently. More complex patterns of interaction have to be split into a number of one-shot request-response sessions. Multiple sessions may be initiated concurrently, and the communication layer needs to keep track of those one-shot sessions starting, running, and finishing concurrently.

The session-based style of communication is connection-oriented and supports stateful interaction between nodes. Communication sessions are established between individual nodes and represent reliable, ordered, bidirectional message streams. This provides a context for the messages being exchanged between nodes and implies blocking or asynchronous interface. Thanks to reliable and ordered delivery, the context establishes causal relationship between individual messages. Relying on those assumptions can greatly simplify the protocol implementation while taking advantage of the guarantees commonly provided by stream-based transport layers. This style of communication is quite generic and can express many different patterns of interaction. Combined with built-in flow control (backpressure), it is particularly suited for implementing consumer-driven communication. On the other hand, session-based communication cannot directly express multi-/broadcasting primitives and can induce additional latency in certain patterns of interaction. Though higher-level communication mechanisms built on top of session-based communication can implement multi-/broadcasting, whereas using pipelining techniques can help to hide latency and achieve good performance.

The gossip-style communication designates probabilistic broadcasting in a relay network of nodes. It resembles the best-effort broadcasting in the fire-and-forget message delivery style. The key difference is that data in the gossip-style communication can propagate from one node to another in multiple hops rather than being received directly from the source node. This makes it suitable for sparsely connected networks. Therefore, gossip communication can scale well in large networks. It can provide, with high probability, eventual delivery of bounded amount of data under normal network conditions. This style of communication implies a publish-subscribe interface. Similar to the fire-and-forget message delivery, the interface is largely stateless and can be non-blocking. Under the hood, it is often implemented using the advertise-request-response pattern of communication: nodes advertise available pieces of data to their neighbors and exchange with them the missing parts following the request-response pattern. Efficient gossip implementations require adaptive network topology and advanced data dissemination techniques, which can make them fairly complicated.

An interesting example of using the gossip-style communication is artifact pools in the Internet Computer blockchain. Artifact pools in ICP are structured collections of artifacts, generic pieces of data produced by the local replica or received from other nodes. The gossip layer is responsible for synchronizing artifact pools between nodes. Nodes communicate with each other through the artifact pools by adding/removing/moving artifacts to/from/between pool sections. Higher-level code is responsible for artifact validation; it also determines retention and prioritization policies.

It is easy to notice that some styles of communication can be implemented in terms of others. So the request-response style is a reduced from of the session-based communication, which is more generic and expressive. Both can be implemented relying on the fire-and-forget delivery and using some message retransmission and acknowledgement protocol. Or conversely, the fire-and-forget message delivery can be implemented on top of a reliable session-based communication using bounded lossy message queues. Similarly, gossip mechanisms can be implemented using any of the other styles of communication; though the implementations may differ in complexity.

Different styles of communication have different properties that can significantly influence the shape of code built around them. Some of them are strictly more expressive than others, but do not necessarily reduce to an equivalent, because less expressive mechanisms may have more efficient implementations. In order to avoid accidental complexity when implementing distributed protocols, it is important to have a range of communication mechanisms with aligned interfaces and clearly defined properties.

Internal Communication

Apart from interaction between nodes, there is also communication between concurrent tasks within the same node. This internal communication shares some similarity with communication between nodes. The main difference is in the communication medium: while different nodes communicate through unreliable and slow network, internal communication happens through fast and reliable shared memory. Some programming models and techniques make the similarity particularly prominent, e.g. the actor model, communicating sequential processes (CSP), remote procedure calls (RPC), etc.

Any piece of shared memory can act as a communication channel between internal components. Such a channel can be established by simply sharing a reference to the corresponding piece of memory. Internal messages do not need translation into/from a binary representation; they can be simply shared by reference. The request-response style of communication can be implemented as simple invocation of blocking or asynchronous procedures (functions); invoking non-blocking procedures (functions) without a return value corresponds to the fire-and-forget message delivery style. Obviously, such procedures need to be safe for concurrent invocation.

The session-based communication style can be implemented for internal communication using the constructs commonly known as channels (e.g. channels in Go, Tokio channels in Rust) or concurrent queues (e.g. LinkedBlockingQueue and other concurrent queues in Java). Those constructs belong to fundamental mechanisms of communication and coordination between concurrent components. Channels can be buffered or unbuffered (i.e. not buffered). Buffered channels and queues can hold items being sent through them without blocking the sender. In contrast, sending to or receiving from an unbuffered channel acts as a rendezvous point: it synchronizes the sender and the receiver at the point of communication.

Buffered channels and queues that can hold more than a singe item may return items to a receiver in different order. FIFO is the most commonly used ordering policy, in which items are returned in the same order as they were inserted. LIFO is another option, in which the most recently inserted item is the one that is returned first. One can think of many other options such as priority queues etc. The preferred ordering policy would depend on the purpose of communication.

Buffered channels and queues can be bounded or unbounded. The bounded version imposes a hard limit on the amount of items that they can hold. Unbounded channels and queues usually provide a simple non-blocking interface for inserting new items. However, they require some additional mechanism to prevent accumulating indefinite amount of items, e.g. blocking ingress of external events when internal buffers grow above certain threshold or relying on time-based assumptions such as throttling the data flow or imposing expiration time on the items. Such mechanisms can make reasoning about the protocol implementation more complicated.

Bounded channels and queues usually provide blocking or asynchronous interface. They can also support non-blocking insertion of new items, but then they must discard some items when there is no more capacity left. There may be different eviction policies. The simplest one is to discard the item being inserted. Otherwise, the new item is inserted, but some of the buffered items must be discarded, e.g. the least recently inserted one. Similarly to the ordering policy, there may be many other options, and the choice depends on the purpose of communication.

It is also worth mentioning buffered channels with a single-item buffer. They can be convenient for communicating a single item from one concurrent component to another, e.g. sending a response message back to the requester. The oneshot channel in Tokio is a good example of such channel type. CompletableFuture in Java can also be considered a kind of single-item buffered channel, as well as synchronizing variables MVar and TMVar in Haskell. Another interesting example of a single-item buffered channel is the watch channel in Tokio: it always keeps the last value sent to it. The watch channel is useful for watching for changes to a value from multiple concurrent components. Transactional variables (TVars) in Haskell are somewhat similar to watch channels since STM transactions can be suspended until one of the TVars that it has read from has been updated.

Channels and queues often serve as fundamental constructs to implement message passing between concurrent components. They can be used to implement various styles of internal communication and higher-level components. For example, implementations of components for communication between nodes often use channels and queues as internal message buffers.

The publish-subscribe design pattern resembles the gossip style of communication. It can be implemented for internal communication as an event bus or broadcast channel. Same as channels and queues, it can be buffered or unbuffered, bounded or unbounded. Unless messages can be dropped, unbuffered and bounded buffered implementations only support non-blocking publishing/broadcasting of messages if no subscriber blocks.

Similar to communication between nodes, different mechanisms and styles of internal communication have different properties that can significantly influence the shape of code. Therefore, it is equally important to have a range of internal communication mechanisms with aligned interfaces and clearly defined properties. The similarity between mechanisms for internal communication and communication between nodes provides an interesting perspective and can help to come up with better abstractions for communication.

Resilience

Fault-tolerant distributed systems are meant to tolerate (within limits) faults of individual nodes due to crashes, network partitioning, malfunctioning, or even malicious behavior. Crash fault tolerant (CFT) systems, e.g. Apache Kafka and Apache Zookeeper, are relatively simple since they can only withstand node crashes and network partitioning. Byzantine fault tolerant (BFT) systems, e.g. public blockchains, are designed to withstand arbitrary (including malicious) behavior of a fraction of nodes and thus are significantly more complicated. There are two sides of the issue: preventing faulty or malicious nodes from compromising the whole system and recovering failed nodes to rejoin the system.

Theoretically, fault-tolerant distributed protocols are designed so that they guarantee their safely and liveness properties despite the presence of faulty nodes in the system, provided that certain assumptions hold. In practice, those guarantees are only provided if the implementation ensures that the required assumptions actually hold. This is particularly challenging in BFT systems meant to operate in adversarial environments. Nodes in such systems can be subjects to various attacks, such as denial-of-service (DoS) attacks through resource exhaustion. Fairness between peers is another concern since it may also impact resilience.

To mitigate those risks, many implementations maintain reputation metrics for remote peers and apply rate-limiting or throttling techniques. Peer reputation is based on the observable behavior of the peer, such as protocol violations, timeouts, and performance. Nodes normally disconnect from remote peers whose reputation drops below a certain threshold, as well as reject inbound connections from those peers. Conversely, peers with higher reputation may be preferred for communication in sparsely connected systems.

In its threat-aware design approach, Cardano emphasizes detecting protocol violations as early as possible in the operational cycle where the data is available but the least resources have been expended to process the received data5. For instance, block and transaction relaying is interleaved with validation to avoid circulating invalid data in the system. This approach works well with stateful consumer-driven communication between nodes: Inbound messages must be well-formed syntactically and semantically valid in the context of information previously received from the peer node.

In order to allow failed nodes to efficiently restore and safely rejoin the system, some parts of the protocol state can be persisted in a stable storage. This is usually implemented as a write-ahead log (WAL), an append-only stable storage used for crash recovery. Certain events are first recorded in the log before the corresponding actions are taken, e.g. before sending messages to other nodes. This allows the node to restore and continue participating in the protocol from where it stopped, without violating the protocol. Persistence mechanisms are also required to support recovery from a massive system crash, i.e. to provide the durability property.

Early detection of protocol violations is advantageous, and the implementation structure should allow that. There should be a clear path for propagating information about detected protocol violations and other anomalies to adjust peer reputation metrics and take appropriate measures. Persistence mechanisms, such as write-ahead logging, are required for durability, as well as for safe and efficient node recovery.

Optimization

Practical distributed systems require not only reliability but also efficiency. Simplistic designs and implementations unfortunately tend to exhibit poor performance, whereas we would like that our systems scale well and provide decent throughput and latency. Improving those characteristics demands optimization at protocol and implementation levels. Great effort has been put into optimizing distributed protocols during decades of active research. This gave rise to a range of elaborate protocols attempting to achieve ever higher performance. On the implementation level, there also exists a variety of technical means for increasing efficiency. Optimizations, however, often add more complexity and make protocols harder to reason about and implement.

Protocol-level optimizations may involve using more complex communication patterns and topologies. Protocol phase pipelining, i.e. participating with a single message in multiple protocol phases at once, and speculative execution are common techniques to improve responsiveness. Batching, as well as advanced cryptography such as threshold signatures, helps to reduce communication overhead. State-of-the-art protocols are often based on advanced data structures, such as directed acyclic graphs (DAGs). At the implementation level, on-demand execution and caching are often used to avoid performing unnecessary or duplicate operations.

Communication contributes significantly to the overall overhead in distributed systems and, therefore, is a clear target for optimization. Point-to-point protocol pipelining, i.e. continuous sending of requests without waiting for the corresponding responses, can greatly increase performance by hiding high network latency. Widely used transport protocols, such as TCP, tend to perform best under steady data flow. Moreover, keeping multiple network connections consumes additional system resources. Therefore, implementations commonly multiplex multiple logical communication streams through a single network connection. Minimizing head-of-line blocking effects may require flow control mechanisms at the level of individual logical streams; large pieces of data should be transmitted through a multiplexed connection in chunks. Specific kinds of communication, e.g. state synchronization in blockchain systems, can benefit from dedicated, specialized communication mechanisms.

Interaction between concurrent components and across levels of abstraction is also subject to fine-tuning and optimization. Prioritization and flexible policies can help to maximize performance. For example, the system may perform better when certain concurrent tasks or communication paths have higher priority. Internal communication, as well as communication between nodes, may be optimized through prioritization and retention policies applied to individual messages or kinds of messages. This sort of optimization requires deep understanding of the protocol and its inner workings.

Expensive low-level operations, such as spawning threads, blocking on locks, and copying data, can become a hidden cause of suboptimal performance. There are well-known techniques that can help to avoid unnecessary low-level overhead. For example, execution pools avoid the overhead associated with creation and destruction of threads for executing short-lived concurrent tasks; non-blocking algorithms can improve performance by avoiding unnecessary suspension of thread execution; zero-copy techniques focus on eliminating excessive copying of data.

Improving performance characteristics of distributed systems may require nontrivial changes in the underlying protocols and their implementations. The structure of the code should be flexible enough to support such changes. Some optimizations can be confined within boundaries of abstract components, whereas some may require crossing the borders of modularity. Flexible and composable primitives and interfaces designed for optimization would help to fully realize the potential of distributed systems in practice.

Correctness

Correctness is absolutely essential for implementation of distributed fault-tolerant protocols since they are critical for ensuring reliability of the whole system. Formal verification methods allow confirming protocol correctness in terms of desired properties. Applying those methods requires that the protocol is described precisely with a formal specification. Though the way protocols are actually implemented in code tends to be significantly different from the notation used in protocol specifications. This discrepancy is clearly a potential source of errors. There are different methods that can help to acquire higher confidence in correctness of the protocol implementation.

Testing is an established practice to examine correctness of software. Comprehensive testing of complex systems happens at different levels, and modularity of the code supports more effective testing by isolating functionalities, enabling independent unit testing, simplifying integration testing, and promoting code reuse. Some code bases include dedicated interfaces and hooks to facilitate testing; fail points is a technique that allows injecting errors and other behavior at runtime for testing purposes, which is used in Aptos and Sui. In Algorand, each component of the hierarchical state machine implementing the consensus protocol can perform pre- and post-condition checks to validate if it conforms to its contract. Most code bases perform diagnostic logging or tracing that can also be useful for testing, e.g. to check invariants in property-based testing.

Deterministic discrete-event simulation is a powerful technique that can be used for performing randomized but reproducible testing. For example, Sui, Apache Kafka, and Cardano employ this technique. It works by running the code within a special runtime that supports deterministic, randomized execution of concurrent code, as well as faster-than-real-time simulation of time delays. This technique can be used to run an entire network in a single process, with simulated network latency and packet loss. To ensure deterministic execution, the simulation approach usually requires that the code is generic over the sources of local time and randomness; it can also rely on code instrumentation techniques. The key advantage of this approach is that it allows running precisely the same code in the simulator for testing as in production.

Certain correctness properties of code can be ensured statically, i.e. at compile time. Those checks rely on the programming language's type system. Software engineers can take advantage of type safety features to implement components in a way that makes them safe by construction. For example, Cardano uses the typed-protocols package, a generic framework for implementing application-level protocols, which is based on a simple form of session typing.6 Within this framework, protocols are described as state machines encoded into Haskell types. The allowed transitions between states correspond to messages exchanged between the peers, so the protocol state determines which messages are allowed to be sent or must be accepted when received, at type level. This simplifies protocol implementation, allows early detection of protocol violations, and makes the protocols themselves deadlock-free by construction. More advanced type-level programming techniques may allow achieving impressive levels of type safety; however, such code may be significantly harder to implement, understand, and maintain.

Ensuring correctness in distributed systems is a complex task. Protocols and their properties can be formally specified and verified. Expressing the protocol specification and its implementation using possibly similar notations could help to ensure equivalence between the two. Modular and generic structure of code, as well as using various testing support features within the code base, support more effective testing. Supporting deterministic discrete-event simulation is particularly powerful for reproducible randomized testing. Finally, type safety techniques like session types and typestates can eliminate certain kinds of programming errors at compile time.

Resource Utilization

Real computing systems are fundamentally bounded in the amount of available resources. Computers operate with limited computational power, memory, storage, and network bandwidth. Operating systems impose further limits on such system resources as threads, open network connections, file handles, etc. Practical systems are required to prevent resource leaks, as well as to ensure fair and efficient utilization of available resources.

Some resources, such as allocated memory, open file handles and network connections, spawned concurrent tasks and threads, may require explicit actions to release them properly when they are no longer needed. Failing to release resources promptly is known as a resource leak. It can cause resource starvation, slowdowns, and instability in the system. Relying on explicit releasing of acquired resources is known to be error-prone. Automatically releasing resources based on lifetimes and lexical scopes is a more robust form of resource management. Sometimes the encompassing lexical scope's lifetime is longer than the resource's natural life cycle, e.g. when managing concurrent tasks, so that strict lexical scoping becomes inappropriate. In such cases, resources may be managed more explicitly within the scope, but with a fallback mechanism to track resources and ensure that any remaining resource gets released when leaving the scope.7

Concurrency often makes resource management more challenging. First of all, concurrent tasks running in background is a kind of resource that needs to be released when no longer needed. Moreover, they can acquire other resources that should be released when the task is terminated, even in case of asynchronous cancellation. In simple cases, there is a limited number of long-running concurrent tasks, which are responsible for releasing the resources acquired by them, and their termination is synchronized with the main task; short-living jobs can run concurrently on execution pools that distribute those jobs among a number of long-running concurrent tasks. When more flexibility is desired, structured concurrency can help managing concurrent code in a more organized and predictable manner by organizing concurrent tasks into a structured hierarchy with well-defined scopes and lifetimes.8

Individual parts of a distributed system may operate at different pace. Moreover, for performance reasons, it is common to apply pipelining techniques when communicating with remote nodes, i.e. proceed without waiting for a response or acknowledgement form the remote node in order to hide network latency. Compensating for the delays and variability in throughput demands some kind of explicit or implicit buffering, e.g. buffered channels, send/receive queues, pending request trackers, out-of-context message buffers, etc. The amount of buffered state tends to grow under certain conditions, e.g. under heavy load or during network instability. Therefore, there should be some mechanisms to prevent unbounded growth of state without compromising liveness. That can be such mechanisms as backpressure, rate limiting, item expiration and eviction policies, etc.

In adversarial environments, potential DoS attacks through resource exhaustion is a major threat. An adversary may attempt to exhaust node's resources, such as network bandwidth, memory and computational capacity. In order to effectively mitigate such attacks, they should be prohibitively expensive for the attacker relative to the amount of resource consumed from honest participants. Early detection of protocol violations and consumer-driven data flow, as employed in Cardano, can reduce the amount of resources expended by the nodes under attack. It may also be useful to tack resource expenditure caused by processing messages from remote peers, as done in Avalanche, and apply fair throttling to communication channels.

Proper resource management is indispensable in long-running systems. It can be particularly challenging combined with concurrency. Reliable resource management approaches, e.g. based on lifetimes and lexical scopes, as well as structured concurrency, should be, when applicable, preferred to relying on explicit hand-coded releasing of acquired resources. Potential growth of state due to buffering requires mechanisms for ensuring bounded memory usage. Resource exhaustion attacks should be anticipated in adversarial environments and mitigated by minimizing their impact on honest nodes.

Maintainability

Maintenance of distributed systems is challenging. Those systems are usually long-running critical parts of infrastructure with high reliability requirements. They are complex systems consisting of multiple nodes, often operated independently by different entities. Publicly available deployments are also subject to malicious attacks. Thus effective maintenance of distributed systems demands comprehensive mechanisms and tools.

First of all, deploying distributed systems may require specific bootstrapping procedures in order to ensure a secure setup for the whole system. Different distributed protocols may have different requirements and rely on different assumptions for the setup phase.9 Protocol implementations should be clear about the requirements and assumptions for their setup phase. Long-running, highly available distributed systems should be capable of upgrading individual nodes with newer versions of the protocol implementation without disrupting the whole system. This requires designing for backward and forward compatibility. Similarly, failed nodes should be able to recover and rejoin the system safely and efficiently. Moreover, it is also desired that the system is able to safely recover from a massive crash, i.e. provide durability. Therefore, the protocols should be designed and implemented with a clear recovery procedure.

Distributed system administrators need mechanisms and tools for monitoring individual nodes in order to analyze the system and promptly detect anomalies. Developers also need effective mechanisms for analyzing, diagnosing issues, and identifying bugs in protocol implementations. Logging, tracing, and collecting metrics are common observability techniques to allow monitoring and obtaining diagnostic information from the system; most of the explored code bases use these techniques. OpenTelemetry and Prometheus are popular open-source monitoring solutions, which are used in many of the explored code bases.

Diagnostic logging typically refers to emitting and recording chronological textual messages that capture important events happening during the execution of software. Messages in diagnostic logs are traditionally assigned a severity level that can be used to disable logging of messages below a certain severity level, e.g. debug messages. Log messages can support addition of structured data along with a formatted text message, e.g. key-value context fields. Logging can be organized hierarchically, reflecting the structure of components within the system. Messages in hierarchical logging are usually automatically enriched with context from higher-level components.

Tracing is somewhat similar to logging, but it is focused on capturing a detailed view of the flow of execution in the system. Tracing records are primarily structured rather than textual and reflect causal relationships. In particular, distributed tracing is tracking of events caused by processing individual logical operations, such as user requests or transactions, across different components of a distributed system. A distributed trace is associated with a single logical operation and consists of spans linked with causal relationships where each span represents a particular activity within the operation. Spans normally contain structured data describing the corresponding activity and timing information.

Metrics represent numeric measurements that describe the system's behavior over time. Metrics are typically collected and aggregated at regular intervals. They can include various types of information such as CPU and memory utilization, latency, error rates, throughput, queue lengths, etc. There are different kinds or metrics; the most widely used are counter, gauge, and histogram. A counter is a cumulative metric monotonically increasing over time; a gauge expresses the current value of some measurement; a histogram records sampled observations in a statistical representation.

Observability is a cross-cutting concern. Most implementations define abstract interfaces for logging, tracing, and capturing metrics and require them as dependency across components; some use code instrumentation techniques. Cardano uses an interesting approach to implement observability features, called contravariant tracing, in which domain-specific values are provided to domain-agnostic processors. The contravariance property allows domain-agnostic tracers to be adapted and stand in where a domain-specific tracer is required. This discourages using textual encoding for diagnostic logging/tracing in favor of dedicated domain-specific event types. Contravariant tracing can also be used to collect metrics.

Detailed logging and tracing can add significant overhead. When logging a large amount of diagnostic data is expensive, logging can be sampled, producing only a subset of the total messages based on a predetermined sampling rate or criteria. The contravariant tracing incurs zero runtime cost if the program is compiled with tracing disabled; this is possible even when dealing with a tracer which ignores only certain types of events.

Fault-tolerant distributed protocols should be designed and implemented with clear bootstrapping, upgrading, and recovery procedures. Note that upgradability relies on backward and forward compatibility of the implementation. It is also worth considering the durability feature, i.e. the ability to safely recover the system from a massive crash. There should be seamless support for usual observability and diagnostic mechanisms.

Flexibility

Flexible software is able to adapt to changing requirements without having to undergo extensive restructuring. Flexibility is crucial for adoption, reuse, and evolution of code. Each explicit or implicit assumption or requirement imposed on how the code can be used is an additional constraint reducing flexibility. The explored code bases were primarily meant to implement particular protocols or serve specific purposes rather than to address fundamental needs for implementing distributed systems in general. This is a common approach to building software, but it tends to result in rather limited flexibility of the code.

In general, highly modular and composable code is also more flexible. Clear separation of concerns through abstract interfaces and dependency inversion contributes to flexibility by enabling interchangeable components, as well as facilitating easier code modifications and extensions. Flexibility can also be enhanced with generic and configurable components. Generic programming techniques, such as parametric polymorphism, encourage the development of more generic and adaptable components that can be used in different contexts without modification.

The ability to seamlessly integrate into larger systems is another aspect of flexibility required for adoption and reuse of protocol implementations. Since larger systems may opt for different programming languages and runtime environments, it is important to support interfacing with other languages and impose minimal runtime requirements. Rust is particularly suitable to implement robust software components for integrating into other languages and environments due to its rich language features, zero-cost abstractions, predictable performance, safe memory management without a garbage collector, and the ability to use custom concurrency runtimes.

Designing for flexibility promotes adoption, reuse, and evolution of code. Following this approach should be a deliberate choice from the beginning. Avoiding strong constraints, assumptions, and requirements, aiming at modularity and composability with generic and configurable components make for greater flexibility.

Conclusions

I learned a lot while exploring those 14 code bases. I have acquired a much deeper understanding of what is important for a practical distributed protocol implementation and what are the typical challenges there. I have seen different approaches in use and discovered some interesting ideas and techniques scattered around. Though I find the ways distributed protocols are implemented quite unsatisfactory. Even for an engineer experienced in implementing this kind of protocols, most of the the code bases were fairly hard to comprehend and follow. I can imagine how much effort it took and how painful was it to first make them work, as well as to improve them later.

Status Quo

Most of the time, it was rather hard to follow the main protocol, its causal dependencies and logical connections in the code that was presumably structured focusing on the operational aspects, fragmented, entangled, and cluttered. Structuring the protocol implementation directly around simplistic communication mechanisms foregoing reliable and ordered delivery guarantees provided by the transport layers, expressing concurrency and synchronization explicitly in terms of low-level mechanisms based on shared mutable memory and lock-based primitives or attempting to evade concurrency in favor of sequential state machines, all seem to cause fragmentation of the protocol logic across the code base, shift the focus towards operational technicalities, and incur cluttering of the code with boilerplate and hand-coded flow control, context switching, resource management, etc. Invasive ad hoc optimizations, patches, and cross-cutting concerns also contribute to muddling the code. Often insufficient modularity, unclear structure, excessive coupling, and abundance of mutable state complicate the matter further. Using sophisticated techniques and lack of inline documentation present additional obstacles for understanding.

It seems barely possible to fully convince oneself that the majority of the implementations actually correspond closely to the original protocol and guarantee the claimed properties under unfavorable conditions. The way the protocols are expressed in code does not appear anything like formal specification. The ever-present possibility of such subtle issues as race conditions, deadlocks, resource starvation, and, in some languages, data races manifesting themselves in such complicated code bases does not add more confidence. Unconstrained nondeterminism and abundance of non-local operations result in state space explosion, making rigorous test-based verification infeasible. Only a few of the implementations support reproducible testing with deterministic discrete-event simulation of unmodified code.

It also seems unclear how many of the implementations would behave under certain high load conditions, e.g. under a denial-of-service attack. Protocol violations are not always optimally detected at early stages of processing incoming data; many implementations lack mechanisms for propagating information about detected anomalies towards lower communication layers in order to restrict communication with offending nodes. The majority of the implementations employ the push style of communication and forgo flow control mechanisms of transport layers, so individual remote nodes can potentially send arbitrary amounts of data that the receiving node has to deal with in time. Uncomfortably often there are unbounded buffers and queues with unclear mechanisms that could control growth of state. Unreliable explicit hand-coded resource management could cause resource leaks, including concurrent tasks dangling in background.

In terms of observability, most of the protocol implementations rely on simple logging with context fields and collect various metrics. However, this may not provide enough details and context for effective debugging and analysis of the protocol execution.

The explored code bases are quite specific to particular protocols, execution environments, and use cases. Modularity there is rather coarse and most of the components are not meant to be reused or recombined; tight coupling is also not rare. This harms adaptability and reusability of the code, making it inflexible.

We Can Do Better

I think we can do much better. I think we should not waste our efforts reinventing the wheel over and over again and repeating mistakes. Builders better focus on implementing the functionality specific to their solutions without having to figure out how to approach implementing the tricky but critically important distributed protocols. There should be a framework that solves the problem of implementing distributed protocols once and for all, a framework reach with easy-to-use, reliable primitives and components that can be taken as is or mixed and matched as needed, a framework that guides towards robust and understandable code, a framework that supports analyzing, monitoring, testing, and debugging protocol implementations, a framework that is reasonably efficient and can be easily integrated into various environments.

The framework should guide away from incidental, non-essential complexity and allow expressing protocol implementations in clear and understandable code. Protocol implementations should be structured primarily focusing on functional and logical aspects with clear separation of concerns, operational technicalities and sophisticated techniques possibly hidden behind simple and clearly defined abstractions. Fine modularity of reasonably small and simple components expressed in more declarative notation with reduced number of non-local operations should facilitate local reasoning. Components should have minimal internal state, as well as clearly defined requirements, properties, and external dependencies.

Concurrency requires special attention since it is unavoidable, tricky, and can add a great deal of complexity, whereas designing for concurrency can be actually advantageous in terms of code structure and modularity. Working with concurrency should be possibly safe, easy, and efficient. Low-level concurrency mechanisms, such as OS threads, lock-based synchronization primitives, and shared mutable memory, should only be used for implementing the internals of higher-level, safer, and easier-to-use concurrency mechanisms, such as concurrency runtimes. Expressing concurrent parts of the protocol in code should feel as natural as expressing sequential ones. This can be achieved with syntactical means, abstractions, and a concurrency model that recognizes any causally independent operations as potentially concurrent.

Since communication interfaces can greatly affect the structure of code built around them, we need a range of communication mechanisms with aligned interfaces and clearly defined properties. There should be different levels of abstractions for communication. Higher levels should take advantage of the properties already guaranteed by lower-level layers, such as reliable, ordered communication channels with flow control provided by commonly used transport layers. Expressing communication in stateful sessions can help to express causal relationship between individual messages and greatly simplify protocol implementations. Similarity between internal and external communication can suggest better abstractions.

Flexibility is extremely important to make the framework applicable to an open-ended space of use cases. Therefore, we should by all means avoid strong constraints, assumptions, and requirements. The framework should support integration with different programming languages and runtime environments. Its components should be generic and configurable. It should also support backward and forward compatibility. Composability is critical for ensuring great adaptability, reusability, and evolvability. It requires unified means of abstraction and combination. Generic programming techniques, such as parametric polymorphism, can be used to make components generic; functional and asynchronous programming techniques can be great sources of ideas for enhancing composability, particularly with concurrency.

Correctness of distributed protocol implementations should be verifiable, in terms of both safety and liveness properties. Formal verification methods are able to provide rigorous assurance about correctness of protocols and their implementations. However, since formal verification involves exhaustively analyzing all possible states of a system, it may become infeasible for large and complex components. Fine modularity, components amenable to local reasoning, as well as reducing the number of non-local and nondeterministic operations, can help making formal verification more tractable. In order to maintain equivalence between a formally verified protocol specification and its implementation in code, the implementation should be expressed possibly close to the formal specification, preferably using an identical notation. Type safely techniques, such as ownership, typestates, session types, linear and uniqueness types, can greatly help to ensure correctness of the code by making it virtually safe by construction in terms of certain properties. Hybrid approaches combining formal verification with other testing methods can be used to achieve decently high assurance about correctness where purely formal methods become infeasible. Deterministic discrete-event simulation of unmodified code is a particularly powerful technique to complement other verification methods with randomized, reproducible testing. Confining nondeterministic aspects behind abstract interfaces in code and being able to control nondeterminism during simulation is the key for enabling reproducible testing.

Distributed protocols and their implementations should provide strong guarantees even under unfavorable conditions, especially those supposed to be deployed in adversarial environments. The framework should employ a reliable approach for resource management in concurrent code, e.g. based on lifetimes and lexical scopes, structured concurrency. There should be mechanisms for flow control preventing unlimited growth of state and ensuring bounded memory usage. The framework should emphasize threat-aware design. Potential impact of resource exhaustion (DoS) attacks should be minimized with early detection of protocol violations and propagating information about detected anomalies for maintaining peer reputation metrics and taking appropriate measures. For that reason, consumer-driven patterns of communication should be preferred. There should also be mechanisms for safe and efficient recovery of failed nodes from persistent storage. Supporting durability, i.e. safe recovery after a massive system crash, is also desirable.

Protocols and their implementations should be clear about bootstrapping and recovery requirements and procedures. Upgradability requires backward and forward compatibility. There should be seamless support for usual observability and diagnostic mechanisms, such as logging, tracing, and collecting metrics. It may also be useful to provide mechanisms for tracking resource expenditure caused by processing incoming data. In place of simple logging with context fields, it seems advantageous supporting structured distributed tracing using domain-specific trace event types. This kind of tracing could also be suitable for collecting metrics. It is important to minimize incurred overhead when tracing is disabled. Code instrumentation can help to avoid cluttering code with tracing boilerplate.

The framework should provide good performance and support various optimizations, such as speculative and on-demand execution, caching, flexible prioritization policies. To support protocol-level optimizations, the framework should allow expressing complex communication patterns. The communication layer should prevent such undesired effects as head-of-line blocking, optimize data flow and take advantage of the properties provided by the transport layers. Lightweight user threads and non-blocking algorithms allow achieving high concurrency without compromising efficiency. Zero-copy techniques can be used to eliminate unnecessary copying of data.

So we need a structured, yet flexible enough, approach guiding away from incidental complexity towards understandability, fine modularity, and composability. The framework's components should be generic and configurable, allowing local reasoning about the implementation. Expressing concurrency and communication abstractions should be safe and easy, structured and composable. We should be serious about correctness and resilience against unfavorable conditions. The framework should also cater for maintenance needs, provide great observability and diagnostic mechanisms. It should deliver decent performance and allow for various optimizations.

Next Steps

Having explored those implementations of distributed protocol, now it became more clear to me what is worth focusing on while developing the Replica_IO framework. I define the following key areas of focus:

  • simplicity: making protocol implementations well structured and understandable;
  • flexibility: keeping the framework adaptable, widely applicable, and evolvable;
  • reliability: ensuring that protocol correctness is verifiable and the implementation is resilient;
  • efficiency: allowing for various optimizations and delivering good performance;
  • maintainability: catering for maintenance needs and providing great diagnostic mechanisms.

Achieving all of that at once is obviously not realistic. Therefore, the primary focus will be initially put on simplicity, flexibility, and reliability, but without neglecting the remaining aspects. Of particular interest are the matters of structure and notation supporting composability in concurrency and communication mechanisms, as well as controlling nondeterminism.

Exploring distributed protocol implementations was the first phase of the initial state-of-the-art exploration. The next step is to select and examine some existing frameworks for developing distributed protocols in order to find out how they attempt to approach the problem and, perhaps, also discover some interesting techniques or ideas employed there. Then there are some potentially related concepts, approaches, and techniques worth looking into. The exploration tasks are tracked in the scope of this issue on GitHub.

Once the initial exploratory stage is over, it will be time to come up with key ideas concerning core principles that will guide the process of designing and implementing generic components within the framework (milestone M0.1). Then those ideas will be developed into clearly formulated concepts (milestone M0.2), their feasibility will be verified with code (milestone M0.3). After that, prototype, MVP, and production versions of the framework will be developed and released (milestones  M1,  M2, and  M3).

It does not mean at all that exploration, ideation, and prototyping will not take place at later stages; the milestones simply define the framework's general level of maturity. The framework will continuously evolve and expand and at some point become a de facto standard for implementing critical fault-tolerant systems providing a growing collection of easy-to-use reliable and efficient distributed replication mechanisms.

❤️Supporting
If you like the project and find it valuable, please support its further development! 🙏
tip

If you have any thought you would like to share or any question regarding this post, please add a comment here. You are also welcome to start a new discussion or chime in to our Discord server.

Footnotes

  1. If you know of some other implementation that I should have absolutely looked into for some reason, please let me know.

  2. Actually, incidental complexity can start creeping in even earlier, into the way we think about distributed systems, but let's not go into this here.

  3. Zarko Milosevic, CTO at Informal Systems, tells in his invited talk at ConsensusDays 23 how a small protocol change addressing a major issue resulted in months of implementation work on the Tendermint code base.

  4. In his talk "Using STM for Modular Concurrency", Duncan Coutts expands on the approach to concurrency employed by Cardano.

  5. More about the threat-aware design approach in "Introduction to the design of the Data Diffusion and Networking for Cardano Shelley".

  6. The typed-protocols framework was presented in the talk "Well-Typed Communication Protocols" by Duncan Coutts.

  7. ResourceRegistry used in Cardano is an example of a fallback mechanism based on lexical scoping for preventing resource leaks.

  8. Nathaniel J. Smith elaborates on structured concurrency in great detail in his blog post "Notes on structured concurrency, or: Go statement considered harmful".

  9. This post discusses the setup phase in distributed systems.