Categorygithub.com/noctarius/timescaledb-event-streamer

# README

timescaledb-event-streamer

timescaledb-event-streamer is a command line program to create a stream of CDC (Change Data Capture) TimescaleDB™ Hypertable and Continuous Aggregate events from a PostgreSQL installation running the TimescaleDB extension. In addition, it also supports capturing events of vanilla PostgreSQL tables.

Change Data Capture is a technology where insert, update, delete and similar operations inside the database generate a corresponding set of events, which are commonly distributed through a messaging connector, such as Kafka, NATS, or similar.

Attention: This is not an official Timescale™ project, but just developed by a person who used to work for Timescale.

Trademark information: Timescale (TIMESCALE) and TimescaleDB (TIMESCALEDB) are registered trademarks of Timescale, Inc.

Why not just Debezium?

  • Slightly outdated:
    While Debezium already supports PostgreSQL, the implementation doesn't really support the internals of TimescaleDB, most specifically the way data is chunked or partitioned. It is possible to use Debezium to capture change events of the actual chunks itself, but without the catalog handling. This means that every chunk would emit changes on its own, but with no reference to its parent hypertable. The timescaledb-event-streamer changes this, by handling the catalog updates and resolving the parent hypertable before emitting the events.
  • Current status:
    While Debezium already supports PostgreSQL and TimescaleDB, the implementation for TimescaleDB is very basic at this point may never catch up to this tool, which is much more specific. It is possible to use Debezium to catch changes on chunks (and the corresponding hypertable name will be provided in the Kafka header), but changes are still happening on chunk-individual streams. That said, it is still necessary to implement the handling / merging of chunks streams into their hypertable parent on the application side (behind Kafka). While there may be use-cases for this, the more general use-case would be to "ignore" that the data actually come from a hypertable, since, in most cases, this information won't mean anything to the target application.

Getting Started

Installing prebuilt Packages

To automatically download the latest version into the current directory, the repository provides a bash script. This script supports downloading on Linux, MacOS, and FreeBSD.

When you want to use Windows, please download the zip file manually from the repository's download page at https://github.com/noctarius/timescaledb-event-streamer/releases/latest which automatically redirects to the latest available download.

curl -L https://raw.github.com/noctarius/timescaledb-event-streamer/master/get.sh | bash

Using Docker

To run timescaledb-event-streamer via Docker, access the latest version of the Docker image using ghcr.io/noctarius/timescaledb-event-streamer:latest.

The environment variable TIMESCALEDB_EVENT_STREAMER_CONFIG provides the location of the configuration file to the tool. This file must be mounted into the running container at the given location.

The following command shows an example on how to run it.

docker run \
  --name timescaledb-event-streamer \
  -v ./config.tml:/etc/config.toml \
  -e TIMESCALEDB_EVENT_STREAMER_CONFIG='/etc/config.toml' \
  ghcr.io/noctarius/timescaledb-event-streamer:latest

Installation from Source

timescaledb-event-streamer requires the Go runtime (version 1.20+) to be installed. With this requirement satisfied, the installation can be kicked off using:

$ go install github.com/noctarius/timescaledb-event-streamer/cmd/timescaledb-event-streamer@latest

Before you start

Before using the program, a configuration file needs to be created. An example configuration can be found here.

For a full reference of the existing configuration options, see the Configuration section.

Starting from TimescaleDB 2.12, the extension provides the possibility to use PostgreSQL 14+ logical replication messages to mark the start and end of decompressions (due to insert, update, delete in compressed chunks). If you are on PG14+ and use TimescaleDB 2.12+, you should enable those markers.

In your postgresql.conf, set the following line:

timescaledb.enable_decompression_logrep_markers=on

This property cannot be set at runtime! After changing this property you have to restart your PostgreSQL server instance.

Support for non-privileged users (non-superuser roles)

In addition to the program itself, a function has to be installed into the database which will be used to generate change events from. The function is used to create the initial logical replication publication, since the internal catalog tables from TimescaleDB are owned by the postgres user, as required for a trusted extension.

The function needs to be created by the postgres user when added to the database and runs in security definer mode, inheriting the permissions and ownership of the defining user before giving up the increased permissions voluntarily.

To install the function, please run the following code snippet as postgres user against your database. timescaledb-event-streamer will automatically use it when starting up. If the function is not available the startup will fail!

The function can be found in the github repository.

To install the function you can use as following:

$ wget https://raw.githubusercontent.com/noctarius/timescaledb-event-streamer/main/create_timescaledb_catalog_publication.sql
$ psql "<connstring>" < create_timescaledb_catalog_publication.sql

Creating the Replication User

The user used for logical replication needs to have owner permission to the entities, no matter if Hypertable, Continuous Aggregate (Materialized Hypertable), or vanilla PostgreSQL table.

That said, when creating a user for logical replication, the new user has to be granted the owner as a role, as well as REPLICATION attribute set.

The following example assumes the new replication user to be named repl_user and the entity's owner to be named pg_user.

CREATE ROLE repl_user LOGIN REPLICATION ENCRYPTED PASSWORD '<<password>>';
GRANT pg_user TO repl_user;

In your configuration, you would set repl_user as the user for replication, as well as the chosen password.

postgresql.connection = 'postgres://repl_user@<<hostname>>:<<port>>/<<database>>'
postgresql.password = '<<password>>'

Using timescaledb-event-streamer

After creating a configuration file, timescaledb-event-streamer can be executed with the following command:

$ timescaledb-event-streamer -config=./config.toml

The tool will connect to your TimescaleDB database, and start replicating incoming events.

Supported PostgreSQL Data Type

timescaledb-event-streamer supports almost all default data types available in PostgreSQL, which some exception that shouldn't be seen in real-world scenarios. Apart from that, it lacks support for structural data types (composite types, and similar). Support is worked on though.

The following list describes all available data types and their schema type mappings. For Array data types, the element type of array children is also named. Furthermore, starting with 0.4.0, timescaledb-event-streamer supports user defined Enum data types, as well as Composite types and handles them correctly.

Type NamePostgreSQL TypeSchema TypeSchema Element Type
Bitbit, bit(n)STRING
Bit Arraybit[], bit(n)[]ARRAYSTRING
Bit Varyingvarbit, varbit(n)STRING
Bit Varying Arrayvarbit[], varbit(n)[]ARRAYSTRING
BooleanbooleanBOOLEAN
Boolean Arrayboolean[]ARRAYBOOLEAN
BoxboxSTRING
Box Arraybox[]ARRAYSTRING
Byte Array (bytea)byteaSTRING
Byte Array (bytea) Arraybytea[]ARRAYSTRING
CIDcidINT64
CID Arraycid[]ARRAYINT64
CIDR (IPv4)cidrSTRING
CIDR (IPv4) Arraycidr[]ARRAYSTRING
CIDR (IPv6)cidrSTRING
CIDR (IPv6) Arraycidr[]ARRAYSTRING
CirclecircleSTRING
Circle Arraycircle[]ARRAYSTRING
DatedateINT32
Date Arraydate[]ARRAYINT32
Date RangedaterangeSTRING
Date Range Arraydaterange[]ARRAYSTRING
Fixed Length Charchar(x)STRING
Fixed Length Char Arraychar(x)[]ARRAYSTRING
Float (32bit)float4FLOAT32
Float (32bit) Arrayfloat4[]ARRAYFLOAT32
Float (64bit)float8FLOAT64
Float (64bit) Arrayfloat8[]ARRAYFLOAT64
Geography (PostGUS)geographySTRUCT
Geography Array (PostGUS)geography[]ARRAYSTRUCT
Geometry (PostGUS)geometrySTRUCT
Geometry Array (PostGIS)geometry[]ARRAYSTRUCT
HstorehstoreMAP
Hstore Arrayhstore[]ARRAYMAP
Inet (IPv4)inetSTRING
Inet (IPv4) Arrayinet[]ARRAYSTRING
Inet (IPv6)inetSTRING
Inet (IPv6) Arrayinet[]ARRAYSTRING
Int (16bit)int2INT16
Int (16bit) Arrayint2[]ARRAYINT16
Int (32bit)int4INT32
Int (32bit) Arrayint4[]ARRAYINT32
Int (64bit)int8INT64
Int (64bit) Arrayint8[]ARRAYINT64
Int4Rangeint4rangeSTRING
Int4Range Arrayint4range[]ARRAYSTRING
Int8Rangeint8rangeSTRING
Int8Range Arrayint8range[]ARRAYSTRING
IntervalintervalINT64
Interval Arrayinterval[]ARRAYINT64
JSONjsonSTRING
JSON Arrayjson[]ARRAYSTRING
JSONBjsonbSTRING
JSONB Arrayjsonb[]ARRAYSTRING
LinelineSTRING
Line Arrayline[]ARRAYSTRING
LseglsegSTRING
Lseg Arraylseg[]ARRAYSTRING
LtreeltreeSTRING
Ltree Arrayltree[]ARRAYSTRING
MAC AddressmacaddrSTRING
MAC Address (EUI-64)macaddr8STRING
MAC Address (EUI-64) Arraymacaddr8[]ARRAYSTRING
MAC Address Arraymacaddr[]ARRAYSTRING
NamenameSTRING
Name Arrayname[]ARRAYSTRING
NumericnumericFLOAT64
Numeric Arraynumeric[]ARRAYFLOAT64
Numeric RangenumrangeSTRING
Numeric Range Arraynumrange[]ARRAYSTRING
OIDoidINT64
OID Arrayoid[]ARRAYINT64
PathpathSTRING
Path Arraypath[]ARRAYSTRING
PointpointSTRING
Point Arraypoint[]ARRAYSTRING
PolygonpolygonSTRING
Polygon Arraypolygon[]ARRAYSTRING
Quoted Char"char"STRING
Quoted Char Array"char"[]ARRAYSTRING
TexttextSTRING
Text Arraytext[]ARRAYSTRING
Time With TimezonetimetzSTRING
Time With Timezone Arraytimetz[]ARRAYSTRING
Time Without TimezonetimeSTRING
Time Without Timezone Arraytime[]ARRAYSTRING
Timestamp With TimezonetimestamptzSTRING
Timestamp With Timezone Arraytimestamptz[]ARRAYSTRING
Timestamp With Timezone RangetstzrangeSTRING
Timestamp With Timezone Range Arraytstzrange[]ARRAYSTRING
Timestamp Without TimezonetimestampINT64
Timestamp Without Timezone Arraytimestamp[]ARRAYINT64
Timestamp Without Timezone RangetsrangeSTRING
Timestamp Without Timezone Range Arraytsrange[]ARRAYSTRING
User Defined Composite TypecompositetypeSTRUCT
User Defined Composite Type Arraycompositetype[]ARRAYSTRUCT
User Defined EnumenumtypeSTRING
User Defined Enum Arrayenumtype[]ARRAYSTRING
UUIDuuidSTRING
UUID Arrayuuid[]ARRAYSTRING
Varcharvarchar, varchar(n)STRING
Varchar Arrayvarchar[], varchar(n)[]ARRAYSTRING
XIDxidINT64
XID Arrayxid[]ARRAYINT64
XMLxmlSTRING
XML Arrayxml[]ARRAYSTRING

Configuration

timescaledb-event-streamer utilizes TOML, or YAML as its configuration file format due to its simplicity.

In addition to the configuration file, all values can be provided as environment variables. In this case, the property name uses underscore (_) characters instead of dots (.) characters and all characters are used as uppercase. That means, postgresql.connection becomes POSTGRESQL_CONNECTION. In case the standard property name already contains an underscore the one underscore character is duplicated (test.some_value becomes TEST_SOME__VALUE).

PostgreSQL Configuration

PropertyDescriptionData TypeDefault Value
postgresql.connectionThe connection string in one of the libpq-supported forms.stringhost=localhost user=repl_user sslmode=disable
postgresql.passwordThe password to connect to the user.stringEnvironment variable: PGPASSWORD
postgresql.snapshot.batchsizeThe size of rows requested in a single batch iteration when snapshotting tables.int1000
postgresql.snapshot.initialThe value describes the startup behavior for snapshotting. Valid values are always, never, initial_only. NOT YET IMPLEMENTED: alwaysstringnever
postgresql.publication.nameThe name of the publication inside PostgreSQL.stringempty string
postgresql.publication.createThe value describes if a non-existent publication of the defined name should be automatically created or not.booleanfalse
postgresql.publication.autodropThe value describes if a previously automatically created publication should be dropped when the program exits.booleantrue
postgresql.replicationslot.nameThe name of the replication slot inside PostgreSQL. If not configured, a random 20 characters name is created.stringrandom string (20)
postgresql.replicationslot.createThe value describes if a non-existent replication slot of the defined name should be automatically created or not.booleantrue
postgresql.replicationslot.autodropThe value describes if a previously automatically created replication slot should be dropped when the program exits.booleantrue
postgresql.transaction.window.enabledThe value describes if a transaction window should be opened or not. Transaction windows are used to try to collect all WAL entries of the transaction before replicating it out.booleantrue
postgresql.transaction.window.timeoutThe value describes the maximum time to wait for a transaction end (COMMIT) to be received. The value is the number of seconds. If the COMMIT isn't received inside the given time window, replication will start to prevent memory hogging.int60
postgresql.transaction.window.maxsizeThe value describes the maximum number of cached entries to wait for a transaction end (COMMIT) to be received. If the COMMIT isn't received inside the given time window, replication will start to prevent memory hogging.int10000
postgresql.tables.includesThe includes definition defines which vanilla tables to include in the event stream generation. The available patters are explained in Includes and Excludes Patterns. Excludes have precedence over includes.array of stringsempty array
postgresql.tables.excludesThe excludes definition defines which vanilla tables to exclude in the event stream generation. The available patters are explained in Includes and Excludes Patterns. Excludes have precedence over includes.array of stringsempty array
postgresql.events.readThe property defines if read events for vanilla tables are generated.booleantrue
postgresql.events.insertThe property defines if insert events for vanilla tables are generated.booleantrue
postgresql.events.updateThe property defines if update events for vanilla tables are generated. If old values should be captured, REPLICA IDENTITY FULL needs to be seton the table.booleantrue
postgresql.events.deleteThe property defines if delete events for vanilla tables are generated. If old values should be captured, REPLICA IDENTITY FULL needs to be seton the table.booleantrue
postgresql.events.truncateThe property defines if truncate events for vanilla tables are generated.booleantrue
postgresql.events.messageThe property defines if logical replication message events are generated.booleanfalse

Topic Configuration

PropertyDescriptionData TypeDefault Value
topic.namingstrategy.typeThe naming strategy of topic names. At the moment only the value debezium is supported.stringdebezium
topic.prefixThe prefix for all topic named.stringtimescaledb

State Storage Configuration

PropertyDescriptionData TypeDefault Value
statestorage.typeThe strategy to store internal state (such as restart points and snapshot information). Valid values are file and none.stringnone
statestorage.file.pathIf the type is file, this property defines the file system path of the state storage file.stringempty string

TimescaleDB Configuration

PropertyDescriptionData TypeDefault Value
timescaledb.hypertables.includesThe includes definition defines which hypertables to include in the event stream generation. The available patters are explained in Includes and Excludes Patterns. Excludes have precedence over includes.array of stringsempty array
timescaledb.hypertables.excludesThe excludes definition defines which hypertables to exclude in the event stream generation. The available patters are explained in Includes and Excludes Patterns. Excludes have precedence over includes.array of stringsempty array
timescaledb.events.readThe property defines if read events for hypertables are generated.booleantrue
timescaledb.events.insertThe property defines if insert events for hypertables are generated.booleantrue
timescaledb.events.updateThe property defines if update events for hypertables are generated. If old values should be captured, REPLICA IDENTITY FULL needs to be seton the table.booleantrue
timescaledb.events.deleteThe property defines if delete events for hypertables are generated. If old values should be captured, REPLICA IDENTITY FULL needs to be seton the table.booleantrue
timescaledb.events.truncateThe property defines if truncate events for hypertables are generated.booleantrue
timescaledb.events.compressionThe property defines if compression events for hypertables are generated.booleanfalse
timescaledb.events.decompressionThe property defines if decompression events for hypertables are generated.booleanfalse
timescaledb.events.messageThe property defines if logical replication message events are generated. This property is deprecated, please see postgresql.events.message.booleanfalse

Sink Configuration

PropertyDescriptionData TypeDefault Value
sink.typeThe property defines which sink adapter is to be used. Valid values are stdout, nats, kafka, redis, http.stringstdout
sink.tombstoneThe property defines if delete events will be followed up with a tombstone event.booleanfalse
sink.filters.<name>.<...>The filters definition defines filters to be executed against potentially replicated events. This property is a map with the filter name as its key and a Sink Filter.map of filter definitionsempty map

Sink Filter configuration

PropertyDescriptionData TypeDefault Value
sink.filters.<name>.conditionThis property defines the filter expression to be executed. The expression language used is Expr.stringempty string
sink.filters.<name>.defaultThis property defines the value to be returned from the filter expression is true. This makes it possible to make negative and positive filters.booleantrue
sink.filters.<name>.tables.includesThe includes definition defines to which tables the filter expression should be applied to. The available patters are explained in Includes and Excludes Patterns. Excludes have precedence over includes.array of stringsempty array
sink.filters.<name>.tables.excludesThe excludes definition defines to which tables the filter expression should be applied to. The available patters are explained in Includes and Excludes Patterns. Excludes have precedence over includes.array of stringsempty array

The include and exclude filters are optional. If neither is provided, the filter is expected to apply to all hypertables.

Events generated for excluded hypertables will be replicated, as the filter isn't tested.

NATS Sink Configuration

NATS specific configuration, which is only used if sink.type is set to nats.

PropertyDescriptionData TypeDefault Value
sink.nats.addressThe NATS connection address, according to the NATS connection string definition.stringempty string
sink.nats.authorizationThe NATS authorization type. Valued values are userinfo, credentials, jwt.stringempty string
sink.nats.userinfo.usernameThe username of userinfo authorization details.stringempty string
sink.nats.userinfo.passwordThe password of userinfo authorization details.stringempty string
sink.nats.credentials.certificateThe path of the certificate file of credentials authorization details.stringempty string
sink.nats.credentials.seedsThe paths of seeding files of credentials authorization details.array of stringsempty array

Kafka Sink Configuration

Kafka specific configuration, which is only used if sink.type is set to kafka.

PropertyDescriptionData TypeDefault Value
sink.kafka.brokersThe Kafka broker urls.array of stringempty array
sink.kafka.idempotentThe property defines if message handling is idempotent.booleanfalse
sink.kafka.sasl.enabledThe property defines if SASL authorization is enabled.booleanfalse
sink.kafka.sasl.userThe user value to be used with SASL authorization.stringempty string
sink.kafka.sasl.passwordThe password value to be used with SASL authorization.stringempty string
sink.kafka.sasl.mechanismThe mechanism to be used with SASL authorization. Valid values are PLAIN.stringPLAIN
sink.kafka.tls.enabledThe property defines if TLS is enabled.booleanfalse
sink.kafka.tls.skipverifyThe property defines if verification of TLS certificates is skipped.booleanfalse
sink.kafka.tls.clientauthThe property defines the client auth value (as defined in Go).int0 (NoClientCert)

Redis Sink Configuration

Redis specific configuration, which is only used if sink.type is set to redis.

PropertyDescriptionData TypeDefault Value
sink.redis.networkThe network type of the redis connection. Valid values are tcp, unix.stringtcp
sink.redis.addressThe connection address as host:port.stringlocalhost:6379
sink.redis.passwordOptional password to connect to the redis server.stringempty string
sink.redis.databaseDatabase to select after connecting to the server.int0
sink.redis.poolsizeMaximum number of socket connections.int10 per cpu
sink.redis.retries.maxattemptsMaximum number of retries before giving up.int0
sink.redis.retries.backoff.minMinimum backoff between each retry in milliseconds. A value of -1 disables backoff.int8
sink.redis.retries.backoff.maxMaximum backoff between each retry in milliseconds. A value of -1 disables backoff.int512
sink.redis.timeouts.dialDial timeout for establishing new connections in seconds.int5
sink.redis.timeouts.readTimeout for socket reads in seconds. A value of -1 disables the timeout.int3
sink.redis.timeouts.writeTimeout for socket writes in seconds. A value of -1 disables the timeout.intread timeout
sink.redis.timeouts.poolAmount of time in seconds client waits for connection if all connections are busy before returning an error.intread timeout + 1s
sink.redis.timeouts.idleAmount of time in minutes after which client closes idle connections.int5
sink.redis.tls.enabledThe property defines if TLS is enabled.boolfalse
sink.redis.tls.skipverifyThe property defines if verification of TLS certificates is skipped.boolfalse
sink.redis.tls.clientauthThe property defines the client auth value (as defined in Go).int0 (NoClientCert)

AWS Kinesis Sink Configuration

PropertyDescriptionData TypeDefault Value
sink.kinesis.stream.nameThe Name of the AWS Kinesis stream to send events to. The events will be partitioned based on the topic name.stringempty string
sink.kinesis.stream.createDefines if the stream should be created at startup if non-existent. The below properties configure the created stream.booleantrue
sink.kinesis.stream.shardcountThe number if shards to use when creating the stream.int1
sink.kinesis.stream.modeThe mode to use when creating the stream. Valid values are ON_DEMAND, and PROVISIONED. More details in the AWS documentation.stringempty string
sink.kinesis.aws.<...>AWS specific content as defined in AWS service configuration.structempty struct

AWS SQS Sink Configuration

AWS SQS queues when configured as FIFO queues. No content based deduplication is required, since the sink creates a deduplication id based on the LSN, transaction id (if available), and content of the message.

PropertyDescriptionData TypeDefault Value
sink.sqs.queue.urlThe URL of the FIFO queue in SQS.stringempty string
sink.sqs.aws.<...>AWS specific content as defined in AWS service configuration.structempty struct

HTTP Sink Configuration

HTTP specific configuration, which is only used if sink.type is set to http. This Sink is an HTTP client that POSTs the events as the payload. Usage of TLS is inferred automatically from the prefix of the url, if url has the https:// prefix, then the respective TLS settings will be set according to the properties defined in sink.http.tls.

PropertyDescriptionData TypeDefault Value
sink.http.urlThe url where the requests are sent. You have to include the protocol scheme (http/https) too.stringhttp://localhost:80
sink.http.authentication.typeType of authentication to use when making the request. Valid values are none, basic, header.stringnone
sink.http.authentication.basic.usernameIf the authentication type is set to basic then this is the username used when making the request.stringempty string
sink.http.authentication.basic.passwordMaximum number of socket connections.intempty string
sink.http.authentication.header.nameMaximum number of retries before giving up.intempty string
sink.http.authentication.header.valueMinimum backoff between each retry in milliseconds. A value of -1 disables backoff.intempty string
sink.http.tls.skipverifyThe property defines if verification of TLS certificates is skipped.boolfalse
sink.http.tls.clientauthThe property defines the client auth value (as defined in Go).int0 (NoClientCert)

AWS Service Configuration

This configuration is the basic configuration for AWS, including the region, or credentials. If the latter isn't configured, the sink will try to use environment variables (similar to the official AWS clients) to provide connection details.

PropertyDescriptionData TypeDefault Value
<...>.aws.regionThe AWS region.stringempty string
<...>.aws.endpointThe AWS endpoint.stringempty string
<...>.aws.accesskeyidThe AWS Access Key Id.stringempty string
<...>.aws.secretaccesskeyThe AWS Secret Access Key.stringempty string
<...>.aws.sessiontokenThe AWS Session Token.stringempty string

Logging Configuration

This section describes the logging configuration. There is one standard logger. In addition to that, loggers are named (as seen in the log messages). Those names can be used to define a more specific logger configuration and override log levels or outputs.

Valid logging levels are panic, fatal, error, warn, notice, info, verbose, debug, and trace. Earlier levels include all following ones as well.

PropertyDescriptionData TypeDefault Value
logging.levelThis property defines the default logging level. If not defined, default value is info.stringinfo
logging.outputs.<...>This property defines the outputs for the default logger. By default console logging is enabled.output configurationempty struct
logging.loggers.<name>.<...>This property provides the possibility to override the logging for certain parts of the system. The is the name the logger uses to identify itself in the log messages.sub-logger configurationempty struct

Sub-Logger Configuration

PropertyDescriptionData TypeDefault Value
logging.loggers.<name>.levelThis property defines the default logging level. If not defined, default value is info.stringinfo
logging.loggers.<name>.outputs.<...>This property defines the outputs for the default logger. By default console logging is enabled.output configurationempty struct

Logging Output Configuration

PropertyDescriptionData TypeDefault Value
<...>.outputs.console.enabledThis property is used to enabled the console logger. If this configuration is used for the default logger, this value is true by default, but false otherwise.booleanfalse
<...>.outputs.file.enabledThis property is used to enabled the file logger.booleanfalse
<...>.outputs.file.pathThis property defines the log file path for the logger.stringempty string
<...>.outputs.file.rotateThis property defines if the log file should be rotated.booleantrue
<...>.outputs.file.maxdurationThis property defines the maximum runtime (in seconds) of the log file before rotation. If both, maxduration and maxsize are defined, maxduration has precedence.int60
<...>.outputs.file.maxsizeThis property defines the maximum file size (in bytes) of the log file before rotation. If both, maxduration and maxsize are defined, maxduration has precedence.int5242880 (5MB)
<...>.outputs.file.compressThis property defines if the rotated log file should be compressed.booleanfalse

Includes and Excludes Patterns

Includes and Excludes can be defined as fully canonical references to hypertables (equivalent to PostgreSQL regclass definition) or as patterns with wildcards.

For an example we assume the following hypertables to exist.

SchemaHypertableCanonical Name
publicmetricspublic.metrics
publicstatus_messagespublic.status_messages
invoicinginvoicesinvoicing.invoices
alarmingalarmsalarming.alarms

To note, excludes have precedence over includes, meaning, that if both includes and excludes match a specific hypertable, the hypertable will be excluded from the event generation process.

Hypertables can be referred to by their canonical name (dotted notation of schema and hypertable table name). timescaledb.hypertables.includes = [ 'public.metrics', 'invoicing.invoices' ]

When referring to the hypertable by its canonical name, the matcher will only match the exact hypertable. That said, the above example will yield events for the hypertables public.metrics and invoicing.invoices but none of the other ones.

Wildcards

Furthermore, includes and excludes can utilize wildcard characters to match a subset of tables based on the provided pattern.

timescaledb-event-streamer understands 3 types of wildcards:

WildcardDescription
*The asterisk (*) character matches zero or more characters
+The plus (+) character matches one or more characters
?The question mark (?) character matches exactly one character

Wildcards can be used in the schema or table names. It is also possible to have them in schema and table names at the same time.

Asterisk: Zero Or More Matching

SchemaHypertableCanonical Name
publicmetricspublic.metrics
publicstatus_messagespublic.status_messages

timescaledb.hypertables.includes = [ 'public.*' ] matches all hypertables in schema public.

SchemaHypertableCanonical Name
publicstatus_1hpublic.status_1h
publicstatus_12hpublic.status_12h
publicstatus_messagespublic.status_messages

timescaledb.hypertables.includes = [ 'public.statis_1*' ] matches public.status_1h and public.status_12h, but not public.status_messages.

SchemaHypertableCanonical Name
customer1metricscustomer1.metrics
customer2metricscustomer2.metrics

Accordingly, it is possible to match a specific hypertable in all customer schemata using a pattern such as timescaledb.hypertables.includes = [ 'customer*.metrics' ].

Plus: One or More Matching

SchemaHypertableCanonical Name
publicstatus_1_daypublic.status_1_day
publicstatus_1_monthpublic.status_1_month
publicstatus_1_yearpublic.status_12_month

timescaledb.hypertables.includes = [ 'public.statis_+_month' ] matches hypertables public.status_1_month and public.status_12_month, but not public.status_1_day.

SchemaHypertableCanonical Name
customer1metricscustomer1.metrics
customer2metricscustomer2.metrics

Accordingly, it is possible to match a specific hypertable in all customer schemata using a pattern such as timescaledb.hypertables.includes = [ 'customer+.metrics' ].

Question Mark: Exactly One Matching

SchemaHypertableCanonical Name
publicstatus_1_daypublic.status_1_day
publicstatus_7_daypublic.status_7_day
publicstatus_14_daypublic.status_14_day

timescaledb.hypertables.includes = [ 'public.statis_?_day' ] matches hypertables public.status_1_day and public.status_7_day, but not public.status_14_day.

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author