Ketan Singh

Collection of musing and ramblings

Pipeline Pattern in Go Part 1

Posted at — Jul 2, 2021 · 6 min read

I am going to divide this post into two parts, In the first part I will try to explain basic building blocks of pipelines and in second post I will try to build a general purpose library around this design.

I was recently reading Concurrency In Go and came across something called as pipeline processing pattern. Idea is you can break a logical functionality into stages. Each stage does its own processing and passes the output to the next stage to get processed. You can modify stages independent of one another, rate limit the stages and so on and forth.

Pipeline basics

Consider these two functions

1func Multiply(value ,multiplier int) int { 
2	return value*multiplier
3}
1func Add(value,additive int) int { 
2	return value+additive
3}

What these function do is very simple, They just perform the arithmetic operation on number with a constant and return it. You can think of these as “stages” of pipeline. To complete the pipeline we can just combine both stages.

1
2ints := []int{1,2,3,4}
3
4for _,v := range ints {
5	fmt.Println(multiply(add(multiply(v,2),1),2))
6}
7

We can observe here that stage consumes the input, processes it and returns the same type for further stage processing.

Concurrent Pipelines

This simplistic model can now be extended to utilize go’s channels and goroutines to perform the processing of the stages concurrently. Before we can do that we must have following entities in our pipeline.

Let’s start with generator

 1func generator(done <-chan interface{}, integers ...int) <-chan int {
 2	intStream := make(chan int)
 3	go func() {
 4		defer close(intStream)
 5		for _, i := range integers {
 6			select {
 7			case <-done: return
 8			case intStream <- i:
 9			}
10		}
11
12	}()
13	return intStream
14}

This function simply spawns a goroutine which tries to produce values on a channel, and the function simply returns that channels. Values generated on this channel serves as the input for further stages. We’re also passing done channel in the function to gracefully exit the generation, this is also known as Poison Pill Pattern

Extending the same idea, we can rewrite multiply and add function to do the processing concurrently.

 1
 2func multiply(done <-chan interface{}, intStream <-chan int, multiplier int) chan int {
 3	multipliedStream := make(chan int)
 4	go func() {
 5		defer close(multipliedStream)
 6		for i := range intStream {
 7			select {
 8			case <-done: return
 9			case multipliedStream <- i * multiplier:
10			}
11		}
12	}()
13	return multipliedStream
14}
15
16
17func add(done <-chan interface{}, intStream <-chan int, adder int) chan int {
18	addedStream := make(chan int)
19	go func() {
20		defer close(addedStream)
21		for i := range intStream {
22			select {
23			case <-done:return
24			case addedStream <- i + adder:
25			}
26		}
27	}()
28	return addedStream
29}
30

Sidenote: Perhaps we can rewrite these functions to accept interface or function instead of writing the function twice which share most of the code, but I am trying to keep things simple here.

In order to build pipelines we can combine these stages of pipelines as

1
2	done := make(chan interface{})
3	intStream := generator(done, 1, 2, 3, 4)
4
5	pipeline := add(done, multiply(done, intStream, 2), 5)
6
7	for i := range pipeline {
8		fmt.Println(i)
9	}

This piece of code will run stages concurrently in different pipes, keep passing their result to next stage via go channels. We can get the final results by reading last stage’s channel.

More practical use case

Ok, but how about something more practical? We can implement a toy shopify scraper using this design. Aim is to crawl shopify app directory page and scrape the useful information.

Here’s the basic design algorithm we can follow

In pipeline terms, we need to build something like this

Scraper

Lets define App struct

1type App struct {
2    Rating     float64
3    Review     int
4    Name       string
5    Category   []string
6    AppLink    string
7    PageLink   string
8}

I am not writing the actual scraping code to keep it short and simple but scraping logic can easily be plugged in without changing code structure.

 1
 2type ShopifyScraper struct {}
 3
 4
 5
 6// Generation Stage, This stage will generate app directory URLs.
 7func (s *ShopifyScraper) Generate(done <-chan bool) chan App {
 8
 9	links := make(chan App)
10	go func() {
11		defer close(links)
12		for i := 0; i < 5; i++ {
13			select {
14			case <-done: return
15			case links <- App{PageLink: fmt.Sprintf("https://apps.shopify.com/browse/all?page=%d", i)}:
16			}
17		}
18	}()
19	return links
20}
21
22// App Directory Scraper Stage, This stage will visit directory page and scrape different app URLs from the page for further processing.
23func (s *ShopifyScraper) AppDirectoryScraper(done <-chan bool, input <-chan App) chan App {
24
25	apps := make(chan App, 5)
26	go func() {
27		defer close(apps)
28		for app := range input {
29
30			// Extract Different App URLs by scraping  using app.PageLink
31
32			select {
33			case <-done: return
34			case apps <- app:
35			}
36		}
37	}()
38	return apps
39}
40
41// App Scraper Stage, This stage will visit app page and scrape app information
42func (s *ShopifyScraper) AppScraper(done <-chan bool, input <-chan App) chan App {
43	apps := make(chan App)
44	go func() {
45		defer close(apps)
46		for app := range input {
47			
48			// Extract Different App Information by scraping by using app.AppLink
49			
50			select {
51			case <-done: return
52			case apps <- app:
53
54			}
55		}
56	}()
57	return apps
58}
59
60

Then to finally link the stages together to form a pipeline.

 1
 2func (s *ShopifyScraper) Do() []AppInfo {
 3
 4	var result []AppInfo
 5	done := make(chan bool)
 6	defer close(done)
 7
 8	for i := range s.AppScraper(done, s.AppDirectoryScraper(done, s.Generate(done))) {
 9		result = append(result, i)
10	}
11	return result
12}
13

In the next post we will try to build pipelines as a library which can take care of (1) Pipeline ordering (2) Concurrently executing same stage in multiple goroutines (3) Cancellation (4) Possibly error handing / retry