Striim

Welcome to the Striim Help Center and Community Site

3.10.3 PostgreSQL

Follow

Striim supports PostgreSQL 9.4.x and later versions.

PostgreSQLReader uses PostgreSQL's logical replication (see PostgreSQL setup).

Striim provides templates for creating applications that read from PostgreSQL and write to various targets. See Creating a new application using a template for details.

PostgreSQL setup

Striim reads change data from PostgreSQL.

Note

PostgreSQLReader requires logical replication. Amazon RDS for PostgreSQL supports logical replication, but Azure Database for PostgreSQL and Google Cloud SQL for PostgreSQL currently do not.

Before Striim applications can use the PostgreSQLReader adapter, a PostgreSQL administrator with the necessary privileges must do the following. The following examples assume that the name of the PostgreSQL role created for use by PostgreSQLReader is striim and the database containing the tables to be read is mydb.

in Linux and Windows

This will require a reboot, so it should probably be performed during a maintenance window.

  1. In striim/native/wal2json, find the directory for the operating system of your PostgreSQL host, and copy the plugin file it contains to PostgreSQL's lib directory. If there is no directory for your operating system, Contact Striim support.

    Warning

    Known issue (DEV-21129): these plugins do not support TOAST columns. To read TOAST columns with PostgreSQL Reader, you must compile a plugin from the 9b579240ec8735e762920bdeff8ca02e201bfa03 commit dated 24 Feb. 2020 (see https://github.com/eulerto/wal2json/commits/master) and copy it to PostgreSQL's lib directory.

    In Windows, to find the location of the lib directory, open a command prompt, navigate to PostgreSQL's bin directory, and enter the following command:

    pg_config --pkglibdir
  2. Edit postgressql.conf, set the following options, and save the file. The values for max_replication_slots and max_wal_senders may be higher but there must be one of each available for each instance of PostgreSQL Reader. max_wal_senders cannot exceed the value of max_connections.

    wal_level = logical
    max_replication_slots = 1
    max_wal_senders = 1
  3. Edit pg_hba.conf and add the following record, replacing <IP address> with the Striim server's IP address. If you have a multi-node cluster, add a record for each server that will run PostgreSQLReader. Then save the file and restart PostgreSQL.

    local  replication  striim  <IP address>/0  trust
  4. Restart PostgreSQL.

  5. Enter the following command to create the replication slot:

    pg_recvlogical -d mydb --slot striim_slot --create-slot -P wal2json

    If you plan to use multiple instances of PostgreSQL Reader, create a separate slot for each.

  6. Create a role with the REPLICATION attribute for use by Striim and give it select permission on the schema(s) containing the tables to be read. Replace ****** with a strong password and myschema with the name of your schema.

    CREATE ROLE striim WITH LOGIN PASSWORD '******' REPLICATION;
    GRANT SELECT ON ALL TABLES IN SCHEMA myschema TO striim;
    
in Amazon RDS for PostgreSQL

You must set up replication in the master instance. This will require a reboot, so it should probably be performed during a maintenance window.

Amazon RDS supports logical replication only for PostgreSQL version 9.4.9, higher versions of 9.4, and versions 9.5.4 and higher. Thus PostgreSQLReader can not be used with PostgreSQL 9.4 - 9.4.8 or 9.5 - 9.5.3 on Amazon RDS.

  1. Go to your RDS dashboard, select Parameter groups > Create parameter group, enter posstgres-logical-decoding as the Group name and Description, then click Create.

    createParameterGroup.png
  2. Click postgres-logical-decoding.

    parameterGroups.png
  3. Enter logical_ in the Parameters field to filter the list, set rds.logical_replication to 1, and click Save changes.

    set_replication.png
  4. In the left column, click Databases, then click the name of your database, click Modify, scroll down to Database options, change DB parameter group to postgres-logical-decoding, then scroll down to the bottom and click Continue.

  5. Select Apply immediately > Modify DB instance.

  6. Reboot the instance.

  7. Restart PostgreSQL.

  8. In PSQL, enter the following command to create the replication slot:

    SELECT pg_create_logical_replication_slot('striim_slot', 'wal2json');
  9. Create a role with the REPLICATION attribute for use by PostgreSQLReader and give it select permission on the schema(s) containing the tables to be read. Replace ****** with a strong password and myschema with the name of your schema.

    CREATE ROLE striim WITH LOGIN PASSWORD '******';
    GRANT rds_replication TO striim;
    GRANT SELECT ON ALL TABLES IN SCHEMA myschema TO striim;
    

PostgreSQL Reader properties

Striim provides templates for creating applications that read from PostgreSQL and write to various targets. See Creating a new application using a template for details.

Starting with Striim 3.9.0, PostgreSQL JDBC driver is included with the Striim server. When running a PostgreSQLReader source with the Forwarding Agent, the driver must be installed as described in Installing the PostgreSQL JDBC driver.

property

type

default value

notes

Bidirectional Marker Table

String

When performing bidirectional replication, the fully qualified name of the marker table (see Bidirectional replication). This setting is case-sensitive.

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

jdbc:postgresql:// followed by the server's IP address or network name, a colon, the port number, and a slash followed by the database name. If the database name is omitted, the Username value is used as the database name.

Excluded Tables

String

Change data for any tables specified here will not be returned. For example, if Tables uses a wildcard, data from any tables specified here will be omitted. Multiple table names and wildcards may be used as for Tables.

Filter Transaction Boundaries

Boolean

True

With the default value of True, begin and commit transactions are filtered out. Set to False to include begin and commit transactions.

Password

encrypted password

the password specified for the username (see Encrypted passwords)

Replication Slot Name

String

striim_slot

The name of the replication slot created as described in PostgreSQL setup. If you have multiple instances of PostgreSQLReader, each must have its own slot.

Start LSN

String

By default, only new transactions are read. Optionally, specify a log sequence number to start reading from that point.

Tables

String

The table(s) for which to return change data. Tables must have primary keys (required for logical replication).

Names are case-sensitive. Specify source table names as <schema>.<table>) (The database is specified in the connection URL.)

You may specify multiple tables as a list separated by semicolons or using the following wildcards in the schema and/or table names only (not in the database name):

  • %: any series of characters

  • _: any single character

For example, %.% would include all tables in all schemas in the database specified in the connection URL.

All tables specified must have primary keys. Tables without primary keys are not included in output.

If any specified tables are missing PostgresReader will issue a warning. If none of the specified tables exists, start will fail with a "found no tables" error.

If you have multiple instances of PostgreSQLReader, each should read a separate set of tables.

Username

String

the login name for the user created as described in PostgreSQL setup

PostgreSQL Reader WAEvent fields

The output data type for PostgreSQLReader is WAEvent. The elements are:

metadata: a map including:

  • LSN: log sequence number of the transaction's commit

  • NEXT_LSN: next log sequence number (used for reconnecting to the replication slot after a non-fatal network interruption)

  • OperationName: INSERT, UPDATE, or DELETE

  • PK_UPDATE: included only when an UPDATE changes the primary key

  • Sequence: incremented for each operation within a transaction

  • TableName: the name of the table including its schema

  • TimeStamp: timestamp from the replication subscription

  • TxnID: transaction identifier

To retrieve the values for these fields, use the META() function. See Parsing the fields of WAEvent for CDC readers.

data: an array of fields, numbered from 0, containing:

  • for an INSERT operation, the values that were inserted

  • for an UPDATE, the values after the operation was completed

  • for a DELETE, the value of the primary key and nulls for the other fields

To retrieve the values for these fields, use SELECT ... (DATA[]). See Parsing the fields of WAEvent for CDC readers.

before: for UPDATE operations, contains the primary key value from before the update. When an update changes the primary key value, you may retrieve the previous value using the BEFORE() function.

dataPresenceBitMap, beforePresenceBitMap, and typeUUID are reserved and should be ignored.

PostgreSQL Reader simple application

The following application will write change data for all tables in all schemas in database mydb to SysOut. Replace striim and ****** with the user name and password for the PostgreSQL account you created for use by PostgreSQLReader (see PostgreSQL setup) and mydb and %.% with the names of the database and tables to be read. If the replication slot name is not STRIIM, specify it using the ReplicationSlotName property.

CREATE APPLICATION PostgreSQLTest;

CREATE SOURCE PostgreSQLCDCIn USING PostgreSQLReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'jdbc:postgresql://192.0.2.10:5432/mydb',
  ReplicationSlotName: 'striim_slot',
  Tables:'%.%'
) 
OUTPUT TO PostgreSQLCDCStream;

CREATE TARGET PostgreSQLCDCOut
USING SysOut(name:PostgreSQLCDC)
INPUT FROM PostgreSQLCDCStream;

END APPLICATION PostgreSQLTest;

PostgreSQL Reader example output

PostgreSQLReader's output type is WAEvent. See WAEvent contents for change data and PostgreSQL Reader WAEvent fields for more information.

The following are examples of WAEvents emitted by PostgreSQLReader for various operation types. They all use the following table:

CREATE TABLE posauthorizations (
  business_name varchar(30),
  merchant_id character varying(35) PRIMARY KEY,
  primary_account bigint,
  pos bigint,
  code character varying(20),
  exp character(4),
  currency_code character(3),
  auth_amount numeric(10,3),
  terminal_id bigint,
  zip bigint,
  city character varying(20));
INSERT

If you performed the following INSERT on the table:

INSERT INTO posauthorizations VALUES(
  'COMPANY 1',
  'D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu',
  6705362103919221351,
  0,
  '20130309113025',
  '0916',
  'USD',
  2.20,
  5150279519809946,
  41363,
  'Quicksand');

The WAEvent for that INSERT would be similar to:

data: ["COMPANY 1","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",6705362103919221351,0,"20130309113025",
"0916","USD",2.200,5150279519809946,41363,"Quicksand"]
metadata: {"TableName":"public.posauthorizations","TxnID":556,"OperationName":"INSERT",
"LSN":"0/152CD58","NEXT_LSN":"0/152D1C8","Sequence":1,"Timestamp":"2019-01-11 16:29:54.628403-08"}
UPDATE

If you performed the following UPDATE on the table:

UPDATE posauthorizations SET BUSINESS_NAME = 'COMPANY 5A' where pos=0;

The WAEvent for that UPDATE would be similar to:

data: ["COMPANY 5A","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",6705362103919221351,0,"20130309113025",
"0916","USD",2.200,5150279519809946,41363,"Quicksand"]
metadata: {"TableName":"public.posauthorizations","TxnID":557,"OperationName":"UPDATE",
"LSN":"0/152D2E0","NEXT_LSN":"0/152D6F8","Sequence":1,"Timestamp":"2019-01-11 16:31:54.271525-08"}
before: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null,null,null]

When an UPDATE changes the primary key, you may retrieve the old primary key value from the before array.

DELETE

If you performed the following DELETE on the table:

DELETE from posauthorizations where pos=0;

The WAEvent for that DELETE would be similar to:

data: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null,null,null]
metadata: {"TableName":"public.posauthorizations","TxnID":558,"OperationName":"DELETE",
"LSN":"0/152D730","NEXT_LSN":"0/152D7C8","Sequence":1,"Timestamp":"2019-01-11 16:33:09.065951-08"}

Only the primary key value is included.

PostgreSQL Reader data type support and correspondence

PostgreSQL type

Striim type

bigint

long

bigserial

long

bit

string

bit varying

string

boolean

short

bytea

string

character

string

character varying

string

cidr

string

circle

unsupported

composite type

string

date

DateTime

daterange

string

double precision

double

inet

string

integer

integer

int2

short

int4

integer

int4range

string

int8

long

int8range

string

integer

integer

interval

string

json

string

jsonb

string

line

unsupported

lseg

unsupported

macaddr

string

macaddr8

string

money

string

name (system identifier)

string

numeric

string

numrange

string

path

unsupported

pg_lan

string

point

unsupported

polygon

unsupported

real

float

smallint

short

smallserial

short

serial

integer

text

string

time

string

time with time zone

string

timestamp

datetime

tsrange

string

timestamp with time zone

datetime

tstzrange

string

tsquery

unsupported

tsvector

unsupported

txid_snapshot

string

uuid

string

xml

string

3.10.3
Was this article helpful?
0 out of 0 found this helpful

Comments