Skip to main content
Version: Next

Operators

Operators are part of the pipeline configuration.

Operators process events and signals in the context of a pipeline. An operator, upon receiving an event from an upstream operator or stream, MAY produce one or many events to one or many downstream directly connected operators. An operator MAY drop events which halts any further processing.

Operators allow the data processing capabilities of tremor to be extended or specialized without changes to runtime behavior, concurrency, event ordering or other aspects of a running tremor system.

Operators are created in the context of a pipeline and configured as part of tremor-query statements. An operator MUST have an identifier that is unique for its owning pipeline.

Configuration is of the general form:

define module::operator_name operator my_custom_operator
with
param1 = "foo",
param2 = [1, 2, 3]
end;

# create - full form
create operator my_custom_operator_instance from my_custom_operator
with
param1 = "bar",
param2 = [true, false, {}]
end;

# create - short form
create operator my_custom_operator;

script

An embedded tremor script is created with a special syntax that deviates from the operator creation. For a full reference see the section on tremor-query embedded-scripts.

The tremor script runtime allows to modify events or their metadata. To learn more about Tremor Script please see the related section.

The script operator allows to modify the events metadata (via $), and the script local state which persists across single events.

Outputs:

  • out (default output used with emit)
  • error - channel for runtime errors
  • <anything else> used when emit event => "<anything else>"

Examples:

# definition
define script rt
script
emit
end;
define script add
script
emit + 1
end;

create script add;

select event from in into add;
select event from add into out;

grouper::bucket

Bucket will perform a sliding window rate limiting based on event metadata. Limits are applied for every $class. In a $class each $dimensions is allowed to pass $rate messages per second.

This operator does not support configuration.

This operator preserves event metadata.

Metadata Variables:

  • $class - The class of an event. (String)
  • $rate - Allowed events per second per class/dimension (Number)
  • (Optional) $dimensions - The dimensions of the event. (Any)
  • (Optional)$cardinality - the maximum number of dimensions kept track of at the same time (Number, default: 1000)

Outputs:

  • out
  • error - Unprocessable events for example if $class or $rate are not set.
  • overflow - Events that exceed the rate defined for them

Example:

define grouper::bucket operator group;

Metrics:

The bucket operator generates additional metrics. For each class the following two statistics are generated (as an example):

{"measurement":"bucketing",
"tags":{
"action":"pass",
"class":"test",
"direction":"output",
"node":"bucketing",
"pipeline":"main",
"port":"out"
},
"fields":{"count":93},
"timestamp":1553012903452340000
}
{"measurement":"bucketing",
"tags":{
"action":"overflow",
"class":"test",
"direction":"output",
"node":"bucketing",
"pipeline":"main",
"port":"out"
},
"fields":{"count":127},
"timestamp":1553012903452340000
}

This tells us the following, up until this measurement was published in the class test:

  • (pass) Passed 93 events
  • (overflow) Marked 127 events as overflow due to not fitting in the limit

generic::backpressure

This operator is deprecated please use qos::backpressure instead.

qos::backpressure

The backpressure operator is used to introduce delays based on downstream systems load. Longer backpressure steps are introduced every time the latency of a downstream system reached timeout, or an error occurs. On a successful transmission within the timeout limit, the delay is reset.

The operator supports two modes:

  • discard - the standard, when backpressure is triggered it will discard new messages by sending them to the overflow output port. This is designed to fulfill the need of low transport latency at the cost of loss.
  • pause - it will trigger a circuit breaker and ask the sources that send it data to stop sending additional events. No Event is discarded by the backpressure operator. This is designed to deal with situations where losing events is not an option - but the garuantee of losslessness depends on the source and how it can handle circuit breaker events.

This operator preserves event metadata.

Configuration options:

  • timeout - Maximum allowed 'write' time in milliseconds.
  • steps - Array of values to delay when a we detect backpressure. (default: [50, 100, 250, 500, 1000, 5000, 10000])
  • method - Either discard or pause to define how backpressure is handled (default: discard)

Outputs:

  • out
  • overflow - Events that are not let past due to active backpressure

Example:

define qos::backpressure operator bp
with
timeout = 100,
method = "discard"
end;

qos::percentile

An alternative traffic shaping option to backpressure. Instead of all dropping events for a given time we drop a statistical subset with an increasing percentage of events dropped the longer we see errors / timeouts.

In general step_up should always be significantly smaller then step_down to ensure we gradually reapproach the ideal state.

This operator preserves event metadata.

Configuration options:

  • timeout - Maximum allowed 'write' time in milliseconds.
  • step_down - What additional percentile should be dropped in the case of a timeout (default 5%: 0.05)
  • step_up - What percentile should be recovered in case of a good event. (default: 0.1%: 0.001)

Outputs:

  • out
  • overflow - Events that are not let past due to active backpressure

Example:

define qos::percentile operator as perc
with
timeout = 100,
step_down = 0.1 # 10%
end;

qos::roundrobin

Evenly distributes events over it's outputs. If a CB trigger event is received from an output this output is skipped until the circuit breaker is restored. If all outputs are triggered the operator itself triggers a CB event.

This operator preserves event metadata.

Outputs:

  • * (any named output is possible)

Example:

define qos::roundrobin operator roundrobin
with
outputs = ["round", "robin", "outputs"]
end;

qos::wal

A Write Ahead Log that will persist data to disk and feed the following operators from this disk cache. It allows to run onramps that do not provide any support for delivery guarantees with offramps that do.

The wal operator will intercept and generate it's own circuit breaker events. You can think about it as a firewall that will protect all operators before itself from issues beyond it. On the other hand it will indiscriminately consume data from sources and operators before itself until it's own circuit breaking conditions are met.

At the same time will it interact with tremors guaranteed delivery system, events are only removed from disk once they're acknowledged. In case of delivery failure the WAL operator will replay the failed events. On the same way the WAL operator will acknowledge events that it persists to disk.

The WAL operator should be used with caution, since every event that passes through it will be written to the hard drive it has a significant performance impact.

This operator preserves event metadata.

Configuration options:

  • read_count - Maximum number of events that are read form the WAL at one time.
  • dir - Directory to store the WAL-file in (optional, if omitted the WAL will remain in memory and not be persisted to disk)
  • max_elements - Maximum number of elements the WAL will cache before triggering a CB event
  • max_bytes - Maximum space on disk the WAL should take (this is a soft limit!)

Only one of max_elements or max_bytes is required, setting both is possible.

Outputs:

  • out

Example:

define qos::wal operator wal with
dir = "./wal",
read_count = 20,
max_elements = 1000,
max_bytes = 10485760
end;

generic::batch

The batch operator is used to batch multiple events and send them in a bulk fashion. It also allows to set a timeout of how long the operator should wait for a batch to be filled.

This operator batches both the event payload and event metadata into a single bulk event. Downstream pipeline nodes or offramps will receive 1 such bulk event but will treat its context as multiple events and might act different e.g. when it comes to building a request payload in the offramp context or other use cases. Empty bulk events are usually considered as no event.

Supported configuration options are:

  • count - Elements per batch
  • timeout - Maximum delay between the first element of a batch and the last element of a batch.

Outputs:

  • out

Example:

define generic::batch operator batch with
count = 300
end;

generic::counter

Keeps track of the number of events as they come and emits the current count out alongside the event. The output is a record of the form {"count": n, "event": event}, where n is the current count and event is the original event.

The counter starts when the first event comes through and begins from 1.

This operator preserves event metadata.

Outputs:

  • out

Example:

define generic::counter operator counter;

debug::history

!!! note

This operator is for debugging purposes only, and should not be used in production deployments.

This operator generates a history entry in the event metadata underneath the field provided in the name config value. Data is pushed to the array as a Striong in the form: "event: <op>(<event_id>)".

This can be used as a tracepoint of events in a complex pipeline setup.

This operator manipulates a section of the event metadata.

Configuration options:

  • op - The operation name of this operator
  • name - The field to store the history on

Outputs:

  • out

Example:

define debug::history operator history with
op = "my-checkpoint",
name = "event_history"
end;