Skip to main content
Version: edge

gcs

gcs_streamer

This connector provides the ability to stream events into Google Cloud Storage.

note

Authentication happens over the GCP autentication

For this connector, while the (name, bucket name) pair does not change, the consecutive events will be appended to the same gcs object.

Modes of operation

All our object storage connectors that do uploads (s3_streamer and gcs_streamer) operate with the same two modes.

yolo

The yolo mode is going to report every event as successfully delivered if uploading it to gcs worked or not. This mode is intended for "fire-and-forget" use-cases, where it is not important if a single event in the middle of an upload failed or is missing. The upload will continue nonetheless and only be finished when a new event with another bucket or name is received.

This mode is well-suited for e.g. aggregating time-series events into gcs object by time unit (e.g. hour or day) where it is not too important that the resulting gcs object is 1:1 mirroring of all the events in the correct order without gaps.

consistent

consistent mode, on the other hand, is well suited when uploading consistent and correct gcs objects is important. In this mode, the gcs_streamer will fail a complete upload whenever only 1 single event could not be successfully uploaded to gcs. This guarantees that only complete and consistent gcs objects will show up in your buckets. But due to this logic, it is possible to lose events and whole uploads if the upstream connectors don't replay failed events. The wal connector and the kafka_consumer can replay events, so it makes sense to pair the gcs_streamer in consistent mode with those.

This mode guarantees that only complete gcs objects with all events in the right order are uploaded to gcs, object where uploading failed for some reason will be deleted, expecting the upstream to retry the upload. It is well-suited for uploading complete files, like images or documents.

Metadata

Two metadata fields are required for the connector to work - $gcs_streamer.name (will be used as the object name) and $gcs_streamer.bucket (the name of the bucket where the object will be placed).

Configuration

All of the configuration options,, aside of token, are optional.

namedescriptiondefault
urlThe HTTP(s) endpoint to which the requests will be made"https://storage.googleapis.com/upload/storage/v1"
bucketThe optional bucket to stream events into if not overwritten by event metadata $gcs_streamer.bucket
modeThe mode of operation for this connector. See Modes of operation.
connect_timeoutThe timeout for the connection (in nanoseconds)10_000_000_000 (10 seconds)
buffer_sizeThe size of a single request body, in bytes (must be divisible by 256kiB, as required by Google)8388608 (8MiB, the minimum recommended by Google)
max_retriesThe number of retries to perform for a failed request to GCS.3
backoff_base_timeBase waiting time in nanoseconds for exponential backoff when doing retries of failed requests to GCS.25_000_000 (25 ms)
tokenThe authentication token see GCP autentication

Example

config.troy
define flow main
flow
use std::{size, time::nanos};

define connector metronome from metronome
with
config = {"interval": nanos::from_millis(10)}
end;

define connector output from gcs_streamer
with
config = {
"mode": "consistent",
"buffer_size": size::kiB(64),
"backoff_base_time": nanos::from_millis(200)
},
codec = "json"
end;

define pipeline main
pipeline
define script add_meta
script
use std::string;

let file_id = event.id - (event.id % 4);

let $gcs_streamer = {
"name": "my_file_#{"#{file_id}"}.txt",
"bucket": "tremor-test-bucket"
};

emit {"a": "B"}
end;

create script add_meta from add_meta;

select event from in into add_meta;
select event from add_meta into out;
select event from add_meta/err into err;
end;

define connector console from stdio
with
codec = "json"
end;

create connector s1 from metronome;
create connector s2 from output;
create connector errors from console;

create pipeline main;

connect /connector/s1 to /pipeline/main;
connect /pipeline/main to /connector/s2;
connect /pipeline/main/err to /connector/errors;
end;

deploy flow main;