(Almost) ranging over multiple Go channels simultaneously

e · l · n
Oct 5, 2017

In my experiments (flowbase, scipipe) in using Flow-based programming (FBP) principles in pure Go, there is a common pattern occuring all the time: The need to synchronously read a set of values from multiple channels at the same time. That is, if I have three in-bound channels, chA, chB, chC for an FBP process (in this context basically a struct wrapper around a go-routine) I typically want to receive one set of values a, b, c at a time from these channels, do something, and then potentially send output on any number of outbound channels, or write results to stdout or a file.

I have tried various ways to do this, but most of them result in clunky, verbose and error prone code because of the need to keep track of which channels have closed and not, etc. Thus not something you'd like to do every time creating a new flowbase component, for example.

Thus, optimally, one would want to use Go's handy range keyword for looping over multiple channels, since range takes care of closing the for-loop at the right time (when the inbound channel is closed). So something like this (N.B: non-working code!):

for a, b, c := range chA, chB, chC {
    doSomething(a, b, c)
}

Unfortunately this is not possible, and probably for good reason (how would it know whether to close the loop when the first, or all of the channels are closed? etc).

Anyways, I started a twitter thread to discuss this. Then, I read this post by Dave Cheney which explains what the range keyword does under the hood when looping over channels. He pointed out that this:

for a := range chA {
    doSomething(a)
}

... is actually the same as this more detailed code:

for a, ok := <-chA; ok; a, ok = <-chA  {
    doSomething(a, b, c)
}

... which is using one variant of Go's for-syntax, which is:

for init variables; condition for continuing; what to do after each iteration  {
    doSomething(a, b, c)
}

See the relevant section in Effective Go for more info.

Together with a tweet by Inanc Gumus in the mentioned thread, I realized one can do something like this with a bit of helper functions:

package main

import "fmt"

func main() {
	bufSize := 1 // Try changing this to 0, and see what happens!
	ch1, ch2, ch3 := make(chan int, bufSize), make(chan int, bufSize), make(chan int, bufSize)
	go func() {
		defer close(ch1); defer close(ch2); defer close(ch3)
		ch1 <- 1; ch2 <- 2; ch3 <- 3
		ch3 <- 6; ch2 <- 5; ch1 <- 4 // Notice the reversed order of sends(!)
	}()

	// THIS IS WHERE TO LOOK:
	for rs, oks := readOnce(ch1, ch2, ch3); allTrue(oks); rs, oks = readOnce(ch1, ch2, ch3) {
		fmt.Println(rs)
	}
}

// -------------------------------------------------------------------------------
// Helper functions
// -------------------------------------------------------------------------------

func readOnce(chs ...chan int) ([]int, []bool) {
	rs := []int{}
	oks := []bool{}
	for _, ch := range chs {
		r, ok := <-ch
		rs = append(rs, r)
		oks = append(oks, ok)
	}
	return rs, oks
}

func allTrue(vs []bool) bool {
	for _, v := range vs {
		if !v {
			return false
		}
	}
	return true
}

If we zoom in again on the interesting part:

for rs, oks := readOnce(ch1, ch2, ch3); allTrue(oks); rs, oks = readOnce(ch1, ch2, ch3) {
	fmt.Println(rs)
}

... we can see that this is pretty close to the more detailed version of the range-loop that Dave Cheney showed above.

We can also note that this will work for any number of channels provided to readOnce(), as long as it is channels of type int, and that you're fine with getting one set of (int) results back, one array at a time, in the loop.

One must keep in mind that the close condition in the loop has to be thought-through carefully. In this example I have created the function allTrue() that just checks that all booleans in the input array are true, and otherwise returns false. This will close the for loop as soon as the first channel is closed. This might be OK if we are sure that all inbound channels will contain the same input items. If not, one will need to adapt the close condition accordingly.

Anyways, I think this is at least some progress towards something as generic as possible for reading from multiple channels simultanously and synchronosly.

Also, if you have further ideas on how to make this more succinct and generic, please let me know!

Wrap the range-like code up in a helper function

Edit: This section was added on Oct 6, 2017, 17:33 CEST

Naturally, we can now also create a helper function that wraps the somewhat messy for-loop above, so instead we would have something like this (see the syncRead() function):

package main

import "fmt"

func main() {
	bufSize := 1 // Try changing this to 0, and see what happens!
	ch1, ch2, ch3 := make(chan int, bufSize), make(chan int, bufSize), make(chan int, bufSize)
	go func() {
		defer close(ch1)
		defer close(ch2)
		defer close(ch3)
		ch1 <- 1
		ch2 <- 2
		ch3 <- 3
		ch3 <- 6
		ch2 <- 5
		ch1 <- 4 // Notice the reversed order of sends(!)
	}()

	// THIS GOT A BIT NICER NOW, RIGHT?:
	for rs := range syncRead(ch1, ch2, ch3) {
		fmt.Println(rs)
	}
}

// -- Helper funcs --------------------------------------------------------------------------------

func syncRead(chs ...chan int) chan []int {
	outChan := make(chan []int, 16)
	go func() {
		defer close(outChan)
		for rs, oks := recvOneEach(chs...); allTrue(oks); rs, oks = recvOneEach(chs...) {
			outChan <- rs
		}
	}()
	return outChan
}

func recvOneEach(chs ...chan int) ([]int, []bool) {
	rs := []int{}
	oks := []bool{}
	for _, ch := range chs {
		r, ok := <-ch
		rs = append(rs, r)
		oks = append(oks, ok)
	}
	return rs, oks
}

func allTrue(vs []bool) bool {
	for _, v := range vs {
		if !v {
			return false
		}
	}
	return true
}

Even better: Avoid the allTrue method

Edit: This section was added on Dec 1, 2017, 21:56 CEST

Thanks to the suggestion by ar3s3ru in the comments below, we can now also remove the allTrue helper method, and so make the code even more succinct:

package main

import "fmt"

func main() {
	bufSize := 1 // Try changing this to 0, and see what happens!
	ch1, ch2, ch3 := make(chan int, bufSize), make(chan int, bufSize), make(chan int, bufSize)
	go func() {
		defer close(ch1)
		defer close(ch2)
		defer close(ch3)
		ch1 <- 1
		ch2 <- 2
		ch3 <- 3
		ch3 <- 6
		ch2 <- 5
		ch1 <- 4 // Notice the reversed order of sends(!)
	}()

	// THIS GOT A BIT NICER NOW, RIGHT?:
	for rs := range syncRead(ch1, ch2, ch3) {
		fmt.Println(rs)
	}
}

// -- Helper funcs --------------------------------------------------------------------------------

func syncRead(chs ...chan int) chan []int {
	outChan := make(chan []int, 16)
	go func() {
		defer close(outChan)
		for rs, ok := recvOneEach(chs...); ok; rs, ok = recvOneEach(chs...) {
			outChan <- rs
		}
	}()
	return outChan
}

func recvOneEach(chs ...chan int) (rs []int, ok bool) {
	ok = true
	for _, ch := range chs {
		r, ok2 := <-ch
		rs, ok = append(rs, r), ok && ok2
	}
	return rs, ok
}

The only limitation to this is of course that the the helper functions work only for channels of type int (and also all of them need to be of the same type), so you would need to create separate implementations for each type, or of course, fall back to use the empty interface (aka the G package) :o)

Keywords: Go, Golang, Pipelines, Patterns