Golang Fan-In Fan-Out Concurrency Pattern

Rahul Kapoor
4 min readDec 30, 2022

--

Image source https://commons.wikimedia.org/wiki/File:Go_gopher_app_engine_color.jpg

Firstly what is CONCURRENCY?

Concurrency is an ability of a program to do multiple things at the same time. Concurrency is very important in modern software, due to the need to execute independent pieces of code as fast as possible without disturbing the overall flow of the program.

It is the ability for functions to run independently of each other. Parallelism is a run-time property where two or more tasks are being executed simultaneously. Through concurrency, you want to define a proper structure for your program.

Image source https://blog.knoldus.com/achieving-concurrency-in-go

To understand the FanIn FanOut pattern and make good sense out of it you should be familiar with goroutines, waitgroups, channels etc.

If you already know about them that’s awesome, else you can read more about them in my other article: read here

Fan Out

Fan out is used when multiple functions read from the same channel. The reading will stop only when the channel is closed. This characteristic is often used to distribute work amongst a group of workers to parallelize the CPU and I /O.

package main

import (
"fmt"
"sync"
)

func generator(nums ...int) <-chan int {
myChannel := make(chan int) //declare a channel

go func() {
//iterate the nums data and sends it to channel
for _, val := range nums {
myChannel <- val
}
close(myChannel)
}()

return myChannel
}

func main() {
data1 := []int{1, 2, 3, 4, 5}
data2 := []int{10, 20, 30, 40, 50}
var wg sync.WaitGroup

//it receives a "receive-only" directional channel
ch1 := generator(data1...)
ch2 := generator(data2...)
wg.Add(2)

//we will loop through both the channels till all data is sent and marked as close
go func() {
for val := range ch1 {
fmt.Printf("Channel1 data: %v\n", val)
}
wg.Done()
}()

go func() {
for val := range ch2 {
fmt.Printf("Channel2 data: %v\n", val)
}
wg.Done()
}()

wg.Wait() //will wait till the above goroutines are marked as done
}

Try running this example live at go playground: try now!

For the above code snippet, we have multiple goroutines which are reading from the same channel which is returned by our generator func.

Generator func declares a channel, extracts data from nums and sends it to the channel. After all the extracted data is sent, channel is marked as closed and returned.

Fan In

Fan In is used when a single function reads from multiple inputs and proceeds until all are closed. This is made possible by multiplexing the input into a single channel.

For the below example, I have created 2 files at the same location named - text1.txt and text2.txt

package main

import (
"bufio"
"fmt"
"log"
"os"
"sync"
)

func readData(file string) <-chan string {
f, err := os.Open(file) //opens the file for reading
if err != nil {
log.Fatal(err)
}

out := make(chan string) //channel declared

//returns a scanner to read from f
fileScanner := bufio.NewScanner(f)
fileScanner.Split(bufio.ScanLines) //scanning it line-by-line token

//loop through the fileScanner based on our token split
go func() {
for fileScanner.Scan() {
val := fileScanner.Text() //returns the recent token
out <- val //passed the token value to our channel
}

close(out) //closed the channel when all content of file is read

//closed the file
err := f.Close()
if err != nil {
fmt.Printf("Unable to close an opened file: %v\n", err.Error())
return
}
}()

return out
}

func fanInMergeData(ch1, ch2 <-chan string) chan string {
chRes := make(chan string)
var wg sync.WaitGroup
wg.Add(2)

//reads from 1st channel
go func() {
for val := range ch1 {
chRes <- val
}
wg.Done()
}()

//reads from 2nd channel
go func() {
for val := range ch2 {
chRes <- val
}
wg.Done()
}()

go func() {
wg.Wait() //waits till the goroutines are completed and wg marked Done
close(chRes) //close the result channel
}()

return chRes
}

func main() {
ch1 := readData("text1.txt")
ch2 := readData("text2.txt")

//receive data from multiple channels and place it on result channel - FanIn
chRes := fanInMergeData(ch1, ch2)

//some logic with the result channel
for val := range chRes {
fmt.Println(val)
}
}

In the above code snippet, we are reading data from multiple files as inputs and then sending all the data we read to an out channel. We are using os.Open to open the file and bufio package to read data from the file. NewScanner() returns *Scanner which further has a Scan(): Scan advances the Scanner to the next token and will return false when it reached EOF or error.

Data we receive on looping with Scan() we keep sending to our output channel from all our input files with the common readData func. until the channel is closed.

Then we pick all the “receive” directional channels which have our data from multiple inputs and merge them together to a common out channel.

To read more about this visit Go blog and give Fan-out, fan-in section a read.

Another amazing article is: GO: A BETTER FAN-OUT, FAN-IN EXAMPLE

--

--