Striim

Welcome to the Striim Help Center and Community Site

3.10.1 Oracle Database

Follow

Striim supports Oracle 11g 11.2.0.4, 12c 12.1.0.2, 12.2.0.1.0, 18c, and 19c.

  • RAC is supported in all versions.

  • LogMiner is supported in all versions.

  • XStream Out is supported only for 11g on Linux and requires Oracle patches 6880880 & 21352635.

You may choose either LogMiner or XStream Out as the source for change data.

Features supported by LogMiner but not XStream Out:

Features supported by XStream Out but not LogMiner:

  • continuous reception of change data events (LogMiner receives events in batches as defined by OracleReader's FetchSize property)

  • reading from tables containing unsupported types (XStream Out will read the columns of supported types; LogMiner will not read the table at all)

Striim provides templates for creating applications that read from Oracle and write to various targets. See Creating a new application using a template for details.

Oracle configuration

Using OracleReader requires the following configuration changes in Oracle:

  • enable archivelog, if not already enabled

  • enable supplemental log data, if not already enabled

  • set up either LogMiner or XStream Out

  • create a user account for OracleReader and grant it the privileges necessary to use LogMiner or XStream Out

Basic Oracle configuration tasks

The following tasks must be performed regardless of which Oracle version or variation you are using.

Enabling archivelog

The following steps check whether archivelog is enabled and, if not, enable it.

  1. Log in to SQL*Plus as the sys user.

  2. Enter the following command:

    select log_mode from v$database;

    If the command returns ARCHIVELOG, it is enabled. Skip ahead to Enabling supplemental log data.

  3. If the command returns NOARCHIVELOG, enter: shutdown immediate

  4. Wait for the message ORACLE instance shut down, then enter: startup mount

  5. Wait for the message Database mounted, then enter:

    alter database archivelog;
    alter database open;
  6. To verify that archivelog has been enabled, enter select log_mode from v$database; again. This time it should return ARCHIVELOG.

Enabling supplemental log data

The following steps check whether supplemental log data is enabled and, if not, enable it.

  1. Enter the following command: 

    select supplemental_log_data_min, supplemental_log_data_pk from v$database;

    If the command returns YES or IMPLICIT, supplemental log data is already enabled. For example, 

    SUPPLEME SUP
    -------- ---
    YES      NO

    indicates that supplemental log data is enabled, but primary key logging is not. If it returns anything else, enter:

    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  2. To enable primary key logging for all tables in the database enter: 

    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;

    Alternatively, to enable primary key logging only for selected tables (do not use this approach if you plan to use wildcards in the OracleReader Tables property to capture change data from new tables):

    ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
  3. If replicating Oracle data to Redshift, or to BigQuery or Snowflake with Optimized Merge disabled, enable supplemental logging on all columns for all tables in the database:

    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

    Alternatively, to enable only for selected tables:

    ALTER TABLE <schema>.<table name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  4. To activate your changes, enter:

    alter system switch logfile;

If using Amazon RDS for Oracle, use the following commands instead:

exec rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => 'ADD');
exec rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => 'ADD', p_type => 'PRIMARY KEY');
exec rdsadmin.rdsadmin_util.switch_logfile;
select supplemental_log_data_min, supplemental_log_data_pk from v$database;
Creating an Oracle user with LogMiner privileges

You may use LogMiner with any supported Oracle version.

Log in as sysdba and enter the following commands to create a role with the privileges required by the Striim OracleReader adapter and create a user with that privilege. You may give the role and user any names you like. Replace ******** with a strong password.

If using Oracle 11g ,or 12c, 18c, or 19c without CDB, enter the following commands

create role striim_privs;
grant create session,
  execute_catalog_role,
  select any transaction,
  select any dictionary
  to striim_privs;
grant select on SYSTEM.LOGMNR_COL$ to striim_privs;
grant select on SYSTEM.LOGMNR_OBJ$ to striim_privs;
grant select on SYSTEM.LOGMNR_USER$ to striim_privs;
grant select on SYSTEM.LOGMNR_UID$ to striim_privs;
create user striim identified by ******** default tablespace users;
grant striim_privs to striim;
alter user striim quota unlimited on users;

If using Database Vault, omit execute_catalog_role, and also enter the following commands:

grant execute on SYS.DBMS_LOGMNR to striim_privs;
grant execute on SYS.DBMS_LOGMNR_D to striim_privs;
grant execute on SYS.DBMS_LOGMNR_LOGREP_DICT to striim_privs;
grant execute on SYS.DBMS_LOGMNR_SESSION to striim_privs;

For Oracle 12c only, also enter the following command.

grant LOGMINING to striim_privs;

If using Oracle 12c, 18c, or 19c with PDB, enter the following commands. Replace <PDB name> with the name of your PDB.

create role c##striim_privs;
grant create session,
execute_catalog_role,
select any transaction,
select any dictionary,
logmining
to c##striim_privs;
grant select on SYSTEM.LOGMNR_COL$ to c##striim_privs;
grant select on SYSTEM.LOGMNR_OBJ$ to c##striim_privs;
grant select on SYSTEM.LOGMNR_USER$ to c##striim_privs;
grant select on SYSTEM.LOGMNR_UID$ to c##striim_privs;
create user c##striim identified by ******* container=all;
grant c##striim_privs to c##striim container=all;
alter user c##striim set container_data = (cdb$root, <PDB name>) container=current;

If using Database Vault, omit execute_catalog_role, and also enter the following commands:

grant execute on SYS.DBMS_LOGMNR to c##striim_privs;
grant execute on SYS.DBMS_LOGMNR_D to c##striim_privs;
grant execute on SYS.DBMS_LOGMNR_LOGREP_DICT to c##striim_privs;
grant execute on SYS.DBMS_LOGMNR_SESSION to c##striim_privs;
Creating an Oracle user with XStream Out privileges

XStream Out is supported only for Oracle 11.2.0.4 with patches 6880880 and 21352635.

Log in as sysdba and enter the following commands to create a user with the privileges required by the Striim OracleReader adapter and a tablespace to store objects and messages related to XSteam administration. The path to the oradata directory in the DATAFILE property may vary depending on your installation. Replace ******** with a strong password.

CREATE TABLESPACE striim_xstream_tbs
 DATAFILE '/home/oracle/app/oracle/oradata/orcl/striim_xstream_tbs.dbf'
 SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER striim IDENTIFIED BY ********
 DEFAULT TABLESPACE striim_xstream_tbs
 QUOTA UNLIMITED ON striim_xstream_tbs;
exec dbms_xstream_auth.grant_admin_privilege('striim', 'CAPTURE', true)
GRANT CREATE SESSION TO striim;
GRANT SELECT ANY TABLE TO striim;

To verify that the user has been created correctly, enter the following command:

select * from dba_streams_administrator where USERNAME='STRIIM';

You should see the following:

USERNAME          LOC ACC
----------------- --- ---
STRIIM            YES YES

When you use XSteam Out in a Striim application, Striim will create the specified stream in Oracle automatically.

Creating the quiescemarker table

To quiesce (see Console commands) an application that uses OracleReader, you must create a quiescemarker table in Oracle. The DDL for creating the table is: 

CREATE TABLE QUIESCEMARKER (source varchar2(100), 
  status varchar2(100),
  sequence NUMBER(10),
  inittime timestamp, 
  updatetime timestamp default sysdate, 
  approvedtime timestamp, 
  reason varchar2(100), 
  CONSTRAINT quiesce_marker_pk PRIMARY KEY (source, sequence));

The Oracle user specified in OracleReader's Username property must have SELECT, INSERT, and  UPDATE privileges on this table.

Note

QUIESCE is not supported with XStream Out, only with LogMiner.

Oracle Reader properties

Before you can use this adapter, Oracle must be configured as described in the parts of Oracle configuration that are relevant to your environment.

Note

The Oracle JDBC driver must be installed on each Striim server or Forwarding Agent that will run Oracle Reader with LogMiner. See Installing the Oracle JDBC driver.

Note

The Oracle Instant Client must be installed on each Striim server or Forwarding Agent that will run Oracle Reader using XStream Out. See Installing the Oracle Instant Client. XStream Out is supported only when Striim is running on Linux.

Striim provides templates for creating applications that read from Oracle and write to various targets. See Creating a new application using a template for details.

The adapter properties are:

property

type

default value

notes

Bidirectional Marker Table

String

When performing bidirectional replication, the fully qualified name of the marker table (see Bidirectional replication). This setting is case-sensitive.

Committed Transactions

Boolean

True

LogMiner only: by default, only committed transactions are read. Set to False to read both committed and uncommitted transactions.

Compression

Boolean

False

If set to True, update operations for tables that have primary keys include only the primary key and modified columns, and delete operations include only the primary key. With the default value of False, all columns are included. See Oracle Reader example output for examples.

Set to True when the output of an OracleReader source is is the input of a DatabaseWriter writing to a Cassandra target (see Cassandra Writer).

Connection Retry Policy

String

timeOut=30, retryInterval=30, maxRetries=3

With the default setting:

  • Striim will wait for the database to respond to a connection request for 30 seconds (timeOut=30).

  • If the request times out, Striim will try again in 30 seconds (retryInterval=30).

  • If the request times out on the third retry (maxRetries=3), a ConnectionException will be logged and the application will stop.

Negative values are not supported.

Connection URL

String

<hostname>:<port>:<SID> or <hostname>:<port>/<service name>

If using Oracle 12c with PDB, use the SID for the CDB service. (Note that with DatabaseReader and DatabaseWriter, you must use the SID for the PDB service instead.)

If using Amazon RDS for Oracle, the connection URL is <endpoint>:<port>:<DB name>. The required values are displayed at Instance Actions > see details

Dictionary Mode

String

OnlineCatalog

Change default only when Including DDL operations in Oracle Reader output.

DDL Capture Mode

String

All

With the default value of All, OracleReader captures all supported DDL operations (see Including DDL operations in Oracle Reader output). Set to Minimal to capture only DDL operations for tables.

Exclude Users

String

Optionally, specify one or more Oracle user names, separated by semicolons, whose transactions will be omitted from OracleReader output. Possible uses include:

  • omitting transactions that would cause an endless endless loop when data previously read by OracleReader is eventually written back to the same table by DatabaseWriter, for example, in the context of high-availability "active/active" replication

  • omitting transactions involving multiple gigabytes of data, thus reducing Striim's memory requirements

  • omitting long-running transactions, ensuring that OracleReader will restart from a recent SCN after Striim is restarted

Excluded Tables

String

If a wildcard is specified for Tables, any tables specified here will be excluded from the query. Specify the value as for Tables.

Fetch Size

Integer

1000

LogMiner only: the number of records the JDBC driver will return at a time. For example, if Oracle Reader queries LogMiner and there are 2300 records available, the JDBC driver will return two batches of 1000 records and one batch of 300.

Filter Transaction Boundaries

Boolean

True

With the default value of True, BEGIN and COMMIT operations are filtered out. Set to False to include BEGIN and COMMIT operations.

Outbound Server Process Name

String

WebAction XStream

XStream only: Specifies the name Oracle will use for the internal process it creates to send change data to Striim. Each source or cache using XStream on the same Oracle database must have a unique value for this property. If you have only one source or cache using XStream, there is no need to change the default value. See "The Outbound Server" in Oracle's XStream documentation for more information.

Password

encrypted password

the password specified for the username (see Encrypted passwords)

Queue Size

Integer

2048

Quiesce Marker Table

String

QUIESCEMARKER

See Creating the quiescemarker table. Modify the default value if the quiesce marker table is not in the schema associated with the user specified in the Username. Three-part CDB / PDB names are not supported in this release.

Reader Type

String

LogMiner

specify LogMiner or XStream

Send Before Image

Boolean

True

set to False to omit before data from output

SSL Config

String

If using SSL with the Oracle JDBC driver, specify the required properties. Examples:

If using SSL for encryption only:

oracle.net.ssl_cipher_suites=
  (SSL_DH_anon_WITH_3DES_EDE_CBC_SHA,
    SSL_DH_anon_WITH_RC4_128_MD5, 
    SSL_DH_anon_WITH_DES_CBC_SHA)

If using SSL for encryption and server authentication:

javax.net.ssl.trustStore=
/etc/oracle/wallets/ewallet.p12;
javax.net.ssl.trustStoreType=PKCS12;
javax.net.ssl.trustStorePassword=********

If using SSL for encryption and both server and client authentication:

javax.net.ssl.trustStore=
/etc/oracle/wallets/ewallet.p12;
javax.net.ssl.trustStoreType=PKCS12;
javax.net.ssl.trustStorePassword=********;
javax.net.ssl.keyStore=/opt/Striim/certs;
javax.net.ssl.keyStoreType=JKS;
javax.net.ssl.keyStorePassword=********

Start SCN

String

Optionally specify an SCN from which to start reading (See Replicating Oracle data to another Oracle database for an example).

Start Timestamp

String

null

With the default value of null, only new (based on current system time) transactions are read. If a timestamp is specified, transactions that began after that time are also read. The format is DD-MON-YYYY HH:MI:SS. For example, to start at 5:00 pm on July 15, 2017, specify 15-JUL-2017 17:00:00.

Support PDB

Boolean

False

Set to True if reading from CDB or PDB.

Tables

String

The table or materialized view to be read (supplemental logging must be enabled as described in Oracle configuration) in the format <schema>.<table>. (If using Oracle 12c with PDB, use three-party names: <pdb>.<schema>.<table>.) Names are case-sensitive.

You may specify multiple tables and materialized views as a list separated by semicolons or with the following wildcards:

  • %: any series of characters

  • _: any single character

For example, HR.% would read all tables in the HR schema. Wildcards may not be used with XStream Out when Compression is set to True.

For a literal underscore, use \_. For example, HR\_% would include HR_EMPLOYEES and HR_DEPTS but not HRMASTER. (Note that when reading Oracle tables with DatabaseReader the opposite is the case: \_ is the wildcard and _ is a literal underscore.)

See also Specifying key columns for tables without a primary key.

Transaction Buffer Disk Location

String

striim/LargeBuffer

See Transaction Buffer Type.

Transaction Buffer Spillover Size

String

1MB

When Transaction Buffer Type is Memory, this setting has no effect.

When Transaction Buffer Type is Disk, the amount of memory that Striim will use to hold each in-process transactions before buffering it to disk. You may specify the size in MB or GB.

Transaction Buffer Type

String

Memory

With the default setting, when Striim is processing more events than it can store in the available Java heap space, the application will crash. Typically this will happen when a transaction includes millions of INSERT, UPDATE, or DELETE events with a single COMMIT, at which point the application will crash with an error message such as "increase the block size of large buffer" or "exceeded heap usage threshold."

To work around that problem, set to Disk. Striim will buffer large transactions to disk at the location specified by the Transaction Buffer Disk Location property, then process them when memory is available.

When the setting is Disk and recovery is enabled (see Recovering applications), after the application crashes or is stopped the buffer will be reset, and during recovery any previously buffered transactions will restart from the beginning.

Username

String

the username created as described in Oracle configuration; if using Oracle 12c with PDB, specify the CDB user (c##striim) 

XStream Timeout

String

600

number of seconds OracleReader will wait for the outbound server to start before failing

OracleReader WAEvent fields

The output data type for OracleReader is WAEvent.

metadata: for DML operations, the most commonly used elements are:

  • OperationName: COMMIT, BEGIN, INSERT, DELETE, UPDATE, or (when using LogMiner only) ROLLBACK

  • TxnID: transaction ID

  • TimeStamp: timestamp from the CDC log

  • TableName (returned only for INSERT, DELETE, and UPDATE operations): fully qualified name of the table

  • ROWID (returned only for INSERT, DELETE, and UPDATE operations): the Oracle ID for the inserted, deleted, or updated row

To retrieve the values for these elements, use the META function. See Parsing the fields of WAEvent for CDC readers.

data: for DML operations, an array of fields, numbered from 0, containing:

  • for an INSERT or DELETE operation, the values that were inserted or deleted

  • for an UPDATE, the values after the operation was completed

To retrieve the values for these fields, use SELECT ... (DATA[]). See Parsing the fields of WAEvent for CDC readers.

before (for UPDATE operations only): the same format as data, but containing the values as they were prior to the UPDATE operation

dataPresenceBitMap, beforePresenceBitMap, and typeUUID are reserved and should be ignored.

The following is a complete list of fields that may appear in metadata. The actual fields will vary depending on the operation type and other factors.

metadata property

present when using LogMiner

present when using XStream Out

comments

AuditSessionID

x

Audit session ID associated with the user session making the change

BytesProcessed

x

CatalogObjectName

x

see Oracle Reader WAEvent fields for DDL operations

CatalogObjectType

x

see Oracle Reader WAEvent fields for DDL operations

ColumnIsKey

x

see Oracle Reader WAEvent fields for DDL operations

ColumnLength

x

see Oracle Reader WAEvent fields for DDL operations

ColumnIsNullable

x

see Oracle Reader WAEvent fields for DDL operations

ColumnMetadata

x

see Oracle Reader WAEvent fields for DDL operations

ColumnName

x

see Oracle Reader WAEvent fields for DDL operations

ColumnPrecision

x

see Oracle Reader WAEvent fields for DDL operations

COMMIT_TIMESTAMP

x

x

the UNIX epoch time the transaction was committed, based on the Striim server's time zone

COMMITSCN

x

x

system change number (SCN) when the transaction committed

DBCommitTimestamp

x

the UNIX epoch time the transaction was committed, based on the Oracle server's time zone

DBTimestamp

x

the UNIX epoch time of the operation, based on the Oracle server's time zone

ObjectName

x

see Oracle Reader WAEvent fields for DDL operations

ObjectType

x

see Oracle Reader WAEvent fields for DDL operations

OperationName

x

x

user-level SQL operation that made the change (INSERT, UPDATE, ...)

OperationSubType

x

see Oracle Reader WAEvent fields for DDL operations

OperationType

x

x

the Oracle operation type (COMMIT, DDL, DELETE, INSERT, INTERNAL, LOB_ERASE, LOB_TRIM, LOB_WRITE, MISSING_SCN, ROLLBACK, SELECT_FOR_UPDATE, SELECT_LOB_LOCATOR, START, UNSUPPORTED, or UPDATE)

OwnerName

x

see Oracle Reader WAEvent fields for DDL operations

ParentTxnID

x

raw representation of the parent transaction identifier

PK_UPDATE

x

true if an UPDATE operation changed the primary key, otherwise false

RbaBlk

x

RBA block number within the log file

RbaSqn

x

sequence# associated with the Redo Block Address (RBA) of the redo record associated with the change

RecordSetID

x

Uniquely identifies the redo record that generated the row. The tuple (RS_ID, SSN) together uniquely identifies a logical row change.

RollBack

x

1 if the record was generated because of a partial or a full rollback of the associated transaction, otherwise 0

ROWID

x

x

Row ID of the row modified by the change (only meaningful if the change pertains to a DML). This will be NULL if the redo record is not associated with a DML.

SCN

x

x

system change number (SCN) when the database change was made

SegmentName

x

name of the modified data segment

SegmentType

x

type of the modified data segment (INDEX, TABLE, ...)

Serial

x

serial number of the session that made the change

Session

x

session number of the session that made the change

SessionInfo

x

Information about the database session that executed the transaction. Contains process information, machine name from which the user logged in, client info, and so on.

SQLRedoLength

x

length of reconstructed SQL statement that is equivalent to the original SQL statement that made the change

TableMetadata

x

see Oracle Reader WAEvent fields for DDL operations

TableName

x

x

name of the modified table (in case the redo pertains to a table modification)

TableSpace

x

name of the tablespace containing the modified data segment.

ThreadID

x

ID of the thread that made the change to the database

TimeStamp

x

x

the UNIX epoch time of the operation, based on the Striim server's time zone

TransactionName

x

x

name of the transaction that made the change (only meaningful if the transaction is a named transaction)

TxnID

x

x

raw representation of the transaction identifier

TxnUserID

x

x

OracleReader simple application

The following application will write change data for all tables in myschema to SysOut. Replace the Username and Password values with the credentials for the account you created for Striim for use with LogMiner (see Configuring Oracle LogMiner) and myschema with the name of the schema containing the databases to be read.

CREATE APPLICATION OracleLMTest;
CREATE SOURCE OracleCDCIn USING OracleReader (
  Username:'striim',
  Password:'passwd',
  ConnectionURL:'203.0.113.49:1521:orcl',
  Tables:'myschema.%',
  FetchSize:1
) 
OUTPUT TO OracleCDCStream;

CREATE TARGET OracleCDCOut
  USING SysOut(name:OracleCDCLM)
  INPUT FROM OracleCDCStream;
END APPLICATION OracleLMTest;

Alternatively, you may specify a single table, such as myschema.mytable. See the discussion of Tables in Oracle Reader properties for additional examples of using wildcards to select a set of tables.

If you are using XStream Out, specify the ReaderType, and use the credentials for the account you created for Striim for use with XStream (see Configuring Oracle XStream Out):

CREATE SOURCE OracleCDCIn USING OracleReader (
  Username:'striim',
  Password:'passwd',
  ConnectionURL:'203.0.113.49:1523:orcl',
  Tables:'myschema.%',
  ReaderType:XStream
) 
OUTPUT TO OracleCDCStream;

The FetchSize property is for LogMiner only.

When troubleshooting problems, you can get the current LogMiner SCN and timestamp by entering mon <namespace>.<OracleReader source name>; in the Striim console.

Oracle Reader example output

OracleReader's output type is WAEvent. See WAEvent contents for change data for general information.

The following are examples of WAEvents emitted by OracleReader for various operation types. Note that many of the metadata values (see OracleReader WAEvent fields) are dependent on the Oracle environment and thus will vary from the examples below.

The examples all use the following table:

CREATE TABLE POSAUTHORIZATIONS (
  BUSINESS_NAME varchar2(30),
  MERCHANT_ID varchar2(100),
  PRIMARY_ACCOUNT NUMBER,
  POS NUMBER,CODE varchar2(20),
  EXP char(4),
  CURRENCY_CODE char(3),
  AUTH_AMOUNT number(10,3),
  TERMINAL_ID NUMBER,
  ZIP number,
  CITY varchar2(20),
  PRIMARY KEY (MERCHANT_ID));
COMMIT;
INSERT

If you performed the following INSERT on the table:

INSERT INTO POSAUTHORIZATIONS VALUES(
  'COMPANY 1',
  'D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu',
  6705362103919221351,
  0,
  '20130309113025',
  '0916',
  'USD',
  2.20,
  5150279519809946,
  41363,
  'Quicksand');
COMMIT;

Using LogMiner, the WAEvent for that INSERT would be similar to:

data: ["COMPANY 1","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu","6705362103919221351","0","20130309113025",
"0916","USD","2.2","5150279519809946","41363","Quicksand"]
metadata: "RbaSqn":"21","AuditSessionId":"4294967295","TableSpace":"USERS","CURRENTSCN":"726174",
"SQLRedoLength":"325","BytesProcessed":"782","ParentTxnID":"8.16.463","SessionInfo":"UNKNOWN",
"RecordSetID":" 0x000015.00000310.0010 ","DBCommitTimestamp":"1553126439000","COMMITSCN":726175,
"SEQUENCE":"1","Rollback":"0","STARTSCN":"726174","SegmentName":"POSAUTHORIZATIONS",
"OperationName":"INSERT","TimeStamp":1553151639000,"TxnUserID":"SYS","RbaBlk":"784",
"SegmentType":"TABLE","TableName":"SCOTT.POSAUTHORIZATIONS","TxnID":"8.16.463","Serial":"201",
"ThreadID":"1","COMMIT_TIMESTAMP":1553151639000,"OperationType":"DML","ROWID":"AAAE9mAAEAAAAHrAAB",
"DBTimeStamp":"1553126439000","TransactionName":"","SCN":"72617400000059109745623040160001",
"Session":"105"}
before: null
UPDATE

If you performed the following UPDATE on the table:

UPDATE POSAUTHORIZATIONS SET BUSINESS_NAME = 'COMPANY 5A' where pos=0;
COMMIT;

Using LogMiner with the default setting Compression: false, the WAEvent for that UPDATE for the row created by the INSERT above would be similar to:

data: ["COMPANY 5A","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,
  null,null,null,null,null,null]
metadata: "RbaSqn":"21","AuditSessionId":"4294967295","TableSpace":"USERS","CURRENTSCN":"726177","
SQLRedoLength":"164","BytesProcessed":"729","ParentTxnID":"2.5.451","SessionInfo":"UNKNOWN",
"RecordSetID":" 0x000015.00000313.0010 ","DBCommitTimestamp":"1553126439000","COMMITSCN":726178,
"SEQUENCE":"1","Rollback":"0","STARTSCN":"726177","SegmentName":"POSAUTHORIZATIONS",
"OperationName":"UPDATE","TimeStamp":1553151639000,"TxnUserID":"SYS","RbaBlk":"787",
"SegmentType":"TABLE","TableName":"SCOTT.POSAUTHORIZATIONS","TxnID":"2.5.451","Serial":"201",
"ThreadID":"1","COMMIT_TIMESTAMP":1553151639000,"OperationType":"DML","ROWID":"AAAE9mAAEAAAAHrAAB",
"DBTimeStamp":"1553126439000","TransactionName":"","SCN":"72617700000059109745625006240000",
"Session":"105"}
before: ["COMPANY 1","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null,
null,null]

Note that when using LogMiner the before section contains a value only for the modified column. You may use the IS_PRESENT() function to check whether a particular field value has a value (see Parsing the fields of WAEvent for CDC readers). With Compression: true, only the primary key is included in the before array:

before: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,
  null,null,null,null,null,null]

In all cases, if OracleReader's SendBeforeImage property is set to False, the before value will be null.

DELETE

If you performed the following DELETE on the table:

DELETE from POSAUTHORIZATIONS where pos=0;
COMMIT;

Using LogMiner with the default setting Compression: false, the WAEvent for a DELETE for the row affected by the UPDATE above would be:

data: ["COMPANY 5A","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu","6705362103919221351","0","20130309113025",
"0916","USD","2.2","5150279519809946","41363","Quicksand"]
metadata: "RbaSqn":"21","AuditSessionId":"4294967295","TableSpace":"USERS","CURRENTSCN":"726180",
"SQLRedoLength":"384","BytesProcessed":"803","ParentTxnID":"3.29.501","SessionInfo":"UNKNOWN",
"RecordSetID":" 0x000015.00000315.0010 ","DBCommitTimestamp":"1553126439000","COMMITSCN":726181,
"SEQUENCE":"1","Rollback":"0","STARTSCN":"726180","SegmentName":"POSAUTHORIZATIONS",
"OperationName":"DELETE","TimeStamp":1553151639000,"TxnUserID":"SYS","RbaBlk":"789",
"SegmentType":"TABLE","TableName":"SCOTT.POSAUTHORIZATIONS","TxnID":"3.29.501","Serial":"201",
"ThreadID":"1","COMMIT_TIMESTAMP":1553151639000,"OperationType":"DML","ROWID":"AAAE9mAAEAAAAHrAAB",
"DBTimeStamp":"1553126439000","TransactionName":"","SCN":"72618000000059109745626316960000",
"Session":"105"}
before: null

With Compression: true, the data array would be:

data: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null,null,null]

Note that the contents of data and before are reversed from what you might expect for a DELETE operation. This simplifies programming since you can get data for INSERT, UPDATE, and DELETE operations using only the data field.

Including DDL operations in Oracle Reader output

Reading DDL transactions requires Oracle 18c or earlier and LogMiner. It is enabled by setting OracleReader's DictionaryMode property to OfflineCatalog.

DDL events are output only for those tables, functions,  indexes, packages,  procedures, and views specified in OracleReader's Tables property value. Wildcards may be used, in which case DDL for new objects with names that match the wildcard pattern will be included in the output. See Handling DDL events when replicating Oracle data for a list of supported targets. Bidirectional replication of DDL is not supported.

Reading from tables containing XMLType columns is not supported when DictionaryMode is OfflineCatalog.

Supported DDL operations

The following DDL operations will be read for those tables, functions,  indexes, packages,  procedures, and views specified in OracleReader's Tables property value:

operation

function

index

package

procedure

table

view

ALTER ... ADD <column> 

x

ALTER ... ADD PRIMARY KEY

x

ALTER ... DROP <column>

x

ALTER ... MODIFY <column> <type>

x

ALTER ... RENAME

x

x

x

ALTER ... RENAME <column>

x

CREATE

x

x

x

x

x

x

CREATE OR REPLACE

x

x

x

x

DROP

x

x

x

x

x

x

TRUNCATE

x

In addition, the following operations are passed through to the target:

  • ANALYZE TABLE <table> VALIDATE STRUCTURE

  • GRANT <privilege(s)> ON <table> ...

  • REVOKE <privilege(s)> ON <table> ...

ALTER TABLE ... DROP / ENABLE / DISABLE <constraint name>; and operations on materialized views are not supported. If OracleReader encounters unsupported DDL (such as CREATE, ALTER, or DROP on a user, role, or trigger), it will write a warning to the log and continue.

Oracle Reader WAEvent fields for DDL operations

For DML operations, see OracleReader WAEvent fields.

The output data type for OracleReader is WAEvent. For DDL operations, the fields are:

data: contains the command string, for example, CREATE TABLE EMP (EMPID INT PRIMARY KEY, ENAME VARCHAR2(10)) or DROP TABLE EMP.

metadata: includes the following fields:

  • CatalogName: reserved

  • CatalogObjectType: TABLE / VIEW / INDEX / etc.

  • ColumnMetadata: a sub-field of TableMetadata containing descriptions of the columns

  • COMMITSCN: the SCN of the commit for the transaction

  • CURRENTSCN: the SCN of the current statement

  • ObjectName: the name of the table / view / index / etc. being acted on or, in the case of a RENAME, the new name

  • OperationName: the first keyword in the DDL command

  • OperationSubName: any additional keywords in the DDL command besides the first

  • OperationType: value for DDL operations is always DDL

  • OwnerName: owner of the object being acted on

  • SchemaName: schema containing the object being acted on

  • SCN: reserved

  • STARTSCN: the SCN of the start of the transaction

  • TableMetadata: description of the table, including sub-field

  • TableName: the name of the table (omitted in metadata for non-table operations)

  • TimeStamp: timestamp from the CDC log

  • TxnID: transaction ID

Example WAEvent values for supported DDL commands

DDL command

example WAEvent values

CREATE TABLE ...

data:

CREATE TABLE TEST (
  EMPID INT PRIMARY KEY,
  ENAME VARCHAR2(10))

metadata:

{
  "OperationSubName": "CREATE_TABLE",
  "TxnID": "8.15.261430",
  "TimeStamp": "2017-02-10T17:46:19.000-08:00",
  "COMMITSCN": "403795721",
  "CatalogName": null,
  "CURRENTSCN": "403795718",
  "STARTSCN": "403795694",
  "ObjectName": "TEST",
  "OperationName": "CREATE",
  "SCN": "403795718",
  "OperationType": "DDL",
  "TableMetadata": {
    "ColumnMetadata": [
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": true,
        "ColumnLength": 0,
        "ColumnType": "NUMBER",
        "ColumnScale": 0,
        "ColumnName": "EMPID",
        "ColumnIsNullable": true,
        "ColumnIndex": 0
      },
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 10,
        "ColumnType": "VARCHAR2",
        "ColumnScale": 0,
        "ColumnName": "ENAME",
        "ColumnIsNullable": true,
        "ColumnIndex": 1
      }
    ],
    "TableName": "TEST",
    "OwnerName": "STRIIM"
  },
  "SchemaName": "STRIIM",
  "CatalogObjectType": "TABLE"
}
ALTER TABLE ... 
  ADD (<name> <data type>)

data:

ALTER TABLE TEST ADD (
  SALARY NUMBER)

metadata:

{
  "OperationSubName": "ALTER_TABLE_ADD_COLUMN",
  "TxnID": "4.12.411190",
  "TimeStamp": "2017-02-10T17:46:30.000-08:00",
  "COMMITSCN": "403795759",
  "CatalogName": null,
  "CURRENTSCN": "403795755",
  "STARTSCN": "403795749",
  "ObjectName": "TEST",
  "OperationName": "ALTER",
  "SCN": "403795755",
  "OperationType": "DDL",
  "TableMetadata": {
    "ColumnMetadata": [
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": true,
        "ColumnLength": 0,
        "ColumnType": "NUMBER",
        "ColumnScale": 0,
        "ColumnName": "EMPID",
        "ColumnIsNullable": true,
        "ColumnIndex": 0
      },
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 10,
        "ColumnType": "VARCHAR2",
        "ColumnScale": 0,
        "ColumnName": "ENAME",
        "ColumnIsNullable": true,
        "ColumnIndex": 1
      },
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 0,
        "ColumnType": "NUMBER",
        "ColumnScale": 0,
        "ColumnName": "SALARY",
        "ColumnIsNullable": true,
        "ColumnIndex": 2
      }
    ],
    "TableName": "TEST",
    "OwnerName": "STRIIM"
  },
  "SchemaName": "STRIIM",
  "CatalogObjectType": "TABLE"
}
ALTER TABLE ...
  DROP COLUMN ...

data:

ALTER TABLE TEST
  DROP (SALARY)

metadata:

  "OperationSubName": "ALTER_TABLE_DROP_COLUMN",
  "TxnID": "10.1.258799",
  "TimeStamp": "2017-02-11T09:59:51.000-08:00",
  "COMMITSCN": "403888603",
  "CatalogName": null,
  "CURRENTSCN": "403888599",
  "STARTSCN": "403888593",
  "ObjectName": "TEST",
  "OperationName": "ALTER",
  "SCN": "403888599",
  "OperationType": "DDL",
  "TableMetadata": {
    "ColumnMetadata": [
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 0,
        "ColumnType": "NUMBER",
        "ColumnScale": 0,
        "ColumnName": "EMPID",
        "ColumnIsNullable": true,
        "ColumnIndex": 0
      },
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 10,
        "ColumnType": "VARCHAR2",
        "ColumnScale": 0,
        "ColumnName": "ENAME",
        "ColumnIsNullable": true,
        "ColumnIndex": 1
      }
    ],
    "TableName": "TEST",
    "OwnerName": "STRIIM"
  },
  "SchemaName": "STRIIM",
  "CatalogObjectType": "TABLE"
}
ALTER TABLE ...
  MODIFY (<column> ...

data:

ALTER TABLE TEST MODIFY
  SALARY INT

metadata:

{
  "OperationSubName": "ALTER_TABLE_MODIFY_COLUMN",
  "TxnID": "2.12.258663",
  "TimeStamp": "2017-02-11T10:02:20.000-08:00",
  "COMMITSCN": "403896354",
  "CatalogName": null,
  "CURRENTSCN": "403896349",
  "STARTSCN": "403896343",
  "ObjectName": "TEST",
  "OperationName": "ALTER",
  "SCN": "403896349",
  "OperationType": "DDL",
  "TableMetadata": {
    "ColumnMetadata": [
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": true,
        "ColumnLength": 0,
        "ColumnType": "NUMBER",
        "ColumnScale": 0,
        "ColumnName": "EMPID",
        "ColumnIsNullable": true,
        "ColumnIndex": 0
      },
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 10,
        "ColumnType": "VARCHAR2",
        "ColumnScale": 0,
        "ColumnName": "ENAME",
        "ColumnIsNullable": true,
        "ColumnIndex": 1
      },
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 0,
        "ColumnType": "NUMBER",
        "ColumnScale": 0,
        "ColumnName": "SALARY",
        "ColumnIsNullable": true,
        "ColumnIndex": 2
      }
    ],
    "TableName": "TEST",
    "OwnerName": "STRIIM"
  },
  "SchemaName": "STRIIM",
  "CatalogObjectType": "TABLE"
}
ALTER ... RENAME ...

data:

ALTER TABLE TEST
  RENAME TO CUSTOMER

metadata:

{
  "OperationSubName": "ALTER_TABLE_RENAME",
  "TxnID": "10.3.258647",
  "TimeStamp": "2017-02-10T17:53:31.000-08:00",
  "COMMITSCN": "403796279",
  "CatalogName": null,
  "CURRENTSCN": "403796276",
  "STARTSCN": "403796267",
  "ObjectName": "CUSTOMER",
  "OperationName": "ALTER",
  "SCN": "403796276",
  "OperationType": "DDL",
  "TableMetadata": {
    "ColumnMetadata": [
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 0,
        "ColumnType": "NUMBER",
        "ColumnScale": 0,
        "ColumnName": "EMPID",
        "ColumnIsNullable": true,
        "ColumnIndex": 0
      },
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 10,
        "ColumnType": "VARCHAR2",
        "ColumnScale": 0,
        "ColumnName": "ENAME",
        "ColumnIsNullable": true,
        "ColumnIndex": 1
      },
      {
        "ColumnPrecision": 0,
        "ColumnIsKey": false,
        "ColumnLength": 0,
        "ColumnType": "NUMBER",
        "ColumnScale": 0,
        "ColumnName": "SALARY",
        "ColumnIsNullable": true,
        "ColumnIndex": 2
      }
    ],
    "TableName": "CUSTOMER",
    "OldTblName": "STRIIM.TEST",
    "OwnerName": "STRIIM"
  },
  "SchemaName": "STRIIM",
  "CatalogObjectType": "TABLE"
}
DROP ...

data:

DROP TABLE TEST

metadata:

{
  "OperationSubName": "DROP_TABLE",
  "TxnID": "8.18.261385",
  "TimeStamp": "2017-02-10T17:48:29.000-08:00",
  "COMMITSCN": "403795874",
  "CatalogName": null,
  "CURRENTSCN": "403795870",
  "STARTSCN": "403795837",
  "ObjectName": "TEST",
  "OperationName": "DROP",
  "SCN": "403795870",
  "OperationType": "DDL",
  "SchemaName": "STRIIM",
  "CatalogObjectType": "TABLE"
}
CREATE UNIQUE INDEX ...

data:

CREATE UNIQUE INDEX TESTINDEX
  ON TEST (EMPID, ENAME)

metadata:

{
  "OperationSubName": "CREATE_INDEX",
  "TxnID": "5.21.258597",
  "TimeStamp": "2017-02-10T17:55:57.000-08:00",
  "COMMITSCN": "403796469",
  "CatalogName": null,
  "CURRENTSCN": "403796466",
  "STARTSCN": "403796460",
  "ObjectName": "TESTINDEX",
  "OperationName": "CREATE",
  "SCN": "403796466",
  "OperationType": "DDL",
  "SchemaName": "STRIIM",
  "CatalogObjectType": "INDEX"
}
ANALYZE TABLE ...
  VALIDATE STRUCTURE

data:

ANALYZE TABLE TEST VALIDATE STRUCTURE

metadata:

{
  "OperationSubName": "ANALYZE_TABLE",
  "TxnID": "6.13.240947",
  "TimeStamp": "2017-02-10T17:56:45.000-08:00",
  "COMMITSCN": "403796529",
  "CatalogName": null,
  "CURRENTSCN": "403796527",
  "STARTSCN": "403796520",
  "ObjectName": "TEST",
  "OperationName": "ANALYZE",
  "SCN": "403796527",
  "OperationType": "DDL",
  "SchemaName": "STRIIM",
  "CatalogObjectType": "TABLE"
}
CREATE VIEW ...

data:

CREATE VIEW MYVIEW AS
  SELECT EMPID,ENAME
  FROM TEST
  WHERE SALARY>43679

metadata:

{
  "OperationSubName": "CREATE_VIEW",
  "TxnID": "6.9.241087",
  "TimeStamp": "2017-02-10T17:59:07.000-08:00",
  "COMMITSCN": "403796658",
  "CatalogName": null,
  "CURRENTSCN": "403796653",
  "STARTSCN": "403796649",
  "ObjectName": "MYVIEW",
  "OperationName": "CREATE",
  "SCN": "403796653",
  "OperationType": "DDL",
  "SchemaName": "STRIIM",
  "CatalogObjectType": "VIEW"
}



	



CREATE OR REPLACE VIEW ...

(updating SELECT statement)

data:

CREATE OR REPLACE VIEW MYVIEW AS
  SELECT EMPID,ENAME
  FROM TEST 
  WHERE SALARY>100000

metadata:

{
  "OperationSubName": "CREATE_VIEW",
  "TxnID": "2.30.258751",
  "TimeStamp": "2017-02-10T18:00:51.000-08:00",
  "COMMITSCN": "403796921",
  "CatalogName": null,
  "CURRENTSCN": "403796917",
  "STARTSCN": "403796913",
  "ObjectName": "MYVIEW",
  "OperationName": "CREATE",
  "SCN": "403796917",
  "OperationType": "DDL",
  "SchemaName": "STRIIM",
  "CatalogObjectType": "VIEW"
}
CREATE OR
  REPLACE PACKAGE ...

data:

CREATE OR REPLACE PACKAGE emp_mgmt AS FUNCTION hire (
last_name VARCHAR2, job_id VARCHAR2, manager_id NUMBER, 
salary NUMBER, commission_pct NUMBER, department_id NUMBER) 
RETURN NUMBER;\nFUNCTION create_dept( department_id NUMBER, 
location_id NUMBER) RETURN NUMBER;
\nPROCEDURE remove_emp(employee_id NUMBER);
\nPROCEDURE remove_dept(department_id NUMBER);
\nPROCEDURE increase_sal(employee_id NUMBER, salary_incr NUMBER);
\nPROCEDURE increase_comm(employee_id NUMBER, comm_incr NUMBER);
\nno_comm EXCEPTION;
\nno_sal EXCEPTION;
\nEND emp_mgmt;

metadata:

{
  "OperationSubName": "CREATE_PACKAGE",
  "TxnID": "2.0.258774",
  "TimeStamp": "2017-02-11T17:05:55.000-08:00",
  "COMMITSCN": "404005562",
  "CatalogName": null,
  "CURRENTSCN": "404005557",
  "STARTSCN": "404005553",
  "ObjectName": "EMP_MGMT",
  "OperationName": "CREATE",
  "SCN": "404005557",
  "OperationType": "DDL",
  "SchemaName": "ROBERT",
  "CatalogObjectType": "PACKAGE"
} 
GRANT ...

data:

GRANT SELECT ON TEST TO STRIIM

metadata:

{
  "OperationSubName": "GRANT",
  "TxnID": "6.9.241213",
  "TimeStamp": "2017-02-11T10:07:21.000-08:00",
  "COMMITSCN": "403898105",
  "CatalogName": null,
  "CURRENTSCN": "403898102",
  "STARTSCN": "403898098",
  "ObjectName": null,
  "OperationName": "GRANT",
  "SCN": "403898102",
  "OperationType": "DDL",
  "SchemaName": "STRIIM",
  "CatalogObjectType": "UNKNOWN"
}
REVOKE ...

data:

REVOKE SELECT ON TEST FROM STRIIM

metadata:

{ 
 "OperationSubName": "REVOKE",
  "TxnID": "5.28.258661",
  "TimeStamp": "2017-02-11T10:07:31.000-08:00",
  "COMMITSCN": "403898124",
  "CatalogName": null,
  "CURRENTSCN": "403898121",
  "STARTSCN": "403898116",
  "ObjectName": null,
  "OperationName": "REVOKE",
  "SCN": "403898121",
  "OperationType": "DDL",
  "SchemaName": "STRIIM",
  "CatalogObjectType": "UNKNOWN"
}
Handling DDL events when replicating Oracle data

Reading DDL transactions requires Oracle 18c or earlier and LogMiner. It is enabled by setting OracleReader's DictionaryMode property to OfflineCatalog.

DDL events may be replayed in another Oracle instance, subject to the limitations discussed in Replicating Oracle data to another Oracle database. DDL cannot be replayed in non-Oracle databases.

KafkaWriter can use DDL events to track evolution of Oracle tables. See Using the Confluent or Hortonworks schema registry.

AzureBlobWriter, FileWriter, GCSWriter, HDFSWriter, and S3Writer automatically roll over to a new file when a DDL event is received. This default behavior may be disabled using the RolloverOnDDL property.

Oracle Reader data type support and correspondence

If a table contains a column of a type not supported by LogMiner, LogMiner will not capture any data for the table. For more information, see "Unsupported Data Types and Table Storage Attributes" in the LogMiner documentation for your version of Oracle.

Oracle type

TQL type

ADT

not supported by LogMiner

ARRAY

not supported by Oracle Reader

BFILE

not supported by LogMiner

BINARY_DOUBLE

double

BINARY_FLOAT

float

BFILE

not supported by Oracle Reader

BLOB

string (a primary or unique key must exist on the table)

An insert or update containing a column of this type generates two CDC log entries: an insert or update in which the value for this column is null, followed by an update including the value.

CHAR

string

CLOB

string (a primary or unique key must exist on the table)

An insert or update containing a column of this type generates two CDC log entries: an insert or update in which the value for this column is null, followed by an update including the value.

DATE

DateTime

FLOAT

string

INTERVALDAYTOSECOND

string (when using LogMiner, string always has a sign; when using XStream, string begins with minus sign if value is negative, but the plus sign is omitted for positive values)

INTERVALYEARTOMONTH

string (when using LogMiner, string always has a sign; when using XStream, string begins with minus sign if value is negative, but the plus sign is omitted for positive values)

LONG

Results may be inconsistent. Oracle recommends using CLOB instead.

LONG RAW

Results may be inconsistent. Oracle recommends using CLOB instead.

NCHAR

string

NCLOB

string (a primary or unique key must exist on the table)

NESTED TABLE

not supported by Oracle Reader

NUMBER

string

NVARCHAR2

string

RAW

string

REF

not supported by Oracle Reader

ROWID

not supported by Oracle Reader

TIMESTAMP

DateTime

TIMESTAMP WITHLOCALTIMEZONE

DateTime (supported in LogMiner only)

TIMESTAMP WITHTIMEZONE

DateTime (supported in LogMiner only)

UDT

not supported by Oracle Reader

UROWID

not supported by Oracle Reader

VARCHAR2

string

VARRAY

not supported by LogMiner

XMLTYPE

Supported only for Oracle 12c and later. When DictionaryMode is OnlineCatalog, values in any XMLType columns will be set to null. When DictionaryMode is OfflineCatalog, reading from tables containing XMLType columns is not supported.

Runtime considerations when using Oracle Reader

Starting an Oracle Reader source automatically opens an Oracle session for the user specified in the Username property.

  • When using LogMiner, the session is closed when the source is stopped.

  • When using XStream Out, the session is closed when the source is undeployed.

  • If a running Oracle Reader source fails with an error, the session will be closed.

  • Closing a PDB source while Oracle Reader is running will cause the application to crash.

Specifying key columns for tables without a primary key

If a primary key is not defined for a table, the values for all columns are included in UPDATE and DELETE records, which can significantly reduce performance. You can work around this by setting the Compression property to True and including the KeyColumns option in the Tables property value. The syntax is:

Tables:'<table name> KeyColumns(<COLUMN 1 NAME>,<COLUMN 2 NAME>,...)'

The column names must be uppercase. Specify as many columns as necessary to define a unique key for each row.

If the table has a primary key, or the Compression property is set to False, KeyColumns will be ignored.

Using the DATA() and BEFORE() functions

The DATA() and BEFORE() functions return the WAEvent data and before arrays. The following example shows how you could use these functions to write change data event details to a JSON file with the associated Oracle column names. (You could do the same thing with AVROFormatter.) This is supported only for 11g using LogMiner.

CREATE SOURCE OracleCDCIn USING OracleReader (
  Username:'walm',
  Password:'passwd',
  ConnectionURL:'192.168.1.49:1521:orcl',
  Tables:'myschema.%',
  FetchSize:1
) 
OUTPUT TO OracleRawStream;CREATE TYPE OpTableDataType(
  OperationName String,
  TableName String,
  data java.util.HashMap,
  before java.util.HashMap
);

CREATE STREAM OracleTypedStream OF OpTableDataType;
CREATE CQ ParseOracleRawStream
  INSERT INTO OracleTypedStream
  SELECT META(OracleRawStream, "OperationName").toString(),
    META(OracleRawStream, "TableName").toString(),
    DATA(OracleRawStream),
    BEFORE(OracleRawStream)
  FROM OracleRawStream;
 
CREATE TARGET OracleCDCFFileOut USING FileWriter(
  filename:'Oracle2JSON_withFFW.json'
)
FORMAT USING JSONFormatter ()
INPUT FROM OracleTypedStream;

The CQ will be easier to read if you use an alias for the stream name. For example:

CREATE CQ ParseOracleRawStream
  INSERT INTO OracleTypedStream
  SELECT META(x, "OperationName").toString(),
    META(x, "TableName").toString(),
    DATA(x),
    BEFORE(x)
  FROM OracleRawStream x;

Using this application, the output for the INSERT operation described in OracleReader example output would look like this:

 {
  "OperationName":"UPDATE",
  "TableName":"ROBERT.POSAUTHORIZATIONS",
  "data":{"AUTH_AMOUNT":"2.2", "BUSINESS_NAME":"COMPANY 5A", "ZIP":"41363", "EXP":"0916", 
"POS":"0", "CITY":"Quicksand", "CURRENCY_CODE":"USD", "PRIMARY_ACCOUNT":"6705362103919221351", 
"MERCHANT_ID":"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu", "TERMINAL_ID":"5150279519809946", 
"CODE":"20130309113025"},
  "before":{"AUTH_AMOUNT":"2.2", "BUSINESS_NAME":"COMPANY 1", "ZIP":"41363", "EXP":"0916", 
"POS":"0", "CITY":"Quicksand", "CURRENCY_CODE":"USD", "PRIMARY_ACCOUNT":"6705362103919221351", 
"MERCHANT_ID":"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu", "TERMINAL_ID":"5150279519809946", 
"CODE":"20130309113025"}
 }

Viewing open transactions

SHOW <namespace>.<OracleReader source name> OPENTRANSACTIONS
  [ -LIMIT <count> ]
  [ -TRANSACTIONID '<transaction ID>,...']
  [ DUMP | -DUMP '<path>/<file name>' ];

This console command returns information about currently open Oracle transactions. The namespace may be omitted when the console is using the source's namespace.

With no optional parameters, SHOW <source> OPENTRANSACTIONS; will display summary information for up to ten open transactions (the default LIMIT count is 10).

╒══════════════════╤════════════╤════════════╤══════════════════╤════════════╤════════════╤═══════════════════════════════════════╕
│ Transaction ID   │ # of Ops   │ Sequence # │ StartSCN         │ Rba block  │ Thread #   │ TimeStamp                             │
├──────────────────┼────────────┼────────────┼──────────────────┼────────────┼────────────┼───────────────────────────────────────┤
│ 3.5.222991       │ 5          │ 1          │ 588206203        │ 5189       │ 1          │ 2019-04-05T21:28:51.000-07:00         │
│ 5.26.224745      │ 1          │ 1          │ 588206395        │ 5189       │ 1          │ 2019-04-05T21:30:24.000-07:00         │
│ 8.20.223786      │ 16981      │ 1          │ 588213879        │ 5191       │ 1          │ 2019-04-05T21:31:17.000-07:00         │
└──────────────────┴────────────┴────────────┴──────────────────┴────────────┴────────────┴───────────────────────────────────────┘
  • To show all open transactions, add -LIMIT ALL.

  • Add -TRANSACTIONID with a comma-separated list of transaction IDs (for example, -TRANSACTIONID '3.4.222991, 5.26.224745') to return summary information about specific transactions in the console and write the details to OpenTransactions_<timestamp> in the current directory.

  • Add DUMP to show summary information in the console and write the details to OpenTransactions_<timestamp> in the current directory.

  • Add -DUMP [<path>/<file name>' to show summary information in the console and write the details to the specified file.

File lineage in Oracle

See Console commands for instructions on using SHOW LINEAGE. The file lineage report is also available in the Flow Designer and via the getOracleFLM endpoint in the Striim application management REST API (see Using the Striim Application Management REST API).

By default, this feature is disabled. See Enabling file lineage.

For OracleReader, the file lineage data includes:

File Name

the archive log file name, for example, /oracle_archivelogs/1_4009_890952357.dbf (the NAME value taken from V$ARCHIVED_LOG based on log sequence number and thread number) or ONLINE REDO when OracleReader is reading from redo logs

Status

PROCESSING when OracleReader is reading the file, COMPLETED when it has finished

Directory Name

the directory in which the file is located

File Creation Time

the time Striim created the file lineage entry for the file

Number Of Events

the number of events OracleReader has read from the file; if the application has been restarted, OracleReader may not start reading from the first event, so the number will be less than the total number of events in the file

First Event Timestamp

FIRST_TIME value taken from V$ARCHIVED_LOG based on log sequence number and thread number

Last Event Timestamp

NEXT_TIME for the file taken from V$ARCHIVED_LOG based on log sequence number and thread number

Wrap Number

the number of times OracleReader resumed reading the file after the application was restarted

SequenceNumber

the unique sequence number from the file name, for example, for /oracle_archivelogs/1_4009_890952357.dbf it would be 4009

ThreadID

the thread number associated with the log sequence number and file name

FirstChangeNumber

FIRST_CHANGE# for the file taken from V$ARCHIVED_LOG based on log sequence number and thread number

LastChangeNumber

NEXT_CHANGE# for the file taken from V$ARCHIVED_LOG based on log sequence number and thread number

When OracleReader is reading from redo logs, file-related values are NOT APPLICABLE or N/A.

For more information, see LOG_ARCHIVE_FORMAT in Oracle's Database Reference.

Oracle to Kafka Quick Guide

This document provides detailed instructions for creating a Striim application to capture change data from one or more tables in Oracle and write it to one or more partitions of a Kafka topic. This reference application uses the most common options:

  • reads Oracle change data using LogMiner (Striim also supports XStream Out)

  • runs KafkaWriter in sync mode with recovery enabled to ensure no lost or duplicate events after recovery from a cluster failure ("exactly-once processing"; see Setting KafkaWriter's mode property: sync versus async for other supported options)

  • writes the Oracle data to Kafka in Avro format (Striim also supports DSV, JASON, and XML)

This document assumes the following:

  • You have sysdba privileges in Oracle to perform the necessary configuration, and can access its host via ssh and scp with the privileges necessary to install and start the Forwarding Agent.

  • You have a Kafka instance and the privileges necessary to perform all the tasks discussed below.

To create the Striim application, you will perform the following tasks, as detailed in this document. It is important that you perform the steps in the order in which they are described here. We strongly recommend that you read the entire document before starting to follow the instructions.

  1. enable archivlog and supplemental log data in Oracle

  2. create an Oracle role and user for use by Striim

  3. if your Kafka instance uses SASL authentication or SSL encryption, configure Striim accordingly

  4. create an application using Striim's Oracle CDC to Kafka template

  5. modify the application to enable recovery

  6. optionally, modify the application to:

    • write to multiple Kafka partitions

    • increase throughput by using multiple instances of OracleReader or KafkaWriter

    • deploy the source on the Oracle host using the Forwarding Agent

We strongly recommend you read this entire Quick Guide before starting to follow any of the detailed instructions.

Reading from Oracle with Striim: quick overview

Striim supports Oracle 11g 11.2.0.4, 12c 12.1.0.2, 12.2.0.1.0, 18c, and 19c.

For additional details such as RAC support and the differences between LogMiner and XStream Out, see Oracle Database.

Enabling archivelog

The following steps check whether archivelog is enabled and, if not, enable it.

  1. Log in to SQL*Plus as the sys user.

  2. Enter the following command:

    select log_mode from v$database;

    If the command returns ARCHIVELOG, it is enabled. Skip ahead to Enabling supplemental log data.

  3. If the command returns NOARCHIVELOG, enter: shutdown immediate

  4. Wait for the message ORACLE instance shut down, then enter: startup mount

  5. Wait for the message Database mounted, then enter:

    alter database archivelog;
    alter database open;
  6. To verify that archivelog has been enabled, enter select log_mode from v$database; again. This time it should return ARCHIVELOG.

Enabling supplemental log data

The following steps check whether supplemental log data is enabled and, if not, enable it.

  1. Enter the following command: 

    select supplemental_log_data_min, supplemental_log_data_pk from v$database;

    If the command returns YES or IMPLICIT, supplemental log data is already enabled. For example, 

    SUPPLEME SUP
    -------- ---
    YES      NO

    indicates that supplemental log data is enabled, but primary key logging is not. If it returns anything else, enter:

    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  2. To enable primary key logging for all tables in the database enter: 

    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;

    Alternatively, to enable primary key logging only for selected tables (do not use this approach if you plan to use wildcards in the OracleReader Tables property to capture change data from new tables):

    ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
  3. If replicating Oracle data to Redshift, enable supplemental logging on all columns for all tables in the database:

    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

    Alternatively, to enable only for selected tables:

    ALTER TABLE <schema>.<table name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  4. To activate your changes, enter:

    alter system switch logfile;

If using Amazon RDS for Oracle, use the following commands instead:

exec rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => 'ADD');
exec rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => 'ADD', p_type => 'PRIMARY KEY');
exec rdsadmin.rdsadmin_util.switch_logfile;
select supplemental_log_data_min, supplemental_log_data_pk from v$database;
Creating an Oracle user with LogMiner privileges

You may use LogMiner with any supported Oracle version.

Log in as sysdba and enter the following commands to create a role with the privileges required by the Striim OracleReader adapter and create a user with that privilege. You may give the role and user any names you like. Replace ******** with a strong password.

If using Oracle 11g ,or 12c, 18c, or 19c without CDB, enter the following commands

create role striim_privs;
grant create session,
  execute_catalog_role,
  select any transaction,
  select any dictionary
  to striim_privs;
grant select on SYSTEM.LOGMNR_COL$ to striim_privs;
grant select on SYSTEM.LOGMNR_OBJ$ to striim_privs;
grant select on SYSTEM.LOGMNR_USER$ to striim_privs;
grant select on SYSTEM.LOGMNR_UID$ to striim_privs;
create user striim identified by ******** default tablespace users;
grant striim_privs to striim;
alter user striim quota unlimited on users;

If using Database Vault, omit execute_catalog_role, and also enter the following commands:

grant execute on SYS.DBMS_LOGMNR to striim_privs;
grant execute on SYS.DBMS_LOGMNR_D to striim_privs;
grant execute on SYS.DBMS_LOGMNR_LOGREP_DICT to striim_privs;
grant execute on SYS.DBMS_LOGMNR_SESSION to striim_privs;

For Oracle 12c only, also enter the following command.

grant LOGMINING to striim_privs;

If using Oracle 12c, 18c, or 19c with PDB, enter the following commands. Replace <PDB name> with the name of your PDB.

create role c##striim_privs;
grant create session,
execute_catalog_role,
select any transaction,
select any dictionary,
logmining
to c##striim_privs;
grant select on SYSTEM.LOGMNR_COL$ to c##striim_privs;
grant select on SYSTEM.LOGMNR_OBJ$ to c##striim_privs;
grant select on SYSTEM.LOGMNR_USER$ to c##striim_privs;
grant select on SYSTEM.LOGMNR_UID$ to c##striim_privs;
create user c##striim identified by ******* container=all;
grant c##striim_privs to c##striim container=all;
alter user c##striim set container_data = (cdb$root, <PDB name>) container=current;

If using Database Vault, omit execute_catalog_role, and also enter the following commands:

grant execute on SYS.DBMS_LOGMNR to c##striim_privs;
grant execute on SYS.DBMS_LOGMNR_D to c##striim_privs;
grant execute on SYS.DBMS_LOGMNR_LOGREP_DICT to c##striim_privs;
grant execute on SYS.DBMS_LOGMNR_SESSION to c##striim_privs;
Writing Oracle data to Kafka with Striim: quick overview

Striim 3.10.1 can write to Kafka 0.8, 0.9, 0.10, or 0.11.

If your Kafka instance uses SASL authentication or SSL encryption, see Configuring Kafka.

Using the Oracle to Kafka template

Before following these instructions:

  • Striim must be running.

  • The Forwarding Agent in Oracle must be connected to Striim.

You will need the following information to complete the wizard:

  • Striim

    • VM user name

    • VM user password

    • Striim cluster name

    • Striim admin password

    • DNS name (displayed on the Essentials tab for the Striim VM)

  • Oracle (source)

    • connection URL in the format <IP address>:<port>:<SID>, for example, 198.51.100.0:1521:orcl

    • login name and password

    • source table names

  • Kafka (target)

    • topic name

    • broker address

    • optionally, any Kafka producer properties required by your environment (see "KafkaWriter" in the "Adapters reference" section of the Striim Programmer's Guide)

  1. Log into the Striim web UI at <DNS name>:9080 using admin as the user name and the Striim admin password.

  2. Click Oracle CDC to Kafka 0.8, Oracle CDC to Kafka 0.9,or Oracle CDC to Kafka 0.10. If using 0.10 or 0.11, see the known issues in Configuring an app template target.

  3. Enter names for your application (for example, Oracle2Kafka) and new namespace (do not create applications in the admin namespace) and click Save.

  4. Enter the name for the Oracle source component in the Striim application (for example, OracleSource), the connection URL, user name, and password.

  5. Select LogMiner as the log reader.

  6. Optionally, specify a wildcard string to select the Oracle tables to be read (see the discussion of the Tables property in Oracle Reader properties).

  7. Set Deploy source on Agent on (if the Forwarding Agent is not connected to Striim, this property does not appear) and click Next.

  8. If Striim's checks show that all properties are valid (this may take a few minutes), click Next.

  9. If you specified a wildcard in the Oracle properties, click Next. Otherwise, select the tables to be read and click Next.

  10. Press Next to skip mapping and filtering (or see Using Advanced Setup to map tables to streams for instructions on using this advanced feature).

  11. Enter the name for the Kafka target component in the Striim application (for example, KafkaTarget), the topic name and the broker address.

  12. For Input From, select the only choice. (This is OracleReader's output stream, and its name is <application name>_ChangeDataStream.)

  13. Enter the topic name, and the broker address.

  14. Optionally, click Show optional properties and specify any Kafka producer properties required by your environment. Leave Mode set to sync.

  15. Select AvroFormatter and specify its schema file name. This file will be created when the application is deployed (see AVRO Formatter).

  16. Click Save, then click Next. (Click Create Target only if you specified maps or filters and want to create more than one target.)

  17. Striim will create your application and open it in the Flow Designer. It should look something like this:

    ora2kafka.png
  18. Select Configuration > App settings, set the recovery interval to 5 seconds, and click Save.

    Screen_Shot_2017-11-02_at_12_16_27_PM.png
    Screen_Shot_2017-11-02_at_12_17_28_PM.png
  19. Select Configuration > Export to generate a TQL file. It should contain something like this (the password is encrypted):

    CREATE APPLICATION Oracle2Kafka RECOVERY 5 SECOND INTERVAL;
    
    CREATE SOURCE OracleSource USING OracleReader ( 
      FetchSize: 1,
      Compression: false,
      Username: 'myname',
      Password: '7ip2lhUSP0o=',
      ConnectionURL: '198.51.100.15:1521:orcl',
      DictionaryMode: 'OnlineCatalog',
      ReaderType: 'LogMiner',
      Tables: 'MYSCHEMA.%'
     ) 
    OUTPUT TO OracleSourcre_ChangeDataStream;
    
    CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' ( 
      Mode: 'Sync',
      Topic: 'MyTopic',
      brokerAddress: '198.51.100.55:9092'
    ) 
    FORMAT USING AvroFormatter ( schemaFileName: 'MySchema.avro' ) 
    INPUT FROM OracleSourcre_ChangeDataStream;
    
    END APPLICATION Oracle2Kafka;

Note that FetchSize: 1 is appropriate for development, but should be increased in a production environment. See Oracle Reader properties for more information.

Avro-formatted data in Kafka

Oracle CDC events formatted as Avro and written to Kafka will be similar to the following examples. For more information, see OracleReader WAEvent fields.

SQL*Plus command insert into qatest.vtesttable values(1,'insert1');

    {
        "metadata": {
                "TxnID": "9.5.256818",
                "RbaSqn": "65036",
                "TableSpace": "USERS",
                "CURRENTSCN": "402873649",
                "OperationName": "INSERT",
                "ParentTxnID": "9.5.256818",
                "SegmentType": "TABLE",
                "SessionInfo": "UNKNOWN",
                "TxnUserID": "UNKNOWN",
                "Session": "72",
                "BytesProcessed": "593",
                "TransactionName": "",
                "STARTSCN": "402873649",
                "SegmentName": "VTESTTABLE",
                "SEQUENCE": "1",
                "COMMITSCN": "402873650",
                "RbaBlk": "20376",
                "ThreadID": "1",
                "SCN": "40287364900183060065866895851680000",
                "AuditSessionId": "2064718",
                "ROWID": "AAJb5rAAEAAATL8AAA",
                "TimeStamp": "2017-02-02T04:48:18.000-08:00",
                "Serial": "36570",
                "RecordSetID": " 0x00fe0c.00004f98.0010 ",
                "TableName": "QATEST.VTESTTABLE",
                "OperationType": "DML",
                "SQLRedoLength": "69",
                "Rollback": "0"
        },
        "before": null,
        "data": {
                "ID": "1",
                "NAME": "insert1"
        }
}

SQL*Plus command UPDATE qatest.vtesttable SET NAME='Vino' WHERE ID=10;:

    {
        "metadata": {
                "TxnID": "6.18.239971",
                "RbaSqn": "65036",
                "TableSpace": "USERS",
                "CURRENTSCN": "402878537",
                "OperationName": "UPDATE",
                "ParentTxnID": "6.18.239971",
                "SegmentType": "TABLE",
                "SessionInfo": "UNKNOWN",
                "TxnUserID": "UNKNOWN",
                "Session": "72",
                "BytesProcessed": "635",
                "TransactionName": "",
                "STARTSCN": "402878537",
                "SegmentName": "VTESTTABLE",
                "SEQUENCE": "1",
                "COMMITSCN": "402878538",
                "RbaBlk": "38226",
                "ThreadID": "1",
                "SCN": "40287853700183060065878594027680000",
                "AuditSessionId": "2064718",
                "ROWID": "AAJb5rAAEAAATL8AAJ",
                "TimeStamp": "2017-02-02T05:20:54.000-08:00",
                "Serial": "36570",
                "RecordSetID": " 0x00fe0c.00009552.0010 ",
                "TableName": "QATEST.VTESTTABLE",
                "OperationType": "DML",
                "SQLRedoLength": "90",
                "Rollback": "0"
        },
        "before": {
                "ID": "10",
                "NAME": "insert10"
        },
        "data": {
                "ID": "10",
                "NAME": "Vino"
        }
}
Enhancing your Oracle to Kafka application

Once you have your basic Oracle to Kafka application, you may modify it in many ways. The following are a few of the most common:

Including DDL in the output

DDL may be included by changing OracleReader's DictionaryMode property to OnlineCatalog. See Including DDL operations in Oracle Reader output and Handling DDL events when replicating Oracle data.

Using the Forwarding Agent

The Striim Forwarding Agent is a stripped-down version of the Striim server that can be used to run sources and queries locally on the Oracle host. See Striim Forwarding Agent installation and configuration for more information.

To make your application deployable to an agent, split it into two flows:

CREATE APPLICATION Oracle2Kafka RECOVERY 5 SECOND INTERVAL;

CREATE FLOW AgentFlow;
CREATE SOURCE OracleSource USING OracleReader ( 
  FetchSize: 1,
  CommittedTransactions: true,
  Compression: false,
  Username: 'myname',
  Password: '7ip2lhUSP0o=',
  ConnectionURL: '198.51.100.15:1521:orcl',
  DictionaryMode: 'OnlineCatalog',
  ReaderType: 'LogMiner',
  Tables: 'MYSCHEMA.%'
 ) 
OUTPUT TO OracleSourcre_ChangeDataStream;
END FLOW AgentFlow;

CREATE FLOW ServerFlow;
CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' ( 
  Mode: 'Sync',
  Topic: 'MyTopic',
  brokerAddress: '198.51.100.55:9092'
) 
FORMAT USING AvroFormatter ( schemaFileName: 'MySchema.avro' ) 
INPUT FROM OracleSourcre_ChangeDataStream;
END FLOW ServerFlow;

END APPLICATION Oracle2Kafka;

With the default deployment group names, you would deploy this using the command:

DEPLOY APPLICATION Oracle2Kafka with AgentFlow in agent, ServerFlow in default;

See Using the Striim Forwarding Agent and Managing deployment groups for more information.

Scaling up OracleReader

If Oracle writes to its CDC log faster than your template-generated application can read it, you can increase throughput by using multiple instances of OracleReader. Even if deployed on the same server this can increase performance, since each can be run on a different core.

Use the Tables property to distribute tables among the OracleReaders:

  • Assign each table to only one OracleReader.

  • When tables are related (by primary or foreign key) or to ensure transaction integrity among a set of tables, assign them all to the same OracleReader.

  • When dividing tables among OracleReaders, distribute them according to how busy they are rather than simply by the number of tables. For example, if one table generates 50% of the entries in the CDC log, you might assign it and any related tables to one OracleReader and all the other tables to another.

The following is a simple example of how you could modify the basic application example above to use two OracleReaders, using one to read a very busy table and the other to read the rest of the tables in the same schema:

CREATE SOURCE OracleSource1 USING OracleReader ( 
  FetchSize: 1,
  Compression: false,
  Username: 'myname',
  Password: '7ip2lhUSP0o=',
  ConnectionURL: '198.51.100.15:1521:orcl',
  ReaderType: 'LogMiner',
  Tables: 'MYSCHEMA.VERYBUSYTABLE'
) 
OUTPUT TO OracleSourcre_ChangeDataStream;

CREATE SOURCE OracleSource2 USING OracleReader ( 
  FetchSize: 1,
  CommittedTransactions: true,
  Compression: false,
  Username: 'myname',
  Password: '7ip2lhUSP0o=',
  ConnectionURL: '198.51.100.15:1521:orcl',
  ReaderType: 'LogMiner',
  Tables: 'MYSCHEMA.%',
  ExcludedTables: 'MYSCHEMA.VERYBUSYTABLE'
) 
OUTPUT TO OracleSourcre_ChangeDataStream;
Writing to multiple Kafka topic partitions

Use KafkaWriter's PartitionKey property to choose a field from the input stream, METADATA map (see OracleReader WAEvent fields, or USERDATA map (see Adding user-defined data to WAEvent streams) to be used to distribute events among multiple partitions. For example, assuming the sixth element (counting from zero) in the WAEvent data array is a city name:

CREATE AddUserData
INSERT INTO OracleSourceWithPartitionKey
SELECT putUserData(x, 'city', data[5])
FROM OracleSourcre_ChangeDataStream x;

CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' (
  Mode: 'Sync',
  Topic: 'MyTopic',
  brokerAddress: '198.51.100.55:9092'
  PartitionKey:'@userdata(city)'
) input from UserDataDemoStream;
Scaling up KafkaWriter

If OracleReader sends CDC events faster than KafkaWriter can write them, you can increase throughput by using the ParallelThreads property to create multiple instances of KafkaWriter. Even if deployed on the same server this can increase performance, since each can be run on a different core.

Warning

Use ParallelThreads only when the target is not able to keep up with incoming events (that is, when its input stream is backpressured). Otherwise, the overhead imposed by additional threads could reduce the application's performance.

The following is a simple example of how you could modify the basic application example above to use four KafkaWriters:

CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' (
  Mode: 'Sync',
  Topic: 'MyTopic',
  brokerAddress: '198.51.100.55:9092'
  ParallelThreadsa:'4'
) input from UserDataDemoStream;

All four KafkaWriters will use the same properties. If the application is deployed ON ALL to a deployment group with multiple servers, each server will have four KafkaWriters.

Troubleshooting your Oracle to Kafka application

The following are some issues you might encounter when running your application.

OracleReader does not appear to be capturing data
  1. Verify that the StartSCN value (if specified) is correct.

  2. Verify that the Tables property is correct.

  3. OracleReader will send no data until it has accumulated the number of records specified by the FetchSize value. For testing purposes, it can be helpful to set this to 1 to send each record immediately.

  4. Verify that the transactions are committed. By default, OracleReader sends only committed transactions (since the default for the CommittedTransactions property is True).

  5. In the Striim console, enter mon <OracleReader name>; and look at the Last observed LCR-SCN and Last observed LCR-Timestamp values. Enter the command again: if the values have increased, that means OracleReader is buffering events in its transaction cache, and will write them when the transactions are complete and the FetchSize value has been reached. If most of the events are part of long-running transactions, it will take longer for OracleReader to emit the next set.

KafkaWriter seems slow

To see whether KafkaWriter is keeping up with the output of OracleReader, in the Striim console, enter mon <KafkaWriter name>; and look at the Read Lag value (see Understanding Read Lag values for more information). Wait two minutes and enter the command again. If the value has increased, KafkaWriter is not keeping up with OracleReader. Subtract the first value from the second and make a note of that number.

Create multiple instances of Kafkawriter (see Creating multiple writer instances), then run mon <KafkaWriter name>;, wait two minutes, and run it again. If the value did not increase, KafkaWriter is keeping up with OracleReader. If the value increased but the difference between it and the first value is significantly less than it was with a single KafkaWriter, try using a higher prallelism-factor value.

If the PARALLELIZE option does not improve performance, investigate whether the issue might be in Kafka rather than in Striim, as detailed in Testing KafkaWriter performance.

3.10.1
Was this article helpful?
0 out of 0 found this helpful

Comments