The kv Connector
The kv connector provides a key value storate facility for tremor-based
applications allowing shared state across pipelines. The facility is useful
when in memory state via the state mechanism is not useful as the state is
not shared across pipeline instances or flows.
The kv service allows state to be shared to multiple pipelines within a a
single flow definition or unit of deployment.
The kv storage is persistent. A storage directory needs to be provided.
If not found, the storage directory will be auto-created, or an error will be propagated.
Configuration
define my_kv from kv
with
"path" = "state",
end;
How do I get values from the kv store?
We use the kv get command for this purpose.
let $kv = { "get": "snot" };
let event = null;
This will result in either null when not found or the stored value
How do I put values into the kv store?
We use the kv put command for this purpose.
let $kv = { "put": "snot", };
let event = "badger";
This will result in either null when not found or the stored value
How do I swap values in the kv store?
We use the kv cas command for this purpose.
let $kv = {"cas": "heinz", "old": "gies"}
let event = "ketchup";
This will result in either null on success, or an event on the err stream upon failure
How do I scan values in the kv store?
We use the kv scan command for this purpose.
let $kv = {"scan": "", "end": "heinz"};
let event = 9;
This will return a value for each match in the specified scan range.
How do I delete values in the kv store?
We use the kv delete command for this purpose.
let $kv = { "delete": "heinz" };
let event = null;
Application
Assuming the $kv metadata and event are as above, the following query connected to
an instance of the kv connector should suffice:
select event from in into out;
Correlation
The $correlation metadata key can be set so that a request and response from the
kv facility can be traced:
let $correlation = "some-correlating-unique-data";
Conditioning
To avoid write errors when other streams are writing to the same kv store we provide the old value as a comparand so that the swap only occurs if the value hasn't changed independently in the intervening time since we last read from the store for this key.