effective-go chapter13

Concurrency

Share by communicating

Concurrent programming is a large topic and there is space only for some Go-specific highlights here.

Concurrent programming in many environments is made difficult by the subtleties required to implement correct access to shared variables. Go encourages a different approach in which shared values are passed around on channels and, in fact, never actively shared by separate threads of execution. Only one goroutine has access to the value at any given time. Data races cannot occur, by design. To encourage this way of thinking we have reduced it to a slogan:

Do not communicate by sharing memory; instead, share memory by communicating.

This approach can be taken too far. Reference counts may be best done by putting a mutex around an integer variable, for instance. But as a high-level approach, using channels to control access makes it easier to write clear, correct programs.

One way to think about this model is to consider a typical single-threaded program running on one CPU. It has no need for synchronization primitives. Now run another such instance; it too needs no synchronization. Now let those two communicate; if the communication is the synchronizer, there’s still no need for other synchronization. Unix pipelines, for example, fit this model perfectly. Although Go’s approach to concurrency originates in Hoare’s Communicating Sequential Processes (CSP), it can also be seen as a type-safe generalization of Unix pipes.

Goroutines

They’re called_goroutines_because the existing terms—threads, coroutines, processes, and so on—convey inaccurate connotations. A goroutine has a simple model: it is a function executing concurrently with other goroutines in the same address space. It is lightweight, costing little more than the allocation of stack space. And the stacks start small, so they are cheap, and grow by allocating (and freeing) heap storage as required.

Goroutines are multiplexed onto multiple OS threads so if one should block, such as while waiting for I/O, others continue to run. Their design hides many of the complexities of thread creation and management.

Prefix a function or method call with the go keyword to run the call in a new goroutine. When the call completes, the goroutine exits, silently. (The effect is similar to the Unix shell’s & notation for running a command in the background.)

1
go list.Sort()  // run list.Sort concurrently; don't wait for it.

A function literal can be handy in a goroutine invocation.

1
2
3
4
5
6
func Announce(message string, delay time.Duration) {
go func() {
time.Sleep(delay)
fmt.Println(message)
}() // Note the parentheses - must call the function.
}

In Go, function literals are closures: the implementation makes sure the variables referred to by the function survive as long as they are active.

These examples aren’t too practical because the functions have no way of signaling completion. For that, we need channels.

Channels

Like maps, channels are allocated with make , and the resulting value acts as a reference to an underlying data structure. If an optional integer parameter is provided, it sets the buffer size for the channel. The default is zero, for an unbuffered or synchronous channel.

1
2
3
ci := make(chan int)            // unbuffered channel of integers
cj := make(chan int, 0) // unbuffered channel of integers
cs := make(chan *os.File, 100) // buffered channel of pointers to Files

Unbuffered channels combine communication—the exchange of a value—with synchronization—guaranteeing that two calculations (goroutines) are in a known state.

There are lots of nice idioms using channels. Here’s one to get us started. In the previous section we launched a sort in the background. A channel can allow the launching goroutine to wait for the sort to complete.

1
2
3
4
5
6
7
8
c := make(chan int)  // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
list.Sort()
c <- 1 // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c // Wait for sort to finish; discard sent value.

Receivers always block until there is data to receive. If the channel is unbuffered, the sender blocks until the receiver has received the value. If the channel has a buffer, the sender blocks only until the value has been copied to the buffer; if the buffer is full, this means waiting until some receiver has retrieved a value.

A buffered channel can be used like a semaphore, for instance to limit throughput. In this example, incoming requests are passed to handle , which sends a value into the channel, processes the request, and then receives a value from the channel to ready the “semaphore” for the next consumer. The capacity of the channel buffer limits the number of simultaneous calls to process .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
sem <- 1 // Wait for active queue to drain.
process(r) // May take a long time.
<-sem // Done; enable next request to run.
}

func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // Don't wait for handle to finish.
}
}

Once MaxOutstanding handlers are executing process , any more will block trying to send into the filled channel buffer, until one of the existing handlers finishes and receives from the buffer.

This design has a problem, though: Serve creates a new goroutine for every incoming request, even though only MaxOutstanding of them can run at any moment. As a result, the program can consume unlimited resources if the requests come in too fast. We can address that deficiency by changing Serve to gate the creation of the goroutines. Here’s an obvious solution, but beware it has a bug we’ll fix subsequently:

1
2
3
4
5
6
7
8
9
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
process(req) // Buggy; see explanation below.
<-sem
}()
}
}

The bug is that in a Go for loop, the loop variable is reused for each iteration, so the req variable is shared across all goroutines. That’s not what we want. We need to make sure that req is unique for each goroutine. Here’s one way to do that, passing the value of req as an argument to the closure in the goroutine:

1
2
3
4
5
6
7
8
9
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<-sem
}(req)
}
}

Compare this version with the previous to see the difference in how the closure is declared and run. Another solution is just to create a new variable with the same name, as in this example:

1
2
3
4
5
6
7
8
9
10
func Serve(queue chan *Request) {
for req := range queue {
req := req // Create new instance of req for the goroutine.
sem <- 1
go func() {
process(req)
<-sem
}()
}
}

It may seem odd to write

1
req := req

but it’s legal and idiomatic in Go to do this. You get a fresh version of the variable with the same name, deliberately shadowing the loop variable locally but unique to each goroutine.

Going back to the general problem of writing the server, another approach that manages resources well is to start a fixed number of handle goroutines all reading from the request channel. The number of goroutines limits the number of simultaneous calls to process . This Serve function also accepts a channel on which it will be told to exit; after launching the goroutines it blocks receiving from that channel.

1
2
3
4
5
6
7
8
9
10
11
12
13
func handle(queue chan *Request) {
for r := range queue {
process(r)
}
}

func Serve(clientRequests chan *Request, quit chan bool) {
// Start handlers
for i := 0; i < MaxOutstanding; i++ {
go handle(clientRequests)
}
<-quit // Wait to be told to exit.
}

Channels of channels

One of the most important properties of Go is that a channel is a first-class value that can be allocated and passed around like any other. A common use of this property is to implement safe, parallel demultiplexing.

In the example in the previous section, handle was an idealized handler for a request but we didn’t define the type it was handling. If that type includes a channel on which to reply, each client can provide its own path for the answer. Here’s a schematic definition of type Request .

1
2
3
4
5
type Request struct {
args []int
f func([]int) int
resultChan chan int
}

The client provides a function and its arguments, as well as a channel inside the request object on which to receive the answer.

1
2
3
4
5
6
7
8
9
10
11
12
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Send request
clientRequests <- request
// Wait for response.
fmt.Printf("answer: %d\n", <-request.resultChan)

On the server side, the handler function is the only thing that changes.

1
2
3
4
5
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}

There’s clearly a lot more to do to make it realistic, but this code is a framework for a rate-limited, parallel, non-blocking RPC system, and there’s not a mutex in sight.

Parallelization

Another application of these ideas is to parallelize a calculation across multiple CPU cores. If the calculation can be broken into separate pieces that can execute independently, it can be parallelized, with a channel to signal when each piece completes.

Let’s say we have an expensive operation to perform on a vector of items, and that the value of the operation on each item is independent, as in this idealized example.

1
2
3
4
5
6
7
8
9
type Vector []float64

// Apply the operation to v[i], v[i+1] ... up to v[n-1].
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
for ; i < n; i++ {
v[i] += u.Op(v[i])
}
c <- 1 // signal that this piece is done
}

We launch the pieces independently in a loop, one per CPU. They can complete in any order but it doesn’t matter; we just count the completion signals by draining the channel after launching all the goroutines.

1
2
3
4
5
6
7
8
9
10
11
12
13
const numCPU = 4 // number of CPU cores

func (v Vector) DoAll(u Vector) {
c := make(chan int, numCPU) // Buffering optional but sensible.
for i := 0; i < numCPU; i++ {
go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
}
// Drain the channel.
for i := 0; i < numCPU; i++ {
<-c // wait for one task to complete
}
// All done.
}

Rather than create a constant value for numCPU, we can ask the runtime what value is appropriate. The function runtime.NumCPU returns the number of hardware CPU cores in the machine, so we could write

1
var numCPU = runtime.NumCPU()

There is also a function runtime.GOMAXPROCS , which reports (or sets) the user-specified number of cores that a Go program can have running simultaneously. It defaults to the value of runtime.NumCPU but can be overridden by setting the similarly named shell environment variable or by calling the function with a positive number. Calling it with zero just queries the value. Therefore if we want to honor the user’s resource request, we should write

1
var numCPU = runtime.GOMAXPROCS(0)

Be sure not to confuse the ideas of concurrency—structuring a program as independently executing components—and parallelism—executing calculations in parallel for efficiency on multiple CPUs. Although the concurrency features of Go can make some problems easy to structure as parallel computations, Go is a concurrent language, not a parallel one, and not all parallelization problems fit Go’s model. For a discussion of the distinction, see the talk cited inthis blog post.

A leaky buffer

The tools of concurrent programming can even make non-concurrent ideas easier to express. Here’s an example abstracted from an RPC package. The client goroutine loops receiving data from some source, perhaps a network. To avoid allocating and freeing buffers, it keeps a free list, and uses a buffered channel to represent it. If the channel is empty, a new buffer gets allocated. Once the message buffer is ready, it’s sent to the server on serverChan .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
for {
var b *Buffer
// Grab a buffer if available; allocate if not.
select {
case b = <-freeList:
// Got one; nothing more to do.
default:
// None free, so allocate a new one.
b = new(Buffer)
}
load(b) // Read next message from the net.
serverChan <- b // Send to server.
}
}

The server loop receives each message from the client, processes it, and returns the buffer to the free list.

1
2
3
4
5
6
7
8
9
10
11
12
13
func server() {
for {
b := <-serverChan // Wait for work.
process(b)
// Reuse buffer if there's room.
select {
case freeList <- b:
// Buffer on free list; nothing more to do.
default:
// Free list full, just carry on.
}
}
}

The client attempts to retrieve a buffer from freeList ; if none is available, it allocates a fresh one. The server’s send to freeList puts b back on the free list unless the list is full, in which case the buffer is dropped on the floor to be reclaimed by the garbage collector. (The default clauses in the select statements execute when no other case is ready, meaning that the selects never block.) This implementation builds a leaky bucket free list in just a few lines, relying on the buffered channel and the garbage collector for bookkeeping.


20200131220947.png

0%