Skip to main content
Version: edge

gpubsub

note

Authentication happens over the GCP autentication

gpubsub_consumer

This connector allows consuming a Google PubSub queue.

Configuration

optionrequired?description
subscription_idyesID of the subscription to use in the form projects/my_project/subscriptions/test-subscription-a
connect_timeoutnoConnection timeout in nanoseconds, defaults to 1 second
ack_deadlinenoACK deadline in nanoseconds, defaults to 10 seconds. PubSub will resend the message if it's not acked within this time
max_outstanding_messagesnoThe maximum number of messages to keep outstanding at any given time, defaults to 128
max_outstanding_bytesnoThe maximum number of bytes to keep outstanding at any given time, defaults to 10MB
tokenyesThe authentication token see GCP autentication
urlnoThe endpoint for the PubSub API
config.troy
define connector gsub from gpubsub_consumer
with
codec = "string",
config = {
"subscription_id": "projects/my_project/subscriptions/test-subscription-a",
"token": "env", # required - The GCP token to use for authentication, see [GCP authentication](./index.md#GCP)
}
end;

Metadata

The connector will set the $pubsub_connector metadata variable, which is a dictionary of the messages metadata.

fieldtypedescription
message_idstringThe ID of the message, as provided by PubSub
ordering_keystringThe ordering key of the message
publish_timeintegerThe time when the message was published (as nanoseconds since 1st January 1970 00:00:00 UTC
attributesrecord with string valuesThe attributes attached to the message

Payload structure

The raw payload will be passed as is to the codec

gpubsub_producer

This connector allows producing to a Google PubSub queue.

Configuration

optionrequired?description
topicyesThe identifier of the topic, in the format of projects/PROJECT_NAME/topics/TOPIC_NAME
connect_timeoutnoConnection timeout in nanoseconds
request_timeoutnoRequest timeout in nanoseconds
urlnoThe endpoint for the PubSub API
tokenyesThe authentication token see GCP autentication
config.troy
define flow gbqtest
flow
use std::time::nanos;

define pipeline passthrough
pipeline
select event from in into out;
end;

define connector metro from metronome
with
config = {"interval": nanos::from_seconds(1) }
end;

define connector gpub from gpubsub_producer
with
codec = "json",
config = {
"topic": "projects/xxx/topics/test-topic-a", # required - the identifier of the topic
"connect_timeout": nanos::from_seconds(1), # optional - connection timeout (nanoseconds) - defaults to 10s
"request_timeout": nanos::from_seconds(10), # optional - timeout for each request (nanoseconds) - defaults to 1s
"token": "env", # required - The GCP token to use for authentication, see [GCP authentication](./index.md#GCP)
"url": "https://us-east1-pubsub.googleapis.com" # optional - the endpoint for the PubSub API, defaults to https://pubsub.googleapis.com
}
end;

create connector gpub;
create connector metro;

create pipeline passthrough;

connect /connector/metro/out to /pipeline/passthrough;
connect /pipeline/passthrough to /connector/gpub/in;
end;

deploy flow gbqtest;

Metadata

The connector will use the $gpubsub_producer metadata variable, which can be used to set the ordering_key.

fieldtypedescription
ordering_keystringThe ordering key of the message

Payload structure

The raw payload will be passed as is to the codec