Skip to main content

· 4 min read

Welcome, Tremor Enthusiasts! This is the first post in a series intended inform and entertain Tremor Technologists with recent changes in the tremor project. We'll mostly focus these posts on Pull Requests and other notable developments. With these posts, you can stay informed and learn more about the project without having to read pull requests, or wait for release notes.

This posts' theme is open and improved communication.

Release Candidates

Where's the next release of Tremor?!

Tremor's release schedule is becoming a bit more regular. We're releasing about twice a year, every 6 to eight months. The next release of Tremor should include an exciting new way to write plugins with the plugin development kit, along with other goodies for faster, well understood tremor scripts and plenty of other performance and bug fixes.

Most notable as an update for us all is that Tremor has a release candidate live. If we're pleased with it, then a new version of tremor is soon to be released!

In this article, Let's dive into three topics: CI, PDK, and performance!

CI

One of my favorite parts of working with tremor, or any project, is using automation. Running a bunch of stuff on my machine can be quicker, but less reliable than a well established automation platform. Tremor makes an effort to improve Continuous integration regularly; and this month is no exception.

One of our active members of the Tremor Community, Pimmy has taken it upon himself to dramatically improve the CI and relase process for tremor-runtime.

The new process will automatically publish a release when we're ready. Using this automatino, we can bump versions appropriately with specially named branches and some fancy GitHub Actions. This lets the Tremor team take full advantage of the seamless GitHub experience from merge -> release, complete with logs and links directly from the Pull Request. We can even extract release notes and bump versions automatically. Once again, truly great work from our community here!

PDK (Plugin Development Kit)

I won't spend too much time on this point, since the lovely marioortizmanero has already taken the time to write out a full blog post or two on the topic. We'll give a full breakdown another time. For now: know that Tremor is currently in the works of creating an easier way for developers to plug their own binaries into the system to run connectors and pipelines.

Performance

Tremor cares a lot about performance. As you may already know from TremorCon 2021's fabulous talks; it was originally created and adopted for performance gains in Wayfair's event processing infrastructure. There are plenty of performance gains to make in a project as large as Tremor, both inside the project and through dependencies.

One such dependency we depend on to parse gigabytes of JSON per second is simd_json. simd_json is a port of the simdjson c++ library into rust. It can not only parse from JSON into Rust, but aid the serde library that can easily serialize and deserialize Rust data structures. That's pretty handy for an event processing project that will have to marshal plenty of events from disparate sources of all data forms! As you might imagine, a project like simd_json comes with a lot of configuration options, and optimizations behind those options.

Further than just the configuration that you might give serde, or simd_json, is the options you may give to the rust compiler. The compiler, by default, compiles for the largest compatability. For best performance on a specific plaform, we make use of the target-feature flag. This will allow us to target specific features available on different platforms and CPUs.

It should be no surprise that we continue this effort within the Tremor team. Know that we also depend on our community, such as a pull request from scarabsha. Sasha idenfitifed additional target-features for the target x86_64-unknown-linux-gnu that speed up the performance of simd-json. There's not a benchmark to show exactly how much faster this made json processing, but we appreciate the fact that it will undoubtedly increase performance for some clients of Tremor.

Thank You

The Tremor project as strong as the community around it. Reading this article, making contributions, and generally being involved in the project makes us more successful. Thank you for reading and contributing! See you next time.

  • Gary, and the Tremor team.

· 4 min read

Like all good projects in the open source community, great collaborations start with ad hoc interactions.

Redpanda Rocket

A recent set of discussions between the Tremor maintainers and our community brought Redpanda to our attention.

Some of our community would like to replace their Kafka deployments with Redpanda, a Kafka API-compatible streaming data platform, to realize gains in performance and reduce their total cost of operations. . As we already have Kafka connectivity, enabling this shouldn’t be too complex, right? Let’s find out.

Context

We are always looking out for interesting technology, and recently we read a very interesting article on the Redpanda blog about using WASM as server-side filters for subscription. Being as excitable as we are, we obviously had to give Redpanda a shot.

To our joy this turned out to be painless, Redpanda reuses the Kafka API so our existing connectors for Kafka work out of the box — and as a bonus we could get rid of a whole bunch of docker-compose YAML dancing that we needed to set up Zookeeper.

Alex Gallego, founder of Redpanda, reached out to us and we started experimenting with Tremor and Redpanda in this repo: github.com/tremor-rs/tremor-redpanda

Setting up Tremor and Redpanda

So, let’s get our hands dirty and actually connect Redpanda and tremor in a real-world project.

We have prepared a fully equipped workshop for this occasion. Give it a shot here if you are impatient.

Tremor can flexibly act as a Redpanda/Kafka consumer or producer, make use of auto-commit for offset management or manually commit when events are completely handled by the tremor pipeline. Here we are configuring Tremor only committing offsets when events have been successfully handled.

onramp:
- id: redpanda-in
type: kafka
codec: json
config:
brokers:
- redpanda:9092
topics:
- tremor
group_id: redpanda_es_correlation
retry_failed_events: false
rdkafka_options:
enable.auto.commit: false

This tremor application is reporting success or failure of ingesting the received events into elasticsearch via another Redpanda topic. Configuring this is also straightforward, here we have a Redpanda consumer ready for copy-pasting:

offramp:
- id: redpanda-out
type: kafka
codec: json
config:
group_id: tremor-in
brokers:
- redpanda:9092
topic: tremor

Here you go. A fully working setup for orchestrating document ingestion with Redpanda delivering the documents and receiving acknowledgements. For more details check out this Redpanda recipe on our website.

Tremor is designed to be robust when faced with high volumetric data streams. It comes with batteries included for traffic shaping, QoS and data distribution. With those tremor can guarantee at-least-once message delivery. We try to reduce CPU and memory footprint for a given workload and at the same time provide a “just works” experience for operators. And we think we found a soulmate project in Redpanda.

And most importantly, it is working like a charm. In fact we just dropped in Redpanda and expected some hours of troubleshooting, but this hope was cut short by a smooth transition:

104_redpanda_elastic_correlation-tremor_out-1     | 2021-12-10T15:17:01.828694200+00:00 INFO tremor_runtime::system - Binding onramp tremor://localhost/onramp/redpanda-in/01/out
104_redpanda_elastic_correlation-tremor_out-1 | 2021-12-10T15:17:01.830056300+00:00 INFO tremor_runtime::source::kafka - [Source::tremor://localhost/onramp/redpanda-in/01/out] Starting kafka onramp
104_redpanda_elastic_correlation-tremor_out-1 | 2021-12-10T15:17:01.831848100+00:00 INFO tremor_runtime::source::kafka - [Source::tremor://localhost/onramp/redpanda-in/01/out] Subscribing to: ["tremor"]
104_redpanda_elastic_correlation-tremor_out-1 | 2021-12-10T15:17:01.832735600+00:00 INFO tremor_runtime::source::kafka - [Source::tremor://localhost/onramp/redpanda-in/01/out] Subscription initiated...
104_redpanda_elastic_correlation-tremor_out-1 | 2021-12-10T15:17:01.833342+00:00 INFO tremor_runtime::onramp - Onramp tremor://localhost/onramp/redpanda-in/01/out started.
104_redpanda_elastic_correlation-tremor_out-1 | 2021-12-10T15:17:01.828694200+00:00 INFO tremor_runtime::system - Binding onramp tremor://localhost/onramp/redpanda-in/01/out
104_redpanda_elastic_correlation-tremor_out-1 | 2021-12-10T15:17:01.830056300+00:00 INFO tremor_runtime::source::kafka - [Source::tremor://localhost/onramp/redpanda-in/01/out] Starting kafka onramp
104_redpanda_elastic_correlation-tremor_out-1 | 2021-12-10T15:17:01.831848100+00:00 INFO tremor_runtime::source::kafka - [Source::tremor://localhost/onramp/redpanda-in/01/out] Subscribing to: ["tremor"]
104_redpanda_elastic_correlation-tremor_out-1 | 2021-12-10T15:17:01.832735600+00:00 INFO tremor_runtime::source::kafka - [Source::tremor://localhost/onramp/redpanda-in/01/out] Subscription initiated...
104_redpanda_elastic_correlation-tremor_out-1 | 2021-12-10T15:17:01.833342+00:00 INFO tremor_runtime::onramp - Onramp tremor://localhost/onramp/redpanda-in/01/out started.
...

Both in terms of operator friendliness and performance we root for Redpanda.

We started using it in our integration test suite, so users can be 100% sure Redpanda connectivity just works.

· 4 min read

Introduction

Hi folks, I'm Daksh, a senior year CS student at Indian Institute of Technology, Jammu. This blog talks about my project and experience contributing to Tremor as part of LFX Mentorship Program Fall 2021.

Learning about Tremor

I came across rust in early 2020, and I absolutely loved its design, the syntax and how approachable it was to a beginner. I discovered Tremor while looking for open source projects written in rust. Tremor is an event processing system (think kafka) for unstructured data with rich support for structural pattern-matching, filtering and transformation. Over the summers, I did a few minor PR's. Going through the examples and the docs I could set up Tremor and start hacking on!!

My Project

It is very common in event processing to stream data to a persistent storage engine for later processing or archival purposes. My job was to add connectors to stream data to AWS S3. You may find more information in the github issue.

So what is a connector? A connector is the component of an Event Processing System that provides the functionality of communicating with the outside world. This would enable current, and future users of Tremor to now connect and stream events to any endpoint which supports the S3 API.

AWS S3 Connectors

I would explain the sink via an example. To connect to S3, one would require the s3 credentials. Due to lack of support from the sdk only public-secret key credentials are supported (to be extended once the sdk supports other means for credentials). Tremor would read the key names specified in the config from the environment.

s3demo.troy
define flow s3demo
flow
define connector s3conn from s3 with
codec="json",
config={
"aws_access_token": "AWS_ACCESS_KEY_ID",
"aws_secret_access_key": "AWS_SECRET_ACCESS_KEY",
"aws_region": "AWS_REGION",
"bucket": "tremordemo",
"min_part_size": 5242880,
}
end;

define connector files3 from file with
code="json",
config={
"mode": "read",
"path": "sample.json",
},
preprocessors=["lines"]
end;

define pipeline s3pipe
pipeline
define script s3Event
script
let e = event;
let $s3 = {
"key": e.key
};
let payload = e.payload;
payload
end;

create script s3Event;

select event from in into s3Event;
select event from s3Event into out;

end;

create connector s3conn;
create connector files3;
create pipeline s3pipe;

connect /connector/files3 to /pipeline/s3pipe;
connect /pipeline/s3pipe to /connector/s3conn;

end;

deploy flow s3demo;
sample.json
{"key": "key1", "payload": {"event1": "hello1", "key2":[1,2,3,4,5]} }
{"key": "key2", "payload": {"event3": {"nested Obj": ["vec1", "vec2", "vec3"]}} }
{"key": "key3", "payload": {"event3": null}}

This configuration reads the file sample.json delimited by lines for events. The s3pipe pipeline destructures the line contents to set the data for the object to upload and its key as meta-data. The s3-sink would then upload the data to AWS S3 with key set to $key inside the bucket tremordemo or anything that is given in the config

Sample Working

The sink also has the min_part_size configuration parameter. S3 support uploading larger objects in multiple parts. One can send multiple events with the same key consecutively, and Tremor would append the content of all those events, and whenever the content size gets larger than the min_part_size, a part is uploaded to s3. Whenever the key changes or Tremor stops, the upload for the previous key is completed.

Ending Thoughts

I had a very productive and fun time with the Tremor Community. The Tremor principle of "never worry about it" has helped me to deal with clueless moments during this mentorship. I would like to express my regards and gratitude to Matthias, Heinz, and Darach for giving me this wonderful opportunity and helping me develop as an open-source contributor and as a joyful person. A special thanks to Matthias for being there to clarify my doubts and fix my mistakes and for being really helpful. I would continue to be a part of the Tremor Community and hope to engage with more newcomers to open-source. I would wish to be part of future CNCF events. You may see me around at the Tremor Discord.

· 2 min read

Identified Need

We have a humongous PHP application - around 20 million lines of code. It’s believed that there are certain parts of the codebase that are no longer used, but it’s hard to reliably find out which ones. Due to the dynamic nature of PHP, attempts at static analysis have failed. We decided that dynamic analysis was needed, and created a PHP extension that logs all the calls to all functions and methods. That’s a lot of data, and we used tremor to aggregate it.

Required Outcome

We need to be able to aggregate a lot of data (the extension sends one message for each HTTP request, which can be at the order of hundreds per second for some servers), counting the number of calls to each function or method in the codebase and send it to our service which does further aggregation and presents the data. High Level Architecture

Characteristics

We used Unix Sockets (which were added as a source to tremor during this project) and then a simple script that aggregates the data. We’re hoping to use custom aggregate functions in the future (there’s an open RFC, waiting for the official voting) to further simplify the pipeline.

Conclusion

We are currently testing the solution with smaller applications, seeing minimal performance impact and no impact on stability. We were able to delete unused code from those applications without affecting their operation.

· 6 min read

As Wayfair's technology organization modernizes its infrastructure services to meet new volumetric peaks they continually adapt, adopt and atrophy out systems and services.

This is particularly difficult with shared infrastructure and services that are ever-present but seldom seen by many of the engineers in the organization.

The rise of OpenTelemetry provides an opportunity for developers to consolidate on SDKs that are consistent across programming languages and frameworks. For our SRE's and operators it offers ease of integration and ease of migration of services.

Tremor is a core enabling component of Wayfair's migration strategy. Tremor supports but is endpoint, protocol and service agnostic. This allows our operational teams to switch from on-premise to cloud native services with minimal coordination with others.

This also allows switching from in-house to managed or out of the box services supported by cloud providers.

Moving 1000s of developers to a new technology stack when you are operating continuously with no downtime and at a large scale is hard.

Identified Need

Tremor emerged from production needs in existing systems, specifically in the logging and metrics or observability domains in a large hypergrowth and heavily speciated and specialized production environment that operates 24x7x365.

As this infrastructure is migrated from a largely on-premise data-centre based environment to a cloud native environment and the wider technology organization it operates within doubles in size, change is inevitable. You can bet your roadmap on it.

One great emerging de facto standard is the observability sub-domain that cloud-native computing is propelling forward with standards such as the CNCF’s OpenTelemetry.

Tremor added initial support for OpenTelemetry in February 2021.

For decades, observability - whether logging, metrics or distributed tracing has been common. But, lacking in a unified approach, lacking in a core set of interoperable and interworking standards.

OpenTelemetry has good support for capturing and distributing logs, metrics and trace spans. It has sufficient out of the box to suit the most common needs, most of the time. It affords developers an opportunity to rationalize libraries and frameworks around shared concepts, shared understanding and shared effort. This is especially valuable in large, complex production operations such as Wayfair’s eCommerce production estate.

Languages and platforms change over time. System and component infrastructure are continuously evolving. As SaaS environments continue to evolve and innovate new forms of observability; or production operations, system reliability engineers or network operations teams who often need to move faster than the pace of application or infrastructure developers can develop new software and systems - this is a hard problem.

By targeting OpenTelemetry - developers can depend on a consistent set of SDKs for most common observability needs. Production focused teams can depend on a consistent wire-format and protocol allowing them to move from data-centre to the cloud, and rewire the infrastructure and services just in time.

Through OpenTelemetry, we can normalize the concepts, the code and the inner workings of our cloud-based systems and services - whilst introducing far more flexibility than ever before as observability services and software vendors provide a standards-based and interoperable path.

It is a win-win for all concerned.

Required Outcome

High level architecture - current system

In production at Wayfair, logging and metrics data distribution pipelines are all handled by tremor. With OpenTelemetry, distributed tracing can now be standardized across our production estate.

Solution

Standardize an Tremor and OpenTelemetry together.

Characteristics

The introduction of OpenTelemetry allows developers to standardize on Observability in applications, services and code for logging, metrics and distributed tracing. As OpenTelemetry is natively supported in tremor, it means only minor configuration changes to our existing logging and metrics services.

The new OpenTelemetry normal

With OpenTelemetry, distributed tracing can benefit from the same traffic shaping and adaptive rate limiting as the rest of our observability stack. The unification of the source, collector and distribution tiers via kafka provides scalable and recoverable telemetry shipping and distribution.

Tremor adds adaptive load shedding and traffic shaping that are tunable in production. Tremor also allows legacy observability frameworks to co-exist with OpenTelemetry for a gradual migration across the programming languages and frameworks in production use. Finally, tremor enables multiple downstream services to participate in the overall solution.

As a heterogeneous ecosystem of interconnected services - the unified observability platform based on tremor has no preferred upstream or downstream endpoint. It is system and service agnostic. It is bendable.

This allows teams that prefer the GCP ecosystem to normalize to those native services for visualization, debugging and troubleshooting. For our ElasticSearch population, Elastic’s APM may be a better alternative. For other teams, and our operational staff and folk with more ad hoc needs - DataDog may be preferable.

It’s a cloud native decentralized rock’n’roll observability world out there. Getting 5000 and growing engineers to choose a single observability path is impossible. So, as the population cannot bend, the unified observability leans on tremor for this purpose.

As improved services, frameworks and methods are onboarded our tremor-based systems can be incrementally adjusted to meet changing demands.

Insights

This application of tremor does not introduce new features or capabilities per se.

However, it is the first unified tremor-based system that spans the entire observability spectrum. It centralizes common capabilities and facilities for greater operational freedom, whilst decentralizing the point-to-point endpoint connectivity for the widest applicability across our production estate.

As tremor exposes OpenTelemetry as a client, and as an embedded server - it is effectively used to disintermediate, interpose and intework with legacy environments and to standardise on OpenTelemetry.

It does this as an incremental update. Existing users have time to migrate their systems to the new OpenTelemetry-based best-practice. Existing processes, practices and battle-tested systems are maintained.

Operators have better tools to manage the production estate and to tune capacity, performance and cost.

This is also the first tremor-based system where tremor is a key architectural primitive allowing our observability community to bend to the changing needs of our development and operational community with minimal effort, and at short notice.

Conclusion

As tremor expands to new domains such as search, service orchestration and supply-chain and logistics to name but a few - our early adopters in the logging domain have evolved from using tremor as a point solution for traffic shaping - to building our entire observability infrastructure based on tremor.

New domains will extend tremor’s capabilities in multi-participant transaction processing and distributed orchestration. The now unified observability domain will further expand and extract capabilities that enhance modularity and flexibility of tremor to build large distributed systems with a relatively simple and easy to program and growing set of languages designed for large-scale event-based processing.

· 9 min read

The support for multi-participant transaction orchestration in tremor originates with this use case from Wayfair's Search platform services team.

Identified Need

One of the frequent requests made of the tremor team by peers in the Infrastructure organization at Wayfair has come from the search domain. Search is a critical service at Wayfair and it is the powerplant behind many other services - ranging from recommendation engines through to auditing of data streams that are continually being ingested and indexed into multiple searchable databases.

At a very high level - streams of documents need to be elementized and broken down into one or many indexable items of interest - these items then need to be indexed ( successfully ) into one or many search engines.

Many of the use cases that are battle tested with tremor are relevant in this domain:

  • Cleansing, normalization and enrichment of documents and indexable elementization and tracking documents and elementized items
  • Rate limiting, capacity-based load-shedding, with domain classification similar to the traffic shaping use cases where tremor started
  • Sourcing, transformation and distribution of documents and the synthetic events in real-time at low or very low latencies

But, for the use case at hand, there are additional needs:

  • All documents must be processed transactionally, without loss and with proper reporting of processing outcome to upstream services and the documents must be processed in arrival order. The guaranteed delivery and circuit-breaker mechanisms in tremor now need to be multi-pipeline.
  • All indexable elements of all documents must be indexed in multiple downstream engines successfully ( or operator errors produced for exceptions ) while all possible error cases need to be caught and reported upstream in order to issue retries or let operators intervene. This is a reasonably orchestration mapping processed elements and tracing back to the documents the elements were produced from before publication down stream.
  • There is significant variability, variant on a case by case basis, to the exact semantics required for different document types to be processed to a varying number of downstream indexing systems and technologies. The solution needs to be modular

Gathering and aggregating multiple parallel processing outcomes and subsuming them under a common transaction is outside of the baseline scope of message-based and message-like systems as they typically only support point-to-point transactions. Correlation across multiple event streams usually needs to be solved on the application level. Where systems support orchestrated transactions - these are typically constrained by transport/protocol or other factors beyond the application authors control, and therefore inflexible under variant ( and often fast-changing ) production needs.

Required Outcome

Expand on tremor’s QoS facilities so that multi-participant transaction orchestration is possible, easily composable and user programmable.

Characteristics

The original use cases for tremor were relatively straightforward and data distribution applications with a requirement for traffic shaping and rate limiting for data streams when downstream systems were prone to being overwhelmed at peak traffic conditions.

These occurrences were rare - but their impact was high when they happened. And in an infrastructure with higher high peaks year on year this is an ever-present hazard of doing business.

More recently, delivery guarantees are expanding as new domains adopt tremor in production. In these domains data loss, even user defined and strictly capacity managed traffic shaping, is not tolerable.

Like with many real-time systems - the percentage of the overall in-flight volumetric that requires transactional delivery is typically a small subset of the overall firehose. Take financial trading systems for example - orders and trades are transactional and they need to be processed, each and every one, correctly - as there are fiscal and regulatory conditions that need to be strictly met.

But pricing - the ability to buy or sell and equity, or the current currency rate is often naturally continuously changing due to supply and demand, and naturally redundant - as you can buy or sell the same stock on many different venues.

The search case stretches the QoS mechanisms in tremor and the internal mechanisms used to track events as they are processed from a single set of flows and a small set of participants - to larger and more complex user defined flows that orchestrate transactions of arbitrary complexity.

This is compounded by tremor-based applications today being large, increasingly sophisticated and modular. So the QoS mechanisms that were originally constrained to the boundary of a single pipeline - now need to be preserved and propagated across an entire deployment.

Solution

Tremor’s core processing element - pipelines - are executable directed-acyclic graphs.

A tremor user designs a workflow or pipeline using the tremor query language.

Tremor converts this to a directed graph and makes sure that it is acyclic.

Tremor transforms and optimizes the user defined graph to an executable form that is well-structured for easily supporting easy to understand and easy to define qualities of service.

If we imagine the pipeline graph as a single larger directed-acyclic graph we have what tremor actually uses for event distribution internally. Tremor can distribute and process user defined events - these are business or data events that originate from connectors or user defined logic.

Tremor can inject control events - these are runtime events that tremor uses for quality of service and they are not ordinarily user visible. Tremor can also inject events originating at outputs ( or that propagate from downstream systems ) backwards to inputs ( or for propagation to upstream systems ). But, we can do so without introducing cycles.

The user-defined graph is acyclic. But the tremor runtime has, in effect, the ability to coordinate acknowledgements for user-defined events and an ability to signal upstream breaks in connectivity to downstream systems, or downstream breaks to upstream systems. These runtime control events - we call them signal-flow and contra-flow - are transparent to users.

Our wal ( write-ahead-log ) operator produces and consumes signal-flow and contra-flow events.

So, given a simple tremor application that has no defined QoS ( it does not use guaranteed delivery )

/etc/tremor/config/lossy.trickle
select patch event of
insert hostname = system::hostname()
end
from in into out;

We can configure the wal operator

/etc/tremor/config/mostly_guaranteed.trickle

use tremor::system;

define qos::wal operator in_memory_wal

with
read_count = 20,
max_elements = 1000, # Capacity limit of 1000 stored events
max_bytes = 10485760 # Capacity limit of 1MB of events
end;

create operator in_memory_wal;
select patch event of
insert hostname = system::hostname()
end
from in into in_memory_wal;

select event from in_memory_wal into out;

This is a logically equivalent application - but we can tolerate a lag of up to 1000 events or 1 megabyte of data before losing data. Under the hood of course - events are now tracked and traced. If connectors are QoS aware then we now have a more robust application.

So if we are consuming from a kafka cluster upstream and distributing to another kafka cluster downstream ( such as in another data center ) - those systems can go offline briefly or be disconnected. The connectors themselves handle lossless delivery ( that’s handled by kafka in both cases in this example ). Connectors with less strong guarantees can still be ( mostly ) lossless - so if our downstream system is HTTP-based ( like elasticsearch ) - we can tolerate transient service loss and fully recover.

What if tremor or the host it is deployed on is rebooted?

/etc/tremor/config/mostly_guaranteed.trickle
use tremor::system;
define qos::wal operator in_memory_wal
with
dir = ”./recovery”, # Persistent file-based recovery file
read_count = 20,
max_elements = 1000, # Capacity limit of 1000 stored events
max_bytes = 10485760 # Capacity limit of 1MB of events
end;

create operator in_memory_wal;
select patch event of
insert hostname = system::hostname()
end

from in into in_memory_wal;

select event from in_memory_wal into out;

The tremor developer doesn’t need to be too concerned with the internal mechanisms, or their implementation. And for simple applications with a single primary data flow, its as easy as the examples above to selectively introduce grades of guaranteed delivery with a spectrum of robustness that derives from choice of connectivity ( kafka vs http ) or how the qos operators are chosen, placed in a flow, and configured.

Orchestration however, is different. In an orchestrated transaction the user defined logic provided by the tremor developer also needs to do some tracking. This is achieved through using tremor’s state mechanism alongside the qos capabilities and operators that tremor provides to compose a solution.

So, in our search case - let us say we have two downstream search engines - and both need to index a different set of items of interest elementized from a single document - we use the state mechanism to track progress of the items for each participant - and when all participants have indexes up to date - we issue a synthetic event ( that can be recorded in a wal ) that publishes the document processing status downstream.

search logic

So our document source is kafka, our indexing engines for elementized items and our destination for successfully elementized documents ( which may now be enriched with elementization metadata and index metadata ) can now be published ( let’s assume kafka again for simplicity ) to an audited topic.

The state mechanism in tremor is a readable/writable value - so persistent and recoverable state is a relatively simple composition:

define script remember
script
let state = event;
event
end;

define qos::wal operator forget_me_not
with
dir = "./brain",
read_count = 1,
max_elements = 1000, # Capacity limit of 1000 stored events
max_bytes = 10485760 # Capacity limit of 1MB of events
end;

create script remember;
create operator forget_me_not;
select event from in into remember;
select event from remember into forget_me_not;
select event from forget_me_not into out;

Please don’t run out of disk space!

Conclusion

Most of the changes required to evolve tremor form supporting great qos for simple single pipeline applications to complex multi-pipeline and multi-participant stateful orchestrations did not expose new features to the tremor developer or user.

It has been a significant change to tremor internals, however and the work reaches a stable point with our 0.12 release - the ability to pause and resume connectors, and the ability for the tremor runtime itself to detect and act on quiescence will mean that tremor is flexible enough for the demands and use cases that originated in the search domain.

· 5 min read

Identified Need

The adoption growth of tremor inside Wayfair is such that we have production customers with 3.5 years of experience operating tremor in production with very large, complex tremor-based services - and fresh demands from new domains such as Search - whose requirements for multi-participant transaction orchestration are driving enhancements to qualify of service, guaranteed delivery and circuit breaker capabilities within tremor’s core processing engine.

Some usage patterns are becoming common enough that sharing tremor application logic across teams, across organizations, and across domains is now common. For example - tremor has good support for HTTP based interactions and already interfaces with a number of HTTP-based and REST APIs, and it can expose HTTP-based interfaces to participants and act as a REST-based or HTTP-based service. Many of these services share the same needs around handling HTTP errors and routing errors and alerts.

At the event flow or query level - similar levels of sophistication are emerging in the installed user base.

Required Outcome

Expand tremor’s domain specific languages to support modularity, starting with the scripting language but extending into the query language to maximise reuse of user-defined processing logic.

Characteristics

Provide standard mechanism and syntax to discover, reference and consume functional units of logic that can be shared by all tremor domain specific languages.

A new lexical preprocessing phase has been introduced allowing source to be packaged within a modular set of paths through a TREMOR_PATH environment variable. Since V0.8 of tremor the scripting and query languages both support modules, and share the same module syntax and semantics.

The tremor API was upgraded to supported preprocessed or non-modular source so that no API changes were required to extend the deployment mechanisms to support modularity.

Modular scripting

In a nutshell - this added support for user defined constants and user defined functions.

my_mod.tremor
# Tail recursive implementation of fibonacci
#
fn fib_(a, b, n) of
case (a, b, n) when n > 0 => recur(b, a + b, n - 1)
default => a
end;

fn fib(n) with
fib_(0, 1, n)
end;

The module can be loaded via the module path and used in other script or query sources:

fib.trickle
# A streaming fibonacci service*

define script fibber
script
use my_mod;
my_mod::fib(event)
end;

create script fib from fibber;

select event from in into fib;

select event from fib into out;

Modules can be file-based, or defined inline in the scripting or query language. A standard set of system modules is provided by tremor out of the box.

Tremor Modules

Modularity in the query language

In the query language windows, scripts and pluggable operators may be defined and shared across queries.

There is an RFC for modular sub-query to allow query sub-graphs to be defined and shared which is being delivered as part of a tremor LFX mentorship.

Modularity in the deployment language

Modular deployments through replacing the YAML configuration syntax with a deployment language will embed the modular query language, which in turn embeds the modular scripting language.

This work is under development and will span multiple releases - but it is being designed in such a way that as clustering capabilities are added to tremor, that clustered or distributed deployments will be modularly composable by end users using the same tooling as the other DSLs within tremor.

Solution

The support for modularity is thematic. The scripting language now supports a functional programming paradigm and file-based or nested inline modules. The query language can embed modular scripts using the same module mechanism and definitions in the query language are also modular.

Modular sub-query and the introduction of the deployment language are planned and in progress and will extend modularity to the administration of running tremor nodes.

And, in the fullness of time, clustering will further extend modularity in tremor to maximize operator and tremor developer productivity through efficient reuse and sharing of common tasks.

The first set of connectivity to benefit from modularity is the support for CNCF OpenTelemetry. This is the first set of tremor connectors to be delivered that ships with its own set of modules designed to make designing OpenTelemetry based services in tremor easier.

Conclusion

As tremor grows into new domains, and the algorithm and solution complexity of traditional production domains for tremor increase in sophistication, size, complexity, the subject of modularity has evolved and new demands continue to emerge.

We expect the modularity theme to be long-lived, but its origins derive from production needs. When tremor was developed the user defined logic was small, relatively simple and applications built with tremor were fairly monolithic.

Today, multi-pipeline and medium to large tremor-based applications are common. And adoption of the modular scripting and query language primitives is now driving larger and more sophisticated use cases.

Another dimension of modularity is the ongoing expansion of connectivity in tremor. Modularity here reflects a different production-driven need ( and a few maintainer conveniences ). A plugin development kit is being developed that will allow connectors and other plugins to be developed in separately managed and maintained projects. This in turn allows the core of tremor to be managed independently of connectivity.

· 3 min read

Happened Before

The simplified architecture of our systems is currently:

old pipeline

Identified Need

Wayfair’s technology organization is continuously expanding, evolving and growing. When work on tremor began we were running almost entirely in our own data centres, on our own hardware.

Today, we are running largely in the cloud. Whether cloud-native or bare-metal the sidecar pattern has proven to be a significantly popular deployment pattern with tremor-based systems in production at Wayfair.

Tremor is equally happy with clustered ( just a bunch of event processors ) centralized deployments or with sidecar deployments; on containerized virtual machines, or alongside our growing Kubernetes estate.

With the rise of Kubernetes and cloud-native at Wayfair - we had an interesting challenge as an infrastructure organization serving the 1000’s of developers in our wider technology group. How do we continue to deliver a single pane of glass to our developers?

Although the over-simplified architecture diagram above looks simple - its far more complex in practice. We have multiple independent search and visualization clusters. And we have many more infrastructure services and clusters sitting behind these frontends.

For our application developers - this looks like a vanilla installation of ElasticSearch and Kibana. In reality its an always-changing and forever-speciating set of services running 24x7x365 offering a single-pane-of-glass for our developers convenience.

By this state of use case evolution and adoption of Tremor at Wayfair - tremor is at the centre of a lot of the internal data distribution and processing - whilst offering no protocols, APIs or transports of its own - entirely invisible to our target developer community.

Our Kubernetes team developed helm charts for tremor to preserve the convenience of this illusion; whilst accelerating the operationalisation of our emergent cloud-native infrastructure services.

Required Outcome

Boldly go cloud-native, in such a way that no-one truly notices.

Characteristics

No new features or capabilities were developed for this use case by the tremor team.

Solution

Adopt helm-based deployments of tremor for our cloud-native and kubernetes-enabled applications and services. The helm charts have since been open sourced in collaboration with our Kubernetes Team as a part of tremor, and have been extended to support OpenShift by the tremor community ( waves to Anton Whalley of Rust Dublin fame! ).

Tremor Helm Charts

Conclusion

Shortly before tremor was open-sourced and donated to the Linux Foundation it became cloud-native itself through the combined efforts of the Infrastructure group at Wayfair - especially our Platform Engineering, SRE’s and our Kubernetes teams' hard work.

We hope to productize and open source other solution packs that enable the wider tremor community to quickly bootstrap production environments based on the tremor-based systems we already have in productioning in our Logging, Metrics, Observability, Kubernetes and Search teams.

In a large way - the early successes of tremor and our adoption of cloud-native and cloud-based computing aligned the goals of the tremor project with those of the Linux Foundation and the Cloud Native Compute Foundation where tremor is now an early stage sandbox project.

If you’re reading this - and the work and philosophy resonates with your organization, your people and you believe similar benefits could elevate your production environments - then please reach out to us in the tremor community and get involved, join us and help us shape tremor’s future.

· 6 min read

Happened Before

The simplified architecture of our systems is currently:

old pipeline

Identified Need

The need to route and process data from multiple sources to multiple destinations with one or many potentially overlapping streams of data processing is already evident in the first 2 phases of Tremor’s emergence as a mission critical piece of infrastructure at Wayfair.

Growing adoption and the increased sophistication of processing needs now results in the emergence of event/data flow or pipeline processing by our production users. We summarise multiple phases of evolution here and conflate them into a single case study.

In reality the use case captured in this synopsis composes multiple smaller projects related by the same identified need spanning multiple releases into a single case study.

Required Outcome

Our systems specialists are writing increasingly complex, layered and rich business logic within tremor’s domain specific language tremor-script and are inhibited by our pipeline model which uses the YAML format for describing data flow graphs.

YAML is hard to write, and hard to debug, and the embedded scripting language is an odd fit into the pipeline configuration which is described in YAML.

Preserving the internal pipeline processing and execution mechanisms - replace the verbose YAML syntax with a statement oriented query language that extends the expression oriented scripting language.

This allows hygienic and helpful error reporting with suggested workarounds and fixes, better IDE integration and a more intuitive means to express data flow operations on multiple data streams that the pipeline mechanisms expose to tremor systems application developers.

A structured query-like language suitably captures the directed-acyclic graph nature of data flows but we need to support rich nested JSON-like data structures with leverage of the processing capabilities of the scripting language - whilst opening up new capabilities to extend the processing capabilities through custom operators, embeddable scripting logic and extensions and enhancements to QoS and non-functional primitives.

For the metrics domain - the ability to aggregate tumbling windows of events and to provide multiple aggregation dimensions with flexible grouping policies is a new requirement.

Characteristics

Replace the YAML-based pipeline configuration with a structured query-like language that embeds the scripting language, providing top grade support for processing rich data structures, extensibility through custom operators, and windowing semantics to support multi-resolution event aggregation.

The initial solution was the introduction of the tremor query language.

Solution

The query language not only replaces the YAML-based event flow syntax for describing data flow pipelines and processing - which is useful for every domain using tremor in production at Wayfair - but, it has special considerations for the metrics or other analytic domains that require rich means of grouping and aggregate processing of in-flight streaming data. The embedding of scripting logic allows the traditional domain of ETL and data processing and transformation to be natively supported with a more intuitive syntax.

A rate limiting or traffic shaping example ( common to multiple usage domains ):

# File traffic.trickle
# Very basic traffic shaping algorithm

# Define a bucket grouping operator
define grouper::bucket operator kfc;

# Logic for categorizing events

define script categorize
script
let $rate = 1;
let $class = event.`group`;
{ "event": event, "rate": $rate, "class": $class };
end;

# An instance of the grouper
create operator kfc from kfc;

# An instance of the categorizer
create script categorize;

# Stream ingested events into the categorizer
select event from in into categorize;

# Stream categorized events into the grouper
select event from categorize into kfc;

# Stream categorized and group batched events downstream
select event from kfc into out;

The relatively simple structured query-like language allows script and window definitions to be reused. The define statements do not create operator instances in the data flow graph; they create named reusable definitions that encompass the desired semantics. The create statements create the nodes of the directed acyclic graph. The select statement is a builtin operator that is used for linking nodes in the graph together to form a data flow or event processing graph.

Although the sample logic in the example is a simplified version of what our logging and metrics service teams actually develop and maintain for our production systems - it replaces 1000s of lines of cryptic YAML with hundreds of lines of easy to debug and reason about query code.

An aggregation example from the metrics or analytics domain. This example is a complete but simplified example of a tremor event processing application:

aggregator.trickle
# Aggregate events using a high dynamic range histogram with 10 second, 1 minute, 10 minute
# and 1 hour aggregate summaries.*

select {
"measurement": event.measurement,
"tags": patch
event.tags of insert "window" => window
end,
# Windowed histogram - for median and a selection of quartiles
"stats": aggr::stats::hdr(event.fields[group[2]], [ "0.5","0.9", "0.99", "0.999" ]),
"class": group[2],
"timestamp": aggr::win::first(event.timestamp),
}
# The higher resolution aggregates are merged into lower resolution aggregates to conserve memory
from in[`10secs`, `1min`, `10min`, `1h`]
where event.measurement == "udp_lb"
or event.measurement == "kafka-proxy.endpoints"
or event.measurement == "burrow_group"
or event.measurement == "burrow_partition"
or event.measurement == "burrow_topic"
# The operator maintains a user defined composited set of group partitions.
# Each group maintains its own set of aggregation windows
group by set(event.measurement, event.tags,
each(record::keys(event.fields)))
into normalize
# Discard computed events with a small sample set
having event.stats.count \> 100;

In most real world tremor-based systems - the synthetic events computed in processing pipelines are tailored to conform to the schemas expected by multiple downstream systems:

normalize.trickle
# Prepare computed histogram summaries for downstream systems ( eg: telegraf/influx )
select {
"measurement": event.measurement,
"tags": event.tags,
"fields": {
"count_#{event.class}": event.stats.count,
"min_#{event.class}": event.stats.min,
"max_#{event.class}": event.stats.max,
"mean_#{event.class}": event.stats.mean,
"stdev_#{event.class}": event.stats.stdev,
"var_#{event.class}": event.stats.var,
"p42_#{event.class}": event.stats.percentiles["0.42"],
"p50_#{event.class}": event.stats.percentiles["0.5"],
"p90_#{event.class}": event.stats.percentiles["0.9"],
"p99_#{event.class}": event.stats.percentiles["0.99"],
"p99.9_#{event.class}": event.stats.percentiles["0.999"]
}
}
from normalize
into out;

Compared to the disparate services and software elements that tremor replaces - the query language affords an intuitive and easy to reason about high level domain specific language to configure rich event processing and data enrichment and transformation pipelines - with a minimally terse yet easy to read form.

Conclusion

Tremor’s core mission and mandate includes the efficient declaration of arbitrarily complex directed-acyclic pipeline processing graphs that are memory and compute efficient under the hood whilst preserving transparency or remaining hidden to most of the developers in our organization by continuing to conform to external transports, protocols and service interfaces in the surrounding production infrastructure estate.

· 6 min read

Happened Before

After the initial production success of tremor as a point solution for capacity management during our peak trading cycles at Wayfair, the focus shifted a little to the next key set of issues with our traditional observability platforms.

The simplified architecture of our logging systems is currently:

old pipeline

The traffic shaping use case was a huge win for Wayfair's SRE and operational teams who manage our production estate.

Identified Need

But we could do a lot better. The development lifecycle to capture, normalize, enrich, enhance and standardize logs from 1000's of engineers, 1000's of applications built in a diversity of programming languages was a have task for our observability engineers.

Their environment still looks pretty grim:

new pipeline

Displace and replace a plethora of in-house, open-source and commercial off the shelf log capture, metrics capture and data distribution technologies with a single cost-effective at hyperscale dynamically load-adaptive solution.

The decision was taken to expand the processing capabilities of tremor beyond traffic shaping, classification and rate limiting rules to general purpose data processing of unstructured hierarchic JSON-like data streams. The shape of data most common within the observability domain we were focused upon.

Required Outcome

Our systems specialists and service reliability engineers can consolidate knowledge and tune to a unified operating experience and center of excellence in our production estate based on a single well known and cost efficient easy to operate alternative.

Characteristics

It is understood that data will be captured from and distributed to a diverse and ever-changing set of production systems - built in many programming languages and environments and distributed to an unknowable set of destination systems.

These systems include, but are not limited to: Graylog Extended Log Format, Syslog,
Kafka, TCP, UDP, Telegraf, Influx Line Protocol, ElasticSearch and many more...

Connectivity must be flexible enough to capture data from and distributed data to synchronous, asynchronous protocols, transports and systems without developers needing to learn or integrate with a new technology. The solution must be invisible to developers.

The unit economics of the existing system is hard to measure, hard to plan and hard to remediate; especially given the rising year on year firehose volumetrics, with rising velocity and rising peak loads.

99% of the data either originates as JSON, or is transformed to JSON just in time.

The target systems for displacement and replacement have very well understood and extract, load, transform, enrichment and normalization algorithms that are coded and configured differently in each system. This is difficult to maintain, enhance and evolve in an environment where infrastructure and services are continuously evolving.

A rich set of ETL and data manipulation and mutation primitives are required so that all existing methods used for data processing across the target systems can be replaced by a single easy to understand, easy to program and optimized alternative.

Solution

Tremor has already established itself in the traffic shaping domain in the logging and metrics domains - adding just-in-time traffic shaping capabilities when needed; and preserving the original system’s operational semantics and behaviours during normal operating conditions when the system is operating within planned capacity constraints.

new pipeline

The mandate now expands to replacing many of our event capture systems deployed on on-premise and cloud-native operating environments across multiple data-centres globally.

Instead of our operations and service reliability teams needing to learn and manage the service deployment and administrative lifecycle of multiple components in this evolution we displace and replace those systems - typically operating as sidecars on source systems where our applications run on virtual machines, bare metal or containerized environments with tremor.

The relatively primitive domain specific language developed as part of the initial solution for traffic shaping is replaced with a new domain specific language tremor-script designed for expressive data filtration, extraction, enrichment and transformation to support:

  • Classification, Categorization and rate limiting for Traffic Shaping
  • Enrichment, Normalization and micro-format parsing
  • Structural pattern matching over hierarchical nested JSON-like data
  • The ability to patch, merge and iterate over JSON-like data
  • A familiar expression language like in regular programming languages with arithmetic, multiplicative and other common operations on numeric and string data
  • Testing string encoded data for regular expression and other micro-formats such as the logstash grok, dissect, kv or stringified embedded json

The resulting language and interpreter for tremor-script forms the basis of the solution and is largely designed around the needs of the logging domain - but delivered in a more generally useful incarnation that is more widely applicable to other domains - such as in flight data analytics, in flight ETL or processing metrics to name a few.

Most of the in-flight events are JSON or JSON-like. Inspired by a tweet the tremor team happens upon the work of Daniel Lemire et al and SIMD-accelerated JSON processing. Heinz ports this to rust and tremor’s sister project and organization simd-lite is born. The tremor team also maintains the rust port of simd-json with other maintainers interested in fast and efficient data parallel parsing of JSON.

Finally, tremor undergoes a steady pace of innovation with benchmark driven performance optimizations. The combined effect of an efficient interpreter, the ability to easily benchmark synthetic analogs of real world use cases in our benchmark system, and the adoption of more workload appropriate and efficient low level memory allocators such as jemalloc, then mimalloc, then snmalloc by Microsoft Research results an order of magnitude level of infrastructure cost savings. The production estate shrinks by 10x in terms of the number of deployed systems, the compute capacity required, and the committed memory required when compared to the displaced and replaced systems.

Further - tremor uses far less memory and resources on the source systems making it a much improved sidecar on our source hosts.

Conclusion

Tremor’s core mission and mandate now extends from high quality of service data distribution with load-adaptive traffic management for the 1% ( and typically less ) of time our systems are under-resourced; and, the other 99% of the time is 10x more cost efficient.

Tremor preserves its facility to remain invisible to developers during this phase of expansion whilst drastically improving the lives of our operational engineers and service reliability engineers. The measured and validated cost-efficiencies offer compounding value that further drives the mandate of the project and pushes it into its next phase of evolution.