Skip to main content
Version: edge

The AWS s3 Connector

The Amazon Web Services s3 provides integration with the AWS Simple Storage Service or drop-in compatible replacements such as MinIO.

Reader

config.troy
define connector s3_reader from s3_reader
with
codec = "json",
preprocessors = ["lines"],
postprocessors = ["lines"],
config = {
# The name of the AWS region to connect to
"aws_region": "eu-central-1",

# The S3 or compatible URL endpoint
"endpoint": "",

# The name of the S3 bucket for this connector
"bucket": "",

# optional - The chunk size for multi-part related uploads/downloads
# Defaults to 8MB
# multipart_chunksize: 16 * 1024 * 1024,

# optional - The size threshold over which multipart-related uploads/downloads kicks in
multipart_threshold: 16 * 1024 * 1024,

# optional - The maximum number of in-flight concurrent connections
# Defaults to 10 connections
max_connections: 1,
},
end;

Writer

config.troy
  define connector s3_writer from s3-writer
with
codec = "json",
preprocessors = ["lines"],
postprocessors = ["lines"],
config = {
# The name of the AWS region to connect to
"aws_region": "<region-name>",

# The S3 or compatible URL endpoint
"endpoint": "localhost:9000",

# The name of the S3 bucket for this connector
"bucket": "snot",

# optional - The minimum size for multipart-related
# Defaults to 5MB ( must be no less than )
# min_part_size : 10 * 1024 * 1024,
},
end;

Example

The following examples used a MinIO docker image which can be launched as follows:

$ docker run \
-p 9000:9000 \
-p 9001:9001 \
-e "MINIO_REGION=eu-central-1" \
-e "MINIO_ROOT_USER=SNOTIOSFODNN7EXAMPLE" \
-e "MINIO_ROOT_PASSWORD=wJBadger/K7MDENG/bPxRfiCYEXAMPLEKEY" \
quay.io/minio/minio server /data --console-address ":9001"

A MinIO storage service

In this example a JSON line-delimited file's content are replayed to a Minio Service mimicking AWS S3.

config.troy
define flow main
flow
use tremor::connectors;
use integration;

define connector out_s3 from s3-writer
with
codec = "json",
preprocessors = ["separate"],
postprocessors = ["separate"],
config = {
# The name of the AWS region to connect to
"aws_region": "eu-central-1",

# The S3 or compatible URL endpoint
"endpoint": "http://127.0.0.1:9000/",

# The name of the S3 bucket for this connector - should already exist as WILL NOT be created
"bucket": "snot",
},
end;

define connector in_file from file
with
codec = "json",
preprocessors = ["separate"],
config = {
"path": "in.json",
"mode": "read"
},
end;

# define connector out_s3 from s3-reader
# end;

define connector out_file from file
with
codec = "json",
preprocessors = ["separate"],
config = {
"path": "out.json",
"mode": "truncate"
},
end;

define pipeline s3_store
from in
into out, exit
pipeline
define script set_s3_key
script
use tremor;
# derive a unique s3 key for each `event`
let $ = { "s3": { "key": "snot_#{tremor::system::ingest_ns()}" }};
event
end;

create script set_s3_key from set_s3_key;

select event from in where event != "exit" into set_s3_key;
select event from set_s3_key into out;
select {"exit": 0, "delay": 1 } from in where event == "exit" into exit;
end;

create connector in_file;
create connector out_s3 from out_s3;
create connector exit from connectors::exit;
create pipeline main from s3_store;

connect /connector/in_file to /pipeline/main;
connect /pipeline/main to /connector/out_s3;
connect /pipeline/main/exit to /connector/exit;

end;
deploy flow main;

NOTES

Environment variables should be set according to the AWS environment variable - however, the access key, shared access key and region MAY differ on Minio from AWS.

A bucket called snot will need to be available.

The region in Minio settings should be the same as configured in the server startup.