Go Pipelines: Difference between revisions
No edit summary |
|||
(38 intermediate revisions by the same user not shown) | |||
Line 4: | Line 4: | ||
A pipeline is a data processing pattern, aimed at processing [[#Streams|streams of data]]. A pipeline consists of a series of [[#Stage|stages]]. The output of a stage is connected to the input of the subsequent stage, unless they happen to be the first and last stage. The input of the first stage is the input of the pipeline. The output of the last stage is the output of the pipeline. The pipeline pattern is powerful because it offers separation of concerns: each stage can implement a different concern. Thus, stages can be modified independently of one another, the stages can be mixed and matched, arranged in fan-out and fan-out topologies, etc. | A pipeline is a data processing pattern, aimed at processing [[#Streams|streams of data]]. A pipeline consists of a series of [[#Stage|stages]]. The output of a stage is connected to the input of the subsequent stage, unless they happen to be the first and last stage. The input of the first stage is the input of the pipeline. The output of the last stage is the output of the pipeline. The pipeline pattern is powerful because it offers separation of concerns: each stage can implement a different concern. Thus, stages can be modified independently of one another, the stages can be mixed and matched, arranged in fan-out and fan-out topologies, etc. | ||
Generally, the limiting factor on the pipeline is either the [[#Generator|generator]] or the stages that are computationally intensive. | |||
=Stage= | =Stage= | ||
Line 12: | Line 14: | ||
Reification means that the language exposes a concept to the developers so they can work with it directly. Examples: functions, classes. | Reification means that the language exposes a concept to the developers so they can work with it directly. Examples: functions, classes. | ||
Stages can be combined at a higher level without modifying the stages themselves. | Stages can be combined at a higher level without modifying the stages themselves. The same stage can be reused multiple times in the same pipeline. | ||
Stages are capable of safely processing [[#element|stream elements]] concurrently. A stage reads stream data from a concurrent-safe input and sends it on a concurrent-safe output. This arrangement allows stages to execute independently of one another. | Stages are capable of safely processing [[#element|stream elements]] concurrently. A stage reads stream data from a concurrent-safe input and sends it on a concurrent-safe output. This arrangement allows stages to execute independently of one another. | ||
Line 30: | Line 32: | ||
The <code>Stage</code> <code>struct</code> looks like this: | The <code>Stage</code> <code>struct</code> looks like this: | ||
<syntaxhighlight lang='go'> | <syntaxhighlight lang='go'> | ||
// Stage is a structure representing a pipeline stage. | // Stage is a structure representing a pipeline stage. The stage creates and manages its own output stream, | ||
// which is an externally read-only channel that is used to send out the elements processed by the stage, and | |||
// a "thread pump" that pulls elements from the input channel, processes them by invoking the element processor | |||
// function on them, and then sends them on the output channel. The stage is preemptable via the "done" channel. | |||
type Stage struct { | type Stage struct { | ||
name | name string | ||
done | done <-chan interface{} | ||
// The read only input stream, it is external, and usually belongs to Stage preceding this one in the pipeline. | |||
in <-chan interface{} | |||
elementProcessor func(e | // The externally read only output stream, it is created by the Stage. | ||
out <-chan interface{} | |||
elementProcessor func(e interface{}) interface{} | |||
} | } | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Note that we could use either a concrete type as <code><some_type></code> or <code>interface{}</code>. If a generically-written stage needs to work with a specific type, [[ | Note that we could use either a concrete type as <code><some_type></code> or <code>interface{}</code>. If a generically-written stage needs to work with a specific type, [[Go_Type_Assertion#Overview|type assertions]] can be used. Also, a stage that just performs the type assertion could be inserted in the pipeline. | ||
<span id='Preventing_Goroutine_Leak'></code>The <code>Stage</code> constructor defines the "thread pump" behavior, in the form of an anonymous function. Among other things, the function closes the out channel when it is not needed anymore. Also, the goroutine exists when something gets written on the <code>done</code>, as an embodiment of the pattern described in the [[Go_Language_Goroutines#Preventing_Goroutines_Leak|Preventing Goroutines Leak]] section. | <span id='Preventing_Goroutine_Leak'></code>The <code>Stage</code> constructor defines the "thread pump" behavior, in the form of an anonymous function. Among other things, the function closes the out channel when it is not needed anymore. Also, the goroutine exists when something gets written on the <code>done</code>, as an embodiment of the pattern described in the [[Go_Language_Goroutines#Preventing_Goroutines_Leak|Preventing Goroutines Leak]] section. | ||
<syntaxhighlight lang='go'> | <syntaxhighlight lang='go'> | ||
func NewStage(name string, done <-chan interface{}, precedingStage *Stage, elementProcessor func(e | func NewStage(name string, done <-chan interface{}, precedingStage *Stage, elementProcessor func(e interface{}) interface{}) *Stage { | ||
out := make(chan | out := make(chan interface{}) | ||
stage := &Stage{ | stage := &Stage{ | ||
name: name, | name: name, | ||
done: done, | done: done, | ||
in: precedingStage.out, | in: precedingStage.out, | ||
out: out, | out: out, | ||
elementProcessor: elementProcessor, | elementProcessor: elementProcessor, | ||
Line 79: | Line 83: | ||
<syntaxhighlight lang='go'> | <syntaxhighlight lang='go'> | ||
func NewGenerator(name string, done <-chan interface{}, input [] | func NewGenerator(name string, done <-chan interface{}, input []int) *Stage { | ||
outputStream := make(chan interface{}) | |||
stage := &Stage{ | stage := &Stage{ | ||
name: name, | name: name, | ||
done: done, | done: done, | ||
out: | out: outputStream, | ||
} | } | ||
// create and start the thread pump that reads from the input slice and writes them on the output stream | // create and start the thread pump that reads from the input slice and writes them on the output stream | ||
Line 95: | Line 99: | ||
case <-done: | case <-done: | ||
return // the stage may be forcibly closed by writing the "done" channel | return // the stage may be forcibly closed by writing the "done" channel | ||
case | case outputStream <- e: | ||
} | } | ||
} | } | ||
Line 107: | Line 111: | ||
==The Assembled Pipeline== | ==The Assembled Pipeline== | ||
<syntaxhighlight lang='go'> | <syntaxhighlight lang='go'> | ||
multiplicator := func(e int) | multiplicator := func(e interface{}) interface{} { | ||
return | i := e.(int) | ||
return i * 2 | |||
} | } | ||
additor := func(e int) | additor := func(e interface{}) interface{} { | ||
return | i := e.(int) | ||
return i + 1 | |||
} | } | ||
Line 126: | Line 132: | ||
} | } | ||
</syntaxhighlight> | </syntaxhighlight> | ||
=Fan-Out Pipeline Pattern= | |||
The fan-out pipeline pattern consists in starting multiple equivalent and concurrent stages, each own with its own goroutine, to handle the output of only one preceding stage. This pattern is useful when some of the pipeline stages are computationally expensive and can be replicated, allowing the pipeline to send its data stream over multiple, equivalent processing stages that use more CPU in parallel. | |||
A stage of a pipeline is suited for fan out if both of the following criteria apply: | |||
# It does not rely on the values that the stage has calculated before (this is important because there is no guarantee in what order the concurrent copies of the stage will run). | |||
# It takes a long time to run. | |||
The parallel stages are just regular <code>Stage</code> instances, and the function that creates them returns a slice of <code>*Stage</code>: | |||
<syntaxhighlight lang='go'> | |||
// NewFanOutStages takes the preceding stage and a count, and creates a 'count' number of concurrent fan-out Stage | |||
// instance which all read from the preceding stage output channel and concurrently process the stream elements | |||
// by sending them to the 'elementProcessor' function. | |||
func NewFanOutStages(done <-chan interface{}, precedingStage *Stage, count int, elementProcessor func(e interface{}) interface{}) []*Stage { | |||
var result = make([]*Stage, count) | |||
for i := 0; i < count; i++ { | |||
result[i] = NewStage("FanOut Stage "+strconv.Itoa(i), done, precedingStage, elementProcessor) | |||
} | |||
return result | |||
} | |||
</syntaxhighlight> | |||
An example of pipeline that fans out stream elements: | |||
<syntaxhighlight lang='go'> | |||
processor := func(e interface{}) interface{} { | |||
i := e.(int) + 10 | |||
fmt.Printf("%d\n", i) | |||
return i | |||
} | |||
done := make(chan interface{}) | |||
defer close(done) | |||
generatorStage := NewGenerator(...) | |||
NewFanOutStages(done, generatorStage, 10, processor) | |||
</syntaxhighlight> | |||
=Fan-In Pipeline Pattern= | |||
The fan-in pipeline pattern consists in reading multiple parallel stages and combining their output into a single stage that processes the outputs of the fan out stages. This is done by creating concurrent goroutines that read from the output channels of the fanned out stages and write the stream elements on the output channel of the fan-in stage. There is an additional goroutine that closes the output channel of the fan-in stage when the incoming channels have all been closed. | |||
<syntaxhighlight lang='go'> | |||
type FanInStage struct { | |||
name string | |||
done <-chan interface{} | |||
// The read only input streams, they are all external, belonging to the fanned out Stages preceding this one in the pipeline. | |||
in []<-chan interface{} | |||
// The externally read only output stream, it is created by the Stage. It will cary the stream elements read from the input streams. | |||
out <-chan interface{} | |||
} | |||
</syntaxhighlight> | |||
The constructor of fan-in stages: | |||
<syntaxhighlight lang='go'> | |||
func NewFanInStage(done <-chan interface{}, fannedOutStages []*Stage) *FanInStage { | |||
var out = make(chan interface{}) | |||
var in = make([]<-chan interface{}, len(fannedOutStages)) | |||
for i, c := range fannedOutStages { | |||
in[i] = c.out | |||
} | |||
result := &FanInStage{ | |||
name: "Fan In Stage", | |||
done: done, | |||
in: in, | |||
out: out, | |||
} | |||
var wg sync.WaitGroup | |||
wg.Add(len(in)) | |||
for _, c := range in { | |||
// Read from all input channels on concurrent goroutines and send the stream elements to the multiplexing output channel. | |||
go func(c <-chan interface{}) { | |||
defer wg.Done() | |||
for e := range c { | |||
select { | |||
case <-done: | |||
return | |||
case out <- e: | |||
} | |||
} | |||
}(c) | |||
} | |||
// Wait for all reads to complete and all channels to drain. | |||
go func() { | |||
wg.Wait() | |||
close(out) | |||
}() | |||
return result | |||
} | |||
</syntaxhighlight> | |||
An example of pipeline that fans out stream elements and then fans them in: | |||
<syntaxhighlight lang='go'> | |||
processor := func(e interface{}) interface{} { | |||
i := e.(int) + 10 | |||
return i | |||
} | |||
done := make(chan interface{}) | |||
defer close(done) | |||
generatorStage := NewGenerator(...) | |||
fannedOutStages := NewFanOutStages(done, generatorStage, 10, processor) | |||
fi := NewFanInStage(done, fannedOutStages) | |||
for i := range fi.out { | |||
... | |||
} | |||
</syntaxhighlight> | |||
=Stage Queueing= | |||
Queueing can be introduced in stages by adopting [[Go_Channels#Buffered_Channels|buffered channels]]. For a conversation on advantages and disadvantages of queueing, see: {{Internal|Queueing_Theory#Queueing_in_Pipelines|Queueing Theory | Queueing in Pipelines}} |
Latest revision as of 16:31, 14 August 2024
Internal
Overview
A pipeline is a data processing pattern, aimed at processing streams of data. A pipeline consists of a series of stages. The output of a stage is connected to the input of the subsequent stage, unless they happen to be the first and last stage. The input of the first stage is the input of the pipeline. The output of the last stage is the output of the pipeline. The pipeline pattern is powerful because it offers separation of concerns: each stage can implement a different concern. Thus, stages can be modified independently of one another, the stages can be mixed and matched, arranged in fan-out and fan-out topologies, etc.
Generally, the limiting factor on the pipeline is either the generator or the stages that are computationally intensive.
Stage
A stage has the following two properties:
- A stage consumes and returns the same type.
- A stage must be reified by the language so that it may be passed around.
Reification means that the language exposes a concept to the developers so they can work with it directly. Examples: functions, classes.
Stages can be combined at a higher level without modifying the stages themselves. The same stage can be reused multiple times in the same pipeline.
Stages are capable of safely processing stream elements concurrently. A stage reads stream data from a concurrent-safe input and sends it on a concurrent-safe output. This arrangement allows stages to execute independently of one another.
The first stage of a pipeline, which reads the input that consists of discrete value from a source and converts it into stream elements is called generator.
Stream
A stream consists of elements, which are processed one at a time by stages.
Go Pipeline
Go Pipeline Stage
Each stage consists of an (externally) read-only output channel and an internal thread pump that invokes an element-processing function. The stream elements to be processed by the stage are read from a read-only input channel, which usually belongs to the preceding stage in the pipeline. Each element is processed by the element-processing function and the result is written on the stage's output channel. The stage also reads from the done
channel, which makes stages to be interconnected in two ways: by the channels that are passed in the subsequent stages of the pipeline and by the done
channel. The role of the done
channel is to allow us to force the pipeline stages to terminate regardless of the state the pipeline stage is in - waiting on the incoming channel or waiting to send. Closing the done
channel will force the pipeline stage to terminate.
The Stage
struct
looks like this:
// Stage is a structure representing a pipeline stage. The stage creates and manages its own output stream,
// which is an externally read-only channel that is used to send out the elements processed by the stage, and
// a "thread pump" that pulls elements from the input channel, processes them by invoking the element processor
// function on them, and then sends them on the output channel. The stage is preemptable via the "done" channel.
type Stage struct {
name string
done <-chan interface{}
// The read only input stream, it is external, and usually belongs to Stage preceding this one in the pipeline.
in <-chan interface{}
// The externally read only output stream, it is created by the Stage.
out <-chan interface{}
elementProcessor func(e interface{}) interface{}
}
Note that we could use either a concrete type as <some_type>
or interface{}
. If a generically-written stage needs to work with a specific type, type assertions can be used. Also, a stage that just performs the type assertion could be inserted in the pipeline.
The Stage
constructor defines the "thread pump" behavior, in the form of an anonymous function. Among other things, the function closes the out channel when it is not needed anymore. Also, the goroutine exists when something gets written on the done
, as an embodiment of the pattern described in the Preventing Goroutines Leak section.
func NewStage(name string, done <-chan interface{}, precedingStage *Stage, elementProcessor func(e interface{}) interface{}) *Stage {
out := make(chan interface{})
stage := &Stage{
name: name,
done: done,
in: precedingStage.out,
out: out,
elementProcessor: elementProcessor,
}
// create and start the thread pump that reads from the input stream, processes elements with
// the element processor function and writes them on the output stream
go func() {
// close the stage when it's done processing all input elements
defer close(out)
// iterate over input elements
for e := range stage.in {
select {
case <-done:
return // the stage may be forcibly closed by writing the "done" channel
case out <- elementProcessor(e):
}
}
}()
return stage
}
Generators
The first stage of the pipeline must read elements from somewhere (a slice, a channel, etc.) and write them to its out channel. In general, the first stage of a pipeline usually reads the input that consists of discrete value from a source and converts it into stream elements. We call this kind of stage a generator. The generator is also a Stage
but with a nil
input stream.
func NewGenerator(name string, done <-chan interface{}, input []int) *Stage {
outputStream := make(chan interface{})
stage := &Stage{
name: name,
done: done,
out: outputStream,
}
// create and start the thread pump that reads from the input slice and writes them on the output stream
go func() {
// close the stage when it's done processing all input elements
defer close(outputStream)
// iterate over input elements
for _, e := range input {
select {
case <-done:
return // the stage may be forcibly closed by writing the "done" channel
case outputStream <- e:
}
}
}()
return stage
}
Generators can be written to take a function to generate elements.
The Assembled Pipeline
multiplicator := func(e interface{}) interface{} {
i := e.(int)
return i * 2
}
additor := func(e interface{}) interface{} {
i := e.(int)
return i + 1
}
done := make(chan interface{})
defer close(done)
gs := NewGenerator("generator", done, []int{1, 2, 3, 4})
ms := NewStage("multiplicator A", done, gs, multiplicator)
as := NewStage("additor", done, ms, additor)
ms2 := NewStage("multiplicator B", done, as, multiplicator)
for i := range ms2.out {
fmt.Printf("%d\n", i)
}
Fan-Out Pipeline Pattern
The fan-out pipeline pattern consists in starting multiple equivalent and concurrent stages, each own with its own goroutine, to handle the output of only one preceding stage. This pattern is useful when some of the pipeline stages are computationally expensive and can be replicated, allowing the pipeline to send its data stream over multiple, equivalent processing stages that use more CPU in parallel.
A stage of a pipeline is suited for fan out if both of the following criteria apply:
- It does not rely on the values that the stage has calculated before (this is important because there is no guarantee in what order the concurrent copies of the stage will run).
- It takes a long time to run.
The parallel stages are just regular Stage
instances, and the function that creates them returns a slice of *Stage
:
// NewFanOutStages takes the preceding stage and a count, and creates a 'count' number of concurrent fan-out Stage
// instance which all read from the preceding stage output channel and concurrently process the stream elements
// by sending them to the 'elementProcessor' function.
func NewFanOutStages(done <-chan interface{}, precedingStage *Stage, count int, elementProcessor func(e interface{}) interface{}) []*Stage {
var result = make([]*Stage, count)
for i := 0; i < count; i++ {
result[i] = NewStage("FanOut Stage "+strconv.Itoa(i), done, precedingStage, elementProcessor)
}
return result
}
An example of pipeline that fans out stream elements:
processor := func(e interface{}) interface{} {
i := e.(int) + 10
fmt.Printf("%d\n", i)
return i
}
done := make(chan interface{})
defer close(done)
generatorStage := NewGenerator(...)
NewFanOutStages(done, generatorStage, 10, processor)
Fan-In Pipeline Pattern
The fan-in pipeline pattern consists in reading multiple parallel stages and combining their output into a single stage that processes the outputs of the fan out stages. This is done by creating concurrent goroutines that read from the output channels of the fanned out stages and write the stream elements on the output channel of the fan-in stage. There is an additional goroutine that closes the output channel of the fan-in stage when the incoming channels have all been closed.
type FanInStage struct {
name string
done <-chan interface{}
// The read only input streams, they are all external, belonging to the fanned out Stages preceding this one in the pipeline.
in []<-chan interface{}
// The externally read only output stream, it is created by the Stage. It will cary the stream elements read from the input streams.
out <-chan interface{}
}
The constructor of fan-in stages:
func NewFanInStage(done <-chan interface{}, fannedOutStages []*Stage) *FanInStage {
var out = make(chan interface{})
var in = make([]<-chan interface{}, len(fannedOutStages))
for i, c := range fannedOutStages {
in[i] = c.out
}
result := &FanInStage{
name: "Fan In Stage",
done: done,
in: in,
out: out,
}
var wg sync.WaitGroup
wg.Add(len(in))
for _, c := range in {
// Read from all input channels on concurrent goroutines and send the stream elements to the multiplexing output channel.
go func(c <-chan interface{}) {
defer wg.Done()
for e := range c {
select {
case <-done:
return
case out <- e:
}
}
}(c)
}
// Wait for all reads to complete and all channels to drain.
go func() {
wg.Wait()
close(out)
}()
return result
}
An example of pipeline that fans out stream elements and then fans them in:
processor := func(e interface{}) interface{} {
i := e.(int) + 10
return i
}
done := make(chan interface{})
defer close(done)
generatorStage := NewGenerator(...)
fannedOutStages := NewFanOutStages(done, generatorStage, 10, processor)
fi := NewFanInStage(done, fannedOutStages)
for i := range fi.out {
...
}
Stage Queueing
Queueing can be introduced in stages by adopting buffered channels. For a conversation on advantages and disadvantages of queueing, see: