Go Pipelines: Difference between revisions

From NovaOrdis Knowledge Base
Jump to navigation Jump to search
Line 22: Line 22:
==Go Pipeline Stage==
==Go Pipeline Stage==


Each stage consists of an (externally) read-only channel with an internal threads that writes elements on the channel. The stage (the channel) is created by a function that looks like this one:
Each stage consists of an (externally) read-only channel with an internal thread pump that invokes an element-processing function. Each element is processed by the element-processing function and the result is written on the stage (channel). The stage (the channel) is created by a function that looks like this one:


<syntaxhighlight lang='go'>
<syntaxhighlight lang='go'>
// makeStage creates a "stage" - a read-only channel with a "thread pump" that pulls elements from the input stage, processes
// makeStage creates a "stage" - a read-only channel with a "thread pump" that pulls elements  
// them and sends them on the channel.
// from the input stage, processes them and sends them on the channel. The stage is preemptable
func makeStage(done <-chan interface{}, inputStage <-chan <some_type>, <other_stage_specific_parameters>) <-chan <some_type> {
// via the "done" channel.
  // make the stage's channel - which *is* the stage
func makeStage(done <-chan interface{}, inputStage <-chan <stage_type>, <some_other_stage_params>) <-chan <stage_type> {
  stage := make(chan <some_type>)
// make the stage's channel - which *is* the stage
  // create and start the thread that writes elements on the channel
stage := make(chan <stage_type>)
  go func() {
// create and start the thread pump that writes elements on the channel
    // close the stage when it's done processing all elements
go func() {
    defer close(stage)
// close the stage when it's done processing all elements
    // iterate over the input elements
defer close(stage)
    for e := range inputStage {
// iterate over the input elements
      select {
for e := range inputStage {
        case <- done:
select {
          return // the stage may be forcibly closed via the "done" channel
case <-done:
        case stage <- func(e some_type) some_type {
return // the stage may be forcibly closed via the "done" channel
          // this is the function that processes each
case stage <- func(e <stage_type>) <stage_type> {
          // element and produces per-element result
// this is the function that processes each
          ...
// element and produces the stage's per-element
          return e2
// result, possibly by using <some_other_stage_params>
        }(e)
                ...
      }
return result
    }
}(e):
  }()
}
  // return the stage
}
  return stage
}()
// return the stage
return stage
}
}
</syntaxhighlight>
</syntaxhighlight>

Revision as of 03:32, 2 February 2024

Internal

Overview

A pipeline is a data processing pattern, aimed at processing streams of data. A pipeline consists of a series of stages. The output of a stage is connected to the input of the subsequent stage, unless they happen to be the first and last stage. The input of the first stage is the input of the pipeline. The output of the last stage is the output of the pipeline. The pipeline pattern is powerful because it offers separation of concerns: each stage can implement a different concern. Thus, stages can be modified independently of one another, the stages can be mixed and matched, arranged in fan-out and fan-out topologies, etc.

Stage

A stage has the following two properties:

  1. A stage consumes and returns the same type.
  2. A stage must be reified by the language so that it may be passed around.

Reification means that the language exposes a concept to the developers so they can work with it directly. Examples: functions, classes.

Stages can be combined at a higher level without modifying the stages themselves.

Stages are capable of processing elements concurrently.

Stream

A stream consists of elements, which are processed one at a time by stages.

Go Pipeline

Go Pipeline Stage

Each stage consists of an (externally) read-only channel with an internal thread pump that invokes an element-processing function. Each element is processed by the element-processing function and the result is written on the stage (channel). The stage (the channel) is created by a function that looks like this one:

// makeStage creates a "stage" - a read-only channel with a "thread pump" that pulls elements 
// from the input stage, processes them and sends them on the channel. The stage is preemptable 
// via the "done" channel.
func makeStage(done <-chan interface{}, inputStage <-chan <stage_type>, <some_other_stage_params>) <-chan <stage_type> {
	// make the stage's channel - which *is* the stage
	stage := make(chan <stage_type>)
	// create and start the thread pump that writes elements on the channel
	go func() {
		// close the stage when it's done processing all elements
		defer close(stage)
		// iterate over the input elements
		for e := range inputStage {
			select {
			case <-done:
				return // the stage may be forcibly closed via the "done" channel
			case stage <- func(e <stage_type>) <stage_type> {
				// this is the function that processes each
				// element and produces the stage's per-element
				// result, possibly by using <some_other_stage_params>
                ...
				return result
			}(e):
			}
		}
	}()
	// return the stage
	return stage
}