Go Pipelines: Difference between revisions
Line 69: | Line 69: | ||
} | } | ||
</syntaxhighlight> | </syntaxhighlight> | ||
The first stage of the pipeline must read elements from somewhere (a slice, a channel, etc.) and write them to its out channel. We call this kind of stage a '''generator'''. The generator is also a <code>Stage</code> but with a <code>nil</code> input stream. | |||
<syntaxhighlight lang='go'> | <syntaxhighlight lang='go'> | ||
func NewGenerator(name string, done <-chan interface{}, input []<some_type>) *Stage { | |||
out := make(chan <some_type>) | |||
stage := &Stage{ | |||
func | name: name, | ||
done: done, | |||
out: out, | |||
// create and start the thread pump that writes | } | ||
// create and start the thread pump that reads from the input slice and writes them on the output stream | |||
go func() { | go func() { | ||
// close the stage when it's done processing all elements | // close the stage when it's done processing all input elements | ||
defer close( | defer close(outputStream) | ||
// iterate over | // iterate over input elements | ||
for e := range | for _, e := range input { | ||
select { | select { | ||
case <-done: | case <-done: | ||
return // the stage may be forcibly closed | return // the stage may be forcibly closed by writing the "done" channel | ||
case | case out <- e: | ||
} | } | ||
} | } | ||
}() | }() | ||
return stage | return stage | ||
} | } | ||
</syntaxhighlight> | </syntaxhighlight> |
Revision as of 00:41, 3 February 2024
Internal
Overview
A pipeline is a data processing pattern, aimed at processing streams of data. A pipeline consists of a series of stages. The output of a stage is connected to the input of the subsequent stage, unless they happen to be the first and last stage. The input of the first stage is the input of the pipeline. The output of the last stage is the output of the pipeline. The pipeline pattern is powerful because it offers separation of concerns: each stage can implement a different concern. Thus, stages can be modified independently of one another, the stages can be mixed and matched, arranged in fan-out and fan-out topologies, etc.
Stage
A stage has the following two properties:
- A stage consumes and returns the same type.
- A stage must be reified by the language so that it may be passed around.
Reification means that the language exposes a concept to the developers so they can work with it directly. Examples: functions, classes.
Stages can be combined at a higher level without modifying the stages themselves.
Stages are capable of processing elements concurrently.
Stream
A stream consists of elements, which are processed one at a time by stages.
Go Pipeline
Go Pipeline Stage
Each stage consists of an (externally) read-only output channel and an internal thread pump that invokes an element-processing function. The stream elements to be processed by the stage are read from a read-only input channel, which usually belongs to the preceding stage in the pipeline. Each element is processed by the element-processing function and the result is written on the stage's output channel.
The Stage
struct
looks like this:
// Stage is a structure representing a pipeline stage. It contains its own output stream, an externally read-only
// channel that is used to send the elements processed by the stage out, and a "thread pump" that pulls elements
// from the input channel, processes them by invoking the element processor function on them, and then sends them
// on the output channel. The stage is preemptable via the "done" channel.
type Stage struct {
name string
done <-chan interface{}
in <-chan int // the read only input stream, it is external, and usually belongs to the preceding Stage
out <-chan int // the externally read only output stream, it is created by the Stage
elementProcessor func(e int) int
}
The Stage
constructor:
func NewStage(name string, done <-chan interface{}, precedingStage *Stage, elementProcessor func(e <some_type>) <some_type>) *Stage {
out := make(chan <some_type>)
stage := &Stage{
name: name,
done: done,
in: precedingStage.out, // the preceding stage also exposes a chan <some_type>
out: out,
elementProcessor: elementProcessor,
}
// create and start the thread pump that reads from the input stream, processes elements with
// the element processor function and writes them on the output stream
go func() {
// close the stage when it's done processing all input elements
defer close(out)
// iterate over input elements
for e := range stage.in {
select {
case <-done:
return // the stage may be forcibly closed by writing the "done" channel
case out <- elementProcessor(e):
}
}
}()
return stage
}
The first stage of the pipeline must read elements from somewhere (a slice, a channel, etc.) and write them to its out channel. We call this kind of stage a generator. The generator is also a Stage
but with a nil
input stream.
func NewGenerator(name string, done <-chan interface{}, input []<some_type>) *Stage {
out := make(chan <some_type>)
stage := &Stage{
name: name,
done: done,
out: out,
}
// create and start the thread pump that reads from the input slice and writes them on the output stream
go func() {
// close the stage when it's done processing all input elements
defer close(outputStream)
// iterate over input elements
for _, e := range input {
select {
case <-done:
return // the stage may be forcibly closed by writing the "done" channel
case out <- e:
}
}
}()
return stage
}