Skip to main content

· 9 min read

The support for multi-participant transaction orchestration in tremor originates with this use case from Wayfair's Search platform services team.

Identified Need

One of the frequent requests made of the tremor team by peers in the Infrastructure organization at Wayfair has come from the search domain. Search is a critical service at Wayfair and it is the powerplant behind many other services - ranging from recommendation engines through to auditing of data streams that are continually being ingested and indexed into multiple searchable databases.

At a very high level - streams of documents need to be elementized and broken down into one or many indexable items of interest - these items then need to be indexed ( successfully ) into one or many search engines.

Many of the use cases that are battle tested with tremor are relevant in this domain:

  • Cleansing, normalization and enrichment of documents and indexable elementization and tracking documents and elementized items
  • Rate limiting, capacity-based load-shedding, with domain classification similar to the traffic shaping use cases where tremor started
  • Sourcing, transformation and distribution of documents and the synthetic events in real-time at low or very low latencies

But, for the use case at hand, there are additional needs:

  • All documents must be processed transactionally, without loss and with proper reporting of processing outcome to upstream services and the documents must be processed in arrival order. The guaranteed delivery and circuit-breaker mechanisms in tremor now need to be multi-pipeline.
  • All indexable elements of all documents must be indexed in multiple downstream engines successfully ( or operator errors produced for exceptions ) while all possible error cases need to be caught and reported upstream in order to issue retries or let operators intervene. This is a reasonably orchestration mapping processed elements and tracing back to the documents the elements were produced from before publication down stream.
  • There is significant variability, variant on a case by case basis, to the exact semantics required for different document types to be processed to a varying number of downstream indexing systems and technologies. The solution needs to be modular

Gathering and aggregating multiple parallel processing outcomes and subsuming them under a common transaction is outside of the baseline scope of message-based and message-like systems as they typically only support point-to-point transactions. Correlation across multiple event streams usually needs to be solved on the application level. Where systems support orchestrated transactions - these are typically constrained by transport/protocol or other factors beyond the application authors control, and therefore inflexible under variant ( and often fast-changing ) production needs.

Required Outcome

Expand on tremor’s QoS facilities so that multi-participant transaction orchestration is possible, easily composable and user programmable.

Characteristics

The original use cases for tremor were relatively straightforward and data distribution applications with a requirement for traffic shaping and rate limiting for data streams when downstream systems were prone to being overwhelmed at peak traffic conditions.

These occurrences were rare - but their impact was high when they happened. And in an infrastructure with higher high peaks year on year this is an ever-present hazard of doing business.

More recently, delivery guarantees are expanding as new domains adopt tremor in production. In these domains data loss, even user defined and strictly capacity managed traffic shaping, is not tolerable.

Like with many real-time systems - the percentage of the overall in-flight volumetric that requires transactional delivery is typically a small subset of the overall firehose. Take financial trading systems for example - orders and trades are transactional and they need to be processed, each and every one, correctly - as there are fiscal and regulatory conditions that need to be strictly met.

But pricing - the ability to buy or sell and equity, or the current currency rate is often naturally continuously changing due to supply and demand, and naturally redundant - as you can buy or sell the same stock on many different venues.

The search case stretches the QoS mechanisms in tremor and the internal mechanisms used to track events as they are processed from a single set of flows and a small set of participants - to larger and more complex user defined flows that orchestrate transactions of arbitrary complexity.

This is compounded by tremor-based applications today being large, increasingly sophisticated and modular. So the QoS mechanisms that were originally constrained to the boundary of a single pipeline - now need to be preserved and propagated across an entire deployment.

Solution

Tremor’s core processing element - pipelines - are executable directed-acyclic graphs.

A tremor user designs a workflow or pipeline using the tremor query language.

Tremor converts this to a directed graph and makes sure that it is acyclic.

Tremor transforms and optimizes the user defined graph to an executable form that is well-structured for easily supporting easy to understand and easy to define qualities of service.

If we imagine the pipeline graph as a single larger directed-acyclic graph we have what tremor actually uses for event distribution internally. Tremor can distribute and process user defined events - these are business or data events that originate from connectors or user defined logic.

Tremor can inject control events - these are runtime events that tremor uses for quality of service and they are not ordinarily user visible. Tremor can also inject events originating at outputs ( or that propagate from downstream systems ) backwards to inputs ( or for propagation to upstream systems ). But, we can do so without introducing cycles.

The user-defined graph is acyclic. But the tremor runtime has, in effect, the ability to coordinate acknowledgements for user-defined events and an ability to signal upstream breaks in connectivity to downstream systems, or downstream breaks to upstream systems. These runtime control events - we call them signal-flow and contra-flow - are transparent to users.

Our wal ( write-ahead-log ) operator produces and consumes signal-flow and contra-flow events.

So, given a simple tremor application that has no defined QoS ( it does not use guaranteed delivery )

/etc/tremor/config/lossy.trickle
select patch event of
insert hostname = system::hostname()
end
from in into out;

We can configure the wal operator

/etc/tremor/config/mostly_guaranteed.trickle

use tremor::system;

define qos::wal operator in_memory_wal

with
read_count = 20,
max_elements = 1000, # Capacity limit of 1000 stored events
max_bytes = 10485760 # Capacity limit of 1MB of events
end;

create operator in_memory_wal;
select patch event of
insert hostname = system::hostname()
end
from in into in_memory_wal;

select event from in_memory_wal into out;

This is a logically equivalent application - but we can tolerate a lag of up to 1000 events or 1 megabyte of data before losing data. Under the hood of course - events are now tracked and traced. If connectors are QoS aware then we now have a more robust application.

So if we are consuming from a kafka cluster upstream and distributing to another kafka cluster downstream ( such as in another data center ) - those systems can go offline briefly or be disconnected. The connectors themselves handle lossless delivery ( that’s handled by kafka in both cases in this example ). Connectors with less strong guarantees can still be ( mostly ) lossless - so if our downstream system is HTTP-based ( like elasticsearch ) - we can tolerate transient service loss and fully recover.

What if tremor or the host it is deployed on is rebooted?

/etc/tremor/config/mostly_guaranteed.trickle
use tremor::system;
define qos::wal operator in_memory_wal
with
dir = ”./recovery”, # Persistent file-based recovery file
read_count = 20,
max_elements = 1000, # Capacity limit of 1000 stored events
max_bytes = 10485760 # Capacity limit of 1MB of events
end;

create operator in_memory_wal;
select patch event of
insert hostname = system::hostname()
end

from in into in_memory_wal;

select event from in_memory_wal into out;

The tremor developer doesn’t need to be too concerned with the internal mechanisms, or their implementation. And for simple applications with a single primary data flow, its as easy as the examples above to selectively introduce grades of guaranteed delivery with a spectrum of robustness that derives from choice of connectivity ( kafka vs http ) or how the qos operators are chosen, placed in a flow, and configured.

Orchestration however, is different. In an orchestrated transaction the user defined logic provided by the tremor developer also needs to do some tracking. This is achieved through using tremor’s state mechanism alongside the qos capabilities and operators that tremor provides to compose a solution.

So, in our search case - let us say we have two downstream search engines - and both need to index a different set of items of interest elementized from a single document - we use the state mechanism to track progress of the items for each participant - and when all participants have indexes up to date - we issue a synthetic event ( that can be recorded in a wal ) that publishes the document processing status downstream.

search logic

So our document source is kafka, our indexing engines for elementized items and our destination for successfully elementized documents ( which may now be enriched with elementization metadata and index metadata ) can now be published ( let’s assume kafka again for simplicity ) to an audited topic.

The state mechanism in tremor is a readable/writable value - so persistent and recoverable state is a relatively simple composition:

define script remember
script
let state = event;
event
end;

define qos::wal operator forget_me_not
with
dir = "./brain",
read_count = 1,
max_elements = 1000, # Capacity limit of 1000 stored events
max_bytes = 10485760 # Capacity limit of 1MB of events
end;

create script remember;
create operator forget_me_not;
select event from in into remember;
select event from remember into forget_me_not;
select event from forget_me_not into out;

Please don’t run out of disk space!

Conclusion

Most of the changes required to evolve tremor form supporting great qos for simple single pipeline applications to complex multi-pipeline and multi-participant stateful orchestrations did not expose new features to the tremor developer or user.

It has been a significant change to tremor internals, however and the work reaches a stable point with our 0.12 release - the ability to pause and resume connectors, and the ability for the tremor runtime itself to detect and act on quiescence will mean that tremor is flexible enough for the demands and use cases that originated in the search domain.

· 5 min read

Identified Need

The adoption growth of tremor inside Wayfair is such that we have production customers with 3.5 years of experience operating tremor in production with very large, complex tremor-based services - and fresh demands from new domains such as Search - whose requirements for multi-participant transaction orchestration are driving enhancements to qualify of service, guaranteed delivery and circuit breaker capabilities within tremor’s core processing engine.

Some usage patterns are becoming common enough that sharing tremor application logic across teams, across organizations, and across domains is now common. For example - tremor has good support for HTTP based interactions and already interfaces with a number of HTTP-based and REST APIs, and it can expose HTTP-based interfaces to participants and act as a REST-based or HTTP-based service. Many of these services share the same needs around handling HTTP errors and routing errors and alerts.

At the event flow or query level - similar levels of sophistication are emerging in the installed user base.

Required Outcome

Expand tremor’s domain specific languages to support modularity, starting with the scripting language but extending into the query language to maximise reuse of user-defined processing logic.

Characteristics

Provide standard mechanism and syntax to discover, reference and consume functional units of logic that can be shared by all tremor domain specific languages.

A new lexical preprocessing phase has been introduced allowing source to be packaged within a modular set of paths through a TREMOR_PATH environment variable. Since V0.8 of tremor the scripting and query languages both support modules, and share the same module syntax and semantics.

The tremor API was upgraded to supported preprocessed or non-modular source so that no API changes were required to extend the deployment mechanisms to support modularity.

Modular scripting

In a nutshell - this added support for user defined constants and user defined functions.

my_mod.tremor
# Tail recursive implementation of fibonacci
#
fn fib_(a, b, n) of
case (a, b, n) when n > 0 => recur(b, a + b, n - 1)
default => a
end;

fn fib(n) with
fib_(0, 1, n)
end;

The module can be loaded via the module path and used in other script or query sources:

fib.trickle
# A streaming fibonacci service*

define script fibber
script
use my_mod;
my_mod::fib(event)
end;

create script fib from fibber;

select event from in into fib;

select event from fib into out;

Modules can be file-based, or defined inline in the scripting or query language. A standard set of system modules is provided by tremor out of the box.

Tremor Modules

Modularity in the query language

In the query language windows, scripts and pluggable operators may be defined and shared across queries.

There is an RFC for modular sub-query to allow query sub-graphs to be defined and shared which is being delivered as part of a tremor LFX mentorship.

Modularity in the deployment language

Modular deployments through replacing the YAML configuration syntax with a deployment language will embed the modular query language, which in turn embeds the modular scripting language.

This work is under development and will span multiple releases - but it is being designed in such a way that as clustering capabilities are added to tremor, that clustered or distributed deployments will be modularly composable by end users using the same tooling as the other DSLs within tremor.

Solution

The support for modularity is thematic. The scripting language now supports a functional programming paradigm and file-based or nested inline modules. The query language can embed modular scripts using the same module mechanism and definitions in the query language are also modular.

Modular sub-query and the introduction of the deployment language are planned and in progress and will extend modularity to the administration of running tremor nodes.

And, in the fullness of time, clustering will further extend modularity in tremor to maximize operator and tremor developer productivity through efficient reuse and sharing of common tasks.

The first set of connectivity to benefit from modularity is the support for CNCF OpenTelemetry. This is the first set of tremor connectors to be delivered that ships with its own set of modules designed to make designing OpenTelemetry based services in tremor easier.

Conclusion

As tremor grows into new domains, and the algorithm and solution complexity of traditional production domains for tremor increase in sophistication, size, complexity, the subject of modularity has evolved and new demands continue to emerge.

We expect the modularity theme to be long-lived, but its origins derive from production needs. When tremor was developed the user defined logic was small, relatively simple and applications built with tremor were fairly monolithic.

Today, multi-pipeline and medium to large tremor-based applications are common. And adoption of the modular scripting and query language primitives is now driving larger and more sophisticated use cases.

Another dimension of modularity is the ongoing expansion of connectivity in tremor. Modularity here reflects a different production-driven need ( and a few maintainer conveniences ). A plugin development kit is being developed that will allow connectors and other plugins to be developed in separately managed and maintained projects. This in turn allows the core of tremor to be managed independently of connectivity.

· 3 min read

Happened Before

The simplified architecture of our systems is currently:

old pipeline

Identified Need

Wayfair’s technology organization is continuously expanding, evolving and growing. When work on tremor began we were running almost entirely in our own data centres, on our own hardware.

Today, we are running largely in the cloud. Whether cloud-native or bare-metal the sidecar pattern has proven to be a significantly popular deployment pattern with tremor-based systems in production at Wayfair.

Tremor is equally happy with clustered ( just a bunch of event processors ) centralized deployments or with sidecar deployments; on containerized virtual machines, or alongside our growing Kubernetes estate.

With the rise of Kubernetes and cloud-native at Wayfair - we had an interesting challenge as an infrastructure organization serving the 1000’s of developers in our wider technology group. How do we continue to deliver a single pane of glass to our developers?

Although the over-simplified architecture diagram above looks simple - its far more complex in practice. We have multiple independent search and visualization clusters. And we have many more infrastructure services and clusters sitting behind these frontends.

For our application developers - this looks like a vanilla installation of ElasticSearch and Kibana. In reality its an always-changing and forever-speciating set of services running 24x7x365 offering a single-pane-of-glass for our developers convenience.

By this state of use case evolution and adoption of Tremor at Wayfair - tremor is at the centre of a lot of the internal data distribution and processing - whilst offering no protocols, APIs or transports of its own - entirely invisible to our target developer community.

Our Kubernetes team developed helm charts for tremor to preserve the convenience of this illusion; whilst accelerating the operationalisation of our emergent cloud-native infrastructure services.

Required Outcome

Boldly go cloud-native, in such a way that no-one truly notices.

Characteristics

No new features or capabilities were developed for this use case by the tremor team.

Solution

Adopt helm-based deployments of tremor for our cloud-native and kubernetes-enabled applications and services. The helm charts have since been open sourced in collaboration with our Kubernetes Team as a part of tremor, and have been extended to support OpenShift by the tremor community ( waves to Anton Whalley of Rust Dublin fame! ).

Tremor Helm Charts

Conclusion

Shortly before tremor was open-sourced and donated to the Linux Foundation it became cloud-native itself through the combined efforts of the Infrastructure group at Wayfair - especially our Platform Engineering, SRE’s and our Kubernetes teams' hard work.

We hope to productize and open source other solution packs that enable the wider tremor community to quickly bootstrap production environments based on the tremor-based systems we already have in productioning in our Logging, Metrics, Observability, Kubernetes and Search teams.

In a large way - the early successes of tremor and our adoption of cloud-native and cloud-based computing aligned the goals of the tremor project with those of the Linux Foundation and the Cloud Native Compute Foundation where tremor is now an early stage sandbox project.

If you’re reading this - and the work and philosophy resonates with your organization, your people and you believe similar benefits could elevate your production environments - then please reach out to us in the tremor community and get involved, join us and help us shape tremor’s future.

· 6 min read

Happened Before

The simplified architecture of our systems is currently:

old pipeline

Identified Need

The need to route and process data from multiple sources to multiple destinations with one or many potentially overlapping streams of data processing is already evident in the first 2 phases of Tremor’s emergence as a mission critical piece of infrastructure at Wayfair.

Growing adoption and the increased sophistication of processing needs now results in the emergence of event/data flow or pipeline processing by our production users. We summarise multiple phases of evolution here and conflate them into a single case study.

In reality the use case captured in this synopsis composes multiple smaller projects related by the same identified need spanning multiple releases into a single case study.

Required Outcome

Our systems specialists are writing increasingly complex, layered and rich business logic within tremor’s domain specific language tremor-script and are inhibited by our pipeline model which uses the YAML format for describing data flow graphs.

YAML is hard to write, and hard to debug, and the embedded scripting language is an odd fit into the pipeline configuration which is described in YAML.

Preserving the internal pipeline processing and execution mechanisms - replace the verbose YAML syntax with a statement oriented query language that extends the expression oriented scripting language.

This allows hygienic and helpful error reporting with suggested workarounds and fixes, better IDE integration and a more intuitive means to express data flow operations on multiple data streams that the pipeline mechanisms expose to tremor systems application developers.

A structured query-like language suitably captures the directed-acyclic graph nature of data flows but we need to support rich nested JSON-like data structures with leverage of the processing capabilities of the scripting language - whilst opening up new capabilities to extend the processing capabilities through custom operators, embeddable scripting logic and extensions and enhancements to QoS and non-functional primitives.

For the metrics domain - the ability to aggregate tumbling windows of events and to provide multiple aggregation dimensions with flexible grouping policies is a new requirement.

Characteristics

Replace the YAML-based pipeline configuration with a structured query-like language that embeds the scripting language, providing top grade support for processing rich data structures, extensibility through custom operators, and windowing semantics to support multi-resolution event aggregation.

The initial solution was the introduction of the tremor query language.

Solution

The query language not only replaces the YAML-based event flow syntax for describing data flow pipelines and processing - which is useful for every domain using tremor in production at Wayfair - but, it has special considerations for the metrics or other analytic domains that require rich means of grouping and aggregate processing of in-flight streaming data. The embedding of scripting logic allows the traditional domain of ETL and data processing and transformation to be natively supported with a more intuitive syntax.

A rate limiting or traffic shaping example ( common to multiple usage domains ):

# File traffic.trickle
# Very basic traffic shaping algorithm

# Define a bucket grouping operator
define grouper::bucket operator kfc;

# Logic for categorizing events

define script categorize
script
let $rate = 1;
let $class = event.`group`;
{ "event": event, "rate": $rate, "class": $class };
end;

# An instance of the grouper
create operator kfc from kfc;

# An instance of the categorizer
create script categorize;

# Stream ingested events into the categorizer
select event from in into categorize;

# Stream categorized events into the grouper
select event from categorize into kfc;

# Stream categorized and group batched events downstream
select event from kfc into out;

The relatively simple structured query-like language allows script and window definitions to be reused. The define statements do not create operator instances in the data flow graph; they create named reusable definitions that encompass the desired semantics. The create statements create the nodes of the directed acyclic graph. The select statement is a builtin operator that is used for linking nodes in the graph together to form a data flow or event processing graph.

Although the sample logic in the example is a simplified version of what our logging and metrics service teams actually develop and maintain for our production systems - it replaces 1000s of lines of cryptic YAML with hundreds of lines of easy to debug and reason about query code.

An aggregation example from the metrics or analytics domain. This example is a complete but simplified example of a tremor event processing application:

aggregator.trickle
# Aggregate events using a high dynamic range histogram with 10 second, 1 minute, 10 minute
# and 1 hour aggregate summaries.*

select {
"measurement": event.measurement,
"tags": patch
event.tags of insert "window" => window
end,
# Windowed histogram - for median and a selection of quartiles
"stats": aggr::stats::hdr(event.fields[group[2]], [ "0.5","0.9", "0.99", "0.999" ]),
"class": group[2],
"timestamp": aggr::win::first(event.timestamp),
}
# The higher resolution aggregates are merged into lower resolution aggregates to conserve memory
from in[`10secs`, `1min`, `10min`, `1h`]
where event.measurement == "udp_lb"
or event.measurement == "kafka-proxy.endpoints"
or event.measurement == "burrow_group"
or event.measurement == "burrow_partition"
or event.measurement == "burrow_topic"
# The operator maintains a user defined composited set of group partitions.
# Each group maintains its own set of aggregation windows
group by set(event.measurement, event.tags,
each(record::keys(event.fields)))
into normalize
# Discard computed events with a small sample set
having event.stats.count \> 100;

In most real world tremor-based systems - the synthetic events computed in processing pipelines are tailored to conform to the schemas expected by multiple downstream systems:

normalize.trickle
# Prepare computed histogram summaries for downstream systems ( eg: telegraf/influx )
select {
"measurement": event.measurement,
"tags": event.tags,
"fields": {
"count_#{event.class}": event.stats.count,
"min_#{event.class}": event.stats.min,
"max_#{event.class}": event.stats.max,
"mean_#{event.class}": event.stats.mean,
"stdev_#{event.class}": event.stats.stdev,
"var_#{event.class}": event.stats.var,
"p42_#{event.class}": event.stats.percentiles["0.42"],
"p50_#{event.class}": event.stats.percentiles["0.5"],
"p90_#{event.class}": event.stats.percentiles["0.9"],
"p99_#{event.class}": event.stats.percentiles["0.99"],
"p99.9_#{event.class}": event.stats.percentiles["0.999"]
}
}
from normalize
into out;

Compared to the disparate services and software elements that tremor replaces - the query language affords an intuitive and easy to reason about high level domain specific language to configure rich event processing and data enrichment and transformation pipelines - with a minimally terse yet easy to read form.

Conclusion

Tremor’s core mission and mandate includes the efficient declaration of arbitrarily complex directed-acyclic pipeline processing graphs that are memory and compute efficient under the hood whilst preserving transparency or remaining hidden to most of the developers in our organization by continuing to conform to external transports, protocols and service interfaces in the surrounding production infrastructure estate.

· 6 min read

Happened Before

After the initial production success of tremor as a point solution for capacity management during our peak trading cycles at Wayfair, the focus shifted a little to the next key set of issues with our traditional observability platforms.

The simplified architecture of our logging systems is currently:

old pipeline

The traffic shaping use case was a huge win for Wayfair's SRE and operational teams who manage our production estate.

Identified Need

But we could do a lot better. The development lifecycle to capture, normalize, enrich, enhance and standardize logs from 1000's of engineers, 1000's of applications built in a diversity of programming languages was a have task for our observability engineers.

Their environment still looks pretty grim:

new pipeline

Displace and replace a plethora of in-house, open-source and commercial off the shelf log capture, metrics capture and data distribution technologies with a single cost-effective at hyperscale dynamically load-adaptive solution.

The decision was taken to expand the processing capabilities of tremor beyond traffic shaping, classification and rate limiting rules to general purpose data processing of unstructured hierarchic JSON-like data streams. The shape of data most common within the observability domain we were focused upon.

Required Outcome

Our systems specialists and service reliability engineers can consolidate knowledge and tune to a unified operating experience and center of excellence in our production estate based on a single well known and cost efficient easy to operate alternative.

Characteristics

It is understood that data will be captured from and distributed to a diverse and ever-changing set of production systems - built in many programming languages and environments and distributed to an unknowable set of destination systems.

These systems include, but are not limited to: Graylog Extended Log Format, Syslog,
Kafka, TCP, UDP, Telegraf, Influx Line Protocol, ElasticSearch and many more...

Connectivity must be flexible enough to capture data from and distributed data to synchronous, asynchronous protocols, transports and systems without developers needing to learn or integrate with a new technology. The solution must be invisible to developers.

The unit economics of the existing system is hard to measure, hard to plan and hard to remediate; especially given the rising year on year firehose volumetrics, with rising velocity and rising peak loads.

99% of the data either originates as JSON, or is transformed to JSON just in time.

The target systems for displacement and replacement have very well understood and extract, load, transform, enrichment and normalization algorithms that are coded and configured differently in each system. This is difficult to maintain, enhance and evolve in an environment where infrastructure and services are continuously evolving.

A rich set of ETL and data manipulation and mutation primitives are required so that all existing methods used for data processing across the target systems can be replaced by a single easy to understand, easy to program and optimized alternative.

Solution

Tremor has already established itself in the traffic shaping domain in the logging and metrics domains - adding just-in-time traffic shaping capabilities when needed; and preserving the original system’s operational semantics and behaviours during normal operating conditions when the system is operating within planned capacity constraints.

new pipeline

The mandate now expands to replacing many of our event capture systems deployed on on-premise and cloud-native operating environments across multiple data-centres globally.

Instead of our operations and service reliability teams needing to learn and manage the service deployment and administrative lifecycle of multiple components in this evolution we displace and replace those systems - typically operating as sidecars on source systems where our applications run on virtual machines, bare metal or containerized environments with tremor.

The relatively primitive domain specific language developed as part of the initial solution for traffic shaping is replaced with a new domain specific language tremor-script designed for expressive data filtration, extraction, enrichment and transformation to support:

  • Classification, Categorization and rate limiting for Traffic Shaping
  • Enrichment, Normalization and micro-format parsing
  • Structural pattern matching over hierarchical nested JSON-like data
  • The ability to patch, merge and iterate over JSON-like data
  • A familiar expression language like in regular programming languages with arithmetic, multiplicative and other common operations on numeric and string data
  • Testing string encoded data for regular expression and other micro-formats such as the logstash grok, dissect, kv or stringified embedded json

The resulting language and interpreter for tremor-script forms the basis of the solution and is largely designed around the needs of the logging domain - but delivered in a more generally useful incarnation that is more widely applicable to other domains - such as in flight data analytics, in flight ETL or processing metrics to name a few.

Most of the in-flight events are JSON or JSON-like. Inspired by a tweet the tremor team happens upon the work of Daniel Lemire et al and SIMD-accelerated JSON processing. Heinz ports this to rust and tremor’s sister project and organization simd-lite is born. The tremor team also maintains the rust port of simd-json with other maintainers interested in fast and efficient data parallel parsing of JSON.

Finally, tremor undergoes a steady pace of innovation with benchmark driven performance optimizations. The combined effect of an efficient interpreter, the ability to easily benchmark synthetic analogs of real world use cases in our benchmark system, and the adoption of more workload appropriate and efficient low level memory allocators such as jemalloc, then mimalloc, then snmalloc by Microsoft Research results an order of magnitude level of infrastructure cost savings. The production estate shrinks by 10x in terms of the number of deployed systems, the compute capacity required, and the committed memory required when compared to the displaced and replaced systems.

Further - tremor uses far less memory and resources on the source systems making it a much improved sidecar on our source hosts.

Conclusion

Tremor’s core mission and mandate now extends from high quality of service data distribution with load-adaptive traffic management for the 1% ( and typically less ) of time our systems are under-resourced; and, the other 99% of the time is 10x more cost efficient.

Tremor preserves its facility to remain invisible to developers during this phase of expansion whilst drastically improving the lives of our operational engineers and service reliability engineers. The measured and validated cost-efficiencies offer compounding value that further drives the mandate of the project and pushes it into its next phase of evolution.

· 5 min read

Happened Before

This is our origin story. The simplified high level architecture of our logging systems at the time of tremor’s birth is:

old pipeline

Identified Need

Enable load shedding and consistent capacity management at production scale of the logging and metrics firehoses within Wayfair’s production, staging and development environments; even when our live production working set exceeds planned capacity, but especially when catastrophic failure of infrastructure overwhelms our storage tier.

Required Outcome

Our network operations center, system reliability and service organization engineers should never be in a position where in-flight mission-critical events are unavailable or lagging ( seconds or even minutes ) the live working set state of our production systems.

Stale operational events direct limited resources under time constraints and production service level objective pressures to curing or tending to the wrong problems, at the wrong times and lead to missed opportunities for early and cheap resolution.

Characteristics

It is understood that, if a critical system ( such as a production database ) fails - query failures won’t have a single point of origin in the logs - whether over time, or across application or business units. When under extreme load beyond capacity - only a subset of the firehose is required for in-flight diagnostics of our production estate.

In-flight categorization, classification & rate limiting enables just-in-time traffic shaping, load-shedding and bending critical infrastructure to available capacity even as capacity changes in planned or surprising ways.

It is understood that, regardless of the system state - that some logs from a subset of systems, applications or services are relatively more important than others. This is even more true when our systems and services are under extraordinary strain due to unplanned failures and outages. Unplanned failures and outages are expected at Wayfair production scale.

The relative priority of classified and categorized events is known and the relative priority is subject to change over time, as are the event classifications and categorizations.

It is likely impossible to convince thousands of engineers in hundreds of teams across our technology organization to stop using JSON format in the short, medium and possibly even longer term.

Snot. It’s a JSON rock’n’roll world. I guess we’ll just have to deal with it

Solution

This works fine 99% of the time - but under extreme load it just does not work at our scale and the impact to our network operations and service reliability and technical communities is severe at the exact moments when we are at peak trading periods and impact to the business is highest - a perfect storm.

old pipeline

In our ELK ( ElasticSearch, Logstash, Kibana ) based logging and Influx ( Telegraf, Chronograf, InfluxDB ) based metrics environments there are limited means to dynamically tune the production system to accommodate highly variant working sets. As we are an online eCommerce system we often go through sustained periods of nominal activity followed by insane periods of extreme peak activity - often hitting peaks we’ve never before experienced.

We do however, have a very experienced infrastructure and platform engineering organization that are deeply attuned to our working set and changing business and operational needs.

The initial version of tremor simply preserved the overall pipeline:

  • Applications in a number of programming languages generate logs
  • A Logstash sidecar receives, normalizes and forwards logs to elasticsearch
  • ElasticSearch indexes logs for display by SRE’s and developers in Kibana

The processing stages remained unchanged ( and largely serviced by Logstash ):

  • Enrich raw logs with business unit, organization, application and environment metadata
  • Normalize schemas - map logs to a common structure and well known field names
  • Known unknowns - isolate user defined fields and extensions to a well known slush field

But added an extra processing stage, a tremor cluster that added back-pressure detection and load-adaptive rate-limiting based on in-flight analysis of the business events in the firehose based on real-time categorization and classification of those event streams.

Now, in the ( significantly less than 1% ) of time when our systems are over-capacity or experiencing catastrophic unplanned events - tremor can discard events based rate limits and event classifications that can be tuned by our SRE, NOC and systems architects and developers.

Conclusion

Tremor’s core mission and mandate started out with, and continues to be the non-functional quality of service of mission critical at scale event based systems. Providing system operators with first class means to shape and tune capacity, load and production countermeasures of the production firehose without compromising the business with system downtime is the core requirement.

The most effective way to achieve this in a highly fluid and ever changing environment is what has yielded the tremor project as it became to be - a general purpose event processing system designed for at scale distributed mission critical systems.

When at or over capacity systems no longer bend to the business - tremor is how we inject a little elasticity into our production quality of service - enriching our systems and services with just-in-time load-adaptive bendability.

new pipeline

· 7 min read

wayfair

Development on Tremor started in the autumn of 2018 as a point solution for introducing a configurable traffic-shaping mechanism for our infrastructure platform engineering organization.

There were two use cases being considered at the time- the traffic-shaping use case, which had the highest priority; and a distributed data replication use case for our Kafka-based environments.

The Origin of Tremor

The Traffic Shaping use case required that log information streaming from various systems be categorized and classified in-flight, so that temporally bound rate limits could be applied when our production estate has a working set that is in excess of our current capacity. As a 24x7x365 eCommerce environment with year-on-year higher peaks, higher volumetrics and higher loads on our systems, this is a frequent occurrence. Tremor was designed to remove the burden of scaling these systems from our system reliability and network operations center engineers.

This use case saw the birth of Tremor, which was delivered in 6 weeks, just in time for Cyber 5 (the long bank-holiday weekend in the US centered around Thanksgiving, where US folk go peak shopping!). At the time, Tremor was inflexible; only supported connectivity to Kafka for Ingres and Elasticsearch downstream; and, the classification and rate-limiting logic was more or less hardwired- but it worked flawlessly!

Although designed with our logging systems in mind, our metrics systems team saw that Tremor would work for their environment (InfluxDB downstream rather than Elasticsearch, but the logic was basically the same). Tremor was designed to detect backpressure on downstream systems, and to intelligently- based on user defined logic- react by adaptively tuning the data distribution to selectively discard or forward data based on a classification system and rate limits.

The Rise of Event Processing

The user-defined logic required by our logging and metrics teams very quickly became sophisticated. A fairly quick succession of releases saw the logic change from fairly simple filtering, classification (enrichment) and rate limiting (selective dropping due to backpressure) to richer needs to slice and dice nested JSON-like data, to normalize these datasets. This rising need for sophistication in processing along with the adoption of Tremor to replace Logstash and other log shipping frameworks in our source nodes, and to replace Telegraf in our metrics environments, gave birth to Tremor as an event processing engine in its own right.

With richer programming primitives, Tremor advanced from traffic shaping, and log and metrics shipping to log and metrics cleansing, normalisation, enrichment and transformation, to impose a common symbology and structure on logs and metrics generated from many different programming languages, platforms and tools across our production estate- from system logs originated via Syslog, to those via GELF, or Prometheus.

The YAML-based configuration of the query pipelines quickly became error-prone and cumbersome as the sophistication, complexity and size of application logic grew. In addition, the success of our scripting language, tremor-script, and its hygienic set of tooling and IDE integration encouraged the addition a query language, thus enabling data flow processing with Tremor.

Cloud Native Migration

At the same time, our technology organisation was preparing for migration from on-premise data centres to cloud-based systems. Our legacy VM-based systems were now being containerised and deployed via Kubernetes in our on-premise environments. Simultaneously, our Cloud-native infrastructure, configuration as code, secrets management, developer scaling and many other infrastructure and development platform teams were building out the services that would allow our line of business service engineers to migrate to the cloud.

Tremor was already battle-hardened as a sidecar deployment, and this was extended to Cloud-native deployments by our kubernetes team who packaged Tremor for kubernetes as a set of helm charts.

As a relatively fast-paced technology organisation, our logging and metrics teams weren’t stalled- the scripting and query languages enabled rapid development. However, lack of native support in Tremor to modularise and reuse logic became a limiting factor. Thus, over the span of approximately a year, most of the modularity mechanisms, now standard in Tremor, were developed.

Tremor was now battle-tested, battle-hardened and widely adopted, having gone through many enhancements and replacing many disparate tools and frameworks with a single, easy-to-operate solution.

Tremor is the first major component of in-house critical infrastructure that was open-sourced by Wayfair and contributed to the Linux Foundation under the Cloud Native Computing Foundation.

The Tremor team then selected a panel of experts and friends, who had early access to Tremor, to assist with the open-sourcing process.

Tremor Today

Today, Tremor is planned and progresses in the open. Anyone can contribute, and with contribution comes maintainership. Tremor is now a CNCF community (sandbox) project, and maintainership has grown beyond Wayfair.

At Wayfair, Tremor has been adopted for our search environments. Our search domain has some complex use cases for audited document elementisation and indexing that are multi-participant and that need to occur transactionally.

Tremor’s QoS mechanisms were extended to support transaction orchestration to enable this and similar use cases. Our search teams are also leveraging Tremor for its traditional areas of strength in traffic shaping and adaptive rate limiting.

Our logging and metrics teams are now recalibrating our technology organisation's philosophy and core infrastructure that supports observability across our production estate. The rise of CNCF OpenTelemetry offers developers a consistent and stable set of primitives for logs, metrics and distributed tracing. It’s a lingua franca that offers developers consistency.

Even more important to large or extreme scale organisations is flexibility to make infrastructure and service decisions based on capacity, cost, or other concerns. Through CNCF OpenTelemetry as a core building block, now natively supported in Tremor, our observability teams can now unify our production support, operations and services around Tremor-based OpenTelemetry- preserving the key values that Tremor adds, whilst opening up the Cloud Native new possibilities that OpenTelemetry and OpenTelemetry-based services offer.

Tremor Tomorrow

As the tremor community grows, we have seen great contributions from the growing tremor community adding support for many different types of systems - ranging from the addition of AMQP support by Nokia Communications through to our ongoing collaboration with Microsoft Research on the snmalloc allocator which tremor defaults to.

The latest production solution based on tremor at Wayfair is from our Developer Platforms technology organization - who have used an extension to PHP to identify dead code in our legacy PHP monolith. This solution is based on tremor and included contributions to tremor in the form of the unix_socket connectivity. Thank you Ramona!

The Tremor maintainers also work on the Rust port of simd-json- SIMD accelerated JSON, which is a sister project to Tremor. Mentorships via the Linux Foundation are also producing some wonderful new capabilities and enhancements to the system, ranging from better support for developing gRPC-based connectors in Tremor, to GCP connectivity, or work to add a plugin development kit to Tremor.

Wayfair is already benefiting from community interest and contributions to the project - and the Tremor team has hired its newest member as a result of open-sourcing the project. We are hoping to package and open-source more components of Wayfair’s Tremor-based systems through the Tremor Project, and through other open-source projects at Wayfair through our new Open Source Program Office.

If you are interested, hop on over to our community chat server and say hello!

· 8 min read

Introduction

Hey, I am Rohit Dandamudi from India, about to complete my undergrad in CSE and will be working as a Software Engineer soon. I will be sharing my expereince at Tremor :)

Main motivation for applying

My work involved writing "Property-based tests for tremor-script" and some of the reasons for applying are:

  • It involved a new type of testing I never heard of
  • Be part of a sandbox project where I can learn and grow with the community
  • The concept of learning Erlang + Rust was very interesting to me and frankly out of my comfort zone, as a person used to Python and web development in general.

New concepts I learned specific to my work

  • Erlang and Rust
    • My work mostly revolved around Erlang and a little Rust and I was completely new to this ecosystem, it didn't help to not find much resources or actively accessible community for Erlang.
    • I took this as a challenge and went through various resources to learn Erlang, functional programming in general and I was able to see why this Language was involved to do the task at hand, my mentor is very passionate about Erlang and shared his thought-process, experience which helped me broaden my knowledge and how to approach any concept while learning something completely new.

· 4 min read

Introduction

Hey folks, I am Nupur Agrawal, a third year student at Indian Institute of Technology Roorkee. This blog describes my experience of contributing to Tremor, CNCF sandbox project in the 2021 spring chapter of LFX Mentorship Program, under the mentorship of Matthias Wahl, Anup Dhamala and Heinz Gies.

Project Abstract

Tremor is an event processing system originally designed for the needs of platform engineering and infrastructure. It is built for the users that have a high message volume to deal with and want to build pipelines to process, route, or limit this event stream.

At the beginning of the program, I was given walkthrough of the project by Matthias and he patiently explained me the components and working of tremor. Tremor is nicely documented and the docs can be very useful for referring many things.

My Project

My project's aim was to enable tremor to receive and send Syslog Protocol Messages, a standard protocol used to send system log or event messages. It was desired to support both the standard IETF format and the old BSD format via UDP and TCP/TLS. More detailed description can be found here.

· 13 min read

Introduction

Hello folks! I'm Jigyasa, a final-year computer science engineering student at Indira Gandhi Delhi Technical University for Women pursuing my bachelor's in Technology. This blog is about my experience contributing to Tremor as part of the LFX Mentorship program. i

Learning about Tremor

Tremor is an event processing system for unstructured data with rich support for structural pattern matching, filtering, and transformation. It is built for users that have a high message volume to deal with and want to build pipelines to process, route, or limit this event stream. It has a scripting language called tremor-script and a query language as well called tremor-query or trickle.

I had never worked on an event processing system before this internship. In fact, my first major contribution to open-source was through this mentorship program. To get started with it, my mentor Darach Ennis, suggested me some documents that helped me learn more about it:

/docs/overview/ (deprecated)

/docs/course (deprecated)

Apart from that, learning more about the tremor-query, tremor-script, and going through the workshops in the docs can be really helpful.

The codebase of Tremor is in Rust, and since I had no prior experience with Rust, I started learning the language.