gpubsub
note
Authentication happens over the GCP autentication
gpubsub_consumer
This connector allows consuming a Google PubSub queue.
Configuration
| option | required? | description | 
|---|---|---|
subscription_id | yes | ID of the subscription to use in the form projects/my_project/subscriptions/test-subscription-a | 
connect_timeout | no | Connection timeout in nanoseconds, defaults to 1 second | 
ack_deadline | no | ACK deadline in nanoseconds, defaults to 10 seconds. PubSub will resend the message if it's not acked within this time | 
max_outstanding_messages | no | The maximum number of messages to keep outstanding at any given time, defaults to 128 | 
max_outstanding_bytes | no | The maximum number of bytes to keep outstanding at any given time, defaults to 10MB | 
token | yes | The authentication token see GCP autentication | 
url | no | The endpoint for the PubSub API | 
config.troy
define connector gsub from gpubsub_consumer
with
    codec = "string",
    config = {
        "subscription_id": "projects/my_project/subscriptions/test-subscription-a",
        "token": "env", # required  - The GCP token to use for authentication, see [GCP authentication](./index.md#GCP)
    }
end;
Metadata
The connector will set the $pubsub_connector metadata variable, which is a dictionary of the messages metadata.
| field | type | description | 
|---|---|---|
message_id | string | The ID of the message, as provided by PubSub | 
ordering_key | string | The ordering key of the message | 
publish_time | integer | The time when the message was published (as nanoseconds since 1st January 1970 00:00:00 UTC | 
attributes | record with string values | The attributes attached to the message | 
Payload structure
The raw payload will be passed as is to the codec
gpubsub_producer
This connector allows producing to a Google PubSub queue.
Configuration
| option | required? | description | 
|---|---|---|
topic | yes | The identifier of the topic, in the format of projects/PROJECT_NAME/topics/TOPIC_NAME | 
connect_timeout | no | Connection timeout in nanoseconds | 
request_timeout | no | Request timeout in nanoseconds | 
url | no | The endpoint for the PubSub API | 
token | yes | The authentication token see GCP autentication | 
config.troy
define flow gbqtest
flow
    use std::time::nanos;
    
    define pipeline passthrough
    pipeline
        select event from in into out;
    end;
    define connector metro from metronome
    with
        config = {"interval": nanos::from_seconds(1) }
    end;
    define connector gpub from gpubsub_producer
    with
        codec = "json",
        config = {
            "topic": "projects/xxx/topics/test-topic-a", # required - the identifier of the topic
            "connect_timeout": nanos::from_seconds(1), # optional - connection timeout (nanoseconds) - defaults to 10s
            "request_timeout": nanos::from_seconds(10), # optional - timeout for each request (nanoseconds) - defaults to 1s
            "token": "env", # required  - The GCP token to use for authentication, see [GCP authentication](./index.md#GCP)
            "url":  "https://us-east1-pubsub.googleapis.com" # optional - the endpoint for the PubSub API, defaults to https://pubsub.googleapis.com
        }
    end;
    create connector gpub;
    create connector metro;
    create pipeline passthrough;
    connect /connector/metro/out to /pipeline/passthrough;
    connect /pipeline/passthrough to /connector/gpub/in;
end;
deploy flow gbqtest;
Metadata
The connector will use the $gpubsub_producer metadata variable, which can be used to set the ordering_key.
| field | type | description | 
|---|---|---|
ordering_key | string | The ordering key of the message | 
Payload structure
The raw payload will be passed as is to the codec