Transforming non-existing datasets


There’s an old trick that never quite gets old: you run a high-velocity exercise that generates a massive amount of traffic through some sort of a multi-part system, whereby some of those parts are (spectacularly) getting killed and periodically recovered.

TL;DR a simple demonstration that does exactly that (and see detailed comments inside):

ScriptAction
cp-rmnode-rebalancetaking a random node to maintenance when there’s no data redundancy
cp-rmnode-ec(erasure coded content) + (immediate loss of a node)
cp-rmdisk(3-way replication) + (immediate loss of a random drive)

The scripts are self-contained and will run with any aistore instance that has at least 5 nodes, each with 3+ disks.

But when the traffic is running and the parts are getting periodically killed and recovered in a variety of realistic ways – then you would maybe want to watch it via Prometheus or Graphite/Grafana. Or, at the very least, via ais show performance – the poor man’s choice that’s always available.

for details, run: ais show performance --help

Observability notwithstanding, the idea is always the same – to see whether the combined throughput dips at any point (it does). And by how much, how long (it depends).

There’s one (and only one) problem though: vanilla copying may sound dull and mundane. Frankly, it is totally unexciting, even when coincided with all the rebalancing/rebuilding runtime drama behind the scenes.

Copy

And so, to make it marginally more interesting – but also to increase usability – we go ahead and copy a non-existing dataset. Something like:

$ ais ls s3
No "s3://" matching buckets in the cluster. Use '--all' option to list _all_ buckets.

$ ais storage summary s3://src --all
NAME             OBJECTS (cached, remote)
s3://src                  0       1430

$ ais ls gs
No "gs://" matching buckets in the cluster. Use '--all' option to list _all_ buckets.

$ ais cp s3://src gs://dst --progress --refresh 3 --all

Copied objects:              277/1430 [===========>--------------------------------------------------] 19 %
Copied size:    277.00 KiB / 1.40 MiB [===========>--------------------------------------------------] 19 %

The first three commands briefly establish non-existence – the fact that there are no Amazon and Google buckets in the cluster right now.

ais storage summary command (and its close relative ais ls --summary) will also report whether the source is visible/accessible and will conveniently compute numbers and sizes (not shown).

But because “existence” may come with all sorts of connotations the term is: presence. We say “present” or “not present” in reference to remote buckets and/or data in those buckets, whereby the latter may or may not be currently present in part or in whole.

In this case, both the source and the destination (s3://src and gs://dst, respectively) were ostensibly not present, and we just went ahead to run the copy with a progress bar and a variety of not shown list/range/prefix selections and options (see --help for details).

Transform

From here on, the immediate and fully expected question is: transformation. Namely – whether it’d be possible to transform datasets – not just copy but also apply a user-defined transformation to the source that may be (currently) stored in the AIS cluster, or maybe not or not entirely.

Something like:

$ ais etl init spec --name=my-custom-transform --from-file=my-custom-transform.yaml

followed by:

$ ais etl bucket my-custom-transform s3://src gs://dst --progress --refresh 3 --all

The first step deploys user containers on each clustered node. More precisely, the init-spec API call is broadcast to all target nodes; in response, each node calls K8s API to pull the corresponding image and run it locally and in parallel – but only if the container in question is not already previously deployed.

(And yes, ETL is the only aistore feature that does require Kubernetes.)

Another flavor of ais etl init command is ais etl init code – see --help for details.

That was the first step – the second is virtually identical to copying (see previous section). It’ll read remote dataset from Amazon S3, transform it, and place the result into another (e.g., Google) cloud.

As a quick aside, anything that aistore reads or writes remotely it also stores. Storing is always done in full accordance with the configured redundancy and other applicable bucket policies and – secondly – all subsequent access to the same content (that previously was remote) gets terminated inside the cluster.

Despite node and drive failures

The scripts above periodically fail and recover nodes and disks. But we could also go ahead and replace ais cp command with its ais etl counterpart – that is, replace dataset replication with dataset (offline) transformation, while leaving everything else intact.

We could do even more – select any startable job:

$ ais start <TAB-TAB>
prefetch           dsort              etl                cleanup            mirror             warm-up-metadata   move-bck
download           lru                rebalance          resilver           ec-encode          copy-bck

and run it while simultaneously taking out nodes and disks. It’ll run and, given enough redundancy in the system, it’ll recover and will keep going.

NOTE:

The ability to recover is much more fundamental than any specific job kind that’s already supported today or will be added in the future.

Not every job is startable. In fact, majority of the supported jobs have their own dedicated API and CLI, and there are still other jobs that run only on demand.

The Upshot

The beauty of copying is in the eye of the beholder. But personally, big part of it is that there’s no need to have a client. Not that clients are bad, I’m not saying that (in fact, the opposite may be true). But there’s a certain elegance and power in running self-contained jobs that are autonomously driven by the cluster and execute at (N * disk-bandwidth) aggregated throughput, where N is the total number of clustered disks.

At the core of it, there’s the (core) process whereby all nodes, in parallel, run reading and writing threads on a per (local) disk basis, each reading thread traversing local, or soon-to-be local, part of the source dataset. Whether it’d be vanilla copying or user-defined offline transformation on steroids, the underlying iterative picture is always the same:

  1. read the next object using a built-in (local or remote) or etl container-provided reader
  2. write it using built-in (local or remote), or container-provided writer
  3. repeat

Parallelism and autonomy always go hand in hand. In aistore, location rules are cluster-wide universal. Given identical (versioned, protected, and replicated) cluster map and its own disposition of local disks, each node independently decides what to read and where to write it. There’s no stepping-over, no duplication, and no conflicts.

Question to maybe take offline: how to do the “nexting” when the source is remote (i.e., not present)? How to iterate a remote source without loss of parallelism?

And so, even though it ultimately boils down to iteratively calling read and write primitives, the core process appears to be infinitely flexible in its applications.

And that’s the upshot.

References

High performance I/O for large scale deep learning

Abstract – Training deep learning (DL) models on petascale datasets is essential for achieving competitive and state-of-the-art performance in applications such as speech, video analytics, and object recognition. However, existing distributed filesystems were not developed for the access patterns and usability requirements of DL jobs. In this paper, we describe AIStore, a highly scalable, easy-to-deploy […]

Four decades of tangled concerns

Numbers don’t lie. Take any storage stack – local or distributed, eventually consistent or ACID-transactional, highly available or otherwise. Ask an innocent question: how does it perform? The benchmarks – if they are current, valid, and most importantly, published – will tell only a part of the story.

In reality, an infinitesimally small part. Consider the following, very modest, example with comments below:

tab1

(*) To get an idea of scope and diversity of the performance tunables, let’s see some popular examples:

In all cases, the numbers of tunables fluctuate anywhere between 20 and 40. On top of any/all of the above there’d often be a storage transport, with its own 10 or 20 client-and-server side knobs.

(**) The knobs themselves will frequently have continuous ranges. The most popular methods to enumerate continuous ranges include divide-by-a-constant and divide-by-a-power-of-two. If these two wouldn’t help, we then could just go ahead and apply Latin Hypercube Sampling – it’s brutal but still better than citing a single default and accompanying it with a stern warning not to change at your own risk, etc.

(***) As for the workloads, on the most basic level they are defined by: synchronous/asynchronous and random/sequential permutations as well as read/write ratios and (application-defined) transfer sizes. They also include specified numbers of worker threads, protocol-specific containers and objects per container, and depth of the container hierarchy (if applicable).

Using those primitives as a starter, and taking into account that read/write ratios are often applied at 25% increments, sequential write is different from sequential rewrite, O_DSYNC is different from NFS fsync – we then combine all this together and come up with estimates. Unsurprisingly, they all will be bigger than the 32 number from the table, by at least a couple orders of magnitude.

However: this presumably corrected workload number (whatever it is) would still be a far, far cry from full workload characterization – because the latter includes I/O burstiness, spatial and temporal localities, size of the working set, compress-ability and deduplication-ability.

Moreover, each of the resulting workloads must be cross-tested across a massive variety of influential environmental factors: on-disk layouts of allocated blocks/chunks/shards, the presence of snapshots and clones and their numbers, the depth of the metadata hierarchy and its distribution, the raw bit error rate as well as its post-ECC BER component. Many of these factors accumulate over time, thus adding to the condition called (quite literally) – aging.

But there is more.

(****) Constant traffic creates a new reality. If you have lived long enough and seen your share of performance charts, you might have noticed that a 10-minute interval may look strikingly different – before and after a couple hours of continuous workload. This nagging (unconfirmed) observation has an ample evidence – the horror stories on the web posted by unsuspecting users, multi-hour testing recommendations from the vendor community, and extensive multi-year studies:

(*****) “One would expect that repeated, carefully controlled experiments might yield nearly identical performance results but we found otherwise,” – write the authors of the FAST’ 17 paper, correctly identifying the widespread, albeit naive, trust in the technological determinism.

But even though every single ‘if’ and ‘for’ statement is ostensibly quite deterministic, there is often no determinism at all when it comes to massively-complex systems. Tiny variations in the order of execution, the environment, the workload produce dramatically different performance results. Anecdotal evidence abounds with examples that create, for instance, small files in a slightly different order, and register a 15-175 times slow-down, etc.

The noise, the variance, the non-reproducibility of the common benchmarks drives the only available inference: a process of measuring storage performance is genuinely stochastic. As such, it must be accompanied by first and second moments along with confidence intervals.

It is difficult, however, to have at least 95% confidence when a sample size is below 100. It is, in fact, fairly impossible. Which means that the very last number in the table above – the 10 runs – must be considered totally inadequate, much like all the previously discussed numbers.

(As a corollary, a single run is a fluke and an outlier if performed below the expectations. Always a silver lining.)

Clustered CDF

Different sources cite different numbers. For instance, the already mentioned FAST’17 study compares three popular local filesystems. According to this research, the total benchmark time ranges anywhere between 1015 to 1033 years (per filesystem). Which, incidentally, exceeds the age of the universe by at least 4 orders of magnitude.

(The good news, though, is that, given enough hardware, the time to evaluate the storage stack performance can be mitigated.)

Scale is part of the problem. Suppose we have a server that 99% of the time handles requests with latency <= A. Now compare the two latency CDFs, for a single server (blue) and for 100 identical servers (red):

fig1

In a 100-node cluster the odds to observe greater than A latencies skyrocket to (1 – 0.99100) = 63.4%. For an industry-grade five nines and a 1000-node cluster the same exercise gives 0.995%. Generally, the so-called tail latency becomes a real issue at scale, even when none of the specific standalone tails is fat, long, heavy or otherwise out of whack. Thus, corroborating the old adage that anything that can possibly go wrong, does with ever-growing chances.

Inherence

In light of the above, it should be no wonder that the performance-related discussions typically sound too open-ended at best, ambiguous or even hostile, at worst. Personally, I believe that the only way to cope with the associated unease is to state, and accept, the following:

The performance of a qualified storage stack cannot be known. (By qualified, I mean any stack that stores at least one petabyte in production – which seems like a reasonable threshold today – and that is used for/by mission-critical applications requiring low latency.) The stack’s performance is inherently unknowable

The word “inherence”, by the way, originates from the Empedocles’ idea that the qualities of matter come from the relative proportions of each of the four elements: earth, water, air, and fire. This idea, as we know today, does not describe matter correctly, much like the still prevalent view that a storage system consists of four components: a controller attached to its memory and a target attached to its disk…

COPs

The scale of the cluster, the size of the working set, the number of concurrently-active tiers – all these factors exponentialize the complexity of the software/hardware constructions. Freeze all of the above – and what will remain is (only!) a parameter space of all possible workloads and all valid configurations.

As shown above, the parameter space in question is enormous – infinite, for all intents and purposes. Which is unfortunate, but maybe not the end of the world – if we could devise an analytical model or framework, to compute/estimate the stuff that we can never test. This model would, potentially, include a DAG for each IO request type, with edges reflecting causal and/or precedence relationships between the request’s parent and children (and their children) – at various stages of the IO execution.

It would also include inter-DAG causal and precedence relationships between the concurrent IOs within a context of a single transaction which, depending on the semantic model of the storage protocol, may or may not possess some or all ACID properties. (As well as inter-transactional relationships, etc.)

Further, any given IO DAG will be spanning local (virtual and physical) memory hierarchies, local (virtual and physical) disks, and – in the distributed-storage case – remote servers with their own layers of volatile and persistent caches, memories, journals, and disks.

As such, this DAG would be connecting multiple cross-over points (COPs) where the IO parent and its children belong to different domains: CPU caches vs. RAM, user vs. kernel, virtual vs. physical, fast memory (or disk) vs slow memory (or disk), etc. In a simplified model/framework, every single COP becomes a queue with consumers and producers having different resources and executing at vastly different rates – from nanoseconds (CPU caches, RAM) to milliseconds (TCP, HDD):

While bottlenecks and SPOFs are often in-your-face obvious and even documented, much of the performance trouble is subtle and elusive – sinister if you will. Much of it lies in and around those COPs – and here are some of the (maybe) less obvious reasons:

  • the number of simultaneously existing COPs is proportional to the (extreme) heterogeneity of the volatile and persistent tiers “multiplied” by the scale/velocity/volume of the concurrent IOs;
  • without designed-in deterministic mechanisms – for instance, resource reservations in the data path – it is extremely difficult to keep in-check utilizations on both sides of each logical COP;
  • none of the popular storage protocols utilize resource reservations in the data path (yet).

In addition, there are the usual overheads: queuing overhead, interrupt-handling overhead, polling overhead, data copying overhead, context switching overhead, locking-of-the-shared-resources overhead, etc. All the overheads “consolidating” in and around the edges of each and every COP.

To conclude this line, I’ll illustrate the importance of keeping utilization in-check. There are many ways to do that. Consider, for example, a queue that “connects” a Markovian producer with a single server – the Pollaczek–Khinchine formula:

fig2

Expectedly, at high utilizations the queue length L and, therefore, the waiting time approaches infinity. The formula works for an M/G/1 queue – and not for an M/G/k queue (let alone G/G/k queue). It is also only a single queue connected to a single “server” – and not an entire hypothetical super-multi-queued DAG where the arrivals and service times are non-deterministic and non-Markovian.

Combinatorial Explosion

fig3

The only known to humanity way to deal with an exponential complexity is to decompose things down to fairly isolated modules/components, and design/implement – or, better – reuse them one by one, one at a time. Modular programming, SEDA, multi-tier architectures, workflow systems, normalized systems, microservices architecture – all that.

“Let me try to explain to you” – wrote Dijkstra in the essay called On the role of scientific thought “what to my taste is characteristic for all intelligent thinking. It is, that one is willing to study in depth an aspect of one’s subject matter in isolation for the sake of its own consistency, all the time knowing that one is occupying oneself only with one of the aspects <snip> It is what I sometimes have called the separation of concerns, which, even if not perfectly possible, is yet the only available technique for effective ordering of one’s thoughts”

Today, 43 years later, a logical question to ask would be: what’s modular or pluggable about the existing storage stacks, and how do the best of designs address the combinatorial effects of (environment, workload, configuration) changes multiplied by the passing of time (and therefore, changing = aging)?

Not shockingly, the answer will depend on who you ask. If you ask google, for instance, search results may appear to be limited, in a sense.

And so, here’s my final point. It may sound controversial, at first glance. Outrageous, at the second. But here it is:

Is SoC itself – a good thing? After all, when we separate IO performance from other vital concerns, such as:

data consistency, fault tolerance, data protection and security, usability, maintain-ability and upgrade-ability, features A, B, and C, protocols D, E, and F, APIs X, Y, and Z

when we do all that (separation), don’t we also, inadvertently, de-prioritize some concerns over the others?

And once de-prioritized, doesn’t the concern sort of vanish?

Well, think about it. Perhaps there will be answers, in due time. Until then, what remains for the prospective users (aka prospects) is – walking up and down the marketplace, being slightly dazzled by the variety, and wondering what kind of a package deal they’ll end up having…

fig4

Parallel optimization via multiple neural networks

When training a neural network, it is not uncommon to have to run through millions of samples, with each training sample (Xi, Yi) separately obtained by a (separate) evaluation of a system function F that maps ℜn ⇒ ℜ1 and that, when given an input Xi, produces an output Yi.

Therein lies the problem: evaluations are costly. Or – slow, which also means costly and includes a variety of times: time to evaluate the function Y=F(X), time to train the network, time to keep not using the trained network while the system is running, etc.

Let’s say, there’s a system that must be learned and that is already running. Our goal would be to start optimizing early, without waiting for a fully developed, trained-and-trusted model. Would that even be possible?

Furthermore, what if the system is highly dimensional, stateful, non-linear (as far as its multi-dimensional input), and noisy (as far as its observed and measured output). The goal would be to optimize the system’s runtime behavior via controlled actions after having observed only a few, or a few hundred, (Xi, Yi) pairs. The fewer, the better.

The Idea

In short, it’s a gradient ascent via multiple neural networks. The steps are:

  • First, diversify the networks so that each one ends up with its own unique training “trajectory”.
  • Second, train the networks, using (and reusing) the same (X, Y) dataset of training samples.
  • In parallel, exploit each of the networks to execute gradient ascents from the current local maximums of its network “siblings”.

As the term suggests, gradient ascent utilizes gradient vectors to ascend – all the way up to the function’s maximum. Since maximizing F(X) is the same as minimizing -F(X), the only thing that matters is the subject of optimization: the system’s own output versus, for instance, a distance from the corresponding function F, which is a function in its own right, often called a cost or, interchangeably, a loss.

In our case, we’d want both – concurrently. But first and foremost, we want the global max F.

The Logic

The essential logic goes as follows:

The statements 1 through 3 construct the specified number of neural networks (NNs) and their respective “runners” – the entities that asynchronously and concurrently execute NN training cycles. Network “diversity” (at 2) is achieved via hyperparameters and network architectures. In the benchmark (next section) I’ve also used a variety of optimization algorithms (namely Rprop, ADAM, RMSprop), and random zeroing-out of the weight matrices as per the specified sparsity.

At 4, we pretrain the networks on a first portion of the common training pool. At 6, we execute the main loop that consists of 3 nested loops, with the innermost 6.1.1.1 and 6.1.1.2 generating new training samples, incrementing numevals counter, and expanding the pool.

That’s about it. There are risks though, and pitfalls. Since partially trained networks generate gradients that can only be partially “trusted,” the risks involve mispartitioning of the evaluation budget between the pretraining phase at 4 and the main loop at 6. This can be dealt with by gradually increasing (multiplicatively decreasing) the number of gradient “steps,” and monitoring the success.

There’s also a general lack of convexity of the underlying system, manifesting itself in runners getting stuck in their respective local maximums. Imagine a system with 3 local maximums and 30 properly randomized neural networks – what would be the chances of all 30 getting stuck on the left and/or the right sides of the picture:

What would be the chances for k >> m, where k is the number of random networks, m – the number of local maximums?..

The Benchmark

A simplified and reduced Golang implementation of the above can be found on my github. Here, all NN runners execute their respective goroutines using their own fixed-size training “windows” into a common stream of training samples. The (simplified) division of responsibilities is as follows: runners operate on their respective windows, centralized logic rotates the windows counterclockwise when the time is right. In effect, each runner “sees,” and takes advantage of, every single evaluation, in parallel.

A number of synthetic benchmarks is available and widely used to compare global optimization methods. This includes Hartmann-6 featuring multiple local optima in a 6-dimensional unit hypercube. For the numbers of concurrent networks varying between 1 and 30, the resulting picture looks as follows:

The vertical axis on this 3-dimensional chart represents the distance from the global maximum (smaller is better), the horizontal axis – Hartmann-6 calls (from 300 to 2300), the “depth” axis – number of neural networks. The worst result is for the {k = 1} configuration consisting of a single network.

Evolution

There’s an alternative to SGD-based optimization – the so-called Natural Evolution Strategies (NES) family of the algorithms. This one comes with important benefits that allow to, for instance, fork the most “successful” or “promising” network, train it separately for a while, then merge back with its parent – the merger producing a better trained result.

The motivation remains the same: parallel training combined with global collaboration. One reason to collaborate globally boils down to finding a global maximum (as above), or – running an effective and fast multimodal (as in: good enough) optimization. In the NES case, though, collaboration gets an extra “dimension:” reusing the other network’s weights, hyperparameters, and architecture.

To be continued.

FTL, translated

The seismic shift

Fact is, NAND media is inherently not capable of executing in-place updates. Instead, the NAND device (conventionally, SSD) emulates updates via write + remap or copy + write + remap operations, and similar. The presence of such emulation (performed by SSD’s flash translation layer, or FTL) remains somewhat concealed from an external observer.

Other hardware and software modules, components and layers of storage stacks (with a notable exception of SMR) are generally not restricted by similar intervention of one’s inherent nature. They could do in-place updates if they’d wanted to.

Nonetheless, if there’s any universal trend in storage of the last decade and a half – that’d be avoiding in-place updates altogether. Circumventing them in the majority of recent solutions. Treating them as a necessary evil at best.

Read More »

Docker, NFS, ZFS, and extended attributes

It may be difficult to develop an emotional connection to all of the features of filesystems and filers. Take deduplication for instance. Dedup is cool. Rabin-Karp rolling hash, sliding-window Content Defined Chunking (CDC) – those were cool 15 years ago and remain cool today. Improvements and products (and startups) keep pouring in.

But when it comes to extended file attributes (xattrs), emotions range from a blank stare to dismay. As in: wouldn’t touch with a ten-foot pole.

Come to think of it, part of the problem is – NFS. And part of the NFS problem is that both v3 and v4 do not support xattrs. There is no support whatsoever: none, nada, zilch. And how there can be with no interoperable standard?

Read More »

Choosy Initiator

In a fully distributed storage cluster (FDSC), one possible design choice for a new storage protocol is resource reservation (RR). There are certain distinct challenges though, when working out an optimal RR schema. On the storage initiator side, one critical question that arises is whether to accept an RR response a.k.a. a bid from a given target for a given I/O request. Or, alternatively, try to renegotiate, in hopes of getting a better reservation.

The absolutely best reservation is, of course, the one that allows I/O in question to start immediately and execute at full throttle.

Hence, choosy initiator in the title, the storage initiator that grapples with the dilemma: go ahead and accept possibly suboptimal arrangement, or renegotiate. The latter inevitably comes with the risk to receive future reservations that are even more delayed.

To bet or not to bet, that is the question. To find out, I’ve used SURGE: a discrete open source event simulation framework written in Go (https://golang.org/doc/). The source includes a bunch of concrete protocol models (that run) and the usual README to get started.

OK, let’s do some definitions.

FDSC and RR

The three key aspects of a FDSC are self-containment, full connectivity, and hashed distribution:

  • Self-containment: each clustered node is a sole owner of its local resources: disks, CPU, memory, and link bandwidth;
  • Any-to-any: all nodes are interconnected through non-blocking, no-drop network core;
  • Hashed memoryless distribution: a given I/O request is mapped or hashed to a number of storage target. For writes, the number of targets must be equal or greater than the required redundancy. This I/O mapping/hashing operation does not depend on the history and on any of the previously executed I/O requests (hence, memoryless).

Note separately that the letter ‘S’ in FDSC also reads as “symmetric”. In symmetric clusters, each node performs a dual role of storage initiator and storage target simultaneously.

For now let’s just say that large fully distributed clusters counting hundreds or thousands initiators and targets require a different intra-cluster transport. That’s #1, the point that maybe does not sound very controversial. To add controversy (just for the heck of it), here’s the point #2: for large and super-large FDSC it makes a lot of sense to use RR based transport protocols. One of the key proof points boils down to convergence.

Convergence time is the time it takes for existing link-sharing (path-sharing) flows to converge on their respective optimal rates when a new flow is being added or an existing flow removed. All things being equal, each of the N flows eventually must take 1/Nth of the total bandwidth.

Figure 12 in the research paper at http://goo.gl/JUlijH shows DCTCP convergence times.

Red flow (below) is added after the blue flow has reached its steady and close-to-optimal state. The research estimates DCTCP convergence times on a 10GE network ranging anywhere from 229*d to 770*d, where d is the propagation delay. This is a considerable time corresponding to several fully transferred 128K payloads even for a very favorable (as far as speedy convergence) value of d in the order of few microseconds.

choosy-fig1
Figure 1. DCTCP convergence (courtesy DCTCP research)

In presence of multiple flows the picture gets substantially more messy but the bigger point is: irrespective of the particular link-sharing transport, during the time N flows are converging from (N-1) or (N+1), they all execute at sub-optimal rates resulting in overall sub-optimal network utilization. Hence, an appeal of the RR approach: eliminate resource sharing. Utilization time slots are simply negotiated. There will be no unexpected start-IO, complete-IO, and add-flow, delete-flow events in the middle.

If there exists anything similar in real life, that could be a timeshare ownership. On the networking side examples include RSVP in its unicast and multicast realizations (RFC2205).

In all cases (including timeshare) and independently of the concrete resource reservation protocol, each successful RR negotiation comes with a time window in combination with a full set of requested targets resources – for exclusive usage (during a given time window) by a given pending request (emphasis on exclusive). For each I/O request, there is a natural progression:

I/O request =>
   potential targets =>
      RR negotiation(s) =>
          data flows to/from ...

This results in network flows executing as prescribed by non-overlapping time windows granted in turn by the respective selected targets.

To bet or not to bet

The figure below shows one possible “resource reservation, renegotiated” (RRR) scenario:

choosy-fig2
Figure 2. RRR (example)

There are two storage initiators: purple A and green B. The initiators are trying to establish data flows with the same target X at approximately the same time.

First, initiator A tentatively reserves target X for a window starting 1.1ms later (from now), which roughly corresponds to the time target X needs to complete a (presumably) already queued 1MB transaction (not shown here).

Next, initiator B tentatively reserves X for the adjacent X’s time window at 2.3ms from now. Red X on the picture indicates the decision point.

In this example, instead of accepting whatever is presented to it at the moment, B decides that 2.3ms is too much of a delay and cancels. Simultaneously, A cancels as well, possibly because it has obtained more attractive bids from other storage targets that are not shown here. This further results in B getting the optimal reservation and utilizing it right away.

Simulation

The idea that at least sometimes it does make sense to take a risk and renegotiate requires proof. Fortunately, with SURGE we can run massive simulations to find out.

Replicast is a one FDSC-friendly RR-based protocol that was previously described – see for instance SNIA presentation Beyond Consistent Hashing and TCP.

Replicast-H (where H stands for hybrid) entails multicast control plane and unicast data. Recall that RR negotiation is a part of the reservations-based protocol control plane. When running a hybrid version of the Replicast, we negotiate up to (typically) 3 successive time windows to perform a write. We then often see the following picture:

choosy-fig3

Figure 3. Reservations without renegotiation

This above is a 200-point random sample that details latencies of 1MB size chunks written into their hashed target destinations. Green trendline in the middle shows 10-point moving average. This is a “plain” Replicast-H benchmark where each target’s bid is taken at a face value and never renegotiated.

The cluster in this case combines 90 targets and 90 storage initiators with targets operating at 1000MB/s disk throughput. Finally, there is a non-blocking no-drop 10Gbps that connects each target with each initiator.

In the simulation above 10% of the bandwidth is statically allocated for control traffic. With 1% accounting for network headers and miscellaneous overhead, that leaves 8.9Gbps for (pure) data.

In the idealistic conditions when a given initiator does not have any competition from others, the times to execute a 1MB write to a single target would be, precisely:

  • Network: 941.482µs
  • Disk: 1ms

Given 3 copies, the request could (ideally) be fully executed in slightly more than 4 milliseconds, including control handshakes and acknowledgements.

There is, however, inevitably a competition resulting in reservation conflicts and delays, and further manifesting itself in occasional end-to-end spikes up to 7ms latency and beyond – see the picture above.

In depth analysis of traces (that I will skip here) indicates that, indeed, one possible reason for the spikes is that initiators accept reservations too early. For instance, the initiator that executed the #5 in the sample (Fig. 3) could likely do better.

Unicast delivery with renegotiation

This “with renegotiation” flavor of the protocol adds a new configuration variable that currently defaults to 3 times RTT on the control plane:

choosy-fig-code1

Note: to go directly to the github source, click this and other similar code fragments. At runtime, this model will double the wait time and renegotiate reservations if and when the actual waiting time (denoted as ‘idletime’ below) exceeds the current value:

choosy-fig-code4

In addition, since 3 times RTT is just a crude initial guess, each modeled gateway (a.k.a. storage initiator) recomputes its value (‘r.maxbidwait’ in the code) for the next write request, averaging the previous value and the current one:

choosy-fig-code3

What happens after the first few transactions is that each initiator independently computes an objective average delay and from there on renegotiates only when the newly collected reservations collectively result in substantially bigger delay (in other words, the initiator renegotiates only when it “thinks” it could get a better deal).

Here’s the SURGE-simulated result for apples-to-apples identical configuration that runs 90 storage initiators and 90 storage targets, etc.:

choosy-fig4
Figure 4. Reservations with renegotiation

Notice that there are no 7ms spikes anymore. They are simply gone.

Conclusions, briefly

For differently sized chunks and disk/backend throughputs, the average improvement ranges from 8% to 20%:

choosy-fig5
Figure 5. 128K chunks, 400MB/s disk (left) and 1000MB/s disk (right)

Legend-wise, red columns depict the “with renegotiation” version.

choosy-fig6
Figure 6. 1MB chunks, 400MB/s disk (left) and 1000MB/s disk (right)

Thus, taking a risk does seem to pay off: RR based protocol where storage initiators are “choosy” and at least sometimes renegotiate their reservations outperforms its prior version.