Skip to main content

9 posts tagged with "case-study"

View All Tags

· 2 min read

Identified Need

We have a humongous PHP application - around 20 million lines of code. It’s believed that there are certain parts of the codebase that are no longer used, but it’s hard to reliably find out which ones. Due to the dynamic nature of PHP, attempts at static analysis have failed. We decided that dynamic analysis was needed, and created a PHP extension that logs all the calls to all functions and methods. That’s a lot of data, and we used tremor to aggregate it.

Required Outcome

We need to be able to aggregate a lot of data (the extension sends one message for each HTTP request, which can be at the order of hundreds per second for some servers), counting the number of calls to each function or method in the codebase and send it to our service which does further aggregation and presents the data. High Level Architecture

Characteristics

We used Unix Sockets (which were added as a source to tremor during this project) and then a simple script that aggregates the data. We’re hoping to use custom aggregate functions in the future (there’s an open RFC, waiting for the official voting) to further simplify the pipeline.

Conclusion

We are currently testing the solution with smaller applications, seeing minimal performance impact and no impact on stability. We were able to delete unused code from those applications without affecting their operation.

· 6 min read

As Wayfair's technology organization modernizes its infrastructure services to meet new volumetric peaks they continually adapt, adopt and atrophy out systems and services.

This is particularly difficult with shared infrastructure and services that are ever-present but seldom seen by many of the engineers in the organization.

The rise of OpenTelemetry provides an opportunity for developers to consolidate on SDKs that are consistent across programming languages and frameworks. For our SRE's and operators it offers ease of integration and ease of migration of services.

Tremor is a core enabling component of Wayfair's migration strategy. Tremor supports but is endpoint, protocol and service agnostic. This allows our operational teams to switch from on-premise to cloud native services with minimal coordination with others.

This also allows switching from in-house to managed or out of the box services supported by cloud providers.

Moving 1000s of developers to a new technology stack when you are operating continuously with no downtime and at a large scale is hard.

Identified Need

Tremor emerged from production needs in existing systems, specifically in the logging and metrics or observability domains in a large hypergrowth and heavily speciated and specialized production environment that operates 24x7x365.

As this infrastructure is migrated from a largely on-premise data-centre based environment to a cloud native environment and the wider technology organization it operates within doubles in size, change is inevitable. You can bet your roadmap on it.

One great emerging de facto standard is the observability sub-domain that cloud-native computing is propelling forward with standards such as the CNCF’s OpenTelemetry.

Tremor added initial support for OpenTelemetry in February 2021.

For decades, observability - whether logging, metrics or distributed tracing has been common. But, lacking in a unified approach, lacking in a core set of interoperable and interworking standards.

OpenTelemetry has good support for capturing and distributing logs, metrics and trace spans. It has sufficient out of the box to suit the most common needs, most of the time. It affords developers an opportunity to rationalize libraries and frameworks around shared concepts, shared understanding and shared effort. This is especially valuable in large, complex production operations such as Wayfair’s eCommerce production estate.

Languages and platforms change over time. System and component infrastructure are continuously evolving. As SaaS environments continue to evolve and innovate new forms of observability; or production operations, system reliability engineers or network operations teams who often need to move faster than the pace of application or infrastructure developers can develop new software and systems - this is a hard problem.

By targeting OpenTelemetry - developers can depend on a consistent set of SDKs for most common observability needs. Production focused teams can depend on a consistent wire-format and protocol allowing them to move from data-centre to the cloud, and rewire the infrastructure and services just in time.

Through OpenTelemetry, we can normalize the concepts, the code and the inner workings of our cloud-based systems and services - whilst introducing far more flexibility than ever before as observability services and software vendors provide a standards-based and interoperable path.

It is a win-win for all concerned.

Required Outcome

High level architecture - current system

In production at Wayfair, logging and metrics data distribution pipelines are all handled by tremor. With OpenTelemetry, distributed tracing can now be standardized across our production estate.

Solution

Standardize an Tremor and OpenTelemetry together.

Characteristics

The introduction of OpenTelemetry allows developers to standardize on Observability in applications, services and code for logging, metrics and distributed tracing. As OpenTelemetry is natively supported in tremor, it means only minor configuration changes to our existing logging and metrics services.

The new OpenTelemetry normal

With OpenTelemetry, distributed tracing can benefit from the same traffic shaping and adaptive rate limiting as the rest of our observability stack. The unification of the source, collector and distribution tiers via kafka provides scalable and recoverable telemetry shipping and distribution.

Tremor adds adaptive load shedding and traffic shaping that are tunable in production. Tremor also allows legacy observability frameworks to co-exist with OpenTelemetry for a gradual migration across the programming languages and frameworks in production use. Finally, tremor enables multiple downstream services to participate in the overall solution.

As a heterogeneous ecosystem of interconnected services - the unified observability platform based on tremor has no preferred upstream or downstream endpoint. It is system and service agnostic. It is bendable.

This allows teams that prefer the GCP ecosystem to normalize to those native services for visualization, debugging and troubleshooting. For our ElasticSearch population, Elastic’s APM may be a better alternative. For other teams, and our operational staff and folk with more ad hoc needs - DataDog may be preferable.

It’s a cloud native decentralized rock’n’roll observability world out there. Getting 5000 and growing engineers to choose a single observability path is impossible. So, as the population cannot bend, the unified observability leans on tremor for this purpose.

As improved services, frameworks and methods are onboarded our tremor-based systems can be incrementally adjusted to meet changing demands.

Insights

This application of tremor does not introduce new features or capabilities per se.

However, it is the first unified tremor-based system that spans the entire observability spectrum. It centralizes common capabilities and facilities for greater operational freedom, whilst decentralizing the point-to-point endpoint connectivity for the widest applicability across our production estate.

As tremor exposes OpenTelemetry as a client, and as an embedded server - it is effectively used to disintermediate, interpose and intework with legacy environments and to standardise on OpenTelemetry.

It does this as an incremental update. Existing users have time to migrate their systems to the new OpenTelemetry-based best-practice. Existing processes, practices and battle-tested systems are maintained.

Operators have better tools to manage the production estate and to tune capacity, performance and cost.

This is also the first tremor-based system where tremor is a key architectural primitive allowing our observability community to bend to the changing needs of our development and operational community with minimal effort, and at short notice.

Conclusion

As tremor expands to new domains such as search, service orchestration and supply-chain and logistics to name but a few - our early adopters in the logging domain have evolved from using tremor as a point solution for traffic shaping - to building our entire observability infrastructure based on tremor.

New domains will extend tremor’s capabilities in multi-participant transaction processing and distributed orchestration. The now unified observability domain will further expand and extract capabilities that enhance modularity and flexibility of tremor to build large distributed systems with a relatively simple and easy to program and growing set of languages designed for large-scale event-based processing.

· 9 min read

The support for multi-participant transaction orchestration in tremor originates with this use case from Wayfair's Search platform services team.

Identified Need

One of the frequent requests made of the tremor team by peers in the Infrastructure organization at Wayfair has come from the search domain. Search is a critical service at Wayfair and it is the powerplant behind many other services - ranging from recommendation engines through to auditing of data streams that are continually being ingested and indexed into multiple searchable databases.

At a very high level - streams of documents need to be elementized and broken down into one or many indexable items of interest - these items then need to be indexed ( successfully ) into one or many search engines.

Many of the use cases that are battle tested with tremor are relevant in this domain:

  • Cleansing, normalization and enrichment of documents and indexable elementization and tracking documents and elementized items
  • Rate limiting, capacity-based load-shedding, with domain classification similar to the traffic shaping use cases where tremor started
  • Sourcing, transformation and distribution of documents and the synthetic events in real-time at low or very low latencies

But, for the use case at hand, there are additional needs:

  • All documents must be processed transactionally, without loss and with proper reporting of processing outcome to upstream services and the documents must be processed in arrival order. The guaranteed delivery and circuit-breaker mechanisms in tremor now need to be multi-pipeline.
  • All indexable elements of all documents must be indexed in multiple downstream engines successfully ( or operator errors produced for exceptions ) while all possible error cases need to be caught and reported upstream in order to issue retries or let operators intervene. This is a reasonably orchestration mapping processed elements and tracing back to the documents the elements were produced from before publication down stream.
  • There is significant variability, variant on a case by case basis, to the exact semantics required for different document types to be processed to a varying number of downstream indexing systems and technologies. The solution needs to be modular

Gathering and aggregating multiple parallel processing outcomes and subsuming them under a common transaction is outside of the baseline scope of message-based and message-like systems as they typically only support point-to-point transactions. Correlation across multiple event streams usually needs to be solved on the application level. Where systems support orchestrated transactions - these are typically constrained by transport/protocol or other factors beyond the application authors control, and therefore inflexible under variant ( and often fast-changing ) production needs.

Required Outcome

Expand on tremor’s QoS facilities so that multi-participant transaction orchestration is possible, easily composable and user programmable.

Characteristics

The original use cases for tremor were relatively straightforward and data distribution applications with a requirement for traffic shaping and rate limiting for data streams when downstream systems were prone to being overwhelmed at peak traffic conditions.

These occurrences were rare - but their impact was high when they happened. And in an infrastructure with higher high peaks year on year this is an ever-present hazard of doing business.

More recently, delivery guarantees are expanding as new domains adopt tremor in production. In these domains data loss, even user defined and strictly capacity managed traffic shaping, is not tolerable.

Like with many real-time systems - the percentage of the overall in-flight volumetric that requires transactional delivery is typically a small subset of the overall firehose. Take financial trading systems for example - orders and trades are transactional and they need to be processed, each and every one, correctly - as there are fiscal and regulatory conditions that need to be strictly met.

But pricing - the ability to buy or sell and equity, or the current currency rate is often naturally continuously changing due to supply and demand, and naturally redundant - as you can buy or sell the same stock on many different venues.

The search case stretches the QoS mechanisms in tremor and the internal mechanisms used to track events as they are processed from a single set of flows and a small set of participants - to larger and more complex user defined flows that orchestrate transactions of arbitrary complexity.

This is compounded by tremor-based applications today being large, increasingly sophisticated and modular. So the QoS mechanisms that were originally constrained to the boundary of a single pipeline - now need to be preserved and propagated across an entire deployment.

Solution

Tremor’s core processing element - pipelines - are executable directed-acyclic graphs.

A tremor user designs a workflow or pipeline using the tremor query language.

Tremor converts this to a directed graph and makes sure that it is acyclic.

Tremor transforms and optimizes the user defined graph to an executable form that is well-structured for easily supporting easy to understand and easy to define qualities of service.

If we imagine the pipeline graph as a single larger directed-acyclic graph we have what tremor actually uses for event distribution internally. Tremor can distribute and process user defined events - these are business or data events that originate from connectors or user defined logic.

Tremor can inject control events - these are runtime events that tremor uses for quality of service and they are not ordinarily user visible. Tremor can also inject events originating at outputs ( or that propagate from downstream systems ) backwards to inputs ( or for propagation to upstream systems ). But, we can do so without introducing cycles.

The user-defined graph is acyclic. But the tremor runtime has, in effect, the ability to coordinate acknowledgements for user-defined events and an ability to signal upstream breaks in connectivity to downstream systems, or downstream breaks to upstream systems. These runtime control events - we call them signal-flow and contra-flow - are transparent to users.

Our wal ( write-ahead-log ) operator produces and consumes signal-flow and contra-flow events.

So, given a simple tremor application that has no defined QoS ( it does not use guaranteed delivery )

# File: /etc/tremor/config/lossy.trickle
select patch event of
insert hostname = system::hostname()
end
from in into out;

We can configure the wal operator

# File: /etc/tremor/config/mostly_guaranteed.trickle

use tremor::system;

define qos::wal operator in_memory_wal

with
read_count = 20,
max_elements = 1000, # Capacity limit of 1000 stored events
max_bytes = 10485760 # Capacity limit of 1MB of events
end;

create operator in_memory_wal;
select patch event of
insert hostname = system::hostname()
end
from in into in_memory_wal;

select event from in_memory_wal into out;

This is a logically equivalent application - but we can tolerate a lag of up to 1000 events or 1 megabyte of data before losing data. Under the hood of course - events are now tracked and traced. If connectors are QoS aware then we now have a more robust application.

So if we are consuming from a kafka cluster upstream and distributing to another kafka cluster downstream ( such as in another data center ) - those systems can go offline briefly or be disconnected. The connectors themselves handle lossless delivery ( that’s handled by kafka in both cases in this example ). Connectors with less strong guarantees can still be ( mostly ) lossless - so if our downstream system is HTTP-based ( like elasticsearch ) - we can tolerate transient service loss and fully recover.

What if tremor or the host it is deployed on is rebooted?

# File: etc/tremor/config/mostly_guaranteed.trickle
use tremor::system;
define qos::wal operator in_memory_wal
with
dir = ”./recovery”, # Persistent file-based recovery file
read_count = 20,
max_elements = 1000, # Capacity limit of 1000 stored events
max_bytes = 10485760 # Capacity limit of 1MB of events
end;

create operator in_memory_wal;
select patch event of
insert hostname = system::hostname()
end

from in into in_memory_wal;

select event from in_memory_wal into out;

The tremor developer doesn’t need to be too concerned with the internal mechanisms, or their implementation. And for simple applications with a single primary data flow, its as easy as the examples above to selectively introduce grades of guaranteed delivery with a spectrum of robustness that derives from choice of connectivity ( kafka vs http ) or how the qos operators are chosen, placed in a flow, and configured.

Orchestration however, is different. In an orchestrated transaction the user defined logic provided by the tremor developer also needs to do some tracking. This is achieved through using tremor’s state mechanism alongside the qos capabilities and operators that tremor provides to compose a solution.

So, in our search case - let us say we have two downstream search engines - and both need to index a different set of items of interest elementized from a single document - we use the state mechanism to track progress of the items for each participant - and when all participants have indexes up to date - we issue a synthetic event ( that can be recorded in a wal ) that publishes the document processing status downstream.

search logic

So our document source is kafka, our indexing engines for elementized items and our destination for successfully elementized documents ( which may now be enriched with elementization metadata and index metadata ) can now be published ( let’s assume kafka again for simplicity ) to an audited topic.

The state mechanism in tremor is a readable/writable value - so persistent and recoverable state is a relatively simple composition:

define script remember
script
let state = event;
event
end;

define qos::wal operator forget_me_not
with
dir = "./brain",
read_count = 1,
max_elements = 1000, # Capacity limit of 1000 stored events
max_bytes = 10485760 # Capacity limit of 1MB of events
end;

create script remember;
create operator forget_me_not;
select event from in into remember;
select event from remember into forget_me_not;
select event from forget_me_not into out;

Please don’t run out of disk space!

Conclusion

Most of the changes required to evolve tremor form supporting great qos for simple single pipeline applications to complex multi-pipeline and multi-participant stateful orchestrations did not expose new features to the tremor developer or user.

It has been a significant change to tremor internals, however and the work reaches a stable point with our 0.12 release - the ability to pause and resume connectors, and the ability for the tremor runtime itself to detect and act on quiescence will mean that tremor is flexible enough for the demands and use cases that originated in the search domain.

· 5 min read

Identified Need

The adoption growth of tremor inside Wayfair is such that we have production customers with 3.5 years of experience operating tremor in production with very large, complex tremor-based services - and fresh demands from new domains such as Search - whose requirements for multi-participant transaction orchestration are driving enhancements to qualify of service, guaranteed delivery and circuit breaker capabilities within tremor’s core processing engine.

Some usage patterns are becoming common enough that sharing tremor application logic across teams, across organizations, and across domains is now common. For example - tremor has good support for HTTP based interactions and already interfaces with a number of HTTP-based and REST APIs, and it can expose HTTP-based interfaces to participants and act as a REST-based or HTTP-based service. Many of these services share the same needs around handling HTTP errors and routing errors and alerts.

At the event flow or query level - similar levels of sophistication are emerging in the installed user base.

Required Outcome

Expand tremor’s domain specific languages to support modularity, starting with the scripting language but extending into the query language to maximise reuse of user-defined processing logic.

Characteristics

Provide standard mechanism and syntax to discover, reference and consume functional units of logic that can be shared by all tremor domain specific languages.

A new lexical preprocessing phase has been introduced allowing source to be packaged within a modular set of paths through a TREMOR_PATH environment variable. Since V0.8 of tremor the scripting and query languages both support modules, and share the same module syntax and semantics.

The tremor API was upgraded to supported preprocessed or non-modular source so that no API changes were required to extend the deployment mechanisms to support modularity.

Modular scripting

In a nutshell - this added support for user defined constants and user defined functions.

# File: my_mod.tremor

# Tail recursive implementation of fibonacci
#
fn fib_(a, b, n) of
case (a, b, n) when n > 0 => recur(b, a + b, n - 1)
default => a
end;

fn fib(n) with
fib_(0, 1, n)
end;

The module can be loaded via the module path and used in other script or query sources:

# File: fib.trickle  
# A streaming fibonacci service*

define script fibber
script
use my_mod;
my_mod::fib(event)
end;

create script fib from fibber;

select event from in into fib;

select event from fib into out;

Modules can be file-based, or defined inline in the scripting or query language. A standard set of system modules is provided by tremor out of the box.

Tremor Modules

Modularity in the query language

In the query language windows, scripts and pluggable operators may be defined and shared across queries.

There is an RFC for modular sub-query to allow query sub-graphs to be defined and shared which is being delivered as part of a tremor LFX mentorship.

Modularity in the deployment language

Modular deployments through replacing the YAML configuration syntax with a deployment language will embed the modular query language, which in turn embeds the modular scripting language.

This work is under development and will span multiple releases - but it is being designed in such a way that as clustering capabilities are added to tremor, that clustered or distributed deployments will be modularly composable by end users using the same tooling as the other DSLs within tremor.

Solution

The support for modularity is thematic. The scripting language now supports a functional programming paradigm and file-based or nested inline modules. The query language can embed modular scripts using the same module mechanism and definitions in the query language are also modular.

Modular sub-query and the introduction of the deployment language are planned and in progress and will extend modularity to the administration of running tremor nodes.

And, in the fullness of time, clustering will further extend modularity in tremor to maximize operator and tremor developer productivity through efficient reuse and sharing of common tasks.

The first set of connectivity to benefit from modularity is the support for CNCF OpenTelemetry. This is the first set of tremor connectors to be delivered that ships with its own set of modules designed to make designing OpenTelemetry based services in tremor easier.

Conclusion

As tremor grows into new domains, and the algorithm and solution complexity of traditional production domains for tremor increase in sophistication, size, complexity, the subject of modularity has evolved and new demands continue to emerge.

We expect the modularity theme to be long-lived, but its origins derive from production needs. When tremor was developed the user defined logic was small, relatively simple and applications built with tremor were fairly monolithic.

Today, multi-pipeline and medium to large tremor-based applications are common. And adoption of the modular scripting and query language primitives is now driving larger and more sophisticated use cases.

Another dimension of modularity is the ongoing expansion of connectivity in tremor. Modularity here reflects a different production-driven need ( and a few maintainer conveniences ). A plugin development kit is being developed that will allow connectors and other plugins to be developed in separately managed and maintained projects. This in turn allows the core of tremor to be managed independently of connectivity.

· 3 min read

Happened Before

The simplified architecture of our systems is currently:

old pipeline

Identified Need

Wayfair’s technology organization is continuously expanding, evolving and growing. When work on tremor began we were running almost entirely in our own data centres, on our own hardware.

Today, we are running largely in the cloud. Whether cloud-native or bare-metal the sidecar pattern has proven to be a significantly popular deployment pattern with tremor-based systems in production at Wayfair.

Tremor is equally happy with clustered ( just a bunch of event processors ) centralized deployments or with sidecar deployments; on containerized virtual machines, or alongside our growing Kubernetes estate.

With the rise of Kubernetes and cloud-native at Wayfair - we had an interesting challenge as an infrastructure organization serving the 1000’s of developers in our wider technology group. How do we continue to deliver a single pane of glass to our developers?

Although the over-simplified architecture diagram above looks simple - its far more complex in practice. We have multiple independent search and visualization clusters. And we have many more infrastructure services and clusters sitting behind these frontends.

For our application developers - this looks like a vanilla installation of ElasticSearch and Kibana. In reality its an always-changing and forever-speciating set of services running 24x7x365 offering a single-pane-of-glass for our developers convenience.

By this state of use case evolution and adoption of Tremor at Wayfair - tremor is at the centre of a lot of the internal data distribution and processing - whilst offering no protocols, APIs or transports of its own - entirely invisible to our target developer community.

Our Kubernetes team developed helm charts for tremor to preserve the convenience of this illusion; whilst accelerating the operationalisation of our emergent cloud-native infrastructure services.

Required Outcome

Boldly go cloud-native, in such a way that no-one truly notices.

Characteristics

No new features or capabilities were developed for this use case by the tremor team.

Solution

Adopt helm-based deployments of tremor for our cloud-native and kubernetes-enabled applications and services. The helm charts have since been open sourced in collaboration with our Kubernetes Team as a part of tremor, and have been extended to support OpenShift by the tremor community ( waves to Anton Whalley of Rust Dublin fame! ).

Tremor Helm Charts

Conclusion

Shortly before tremor was open-sourced and donated to the Linux Foundation it became cloud-native itself through the combined efforts of the Infrastructure group at Wayfair - especially our Platform Engineering, SRE’s and our Kubernetes teams' hard work.

We hope to productize and open source other solution packs that enable the wider tremor community to quickly bootstrap production environments based on the tremor-based systems we already have in productioning in our Logging, Metrics, Observability, Kubernetes and Search teams.

In a large way - the early successes of tremor and our adoption of cloud-native and cloud-based computing aligned the goals of the tremor project with those of the Linux Foundation and the Cloud Native Compute Foundation where tremor is now an early stage sandbox project.

If you’re reading this - and the work and philosophy resonates with your organization, your people and you believe similar benefits could elevate your production environments - then please reach out to us in the tremor community and get involved, join us and help us shape tremor’s future.

· 6 min read

Happened Before

The simplified architecture of our systems is currently:

old pipeline

Identified Need

The need to route and process data from multiple sources to multiple destinations with one or many potentially overlapping streams of data processing is already evident in the first 2 phases of Tremor’s emergence as a mission critical piece of infrastructure at Wayfair.

Growing adoption and the increased sophistication of processing needs now results in the emergence of event/data flow or pipeline processing by our production users. We summarise multiple phases of evolution here and conflate them into a single case study.

In reality the use case captured in this synopsis composes multiple smaller projects related by the same identified need spanning multiple releases into a single case study.

Required Outcome

Our systems specialists are writing increasingly complex, layered and rich business logic within tremor’s domain specific language tremor-script and are inhibited by our pipeline model which uses the YAML format for describing data flow graphs.

YAML is hard to write, and hard to debug, and the embedded scripting language is an odd fit into the pipeline configuration which is described in YAML.

Preserving the internal pipeline processing and execution mechanisms - replace the verbose YAML syntax with a statement oriented query language that extends the expression oriented scripting language.

This allows hygienic and helpful error reporting with suggested workarounds and fixes, better IDE integration and a more intuitive means to express data flow operations on multiple data streams that the pipeline mechanisms expose to tremor systems application developers.

A structured query-like language suitably captures the directed-acyclic graph nature of data flows but we need to support rich nested JSON-like data structures with leverage of the processing capabilities of the scripting language - whilst opening up new capabilities to extend the processing capabilities through custom operators, embeddable scripting logic and extensions and enhancements to QoS and non-functional primitives.

For the metrics domain - the ability to aggregate tumbling windows of events and to provide multiple aggregation dimensions with flexible grouping policies is a new requirement.

Characteristics

Replace the YAML-based pipeline configuration with a structured query-like language that embeds the scripting language, providing top grade support for processing rich data structures, extensibility through custom operators, and windowing semantics to support multi-resolution event aggregation.

The initial solution was the introduction of the tremor query language.

Solution

The query language not only replaces the YAML-based event flow syntax for describing data flow pipelines and processing - which is useful for every domain using tremor in production at Wayfair - but, it has special considerations for the metrics or other analytic domains that require rich means of grouping and aggregate processing of in-flight streaming data. The embedding of scripting logic allows the traditional domain of ETL and data processing and transformation to be natively supported with a more intuitive syntax.

A rate limiting or traffic shaping example ( common to multiple usage domains ):

# File traffic.trickle
# Very basic traffic shaping algorithm

# Define a bucket grouping operator
define grouper::bucket operator kfc;

# Logic for categorizing events

define script categorize
script
let $rate = 1;
let $class = event.`group`;
{ "event": event, "rate": $rate, "class": $class };
end;

# An instance of the grouper
create operator kfc from kfc;

# An instance of the categorizer
create script categorize;

# Stream ingested events into the categorizer
select event from in into categorize;

# Stream categorized events into the grouper
select event from categorize into kfc;

# Stream categorized and group batched events downstream
select event from kfc into out;

The relatively simple structured query-like language allows script and window definitions to be reused. The define statements do not create operator instances in the data flow graph; they create named reusable definitions that encompass the desired semantics. The create statements create the nodes of the directed acyclic graph. The select statement is a builtin operator that is used for linking nodes in the graph together to form a data flow or event processing graph.

Although the sample logic in the example is a simplified version of what our logging and metrics service teams actually develop and maintain for our production systems - it replaces 1000s of lines of cryptic YAML with hundreds of lines of easy to debug and reason about query code.

An aggregation example from the metrics or analytics domain. This example is a complete but simplified example of a tremor event processing application:

# File: aggregator.trickle
# Aggregate events using a high dynamic range histogram with 10 second, 1 minute, 10 minute
# and 1 hour aggregate summaries.*

select {
"measurement": event.measurement,
"tags": patch
event.tags of insert "window" => window
end,
# Windowed histogram - for median and a selection of quartiles
"stats": aggr::stats::hdr(event.fields[group[2]], [ "0.5","0.9", "0.99", "0.999" ]),
"class": group[2],
"timestamp": aggr::win::first(event.timestamp),
}
# The higher resolution aggregates are merged into lower resolution aggregates to conserve memory
from in[`10secs`, `1min`, `10min`, `1h`]
where event.measurement == "udp_lb"
or event.measurement == "kafka-proxy.endpoints"
or event.measurement == "burrow_group"
or event.measurement == "burrow_partition"
or event.measurement == "burrow_topic"
# The operator maintains a user defined composited set of group partitions.
# Each group maintains its own set of aggregation windows
group by set(event.measurement, event.tags,
each(record::keys(event.fields)))
into normalize
# Discard computed events with a small sample set
having event.stats.count \> 100;

In most real world tremor-based systems - the synthetic events computed in processing pipelines are tailored to conform to the schemas expected by multiple downstream systems:

# File: normalize.trickle
# Prepare computed histogram summaries for downstream systems ( eg: telegraf/influx )
select {
"measurement": event.measurement,
"tags": event.tags,
"fields": {
"count_#{event.class}": event.stats.count,
"min_#{event.class}": event.stats.min,
"max_#{event.class}": event.stats.max,
"mean_#{event.class}": event.stats.mean,
"stdev_#{event.class}": event.stats.stdev,
"var_#{event.class}": event.stats.var,
"p42_#{event.class}": event.stats.percentiles["0.42"],
"p50_#{event.class}": event.stats.percentiles["0.5"],
"p90_#{event.class}": event.stats.percentiles["0.9"],
"p99_#{event.class}": event.stats.percentiles["0.99"],
"p99.9_#{event.class}": event.stats.percentiles["0.999"]
}
}
from normalize
into out;

Compared to the disparate services and software elements that tremor replaces - the query language affords an intuitive and easy to reason about high level domain specific language to configure rich event processing and data enrichment and transformation pipelines - with a minimally terse yet easy to read form.

Conclusion

Tremor’s core mission and mandate includes the efficient declaration of arbitrarily complex directed-acyclic pipeline processing graphs that are memory and compute efficient under the hood whilst preserving transparency or remaining hidden to most of the developers in our organization by continuing to conform to external transports, protocols and service interfaces in the surrounding production infrastructure estate.

· 6 min read

Happened Before

After the initial production success of tremor as a point solution for capacity management during our peak trading cycles at Wayfair, the focus shifted a little to the next key set of issues with our traditional observability platforms.

The simplified architecture of our logging systems is currently:

old pipeline

The traffic shaping use case was a huge win for Wayfair's SRE and operational teams who manage our production estate.

Identified Need

But we could do a lot better. The development lifecycle to capture, normalize, enrich, enhance and standardize logs from 1000's of engineers, 1000's of applications built in a diversity of programming languages was a have task for our observability engineers.

Their environment still looks pretty grim:

new pipeline

Displace and replace a plethora of in-house, open-source and commercial off the shelf log capture, metrics capture and data distribution technologies with a single cost-effective at hyperscale dynamically load-adaptive solution.

The decision was taken to expand the processing capabilities of tremor beyond traffic shaping, classification and rate limiting rules to general purpose data processing of unstructured hierarchic JSON-like data streams. The shape of data most common within the observability domain we were focused upon.

Required Outcome

Our systems specialists and service reliability engineers can consolidate knowledge and tune to a unified operating experience and center of excellence in our production estate based on a single well known and cost efficient easy to operate alternative.

Characteristics

It is understood that data will be captured from and distributed to a diverse and ever-changing set of production systems - built in many programming languages and environments and distributed to an unknowable set of destination systems.

These systems include, but are not limited to: Graylog Extended Log Format, Syslog,
Kafka, TCP, UDP, Telegraf, Influx Line Protocol, ElasticSearch and many more...

Connectivity must be flexible enough to capture data from and distributed data to synchronous, asynchronous protocols, transports and systems without developers needing to learn or integrate with a new technology. The solution must be invisible to developers.

The unit economics of the existing system is hard to measure, hard to plan and hard to remediate; especially given the rising year on year firehose volumetrics, with rising velocity and rising peak loads.

99% of the data either originates as JSON, or is transformed to JSON just in time.

The target systems for displacement and replacement have very well understood and extract, load, transform, enrichment and normalization algorithms that are coded and configured differently in each system. This is difficult to maintain, enhance and evolve in an environment where infrastructure and services are continuously evolving.

A rich set of ETL and data manipulation and mutation primitives are required so that all existing methods used for data processing across the target systems can be replaced by a single easy to understand, easy to program and optimized alternative.

Solution

Tremor has already established itself in the traffic shaping domain in the logging and metrics domains - adding just-in-time traffic shaping capabilities when needed; and preserving the original system’s operational semantics and behaviours during normal operating conditions when the system is operating within planned capacity constraints.

new pipeline

The mandate now expands to replacing many of our event capture systems deployed on on-premise and cloud-native operating environments across multiple data-centres globally.

Instead of our operations and service reliability teams needing to learn and manage the service deployment and administrative lifecycle of multiple components in this evolution we displace and replace those systems - typically operating as sidecars on source systems where our applications run on virtual machines, bare metal or containerized environments with tremor.

The relatively primitive domain specific language developed as part of the initial solution for traffic shaping is replaced with a new domain specific language tremor-script designed for expressive data filtration, extraction, enrichment and transformation to support:

  • Classification, Categorization and rate limiting for Traffic Shaping
  • Enrichment, Normalization and micro-format parsing
  • Structural pattern matching over hierarchical nested JSON-like data
  • The ability to patch, merge and iterate over JSON-like data
  • A familiar expression language like in regular programming languages with arithmetic, multiplicative and other common operations on numeric and string data
  • Testing string encoded data for regular expression and other micro-formats such as the logstash grok, dissect, kv or stringified embedded json

The resulting language and interpreter for tremor-script forms the basis of the solution and is largely designed around the needs of the logging domain - but delivered in a more generally useful incarnation that is more widely applicable to other domains - such as in flight data analytics, in flight ETL or processing metrics to name a few.

Most of the in-flight events are JSON or JSON-like. Inspired by a tweet the tremor team happens upon the work of Daniel Lemire et al and SIMD-accelerated JSON processing. Heinz ports this to rust and tremor’s sister project and organization simd-lite is born. The tremor team also maintains the rust port of simd-json with other maintainers interested in fast and efficient data parallel parsing of JSON.

Finally, tremor undergoes a steady pace of innovation with benchmark driven performance optimizations. The combined effect of an efficient interpreter, the ability to easily benchmark synthetic analogs of real world use cases in our benchmark system, and the adoption of more workload appropriate and efficient low level memory allocators such as jemalloc, then mimalloc, then snmalloc by Microsoft Research results an order of magnitude level of infrastructure cost savings. The production estate shrinks by 10x in terms of the number of deployed systems, the compute capacity required, and the committed memory required when compared to the displaced and replaced systems.

Further - tremor uses far less memory and resources on the source systems making it a much improved sidecar on our source hosts.

Conclusion

Tremor’s core mission and mandate now extends from high quality of service data distribution with load-adaptive traffic management for the 1% ( and typically less ) of time our systems are under-resourced; and, the other 99% of the time is 10x more cost efficient.

Tremor preserves its facility to remain invisible to developers during this phase of expansion whilst drastically improving the lives of our operational engineers and service reliability engineers. The measured and validated cost-efficiencies offer compounding value that further drives the mandate of the project and pushes it into its next phase of evolution.

· 5 min read

Happened Before

This is our origin story. The simplified high level architecture of our logging systems at the time of tremor’s birth is:

old pipeline

Identified Need

Enable load shedding and consistent capacity management at production scale of the logging and metrics firehoses within Wayfair’s production, staging and development environments; even when our live production working set exceeds planned capacity, but especially when catastrophic failure of infrastructure overwhelms our storage tier.

Required Outcome

Our network operations center, system reliability and service organization engineers should never be in a position where in-flight mission-critical events are unavailable or lagging ( seconds or even minutes ) the live working set state of our production systems.

Stale operational events direct limited resources under time constraints and production service level objective pressures to curing or tending to the wrong problems, at the wrong times and lead to missed opportunities for early and cheap resolution.

Characteristics

It is understood that, if a critical system ( such as a production database ) fails - query failures won’t have a single point of origin in the logs - whether over time, or across application or business units. When under extreme load beyond capacity - only a subset of the firehose is required for in-flight diagnostics of our production estate.

In-flight categorization, classification & rate limiting enables just-in-time traffic shaping, load-shedding and bending critical infrastructure to available capacity even as capacity changes in planned or surprising ways.

It is understood that, regardless of the system state - that some logs from a subset of systems, applications or services are relatively more important than others. This is even more true when our systems and services are under extraordinary strain due to unplanned failures and outages. Unplanned failures and outages are expected at Wayfair production scale.

The relative priority of classified and categorized events is known and the relative priority is subject to change over time, as are the event classifications and categorizations.

It is likely impossible to convince thousands of engineers in hundreds of teams across our technology organization to stop using JSON format in the short, medium and possibly even longer term.

Snot. It’s a JSON rock’n’roll world. I guess we’ll just have to deal with it

Solution

This works fine 99% of the time - but under extreme load it just does not work at our scale and the impact to our network operations and service reliability and technical communities is severe at the exact moments when we are at peak trading periods and impact to the business is highest - a perfect storm.

old pipeline

In our ELK ( ElasticSearch, Logstash, Kibana ) based logging and Influx ( Telegraf, Chronograf, InfluxDB ) based metrics environments there are limited means to dynamically tune the production system to accommodate highly variant working sets. As we are an online eCommerce system we often go through sustained periods of nominal activity followed by insane periods of extreme peak activity - often hitting peaks we’ve never before experienced.

We do however, have a very experienced infrastructure and platform engineering organization that are deeply attuned to our working set and changing business and operational needs.

The initial version of tremor simply preserved the overall pipeline:

  • Applications in a number of programming languages generate logs
  • A Logstash sidecar receives, normalizes and forwards logs to elasticsearch
  • ElasticSearch indexes logs for display by SRE’s and developers in Kibana

The processing stages remained unchanged ( and largely serviced by Logstash ):

  • Enrich raw logs with business unit, organization, application and environment metadata
  • Normalize schemas - map logs to a common structure and well known field names
  • Known unknowns - isolate user defined fields and extensions to a well known slush field

But added an extra processing stage, a tremor cluster that added back-pressure detection and load-adaptive rate-limiting based on in-flight analysis of the business events in the firehose based on real-time categorization and classification of those event streams.

Now, in the ( significantly less than 1% ) of time when our systems are over-capacity or experiencing catastrophic unplanned events - tremor can discard events based rate limits and event classifications that can be tuned by our SRE, NOC and systems architects and developers.

Conclusion

Tremor’s core mission and mandate started out with, and continues to be the non-functional quality of service of mission critical at scale event based systems. Providing system operators with first class means to shape and tune capacity, load and production countermeasures of the production firehose without compromising the business with system downtime is the core requirement.

The most effective way to achieve this in a highly fluid and ever changing environment is what has yielded the tremor project as it became to be - a general purpose event processing system designed for at scale distributed mission critical systems.

When at or over capacity systems no longer bend to the business - tremor is how we inject a little elasticity into our production quality of service - enriching our systems and services with just-in-time load-adaptive bendability.

new pipeline

· 7 min read

wayfair

Development on Tremor started in the autumn of 2018 as a point solution for introducing a configurable traffic-shaping mechanism for our infrastructure platform engineering organization.

There were two use cases being considered at the time- the traffic-shaping use case, which had the highest priority; and a distributed data replication use case for our Kafka-based environments.

The Origin of Tremor

The Traffic Shaping use case required that log information streaming from various systems be categorized and classified in-flight, so that temporally bound rate limits could be applied when our production estate has a working set that is in excess of our current capacity. As a 24x7x365 eCommerce environment with year-on-year higher peaks, higher volumetrics and higher loads on our systems, this is a frequent occurrence. Tremor was designed to remove the burden of scaling these systems from our system reliability and network operations center engineers.

This use case saw the birth of Tremor, which was delivered in 6 weeks, just in time for Cyber 5 (the long bank-holiday weekend in the US centered around Thanksgiving, where US folk go peak shopping!). At the time, Tremor was inflexible; only supported connectivity to Kafka for Ingres and Elasticsearch downstream; and, the classification and rate-limiting logic was more or less hardwired- but it worked flawlessly!

Although designed with our logging systems in mind, our metrics systems team saw that Tremor would work for their environment (InfluxDB downstream rather than Elasticsearch, but the logic was basically the same). Tremor was designed to detect backpressure on downstream systems, and to intelligently- based on user defined logic- react by adaptively tuning the data distribution to selectively discard or forward data based on a classification system and rate limits.

The Rise of Event Processing

The user-defined logic required by our logging and metrics teams very quickly became sophisticated. A fairly quick succession of releases saw the logic change from fairly simple filtering, classification (enrichment) and rate limiting (selective dropping due to backpressure) to richer needs to slice and dice nested JSON-like data, to normalize these datasets. This rising need for sophistication in processing along with the adoption of Tremor to replace Logstash and other log shipping frameworks in our source nodes, and to replace Telegraf in our metrics environments, gave birth to Tremor as an event processing engine in its own right.

With richer programming primitives, Tremor advanced from traffic shaping, and log and metrics shipping to log and metrics cleansing, normalisation, enrichment and transformation, to impose a common symbology and structure on logs and metrics generated from many different programming languages, platforms and tools across our production estate- from system logs originated via Syslog, to those via GELF, or Prometheus.

The YAML-based configuration of the query pipelines quickly became error-prone and cumbersome as the sophistication, complexity and size of application logic grew. In addition, the success of our scripting language, tremor-script, and its hygienic set of tooling and IDE integration encouraged the addition a query language, thus enabling data flow processing with Tremor.

Cloud Native Migration

At the same time, our technology organisation was preparing for migration from on-premise data centres to cloud-based systems. Our legacy VM-based systems were now being containerised and deployed via Kubernetes in our on-premise environments. Simultaneously, our Cloud-native infrastructure, configuration as code, secrets management, developer scaling and many other infrastructure and development platform teams were building out the services that would allow our line of business service engineers to migrate to the cloud.

Tremor was already battle-hardened as a sidecar deployment, and this was extended to Cloud-native deployments by our kubernetes team who packaged Tremor for kubernetes as a set of helm charts.

As a relatively fast-paced technology organisation, our logging and metrics teams weren’t stalled- the scripting and query languages enabled rapid development. However, lack of native support in Tremor to modularise and reuse logic became a limiting factor. Thus, over the span of approximately a year, most of the modularity mechanisms, now standard in Tremor, were developed.

Tremor was now battle-tested, battle-hardened and widely adopted, having gone through many enhancements and replacing many disparate tools and frameworks with a single, easy-to-operate solution.

Tremor is the first major component of in-house critical infrastructure that was open-sourced by Wayfair and contributed to the Linux Foundation under the Cloud Native Computing Foundation.

The Tremor team then selected a panel of experts and friends, who had early access to Tremor, to assist with the open-sourcing process.

Tremor Today

Today, Tremor is planned and progresses in the open. Anyone can contribute, and with contribution comes maintainership. Tremor is now a CNCF community (sandbox) project, and maintainership has grown beyond Wayfair.

At Wayfair, Tremor has been adopted for our search environments. Our search domain has some complex use cases for audited document elementisation and indexing that are multi-participant and that need to occur transactionally.

Tremor’s QoS mechanisms were extended to support transaction orchestration to enable this and similar use cases. Our search teams are also leveraging Tremor for its traditional areas of strength in traffic shaping and adaptive rate limiting.

Our logging and metrics teams are now recalibrating our technology organisation's philosophy and core infrastructure that supports observability across our production estate. The rise of CNCF OpenTelemetry offers developers a consistent and stable set of primitives for logs, metrics and distributed tracing. It’s a lingua franca that offers developers consistency.

Even more important to large or extreme scale organisations is flexibility to make infrastructure and service decisions based on capacity, cost, or other concerns. Through CNCF OpenTelemetry as a core building block, now natively supported in Tremor, our observability teams can now unify our production support, operations and services around Tremor-based OpenTelemetry- preserving the key values that Tremor adds, whilst opening up the Cloud Native new possibilities that OpenTelemetry and OpenTelemetry-based services offer.

Tremor Tomorrow

As the tremor community grows, we have seen great contributions from the growing tremor community adding support for many different types of systems - ranging from the addition of AMQP support by Nokia Communications through to our ongoing collaboration with Microsoft Research on the snmalloc allocator which tremor defaults to.

The latest production solution based on tremor at Wayfair is from our Developer Platforms technology organization - who have used an extension to PHP to identify dead code in our legacy PHP monolith. This solution is based on tremor and included contributions to tremor in the form of the unix_socket connectivity. Thank you Ramona!

The Tremor maintainers also work on the Rust port of simd-json- SIMD accelerated JSON, which is a sister project to Tremor. Mentorships via the Linux Foundation are also producing some wonderful new capabilities and enhancements to the system, ranging from better support for developing gRPC-based connectors in Tremor, to GCP connectivity, or work to add a plugin development kit to Tremor.

Wayfair is already benefiting from community interest and contributions to the project - and the Tremor team has hired its newest member as a result of open-sourcing the project. We are hoping to package and open-source more components of Wayfair’s Tremor-based systems through the Tremor Project, and through other open-source projects at Wayfair through our new Open Source Program Office.

If you are interested, hop on over to our community chat server and say hello!