I am going to divide this post into two parts, In the first part I will try to explain basic building blocks of pipelines and in second post I will try to build a general purpose library around this design.
I was recently reading Concurrency In Go and came across something called as pipeline processing pattern. Idea is you can break a logical functionality into stages. Each stage does its own processing and passes the output to the next stage to get processed. You can modify stages independent of one another, rate limit the stages and so on and forth.
Consider these two functions
1func Multiply(value ,multiplier int) int {
2 return value*multiplier
3}
1func Add(value,additive int) int {
2 return value+additive
3}
What these function do is very simple, They just perform the arithmetic operation on number with a constant and return it. You can think of these as “stages” of pipeline. To complete the pipeline we can just combine both stages.
1
2ints := []int{1,2,3,4}
3
4for _,v := range ints {
5 fmt.Println(multiply(add(multiply(v,2),1),2))
6}
7
We can observe here that stage consumes the input, processes it and returns the same type for further stage processing.
This simplistic model can now be extended to utilize go’s channels and goroutines to perform the processing of the stages concurrently. Before we can do that we must have following entities in our pipeline.
Generator, which would be responsible for producing the input for pipeline processing. You think of this as first stage of the pipeline.
Stages, where the actual processing can be performed.
Canceller, a mechanism to signal cancellation or end of processing by pipeline.
Let’s start with generator
1func generator(done <-chan interface{}, integers ...int) <-chan int {
2 intStream := make(chan int)
3 go func() {
4 defer close(intStream)
5 for _, i := range integers {
6 select {
7 case <-done: return
8 case intStream <- i:
9 }
10 }
11
12 }()
13 return intStream
14}
This function simply spawns a goroutine which tries to produce values on a channel, and the function simply returns that channels. Values generated on this channel serves as the input for further stages. We’re also passing done channel in the function to gracefully exit the generation, this is also known as Poison Pill Pattern
Extending the same idea, we can rewrite multiply and add function to do the processing concurrently.
1
2func multiply(done <-chan interface{}, intStream <-chan int, multiplier int) chan int {
3 multipliedStream := make(chan int)
4 go func() {
5 defer close(multipliedStream)
6 for i := range intStream {
7 select {
8 case <-done: return
9 case multipliedStream <- i * multiplier:
10 }
11 }
12 }()
13 return multipliedStream
14}
15
16
17func add(done <-chan interface{}, intStream <-chan int, adder int) chan int {
18 addedStream := make(chan int)
19 go func() {
20 defer close(addedStream)
21 for i := range intStream {
22 select {
23 case <-done:return
24 case addedStream <- i + adder:
25 }
26 }
27 }()
28 return addedStream
29}
30
Sidenote: Perhaps we can rewrite these functions to accept interface or function instead of writing the function twice which share most of the code, but I am trying to keep things simple here.
In order to build pipelines we can combine these stages of pipelines as
1
2 done := make(chan interface{})
3 intStream := generator(done, 1, 2, 3, 4)
4
5 pipeline := add(done, multiply(done, intStream, 2), 5)
6
7 for i := range pipeline {
8 fmt.Println(i)
9 }
This piece of code will run stages concurrently in different pipes, keep passing their result to next stage via go channels. We can get the final results by reading last stage’s channel.
Ok, but how about something more practical? We can implement a toy shopify scraper using this design. Aim is to crawl shopify app directory page and scrape the useful information.
Here’s the basic design algorithm we can follow
Goto app directory page e.g https://apps.shopify.com/browse/all?page=1 
From each app directory page scrape URLs to different apps.
Visit each app URL scrape app information and populate our struct.

Walk through the pagination to go to the next app directory page and repeat.
In pipeline terms, we need to build something like this

Lets define App struct
1type App struct {
2 Rating float64
3 Review int
4 Name string
5 Category []string
6 AppLink string
7 PageLink string
8}
I am not writing the actual scraping code to keep it short and simple but scraping logic can easily be plugged in without changing code structure.
1
2type ShopifyScraper struct {}
3
4
5
6// Generation Stage, This stage will generate app directory URLs.
7func (s *ShopifyScraper) Generate(done <-chan bool) chan App {
8
9 links := make(chan App)
10 go func() {
11 defer close(links)
12 for i := 0; i < 5; i++ {
13 select {
14 case <-done: return
15 case links <- App{PageLink: fmt.Sprintf("https://apps.shopify.com/browse/all?page=%d", i)}:
16 }
17 }
18 }()
19 return links
20}
21
22// App Directory Scraper Stage, This stage will visit directory page and scrape different app URLs from the page for further processing.
23func (s *ShopifyScraper) AppDirectoryScraper(done <-chan bool, input <-chan App) chan App {
24
25 apps := make(chan App, 5)
26 go func() {
27 defer close(apps)
28 for app := range input {
29
30 // Extract Different App URLs by scraping using app.PageLink
31
32 select {
33 case <-done: return
34 case apps <- app:
35 }
36 }
37 }()
38 return apps
39}
40
41// App Scraper Stage, This stage will visit app page and scrape app information
42func (s *ShopifyScraper) AppScraper(done <-chan bool, input <-chan App) chan App {
43 apps := make(chan App)
44 go func() {
45 defer close(apps)
46 for app := range input {
47
48 // Extract Different App Information by scraping by using app.AppLink
49
50 select {
51 case <-done: return
52 case apps <- app:
53
54 }
55 }
56 }()
57 return apps
58}
59
60
Then to finally link the stages together to form a pipeline.
1
2func (s *ShopifyScraper) Do() []AppInfo {
3
4 var result []AppInfo
5 done := make(chan bool)
6 defer close(done)
7
8 for i := range s.AppScraper(done, s.AppDirectoryScraper(done, s.Generate(done))) {
9 result = append(result, i)
10 }
11 return result
12}
13
In the next post we will try to build pipelines as a library which can take care of (1) Pipeline ordering (2) Concurrently executing same stage in multiple goroutines (3) Cancellation (4) Possibly error handing / retry