use std::type; use std::record; define tumbling window `10secs` with interval = 10000 end; define tumbling window `1min` with interval = 60000 end; create stream normalize; create stream aggregate; define generic::batch operator batch with count = 3000, timeout = 5 end; create operator batch; select { "measurement": event.measurement, "tags": event.tags, "field": group[2], "value": event.fields[group[2]], "timestamp": event.timestamp, } from in group by set(event.measurement, event.tags, each(record::keys(event.fields))) into aggregate having type::is_number(event.value); select { "measurement": event.measurement, "tags": patch event.tags of insert "window" => window end, "stats": aggr::stats::hdr(event.value, [ "0.5", "0.9", "0.99", "0.999" ]), "field": event.field, "timestamp": aggr::win::first(event.timestamp), # we can't use min since it's a float } from aggregate[`10secs`, `1min`, ] group by set(event.measurement, event.tags, event.field) into normalize; select { "measurement": event.measurement, "tags": event.tags, "fields": { "count_#{event.field}": event.stats.count, "min_#{event.field}": event.stats.min, "max_#{event.field}": event.stats.max, "mean_#{event.field}": event.stats.mean, "stdev_#{event.field}": event.stats.stdev, "var_#{event.field}": event.stats.var, "p50_#{event.field}": event.stats.percentiles["0.5"], "p90_#{event.field}": event.stats.percentiles["0.9"], "p99_#{event.field}": event.stats.percentiles["0.99"], "p99.9_#{event.field}": event.stats.percentiles["0.999"] }, "timestamp": event.timestamp, } from normalize into batch; select event from in into batch; select event from batch into out;