Parallel optimization via multiple neural networks

When training a neural network, it is not uncommon to have to run through millions of samples, with each training sample (Xi, Yi) separately obtained by a (separate) evaluation of a system function F that maps ℜn ⇒ ℜ1 and that, when given an input Xi, produces an output Yi.

Therein lies the problem: evaluations are costly. Or – slow, which also means costly and includes a variety of times: time to evaluate the function Y=F(X), time to train the network, time to keep not using the trained network while the system is running, etc.

Let’s say, there’s a system that must be learned and that is already running. Our goal would be to start optimizing early, without waiting for a fully developed, trained-and-trusted model. Would that even be possible?

Furthermore, what if the system is highly dimensional, stateful, non-linear (as far as its multi-dimensional input), and noisy (as far as its observed and measured output). The goal would be to optimize the system’s runtime behavior via controlled actions after having observed only a few, or a few hundred, (Xi, Yi) pairs. The fewer, the better.

The Idea

In short, it’s a gradient ascent via multiple neural networks. The steps are:

  • First, diversify the networks so that each one ends up with its own unique training “trajectory”.
  • Second, train the networks, using (and reusing) the same (X, Y) dataset of training samples.
  • In parallel, exploit each of the networks to execute gradient ascents from the current local maximums of its network “siblings”.

As the term suggests, gradient ascent utilizes gradient vectors to ascend – all the way up to the function’s maximum. Since maximizing F(X) is the same as minimizing -F(X), the only thing that matters is the subject of optimization: the system’s own output versus, for instance, a distance from the corresponding function F, which is a function in its own right, often called a cost or, interchangeably, a loss.

In our case, we’d want both – concurrently. But first and foremost, we want the global max F.

The Logic

The essential logic goes as follows:

The statements 1 through 3 construct the specified number of neural networks (NNs) and their respective “runners” – the entities that asynchronously and concurrently execute NN training cycles. Network “diversity” (at 2) is achieved via hyperparameters and network architectures. In the benchmark (next section) I’ve also used a variety of optimization algorithms (namely Rprop, ADAM, RMSprop), and random zeroing-out of the weight matrices as per the specified sparsity.

At 4, we pretrain the networks on a first portion of the common training pool. At 6, we execute the main loop that consists of 3 nested loops, with the innermost 6.1.1.1 and 6.1.1.2 generating new training samples, incrementing numevals counter, and expanding the pool.

That’s about it. There are risks though, and pitfalls. Since partially trained networks generate gradients that can only be partially “trusted,” the risks involve mispartitioning of the evaluation budget between the pretraining phase at 4 and the main loop at 6. This can be dealt with by gradually increasing (multiplicatively decreasing) the number of gradient “steps,” and monitoring the success.

There’s also a general lack of convexity of the underlying system, manifesting itself in runners getting stuck in their respective local maximums. Imagine a system with 3 local maximums and 30 properly randomized neural networks – what would be the chances of all 30 getting stuck on the left and/or the right sides of the picture:

What would be the chances for k >> m, where k is the number of random networks, m – the number of local maximums?..

The Benchmark

A simplified and reduced Golang implementation of the above can be found on my github. Here, all NN runners execute their respective goroutines using their own fixed-size training “windows” into a common stream of training samples. The (simplified) division of responsibilities is as follows: runners operate on their respective windows, centralized logic rotates the windows counterclockwise when the time is right. In effect, each runner “sees,” and takes advantage of, every single evaluation, in parallel.

A number of synthetic benchmarks is available and widely used to compare global optimization methods. This includes Hartmann-6 featuring multiple local optima in a 6-dimensional unit hypercube. For the numbers of concurrent networks varying between 1 and 30, the resulting picture looks as follows:

The vertical axis on this 3-dimensional chart represents the distance from the global maximum (smaller is better), the horizontal axis – Hartmann-6 calls (from 300 to 2300), the “depth” axis – number of neural networks. The worst result is for the {k = 1} configuration consisting of a single network.

Evolution

There’s an alternative to SGD-based optimization – the so-called Natural Evolution Strategies (NES) family of the algorithms. This one comes with important benefits that allow to, for instance, fork the most “successful” or “promising” network, train it separately for a while, then merge back with its parent – the merger producing a better trained result.

The motivation remains the same: parallel training combined with global collaboration. One reason to collaborate globally boils down to finding a global maximum (as above), or – running an effective and fast multimodal (as in: good enough) optimization. In the NES case, though, collaboration gets an extra “dimension:” reusing the other network’s weights, hyperparameters, and architecture.

To be continued.

SURGE: performance modeling for distributed systems

Foreword

As far as distributed system and storage software, finding out how it'll perform at scale - is hard.

Expensive and time-consuming as well, often impossible. When there are the first bits to run, then there’s maybe one, two hypervisors at best (more likely one though). That’s a start.

Early stages have their own exhilarating freshness. Survival is what matters then, what matters always. Questions in re hypothetical performance at 10x scales sound far-fetched, almost superficial. Answers are readily produced – in flashy powerpoints. The risk however remains, carved deep into the growing codebase, deeper inside the birthmarks of the original conception. Risk that the stuff we’ll spend next two, three years to stabilize will not perform.

The Goal

The goal is modeling the performance of a distributed system of any size (emphasis on modeling, performance and any size). Which means – uncovering the behavioral patterns (periodic spike-downs and, generally, any types of pseudo-regular irregularities), charting throughputs and latencies and their respective distributions concealed behind performance averages. And tails of those distributions, those that are in the single-digit percentile ranges.

Average throughput, average IOPS, average utilization, average-anything is not enough – we need to see what is really going on. For any scale, any configuration, any ratios of: clients and clustered nodes, network bandwidth and disk throughput, chunk/block sizes, you name it.

Enter SURGE, discrete event simulation framework written in Go and posted on GitHub. SURGE translates (admittedly, with a certain effort) as Simulator for Unsolicited and Reservation Group based and Edge-driven distributed systems. Take it or leave it (I just like the name).

Go aka golang, on the other hand, is a programming language1 2.

Go

Go is an open source programming language introduced in 2007 by Rob Pike et al. (Google). It is a compiled, statically typed language in the tradition of C with garbage collection, runtime reflection and CSP-style concurrent programming.

CSP stands for Communicating Sequential Processes, a formal language, or more exactly, a notation that allows to formally specify interactions between concurrent processes. CSP has a history of impacting designs of programming languages.

Runtime reflection is the capability to examine and modify the program’s own structure and behavior at runtime.

Go’s reflection appears to be very handy when it comes to supporting IO pipeline abstractions, for example. But more about that later. As far as concurrency, Rob Pike’s presentation is brief and to the point imho. To demonstrate the powers (and get the taste), let’s look at a couple lines of code:

play.golang.org/p/B7U_ua0bzk

In this case, notation 'go function-name' causes the named function to run in a separate goroutine – a lightweight thread and, simultaneously, a built-in language primitive.

Go runtime scheduler multiplexes potentially hundreds of thousands of goroutines onto underlying OS threads.

The example above creates a bidirectional channel called messages (think of it as a typed Unix pipe) and spawns two concurrent goroutines: send() and recv(). The latter run, possibly on different processor cores, and use the channel messages to communicate. The sender sends random ASCII codes on the channel, the receiver prints them upon reception. When 10 seconds are up, the main goroutine (the one that runs main()) closes the channel and exits, thus closing the child goroutines as well.

Although minimal and simplified, this example tries to indicate that one can maybe use Go to build an event-distributing, event-driven system with an arbitrary number of any-to-any interconnected and concurrently communicating players (aka actors). The system where each autonomous player would be running its own compartmentalized piece of event handling logic.

Hold on to this visual. In the next section: the meaning of Time.

Time

In SURGE every node of a modeled cluster runs in a separate goroutine. When things run in parallel there is generally a need to go extra length to synchronize and sequence them. In physical world the sequencing, at least in part, is done for us by the laws of physics. Node A sending message to remote node B can rest assured that it will not see the response from node B prior to this sent message being actually delivered, received, processed, response created, and in turn delivered to A.

The corresponding interval of time depends on the network bandwidth, rate of the A ⇔ B flow at the time, size of the aforementioned message, and a few other utterly material factors.

That’s in the physical world. Simulated clusters and distributed models cannot rely on natural sequencing of events. With no sequencing there is no progression of time. With no progression there is no Time at all – unless…

Unless we model it. For starters let’s recall an age-old wisdom: time is not continuous (as well as, reportedly, space). Time rather is a sequence of discrete NOWs: one indivisible tick (of time) at a time. Per quantum physics the smallest time unit could be the Planck time ≈5.39*10-44s although nobody knows for sure. In modeling, however, one can reasonably ascertain that there is a total uneventful void, literally nothing between NOW and NOW + 1.

In SURGE, the smallest indivisible tick of time is 1 nanosecond, a configurable default.

In a running operating physical cluster each NOW instant is filled with events: messages ready to be sent, messages ready to be received, events being handled right NOW. There are also en route messages and events sitting in disk and network queues and waiting for their respective future NOWs.

Let’s for instance imagine that node A is precisely NOW ready to transmit a 8KB packet to node B:

node-a-to-node-b

Given full 10Gbps of unimpeded bandwidth between A and B and the trip time, say, 1µs, we can then with a high level of accuracy predict that B will receive this packet (819ns + 1µs) later, that is at NOW+1.819µs as per the following:

play.golang.org/p/RZpYY_hNnQ

In this snippet of modifiable-and-runnable code, the local variable sizebits holds the number of bits to send or receive while bwbitss is a link bandwidth, in bits per second.

Time as Categorical Imperative

Here’s a statement of correctness that, on the face of it, may sound trivial. At any point in time all past events executed in a given model are either already fully handled and done or are being processed right now.

A past event is of course an event scheduled to trigger (to happen) in the past: at (NOW-1) or earlier. This statement above in a round-about way defines the ticking living time:

At any instant of time all past events did already trigger.

And the collateral:

Simulated distributed system transitions from (NOW-1) to NOW if and only when absolutely all past events in the system did happen.

Notice that so far this is all about the past – the modeled before. The after is easier to grasp:

For each instant of time all future events in the model are not yet handled - they are effectively invisible as far as designated future handlers.

In other words, everything that happens in a modeled world is a result of prior events, and the result of everything-that-happens is: new events. Event timings define the progression of Time itself. The Time in turn is a categorical imperative – a binding constraint (as per the true statements above) on all events in the model at all times, and therefore on all event-producing, event-handling active players – the players that execute their own code autonomously and in parallel.

Timed Event

To recap. Distributed cluster is a bunch of interconnected nodes (it always is). Each node is an active Go player in the sense that it runs its own typed logic in its own personal goroutine. Nodes continuously generate events, assign them their respective computed times-to-trigger and fan them out onto respective Go channels. Nodes also continuously handle events when the time is right.

By way of a sneak peek preview of timed events and event-driven times, here’s a life of a chunk (a block of object’s data or metadata) in one of the SURGE’s models:
example-executed-chunk
The time above runs on the left, event names are abbreviated and capitalized (e.g. MCPRE). With hundreds and thousands of very chatty nodes in the model, logs like this one become really crowded really fast.

In SURGE framework each and every event is timed, and each timed event implements the following abstract interface:

type EventInterface interface {
    GetSource() RunnerInterface
    GetCreationTime() time.Time
    GetTriggerTime() time.Time
    GetTarget() RunnerInterface
    GetTio() *Tio
    GetGroup() GroupInterface
    GetSize() int
    IsMcast() bool
    GetTioStage() string
    String() string
}

This reads as follows. There is always an event source, event creation time and event trigger time. Some events have a single remote target, others are targeting a group (of targets). Event’s source and event’s target(s) are in turn clustered nodes themselves that implement (RunnerInterface).

All events are delivered to their respective targets at prescribed time, as per the GetTriggerTime() event’s accessor. The Time-defining imperative (above) is enforced with each and every tick of time.

In the next installment of SURGE series: ping-pong model, rate bucket abstraction, IO pipeline and more.

Go Anagram

The history is well known. Go started as a pragmatic effort by Google, to answer their own software needs to manage hundreds of thousands (some say, tens of millions) of servers in Google’s Data Centers. If there’s anywhere a scale, Google has it. Quoting the early introduction (which I strongly suggest to read) Go at Google: Language Design in the Service of Software Engineering:

The Go programming language was conceived in late 2007 as an answer to some of the problems we were seeing developing software infrastructure at Google. The computing landscape today is almost unrelated to the environment in which the languages being used, mostly C++, Java, and Python, had been created. The problems introduced by multicore processors, networked systems, massive computation clusters, and the web programming model were being worked around rather than addressed head-on.
<snip>
Go was designed and developed to make working in this environment more productive. Besides its better-known aspects such as built-in concurrency and garbage collection, Go’s design considerations include rigorous dependency management, the adaptability of software architecture as systems grow, and robustness across the boundaries between components.

When I first started looking at Go aka golang 1 it was strictly in connection with Docker, coreos/rkt, LXD and various other Containers. All of which happen to be coded in Go.

Your First Go

“Like many programmers I like to try out new languages” – a quote from Adam Leventhal’s blog  on his first Rust program: anagrammer. My contention as well 2. Not sure about Rust though but my today’s anagrammer in Go follows below, likely far from the most elegant:

 1 package main
 2
 3 import (
 4         "bufio"
 5         "fmt"
 6         "log"
 7         "os"
 8         "sort"
 9         "strings"
 10 )
 11
 12 func normalize(word string) string {
 13         lcword := strings.ToLower(word)
 14         letters := []string{}
 15
 16         for _, cp := range lcword {
 17                 letters = append(letters, string(cp))
 18         }
 19         sort.Strings(letters)
 20         return strings.Join(letters, "")
 21 }
 22
 23 func do_wordmap() *map[string][]string {
 24         file, err := os.Open("/usr/share/dict/words")
 25         if err != nil {
 26                 log.Fatal(err)
 27         }
 28         defer file.Close()
 29
 30         var allwords []string
 31         scanner := bufio.NewScanner(file)
 32         for scanner.Scan() {
 33                 t := scanner.Text()
 34                 allwords = append(allwords, t)
 35         }
 36
 37         wordmap := make(map[string][]string)
 38         for _, w := range allwords {
 39                 nw := normalize(w)
 40                 wordmap[nw] = append(wordmap[nw], w)
 41         }
 42         return &wordmap
 43 }
 44
 45 func main() {
 46         wordmap := do_wordmap()
 47
 48         syn := bufio.NewReader(os.Stdin)
 49         for {
 50                 fmt.Print("Enter a word: ")
 51                 myword, _ := syn.ReadString('\n')
 52                 myword = strings.TrimSuffix(myword, "\n")
 53                 normal_w := normalize(myword)
 54
 55                 fmt.Println((*wordmap)[normal_w])
 56         }
 57 }

It runs like this:

Enter a word: spare
[pares parse pears rapes reaps spare spear]

Distributed, Concurrent and Parallel

To facilitate distributed concurrent parallel processing on a massive scale, the language must include developer friendly primitives. To that end, Go includes for instance:

  • goroutine, for multitasking
  • channel, to communicate between the tasks

The latter were adopted from communicating sequential processes (CSP) first described in a 1978 paper by C. A. R. Hoare.

Here’s a snippet of code that will make sure to store concurrently exactly 3 replicas of each chunk:

  1 replicas_count := make(chan bool, len(servers))
  2
  3 for _, s := range servers {
  4     go func(s *StorageServer) {
  5         s.Put(chunk)
  6         replicas_count <- true
  7     }(s)
  8 }
  9
 10 for n := 0; n < 3; n++ {
 11     <-replicas_count
 12 }

That’s the power of Go.


  1. Use “golang” to disambiguate your google searches 
  2. Writing code focuses your mind and untroubles your soul (c)