Striim supports Oracle 11g 11.2.0.4, 12c 12.1.0.2, 12.2.0.1.0, 18c, and 19c.
RAC is supported in all versions.
LogMiner is supported in all versions.
XStream Out is supported only for 11g on Linux and requires Oracle patches 6880880 & 21352635.
You may choose either LogMiner or XStream Out as the source for change data.
Features supported by LogMiner but not XStream Out:
reading DDL, ROLLBACK, and uncommitted transactions
using QUIESCE (see Console commands)
Features supported by XStream Out but not LogMiner:
continuous reception of change data events (LogMiner receives events in batches as defined by OracleReader's FetchSize property)
reading from tables containing unsupported types (XStream Out will read the columns of supported types; LogMiner will not read the table at all)
Striim provides templates for creating applications that read from Oracle and write to various targets. See Creating a new application using a template for details.
Using OracleReader requires the following configuration changes in Oracle:
enable archivelog, if not already enabled
enable supplemental log data, if not already enabled
set up either LogMiner or XStream Out
create a user account for OracleReader and grant it the privileges necessary to use LogMiner or XStream Out
The following tasks must be performed regardless of which Oracle version or variation you are using.
The following steps check whether archivelog is enabled and, if not, enable it.
Log in to SQL*Plus as the sys user.
-
Enter the following command:
select log_mode from v$database;
If the command returns
ARCHIVELOG
, it is enabled. Skip ahead to Enabling supplemental log data. If the command returns
NOARCHIVELOG
, enter:shutdown immediate
Wait for the message
ORACLE instance shut down
, then enter:startup mount
-
Wait for the message
Database mounted
, then enter:alter database archivelog; alter database open;
To verify that archivelog has been enabled, enter
select log_mode from v$database;
again. This time it should returnARCHIVELOG
.
The following steps check whether supplemental log data is enabled and, if not, enable it.
-
Enter the following command:
select supplemental_log_data_min, supplemental_log_data_pk from v$database;
If the command returns
YES
orIMPLICIT
, supplemental log data is already enabled. For example,SUPPLEME SUP -------- --- YES NO
indicates that supplemental log data is enabled, but primary key logging is not. If it returns anything else, enter:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-
To enable primary key logging for all tables in the database enter:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
Alternatively, to enable primary key logging only for selected tables (do not use this approach if you plan to use wildcards in the OracleReader
Tables
property to capture change data from new tables):ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
-
If replicating Oracle data to Redshift, or to BigQuery or Snowflake with Optimized Merge disabled, enable supplemental logging on all columns for all tables in the database:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Alternatively, to enable only for selected tables:
ALTER TABLE <schema>.<table name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-
To activate your changes, enter:
alter system switch logfile;
If using Amazon RDS for Oracle, use the following commands instead:
exec rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => 'ADD'); exec rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => 'ADD', p_type => 'PRIMARY KEY'); exec rdsadmin.rdsadmin_util.switch_logfile; select supplemental_log_data_min, supplemental_log_data_pk from v$database;
You may use LogMiner with any supported Oracle version.
Log in as sysdba and enter the following commands to create a role with the privileges required by the Striim OracleReader adapter and create a user with that privilege. You may give the role and user any names you like. Replace ********
with a strong password.
If using Oracle 11g ,or 12c, 18c, or 19c without CDB, enter the following commands
create role striim_privs; grant create session, execute_catalog_role, select any transaction, select any dictionary to striim_privs; grant select on SYSTEM.LOGMNR_COL$ to striim_privs; grant select on SYSTEM.LOGMNR_OBJ$ to striim_privs; grant select on SYSTEM.LOGMNR_USER$ to striim_privs; grant select on SYSTEM.LOGMNR_UID$ to striim_privs; create user striim identified by ******** default tablespace users; grant striim_privs to striim; alter user striim quota unlimited on users;
If using Database Vault, omit execute_catalog_role,
and also enter the following commands:
grant execute on SYS.DBMS_LOGMNR to striim_privs; grant execute on SYS.DBMS_LOGMNR_D to striim_privs; grant execute on SYS.DBMS_LOGMNR_LOGREP_DICT to striim_privs; grant execute on SYS.DBMS_LOGMNR_SESSION to striim_privs;
For Oracle 12c only, also enter the following command.
grant LOGMINING to striim_privs;
If using Oracle 12c, 18c, or 19c with PDB, enter the following commands. Replace <PDB name>
with the name of your PDB.
create role c##striim_privs; grant create session, execute_catalog_role, select any transaction, select any dictionary, logmining to c##striim_privs; grant select on SYSTEM.LOGMNR_COL$ to c##striim_privs; grant select on SYSTEM.LOGMNR_OBJ$ to c##striim_privs; grant select on SYSTEM.LOGMNR_USER$ to c##striim_privs; grant select on SYSTEM.LOGMNR_UID$ to c##striim_privs; create user c##striim identified by ******* container=all; grant c##striim_privs to c##striim container=all; alter user c##striim set container_data = (cdb$root, <PDB name>) container=current;
If using Database Vault, omit execute_catalog_role,
and also enter the following commands:
grant execute on SYS.DBMS_LOGMNR to c##striim_privs; grant execute on SYS.DBMS_LOGMNR_D to c##striim_privs; grant execute on SYS.DBMS_LOGMNR_LOGREP_DICT to c##striim_privs; grant execute on SYS.DBMS_LOGMNR_SESSION to c##striim_privs;
XStream Out is supported only for Oracle 11.2.0.4 with patches 6880880 and 21352635.
Log in as sysdba and enter the following commands to create a user with the privileges required by the Striim OracleReader adapter and a tablespace to store objects and messages related to XSteam administration. The path to the oradata
directory in the DATAFILE
property may vary depending on your installation. Replace ********
with a strong password.
CREATE TABLESPACE striim_xstream_tbs DATAFILE '/home/oracle/app/oracle/oradata/orcl/striim_xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; CREATE USER striim IDENTIFIED BY ******** DEFAULT TABLESPACE striim_xstream_tbs QUOTA UNLIMITED ON striim_xstream_tbs; exec dbms_xstream_auth.grant_admin_privilege('striim', 'CAPTURE', true) GRANT CREATE SESSION TO striim; GRANT SELECT ANY TABLE TO striim;
To verify that the user has been created correctly, enter the following command:
select * from dba_streams_administrator where USERNAME='STRIIM';
You should see the following:
USERNAME LOC ACC ----------------- --- --- STRIIM YES YES
When you use XSteam Out in a Striim application, Striim will create the specified stream in Oracle automatically.
To allow Striim to quiesce (see Console commands) an application that uses OracleReader, you must create a quiescemarker table in Oracle. (This is not necessary when Reading from a standby.)
The DDL for creating the table is:
CREATE TABLE QUIESCEMARKER (source varchar2(100), status varchar2(100), sequence NUMBER(10), inittime timestamp, updatetime timestamp default sysdate, approvedtime timestamp, reason varchar2(100), CONSTRAINT quiesce_marker_pk PRIMARY KEY (source, sequence));
The Oracle user specified in OracleReader's Username property must have SELECT, INSERT, and UPDATE privileges on this table.
Note
QUIESCE is not supported with XStream Out, only with LogMiner.
Oracle Reader can read from a standby rather than a primary database.
Requirements
Create an Active Data Guard standby. (Striim can not read from a regular Data Guard standby.)
Open the standby in read-only mode.
On the primary, perform all steps in Basic Oracle configuration tasks and Creating an Oracle user with LogMiner privileges. No quiescemarker table is required when reading from a standby.
Limitations
Bidirectional replication is not supported.
Oracle Reader will reject QUIESCE if there are any open transactions.
Create the dictionary file
-
On the primary, use SQL Plus or another client to create a dictionary file.
For Oracle 11g or 12.1.0.2, enter the following commands, replacing
<path>
in the second command with the path returned by the first command. If the first command does not return a path, you must set UTL_FILE_DIR.show parameter utl_file_dir; execute dbms_logmnr_d.build('dict.ora', '<path>');
For Oracle 12.2.0.1.0 or later, enter the following commands.
CREATE DIRECTORY "dictionary_directory" AS '/opt/oracle/dictionary'; EXECUTE dbms_logmnr_d.build(dictionary_location=>'dictionary_directory', dictionary_filename=>'dict.ora', options => dbms_logmnr_d.store_in_flat_file);
Copy
dict.ora
to a directory on the standby.
Configure Oracle Reader properties in your application
Set Dictionary Mode to
ExternalDictionaryFileCatalog
.Set Database Role to
PHYSICAL_STANDBY
.Set External Dictionary File to the fully qualified name of the dictionary file you copied to the standby, for example,
/home/oracle/dict.ora
Handling DDL changes
When DDL changes must be made to the tables being read by Oracle Reader, do the following:
On the primary, stop DML activity and make sure there are no open transactions.
On the primary, force a log switch.
In Striim, quiesce the application (see Console commands). If Oracle Reader refuses the quiesce, wait a few minutes and try again.
On the primary, perform the DDL changes.
Repeat the procedure in "Create the dictionary file," above, replacing the old file on the standby with the new one.
Start the Striim application.
Before you can use this adapter, Oracle must be configured as described in the parts of Oracle configuration that are relevant to your environment.
Note
The Oracle JDBC driver must be installed on each Forwarding Agent that will run Oracle Reader with LogMiner. See Installing the Oracle JDBC driver.
Note
The Oracle Instant Client must be installed on each Striim server or Forwarding Agent that will run Oracle Reader using XStream Out. See Installing the Oracle Instant Client. XStream Out is supported only when Striim is running on Linux.
Striim provides templates for creating applications that read from Oracle and write to various targets. See Creating a new application using a template for details.
The adapter properties are:
property |
type |
default value |
notes |
---|---|---|---|
Bidirectional Marker Table |
String |
When performing bidirectional replication, the fully qualified name of the marker table (see Bidirectional replication). This setting is case-sensitive. |
|
Committed Transactions |
Boolean |
True |
LogMiner only: by default, only committed transactions are read. Set to False to read both committed and uncommitted transactions. |
Compression |
Boolean |
False |
If set to True, update operations for tables that have primary keys include only the primary key and modified columns, and delete operations include only the primary key. With the default value of False, all columns are included. See Oracle Reader example output for examples. Set to True when the output of an OracleReader source is is the input of a DatabaseWriter writing to a Cassandra target (see Cassandra Writer). |
Connection Retry Policy |
String |
timeOut=30, retryInterval=30, maxRetries=3 |
With the default setting:
Negative values are not supported. |
Connection URL |
String |
If using Oracle 12c with PDB, use the SID for the CDB service. (Note that with DatabaseReader and DatabaseWriter, you must use the SID for the PDB service instead.) If using Amazon RDS for Oracle, the connection URL is |
|
Database Role |
String |
PRIMARY |
Leave set to the default value of PRIMARY except when you Reading from a standby. |
DDL Capture Mode |
String |
All |
With the default value of All, OracleReader captures all supported DDL operations (see Including DDL operations in Oracle Reader output). Set to Minimal to capture only DDL operations for tables. |
Dictionary Mode |
String |
OnlineCatalog |
Leave set to the default of OnlineCatalog except when Including DDL operations in Oracle Reader output or when you Reading from a standby. |
Excluded Tables |
String |
If a wildcard is specified for |
|
Exclude Users |
String |
Optionally, specify one or more Oracle user names, separated by semicolons, whose transactions will be omitted from OracleReader output. Possible uses include:
|
|
External Dictionary File |
String |
Leave blank except when you Reading from a standby. |
|
Fetch Size |
Integer |
|
LogMiner only: the number of records the JDBC driver will return at a time. For example, if Oracle Reader queries LogMiner and there are 2300 records available, the JDBC driver will return two batches of 1000 records and one batch of 300. |
Filter Transaction Boundaries |
Boolean |
True |
With the default value of True, BEGIN and COMMIT operations are filtered out. Set to False to include BEGIN and COMMIT operations. |
Outbound Server Process Name |
String |
WebAction XStream |
XStream only: Specifies the name Oracle will use for the internal process it creates to send change data to Striim. Each source or cache using XStream on the same Oracle database must have a unique value for this property. If you have only one source or cache using XStream, there is no need to change the default value. See "The Outbound Server" in Oracle's XStream documentation for more information. |
Password |
encrypted password |
the password specified for the username (see Encrypted passwords) |
|
Queue Size |
Integer |
|
|
Quiesce Marker Table |
String |
QUIESCEMARKER |
See Creating the quiescemarker table. Modify the default value if the quiesce marker table is not in the schema associated with the user specified in the Username. Three-part CDB / PDB names are not supported in this release. |
Reader Type |
String |
LogMiner |
specify LogMiner or XStream |
Send Before Image |
Boolean |
True |
set to False to omit |
SSL Config |
String |
If using SSL with the Oracle JDBC driver, specify the required properties. Examples: If using SSL for encryption only: oracle.net.ssl_cipher_suites= (SSL_DH_anon_WITH_3DES_EDE_CBC_SHA, SSL_DH_anon_WITH_RC4_128_MD5, SSL_DH_anon_WITH_DES_CBC_SHA) If using SSL for encryption and server authentication: javax.net.ssl.trustStore= /etc/oracle/wallets/ewallet.p12; javax.net.ssl.trustStoreType=PKCS12; javax.net.ssl.trustStorePassword=******** If using SSL for encryption and both server and client authentication: javax.net.ssl.trustStore= /etc/oracle/wallets/ewallet.p12; javax.net.ssl.trustStoreType=PKCS12; javax.net.ssl.trustStorePassword=********; javax.net.ssl.keyStore=/opt/Striim/certs; javax.net.ssl.keyStoreType=JKS; javax.net.ssl.keyStorePassword=******** |
|
Start SCN |
String |
Optionally specify an SCN from which to start reading (See Replicating Oracle data to another Oracle database for an example). See also Switching from initial load to continuous replication. |
|
Start Timestamp |
String |
null |
With the default value of null, only new (based on current system time) transactions are read. If a timestamp is specified, transactions that began after that time are also read. The format is DD-MON-YYYY HH:MI:SS. For example, to start at 5:00 pm on July 15, 2017, specify 15-JUL-2017 17:00:00. |
Support PDB and CDB |
Boolean |
False |
Set to True if reading from CDB or PDB. |
Tables |
String |
The table or materialized view to be read (supplemental logging must be enabled as described in Oracle configuration) in the format <schema>.<table>. (If using Oracle 12c with PDB, use three-party names: <pdb>.<schema>.<table>.) Names are case-sensitive. You may specify multiple tables and materialized views as a list separated by semicolons or with the following wildcards:
For example, HR.% would read all tables in the HR schema. Wildcards may not be used with XStream Out when Compression is set to True. For a literal underscore, use See also Specifying key columns for tables without a primary key. |
|
Transaction Buffer Disk Location |
String |
striim/LargeBuffer |
See Transaction Buffer Type. |
Transaction Buffer Spillover Size |
String |
1MB |
When Transaction Buffer Type is Memory, this setting has no effect. When Transaction Buffer Type is Disk, the amount of memory that Striim will use to hold each in-process transactions before buffering it to disk. You may specify the size in MB or GB. |
Transaction Buffer Type |
String |
Memory |
With the default setting, when Striim is processing more events than it can store in the available Java heap space, the application will crash. Typically this will happen when a transaction includes millions of INSERT, UPDATE, or DELETE events with a single COMMIT, at which point the application will crash with an error message such as "increase the block size of large buffer" or "exceeded heap usage threshold." To work around that problem, set to Disk. Striim will buffer large transactions to disk at the location specified by the Transaction Buffer Disk Location property, then process them when memory is available. When the setting is Disk and recovery is enabled (see Recovering applications), after the application crashes or is stopped the buffer will be reset, and during recovery any previously buffered transactions will restart from the beginning. |
Username |
String |
the username created as described in Oracle configuration; if using Oracle 12c with PDB, specify the CDB user (c##striim) |
|
XStream Timeout |
String |
600 |
number of seconds OracleReader will wait for the outbound server to start before failing |
The output data type for OracleReader is WAEvent.
metadata: for DML operations, the most commonly used elements are:
OperationName: COMMIT, BEGIN, INSERT, DELETE, UPDATE, or (when using LogMiner only) ROLLBACK
TxnID: transaction ID
TimeStamp: timestamp from the CDC log
TableName (returned only for INSERT, DELETE, and UPDATE operations): fully qualified name of the table
ROWID (returned only for INSERT, DELETE, and UPDATE operations): the Oracle ID for the inserted, deleted, or updated row
To retrieve the values for these elements, use the META
function. See Parsing the fields of WAEvent for CDC readers.
data: for DML operations, an array of fields, numbered from 0, containing:
for an INSERT or DELETE operation, the values that were inserted or deleted
for an UPDATE, the values after the operation was completed
To retrieve the values for these fields, use SELECT ... (DATA[])
. See Parsing the fields of WAEvent for CDC readers.
before (for UPDATE operations only): the same format as data, but containing the values as they were prior to the UPDATE operation
dataPresenceBitMap, beforePresenceBitMap, and typeUUID are reserved and should be ignored.
The following is a complete list of fields that may appear in metadata. The actual fields will vary depending on the operation type and other factors.
metadata property |
present when using LogMiner |
present when using XStream Out |
comments |
---|---|---|---|
AuditSessionID |
x |
Audit session ID associated with the user session making the change |
|
BytesProcessed |
x |
||
CatalogObjectName |
x |
||
CatalogObjectType |
x |
||
ColumnIsKey |
x |
||
ColumnLength |
x |
||
ColumnIsNullable |
x |
||
ColumnMetadata |
x |
||
ColumnName |
x |
||
ColumnPrecision |
x |
||
COMMIT_TIMESTAMP |
x |
x |
the UNIX epoch time the transaction was committed, based on the Striim server's time zone |
COMMITSCN |
x |
x |
system change number (SCN) when the transaction committed |
DBCommitTimestamp |
x |
the UNIX epoch time the transaction was committed, based on the Oracle server's time zone |
|
DBTimestamp |
x |
the UNIX epoch time of the operation, based on the Oracle server's time zone |
|
ObjectName |
x |
||
ObjectType |
x |
||
OperationName |
x |
x |
user-level SQL operation that made the change (INSERT, UPDATE, ...) |
OperationSubType |
x |
||
OperationType |
x |
x |
the Oracle operation type (COMMIT, DDL, DELETE, INSERT, INTERNAL, LOB_ERASE, LOB_TRIM, LOB_WRITE, MISSING_SCN, ROLLBACK, SELECT_FOR_UPDATE, SELECT_LOB_LOCATOR, START, UNSUPPORTED, or UPDATE) |
OwnerName |
x |
||
ParentTxnID |
x |
raw representation of the parent transaction identifier |
|
PK_UPDATE |
x |
true if an UPDATE operation changed the primary key, otherwise false |
|
RbaBlk |
x |
RBA block number within the log file |
|
RbaSqn |
x |
sequence# associated with the Redo Block Address (RBA) of the redo record associated with the change |
|
RecordSetID |
x |
Uniquely identifies the redo record that generated the row. The tuple (RS_ID, SSN) together uniquely identifies a logical row change. |
|
RollBack |
x |
1 if the record was generated because of a partial or a full rollback of the associated transaction, otherwise 0 |
|
ROWID |
x |
x |
Row ID of the row modified by the change (only meaningful if the change pertains to a DML). This will be NULL if the redo record is not associated with a DML. |
SCN |
x |
x |
system change number (SCN) when the database change was made |
SegmentName |
x |
name of the modified data segment |
|
SegmentType |
x |
type of the modified data segment (INDEX, TABLE, ...) |
|
Serial |
x |
serial number of the session that made the change |
|
Session |
x |
session number of the session that made the change |
|
SessionInfo |
x |
Information about the database session that executed the transaction. Contains process information, machine name from which the user logged in, client info, and so on. |
|
SQLRedoLength |
x |
length of reconstructed SQL statement that is equivalent to the original SQL statement that made the change |
|
TableMetadata |
x |
||
TableName |
x |
x |
name of the modified table (in case the redo pertains to a table modification) |
TableSpace |
x |
name of the tablespace containing the modified data segment. |
|
ThreadID |
x |
ID of the thread that made the change to the database |
|
TimeStamp |
x |
x |
the UNIX epoch time of the operation, based on the Striim server's time zone |
TransactionName |
x |
x |
name of the transaction that made the change (only meaningful if the transaction is a named transaction) |
TxnID |
x |
x |
raw representation of the transaction identifier |
TxnUserID |
x |
x |
The following application will write change data for all tables in myschema
to SysOut. Replace the Username and Password values with the credentials for the account you created for Striim for use with LogMiner (see Configuring Oracle LogMiner) and myschema with the name of the schema containing the databases to be read.
CREATE APPLICATION OracleLMTest; CREATE SOURCE OracleCDCIn USING OracleReader ( Username:'striim', Password:'passwd', ConnectionURL:'203.0.113.49:1521:orcl', Tables:'myschema.%', FetchSize:1 ) OUTPUT TO OracleCDCStream; CREATE TARGET OracleCDCOut USING SysOut(name:OracleCDCLM) INPUT FROM OracleCDCStream; END APPLICATION OracleLMTest;
Alternatively, you may specify a single table, such as myschema.mytable
. See the discussion of Tables in Oracle Reader properties for additional examples of using wildcards to select a set of tables.
If you are using XStream Out, specify the ReaderType, and use the credentials for the account you created for Striim for use with XStream (see Configuring Oracle XStream Out):
CREATE SOURCE OracleCDCIn USING OracleReader ( Username:'striim', Password:'passwd', ConnectionURL:'203.0.113.49:1523:orcl', Tables:'myschema.%', ReaderType:XStream ) OUTPUT TO OracleCDCStream;
The FetchSize
property is for LogMiner only.
When troubleshooting problems, you can get the current LogMiner SCN and timestamp by entering mon <namespace>.<OracleReader source name>;
in the Striim console.
OracleReader's output type is WAEvent. See WAEvent contents for change data for general information.
The following are examples of WAEvents emitted by OracleReader for various operation types. Note that many of the metadata values (see OracleReader WAEvent fields) are dependent on the Oracle environment and thus will vary from the examples below.
The examples all use the following table:
CREATE TABLE POSAUTHORIZATIONS ( BUSINESS_NAME varchar2(30), MERCHANT_ID varchar2(100), PRIMARY_ACCOUNT NUMBER, POS NUMBER,CODE varchar2(20), EXP char(4), CURRENCY_CODE char(3), AUTH_AMOUNT number(10,3), TERMINAL_ID NUMBER, ZIP number, CITY varchar2(20), PRIMARY KEY (MERCHANT_ID)); COMMIT;
If you performed the following INSERT on the table:
INSERT INTO POSAUTHORIZATIONS VALUES( 'COMPANY 1', 'D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu', 6705362103919221351, 0, '20130309113025', '0916', 'USD', 2.20, 5150279519809946, 41363, 'Quicksand'); COMMIT;
Using LogMiner, the WAEvent for that INSERT would be similar to:
data: ["COMPANY 1","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu","6705362103919221351","0","20130309113025", "0916","USD","2.2","5150279519809946","41363","Quicksand"] metadata: "RbaSqn":"21","AuditSessionId":"4294967295","TableSpace":"USERS","CURRENTSCN":"726174", "SQLRedoLength":"325","BytesProcessed":"782","ParentTxnID":"8.16.463","SessionInfo":"UNKNOWN", "RecordSetID":" 0x000015.00000310.0010 ","DBCommitTimestamp":"1553126439000","COMMITSCN":726175, "SEQUENCE":"1","Rollback":"0","STARTSCN":"726174","SegmentName":"POSAUTHORIZATIONS", "OperationName":"INSERT","TimeStamp":1553151639000,"TxnUserID":"SYS","RbaBlk":"784", "SegmentType":"TABLE","TableName":"SCOTT.POSAUTHORIZATIONS","TxnID":"8.16.463","Serial":"201", "ThreadID":"1","COMMIT_TIMESTAMP":1553151639000,"OperationType":"DML","ROWID":"AAAE9mAAEAAAAHrAAB", "DBTimeStamp":"1553126439000","TransactionName":"","SCN":"72617400000059109745623040160001", "Session":"105"} before: null
If you performed the following UPDATE on the table:
UPDATE POSAUTHORIZATIONS SET BUSINESS_NAME = 'COMPANY 5A' where pos=0; COMMIT;
Using LogMiner with the default setting Compression: false
, the WAEvent for that UPDATE for the row created by the INSERT above would be similar to:
data: ["COMPANY 5A","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null, null,null,null,null,null,null] metadata: "RbaSqn":"21","AuditSessionId":"4294967295","TableSpace":"USERS","CURRENTSCN":"726177"," SQLRedoLength":"164","BytesProcessed":"729","ParentTxnID":"2.5.451","SessionInfo":"UNKNOWN", "RecordSetID":" 0x000015.00000313.0010 ","DBCommitTimestamp":"1553126439000","COMMITSCN":726178, "SEQUENCE":"1","Rollback":"0","STARTSCN":"726177","SegmentName":"POSAUTHORIZATIONS", "OperationName":"UPDATE","TimeStamp":1553151639000,"TxnUserID":"SYS","RbaBlk":"787", "SegmentType":"TABLE","TableName":"SCOTT.POSAUTHORIZATIONS","TxnID":"2.5.451","Serial":"201", "ThreadID":"1","COMMIT_TIMESTAMP":1553151639000,"OperationType":"DML","ROWID":"AAAE9mAAEAAAAHrAAB", "DBTimeStamp":"1553126439000","TransactionName":"","SCN":"72617700000059109745625006240000", "Session":"105"} before: ["COMPANY 1","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null, null,null]
Note that when using LogMiner the before
section contains a value only for the modified column. You may use the IS_PRESENT() function to check whether a particular field value has a value (see Parsing the fields of WAEvent for CDC readers). With Compression: true
, only the primary key is included in the before
array:
before: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null, null,null,null,null,null,null]
In all cases, if OracleReader's SendBeforeImage
property is set to False
, the before
value will be null
.
If you performed the following DELETE on the table:
DELETE from POSAUTHORIZATIONS where pos=0; COMMIT;
Using LogMiner with the default setting Compression: false
, the WAEvent for a DELETE for the row affected by the UPDATE above would be:
data: ["COMPANY 5A","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu","6705362103919221351","0","20130309113025", "0916","USD","2.2","5150279519809946","41363","Quicksand"] metadata: "RbaSqn":"21","AuditSessionId":"4294967295","TableSpace":"USERS","CURRENTSCN":"726180", "SQLRedoLength":"384","BytesProcessed":"803","ParentTxnID":"3.29.501","SessionInfo":"UNKNOWN", "RecordSetID":" 0x000015.00000315.0010 ","DBCommitTimestamp":"1553126439000","COMMITSCN":726181, "SEQUENCE":"1","Rollback":"0","STARTSCN":"726180","SegmentName":"POSAUTHORIZATIONS", "OperationName":"DELETE","TimeStamp":1553151639000,"TxnUserID":"SYS","RbaBlk":"789", "SegmentType":"TABLE","TableName":"SCOTT.POSAUTHORIZATIONS","TxnID":"3.29.501","Serial":"201", "ThreadID":"1","COMMIT_TIMESTAMP":1553151639000,"OperationType":"DML","ROWID":"AAAE9mAAEAAAAHrAAB", "DBTimeStamp":"1553126439000","TransactionName":"","SCN":"72618000000059109745626316960000", "Session":"105"} before: null
With Compression: true
, the data
array would be:
data: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null,null,null]
Note that the contents of data
and before
are reversed from what you might expect for a DELETE operation. This simplifies programming since you can get data for INSERT, UPDATE, and DELETE operations using only the data
field.
Reading DDL transactions requires Oracle 18c or earlier and LogMiner. It is enabled by setting OracleReader's DictionaryMode property to OfflineCatalog.
DDL events are output only for those tables, functions, indexes, packages, procedures, and views specified in OracleReader's Tables property value. Wildcards may be used, in which case DDL for new objects with names that match the wildcard pattern will be included in the output. See Handling DDL events when replicating Oracle data for a list of supported targets. Bidirectional replication of DDL is not supported.
Reading from tables containing XMLType columns is not supported when DictionaryMode is OfflineCatalog.
Supported DDL operations
The following DDL operations will be read for those tables, functions, indexes, packages, procedures, and views specified in OracleReader's Tables property value:
operation |
function |
index |
package |
procedure |
table |
view |
---|---|---|---|---|---|---|
ALTER ... ADD <column> |
x |
|||||
ALTER ... ADD PRIMARY KEY |
x |
|||||
ALTER ... DROP <column> |
x |
|||||
ALTER ... MODIFY <column> <type> |
x |
|||||
ALTER ... RENAME |
x |
x |
x |
|||
ALTER ... RENAME <column> |
x |
|||||
CREATE |
x |
x |
x |
x |
x |
x |
CREATE OR REPLACE |
x |
x |
x |
x |
||
DROP |
x |
x |
x |
x |
x |
x |
TRUNCATE |
x |
In addition, the following operations are passed through to the target:
ANALYZE TABLE <table> VALIDATE STRUCTURE
GRANT <privilege(s)> ON <table> ...
REVOKE <privilege(s)> ON <table> ...
ALTER TABLE ... DROP / ENABLE / DISABLE <constraint name>; and operations on materialized views are not supported. If OracleReader encounters unsupported DDL (such as CREATE, ALTER, or DROP on a user, role, or trigger), it will write a warning to the log and continue.
For DML operations, see OracleReader WAEvent fields.
The output data type for OracleReader is WAEvent. For DDL operations, the fields are:
data: contains the command string, for example, CREATE TABLE EMP (EMPID INT PRIMARY KEY, ENAME VARCHAR2(10))
or DROP TABLE EMP
.
metadata: includes the following fields:
CatalogName: reserved
CatalogObjectType: TABLE / VIEW / INDEX / etc.
ColumnMetadata: a sub-field of TableMetadata containing descriptions of the columns
COMMITSCN: the SCN of the commit for the transaction
CURRENTSCN: the SCN of the current statement
ObjectName: the name of the table / view / index / etc. being acted on or, in the case of a RENAME, the new name
OperationName: the first keyword in the DDL command
OperationSubName: any additional keywords in the DDL command besides the first
OperationType: value for DDL operations is always
DDL
OwnerName: owner of the object being acted on
SchemaName: schema containing the object being acted on
SCN: reserved
STARTSCN: the SCN of the start of the transaction
TableMetadata: description of the table, including sub-field
TableName: the name of the table (omitted in metadata for non-table operations)
TimeStamp: timestamp from the CDC log
TxnID: transaction ID
Example WAEvent values for supported DDL commands
DDL command |
example WAEvent values |
---|---|
CREATE TABLE ... |
data: CREATE TABLE TEST ( EMPID INT PRIMARY KEY, ENAME VARCHAR2(10)) metadata: { "OperationSubName": "CREATE_TABLE", "TxnID": "8.15.261430", "TimeStamp": "2017-02-10T17:46:19.000-08:00", "COMMITSCN": "403795721", "CatalogName": null, "CURRENTSCN": "403795718", "STARTSCN": "403795694", "ObjectName": "TEST", "OperationName": "CREATE", "SCN": "403795718", "OperationType": "DDL", "TableMetadata": { "ColumnMetadata": [ { "ColumnPrecision": 0, "ColumnIsKey": true, "ColumnLength": 0, "ColumnType": "NUMBER", "ColumnScale": 0, "ColumnName": "EMPID", "ColumnIsNullable": true, "ColumnIndex": 0 }, { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 10, "ColumnType": "VARCHAR2", "ColumnScale": 0, "ColumnName": "ENAME", "ColumnIsNullable": true, "ColumnIndex": 1 } ], "TableName": "TEST", "OwnerName": "STRIIM" }, "SchemaName": "STRIIM", "CatalogObjectType": "TABLE" } |
ALTER TABLE ... ADD (<name> <data type>) |
data: ALTER TABLE TEST ADD ( SALARY NUMBER) metadata: { "OperationSubName": "ALTER_TABLE_ADD_COLUMN", "TxnID": "4.12.411190", "TimeStamp": "2017-02-10T17:46:30.000-08:00", "COMMITSCN": "403795759", "CatalogName": null, "CURRENTSCN": "403795755", "STARTSCN": "403795749", "ObjectName": "TEST", "OperationName": "ALTER", "SCN": "403795755", "OperationType": "DDL", "TableMetadata": { "ColumnMetadata": [ { "ColumnPrecision": 0, "ColumnIsKey": true, "ColumnLength": 0, "ColumnType": "NUMBER", "ColumnScale": 0, "ColumnName": "EMPID", "ColumnIsNullable": true, "ColumnIndex": 0 }, { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 10, "ColumnType": "VARCHAR2", "ColumnScale": 0, "ColumnName": "ENAME", "ColumnIsNullable": true, "ColumnIndex": 1 }, { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 0, "ColumnType": "NUMBER", "ColumnScale": 0, "ColumnName": "SALARY", "ColumnIsNullable": true, "ColumnIndex": 2 } ], "TableName": "TEST", "OwnerName": "STRIIM" }, "SchemaName": "STRIIM", "CatalogObjectType": "TABLE" } |
ALTER TABLE ... DROP COLUMN ... |
data: ALTER TABLE TEST DROP (SALARY) metadata: "OperationSubName": "ALTER_TABLE_DROP_COLUMN", "TxnID": "10.1.258799", "TimeStamp": "2017-02-11T09:59:51.000-08:00", "COMMITSCN": "403888603", "CatalogName": null, "CURRENTSCN": "403888599", "STARTSCN": "403888593", "ObjectName": "TEST", "OperationName": "ALTER", "SCN": "403888599", "OperationType": "DDL", "TableMetadata": { "ColumnMetadata": [ { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 0, "ColumnType": "NUMBER", "ColumnScale": 0, "ColumnName": "EMPID", "ColumnIsNullable": true, "ColumnIndex": 0 }, { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 10, "ColumnType": "VARCHAR2", "ColumnScale": 0, "ColumnName": "ENAME", "ColumnIsNullable": true, "ColumnIndex": 1 } ], "TableName": "TEST", "OwnerName": "STRIIM" }, "SchemaName": "STRIIM", "CatalogObjectType": "TABLE" } |
ALTER TABLE ... MODIFY (<column> ... |
data: ALTER TABLE TEST MODIFY SALARY INT metadata: { "OperationSubName": "ALTER_TABLE_MODIFY_COLUMN", "TxnID": "2.12.258663", "TimeStamp": "2017-02-11T10:02:20.000-08:00", "COMMITSCN": "403896354", "CatalogName": null, "CURRENTSCN": "403896349", "STARTSCN": "403896343", "ObjectName": "TEST", "OperationName": "ALTER", "SCN": "403896349", "OperationType": "DDL", "TableMetadata": { "ColumnMetadata": [ { "ColumnPrecision": 0, "ColumnIsKey": true, "ColumnLength": 0, "ColumnType": "NUMBER", "ColumnScale": 0, "ColumnName": "EMPID", "ColumnIsNullable": true, "ColumnIndex": 0 }, { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 10, "ColumnType": "VARCHAR2", "ColumnScale": 0, "ColumnName": "ENAME", "ColumnIsNullable": true, "ColumnIndex": 1 }, { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 0, "ColumnType": "NUMBER", "ColumnScale": 0, "ColumnName": "SALARY", "ColumnIsNullable": true, "ColumnIndex": 2 } ], "TableName": "TEST", "OwnerName": "STRIIM" }, "SchemaName": "STRIIM", "CatalogObjectType": "TABLE" } |
ALTER ... RENAME ... |
data: ALTER TABLE TEST RENAME TO CUSTOMER metadata: { "OperationSubName": "ALTER_TABLE_RENAME", "TxnID": "10.3.258647", "TimeStamp": "2017-02-10T17:53:31.000-08:00", "COMMITSCN": "403796279", "CatalogName": null, "CURRENTSCN": "403796276", "STARTSCN": "403796267", "ObjectName": "CUSTOMER", "OperationName": "ALTER", "SCN": "403796276", "OperationType": "DDL", "TableMetadata": { "ColumnMetadata": [ { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 0, "ColumnType": "NUMBER", "ColumnScale": 0, "ColumnName": "EMPID", "ColumnIsNullable": true, "ColumnIndex": 0 }, { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 10, "ColumnType": "VARCHAR2", "ColumnScale": 0, "ColumnName": "ENAME", "ColumnIsNullable": true, "ColumnIndex": 1 }, { "ColumnPrecision": 0, "ColumnIsKey": false, "ColumnLength": 0, "ColumnType": "NUMBER", "ColumnScale": 0, "ColumnName": "SALARY", "ColumnIsNullable": true, "ColumnIndex": 2 } ], "TableName": "CUSTOMER", "OldTblName": "STRIIM.TEST", "OwnerName": "STRIIM" }, "SchemaName": "STRIIM", "CatalogObjectType": "TABLE" } |
DROP ... |
data: DROP TABLE TEST metadata: { "OperationSubName": "DROP_TABLE", "TxnID": "8.18.261385", "TimeStamp": "2017-02-10T17:48:29.000-08:00", "COMMITSCN": "403795874", "CatalogName": null, "CURRENTSCN": "403795870", "STARTSCN": "403795837", "ObjectName": "TEST", "OperationName": "DROP", "SCN": "403795870", "OperationType": "DDL", "SchemaName": "STRIIM", "CatalogObjectType": "TABLE" } |
CREATE UNIQUE INDEX ... |
data: CREATE UNIQUE INDEX TESTINDEX ON TEST (EMPID, ENAME) metadata: { "OperationSubName": "CREATE_INDEX", "TxnID": "5.21.258597", "TimeStamp": "2017-02-10T17:55:57.000-08:00", "COMMITSCN": "403796469", "CatalogName": null, "CURRENTSCN": "403796466", "STARTSCN": "403796460", "ObjectName": "TESTINDEX", "OperationName": "CREATE", "SCN": "403796466", "OperationType": "DDL", "SchemaName": "STRIIM", "CatalogObjectType": "INDEX" } |
ANALYZE TABLE ... VALIDATE STRUCTURE |
data: ANALYZE TABLE TEST VALIDATE STRUCTURE metadata: { "OperationSubName": "ANALYZE_TABLE", "TxnID": "6.13.240947", "TimeStamp": "2017-02-10T17:56:45.000-08:00", "COMMITSCN": "403796529", "CatalogName": null, "CURRENTSCN": "403796527", "STARTSCN": "403796520", "ObjectName": "TEST", "OperationName": "ANALYZE", "SCN": "403796527", "OperationType": "DDL", "SchemaName": "STRIIM", "CatalogObjectType": "TABLE" } |
CREATE VIEW ... |
data: CREATE VIEW MYVIEW AS SELECT EMPID,ENAME FROM TEST WHERE SALARY>43679 metadata: { "OperationSubName": "CREATE_VIEW", "TxnID": "6.9.241087", "TimeStamp": "2017-02-10T17:59:07.000-08:00", "COMMITSCN": "403796658", "CatalogName": null, "CURRENTSCN": "403796653", "STARTSCN": "403796649", "ObjectName": "MYVIEW", "OperationName": "CREATE", "SCN": "403796653", "OperationType": "DDL", "SchemaName": "STRIIM", "CatalogObjectType": "VIEW" } |
CREATE OR REPLACE VIEW ... (updating SELECT statement) |
data: CREATE OR REPLACE VIEW MYVIEW AS SELECT EMPID,ENAME FROM TEST WHERE SALARY>100000 metadata: { "OperationSubName": "CREATE_VIEW", "TxnID": "2.30.258751", "TimeStamp": "2017-02-10T18:00:51.000-08:00", "COMMITSCN": "403796921", "CatalogName": null, "CURRENTSCN": "403796917", "STARTSCN": "403796913", "ObjectName": "MYVIEW", "OperationName": "CREATE", "SCN": "403796917", "OperationType": "DDL", "SchemaName": "STRIIM", "CatalogObjectType": "VIEW" } |
CREATE OR REPLACE PACKAGE ... |
data: CREATE OR REPLACE PACKAGE emp_mgmt AS FUNCTION hire ( last_name VARCHAR2, job_id VARCHAR2, manager_id NUMBER, salary NUMBER, commission_pct NUMBER, department_id NUMBER) RETURN NUMBER;\nFUNCTION create_dept( department_id NUMBER, location_id NUMBER) RETURN NUMBER; \nPROCEDURE remove_emp(employee_id NUMBER); \nPROCEDURE remove_dept(department_id NUMBER); \nPROCEDURE increase_sal(employee_id NUMBER, salary_incr NUMBER); \nPROCEDURE increase_comm(employee_id NUMBER, comm_incr NUMBER); \nno_comm EXCEPTION; \nno_sal EXCEPTION; \nEND emp_mgmt; metadata: { "OperationSubName": "CREATE_PACKAGE", "TxnID": "2.0.258774", "TimeStamp": "2017-02-11T17:05:55.000-08:00", "COMMITSCN": "404005562", "CatalogName": null, "CURRENTSCN": "404005557", "STARTSCN": "404005553", "ObjectName": "EMP_MGMT", "OperationName": "CREATE", "SCN": "404005557", "OperationType": "DDL", "SchemaName": "ROBERT", "CatalogObjectType": "PACKAGE" } |
GRANT ... |
data: GRANT SELECT ON TEST TO STRIIM metadata: { "OperationSubName": "GRANT", "TxnID": "6.9.241213", "TimeStamp": "2017-02-11T10:07:21.000-08:00", "COMMITSCN": "403898105", "CatalogName": null, "CURRENTSCN": "403898102", "STARTSCN": "403898098", "ObjectName": null, "OperationName": "GRANT", "SCN": "403898102", "OperationType": "DDL", "SchemaName": "STRIIM", "CatalogObjectType": "UNKNOWN" } |
REVOKE ... |
data: REVOKE SELECT ON TEST FROM STRIIM metadata: { "OperationSubName": "REVOKE", "TxnID": "5.28.258661", "TimeStamp": "2017-02-11T10:07:31.000-08:00", "COMMITSCN": "403898124", "CatalogName": null, "CURRENTSCN": "403898121", "STARTSCN": "403898116", "ObjectName": null, "OperationName": "REVOKE", "SCN": "403898121", "OperationType": "DDL", "SchemaName": "STRIIM", "CatalogObjectType": "UNKNOWN" } |
Reading DDL transactions requires Oracle 18c or earlier and LogMiner. It is enabled by setting OracleReader's DictionaryMode property to OfflineCatalog.
DDL events may be replayed in another Oracle instance, subject to the limitations discussed in Replicating Oracle data to another Oracle database. DDL cannot be replayed in non-Oracle databases.
KafkaWriter can use DDL events to track evolution of Oracle tables. See Using the Confluent or Hortonworks schema registry.
AzureBlobWriter, FileWriter, GCSWriter, HDFSWriter, and S3Writer automatically roll over to a new file when a DDL event is received. This default behavior may be disabled using the RolloverOnDDL property.
Oracle type |
TQL type |
---|---|
ADT |
not supported |
ARRAY |
not supported |
BFILE |
BFILE values will be set to null |
BINARY_DOUBLE |
double |
BINARY_FLOAT |
float |
BLOB |
string (a primary or unique key must exist on the table) An insert or update containing a column of this type generates two CDC log entries: an insert or update in which the value for this column is null, followed by an update including the value. |
CHAR |
string |
CLOB |
string (a primary or unique key must exist on the table) An insert or update containing a column of this type generates two CDC log entries: an insert or update in which the value for this column is null, followed by an update including the value. |
DATE |
DateTime |
FLOAT |
string |
INTERVALDAYTOSECOND |
string (when using LogMiner, string always has a sign; when using XStream, string begins with minus sign if value is negative, but the plus sign is omitted for positive values) |
INTERVALYEARTOMONTH |
string (when using LogMiner, string always has a sign; when using XStream, string begins with minus sign if value is negative, but the plus sign is omitted for positive values) |
LONG |
Results may be inconsistent. Oracle recommends using CLOB instead. |
LONG RAW |
Results may be inconsistent. Oracle recommends using CLOB instead. |
NCHAR |
string |
NCLOB |
string (a primary or unique key must exist on the table) |
NESTED TABLE |
not supported |
NUMBER |
string |
NVARCHAR2 |
string |
RAW |
string |
REF |
not supported |
ROWID |
not supported |
SD0_GEOMETRY |
SD0_GEOMETRY values will be set to null |
TIMESTAMP |
DateTime |
TIMESTAMP WITH LOCAL TIME ZONE |
DateTime (supported in LogMiner only) |
TIMESTAMP WITH TIME ZONE |
DateTime (supported in LogMiner only) |
UDT |
not supported |
UROWID |
not supported |
VARCHAR2 |
string |
VARRAY |
Supported by LogMiner only in Oracle 12c and later. Required Oracle Reader settings:
Limitations:
When the output of an Oracle Reader source is the input of a target using XML Formatter, the formatter's Format Column Value As property must be set to |
XMLTYPE |
Supported only for Oracle 12c and later. When DictionaryMode is OnlineCatalog, values in any XMLType columns will be set to null. When DictionaryMode is OfflineCatalog, reading from tables containing XMLType columns is not supported. |
Starting an Oracle Reader source automatically opens an Oracle session for the user specified in the Username property.
When using LogMiner, the session is closed when the source is stopped.
When using XStream Out, the session is closed when the source is undeployed.
If a running Oracle Reader source fails with an error, the session will be closed.
Closing a PDB source while Oracle Reader is running will cause the application to crash.
If a primary key is not defined for a table, the values for all columns are included in UPDATE and DELETE records, which can significantly reduce performance. You can work around this by setting the Compression property to True and including the KeyColumns option in the Tables property value. The syntax is:
Tables:'<table name> KeyColumns(<COLUMN 1 NAME>,<COLUMN 2 NAME>,...)'
The column names must be uppercase. Specify as many columns as necessary to define a unique key for each row.
If the table has a primary key, or the Compression property is set to False, KeyColumns will be ignored.
The DATA()
and BEFORE()
functions return the WAEvent data
and before
arrays. The following example shows how you could use these functions to write change data event details to a JSON file with the associated Oracle column names. (You could do the same thing with AVROFormatter.) This is supported only for 11g using LogMiner.
CREATE SOURCE OracleCDCIn USING OracleReader ( Username:'walm', Password:'passwd', ConnectionURL:'192.168.1.49:1521:orcl', Tables:'myschema.%', FetchSize:1 ) OUTPUT TO OracleRawStream;CREATE TYPE OpTableDataType( OperationName String, TableName String, data java.util.HashMap, before java.util.HashMap ); CREATE STREAM OracleTypedStream OF OpTableDataType; CREATE CQ ParseOracleRawStream INSERT INTO OracleTypedStream SELECT META(OracleRawStream, "OperationName").toString(), META(OracleRawStream, "TableName").toString(), DATA(OracleRawStream), BEFORE(OracleRawStream) FROM OracleRawStream; CREATE TARGET OracleCDCFFileOut USING FileWriter( filename:'Oracle2JSON_withFFW.json' ) FORMAT USING JSONFormatter () INPUT FROM OracleTypedStream;
The CQ will be easier to read if you use an alias for the stream name. For example:
CREATE CQ ParseOracleRawStream INSERT INTO OracleTypedStream SELECT META(x, "OperationName").toString(), META(x, "TableName").toString(), DATA(x), BEFORE(x) FROM OracleRawStream x;
Using this application, the output for the INSERT operation described in OracleReader example output would look like this:
{ "OperationName":"UPDATE", "TableName":"ROBERT.POSAUTHORIZATIONS", "data":{"AUTH_AMOUNT":"2.2", "BUSINESS_NAME":"COMPANY 5A", "ZIP":"41363", "EXP":"0916", "POS":"0", "CITY":"Quicksand", "CURRENCY_CODE":"USD", "PRIMARY_ACCOUNT":"6705362103919221351", "MERCHANT_ID":"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu", "TERMINAL_ID":"5150279519809946", "CODE":"20130309113025"}, "before":{"AUTH_AMOUNT":"2.2", "BUSINESS_NAME":"COMPANY 1", "ZIP":"41363", "EXP":"0916", "POS":"0", "CITY":"Quicksand", "CURRENCY_CODE":"USD", "PRIMARY_ACCOUNT":"6705362103919221351", "MERCHANT_ID":"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu", "TERMINAL_ID":"5150279519809946", "CODE":"20130309113025"} }
SHOW <namespace>.<OracleReader source name> OPENTRANSACTIONS [ -LIMIT <count> ] [ -TRANSACTIONID '<transaction ID>,...'] [ DUMP | -DUMP '<path>/<file name>' ];
This console command returns information about currently open Oracle transactions. The namespace may be omitted when the console is using the source's namespace.
With no optional parameters, SHOW <source> OPENTRANSACTIONS;
will display summary information for up to ten open transactions (the default LIMIT count is 10).
╒══════════════════╤════════════╤════════════╤══════════════════╤════════════╤════════════╤═══════════════════════════════════════╕ │ Transaction ID │ # of Ops │ Sequence # │ StartSCN │ Rba block │ Thread # │ TimeStamp │ ├──────────────────┼────────────┼────────────┼──────────────────┼────────────┼────────────┼───────────────────────────────────────┤ │ 3.5.222991 │ 5 │ 1 │ 588206203 │ 5189 │ 1 │ 2019-04-05T21:28:51.000-07:00 │ │ 5.26.224745 │ 1 │ 1 │ 588206395 │ 5189 │ 1 │ 2019-04-05T21:30:24.000-07:00 │ │ 8.20.223786 │ 16981 │ 1 │ 588213879 │ 5191 │ 1 │ 2019-04-05T21:31:17.000-07:00 │ └──────────────────┴────────────┴────────────┴──────────────────┴────────────┴────────────┴───────────────────────────────────────┘
To show all open transactions, add
-LIMIT ALL
.Add
-TRANSACTIONID
with a comma-separated list of transaction IDs (for example,-TRANSACTIONID '3.4.222991, 5.26.224745'
) to return summary information about specific transactions in the console and write the details to OpenTransactions_<timestamp> in the current directory.Add
DUMP
to show summary information in the console and write the details to OpenTransactions_<timestamp> in the current directory.Add
-DUMP [<path>/<file name>'
to show summary information in the console and write the details to the specified file.
See Console commands for instructions on using SHOW LINEAGE. The file lineage report is also available in the Flow Designer and via the getOracleFLM endpoint in the Striim application management REST API (see Using the Striim Application Management REST API).
By default, this feature is disabled. See Enabling file lineage.
For OracleReader, the file lineage data includes:
File Name |
the archive log file name, for example, |
Status |
PROCESSING when OracleReader is reading the file, COMPLETED when it has finished |
Directory Name |
the directory in which the file is located |
File Creation Time |
the time Striim created the file lineage entry for the file |
Number Of Events |
the number of events OracleReader has read from the file; if the application has been restarted, OracleReader may not start reading from the first event, so the number will be less than the total number of events in the file |
First Event Timestamp |
|
Last Event Timestamp |
|
Wrap Number |
the number of times OracleReader resumed reading the file after the application was restarted |
SequenceNumber |
the unique sequence number from the file name, for example, for |
ThreadID |
the thread number associated with the log sequence number and file name |
FirstChangeNumber |
|
LastChangeNumber |
|
When OracleReader is reading from redo logs, file-related values are NOT APPLICABLE or N/A.
For more information, see LOG_ARCHIVE_FORMAT in Oracle's Database Reference.
This document provides detailed instructions for creating a Striim application to capture change data from one or more tables in Oracle and write it to one or more partitions of a Kafka topic. This reference application uses the most common options:
reads Oracle change data using LogMiner (Striim also supports XStream Out)
runs KafkaWriter in sync mode with recovery enabled to ensure no lost or duplicate events after recovery from a cluster failure ("exactly-once processing"; see Setting KafkaWriter's mode property: sync versus async for other supported options)
writes the Oracle data to Kafka in Avro format (Striim also supports DSV, JASON, and XML)
This document assumes the following:
You have sysdba privileges in Oracle to perform the necessary configuration, and can access its host via ssh and scp with the privileges necessary to install and start the Forwarding Agent.
You have a Kafka instance and the privileges necessary to perform all the tasks discussed below.
To create the Striim application, you will perform the following tasks, as detailed in this document. It is important that you perform the steps in the order in which they are described here. We strongly recommend that you read the entire document before starting to follow the instructions.
enable archivlog and supplemental log data in Oracle
create an Oracle role and user for use by Striim
if your Kafka instance uses SASL authentication or SSL encryption, configure Striim accordingly
create an application using Striim's Oracle CDC to Kafka template
modify the application to enable recovery
-
optionally, modify the application to:
write to multiple Kafka partitions
increase throughput by using multiple instances of OracleReader or KafkaWriter
deploy the source on the Oracle host using the Forwarding Agent
We strongly recommend you read this entire Quick Guide before starting to follow any of the detailed instructions.
Striim supports Oracle 11g 11.2.0.4, 12c 12.1.0.2, 12.2.0.1.0, 18c, and 19c.
For additional details such as RAC support and the differences between LogMiner and XStream Out, see Oracle Database.
The following steps check whether archivelog is enabled and, if not, enable it.
Log in to SQL*Plus as the sys user.
-
Enter the following command:
select log_mode from v$database;
If the command returns
ARCHIVELOG
, it is enabled. Skip ahead to Enabling supplemental log data. If the command returns
NOARCHIVELOG
, enter:shutdown immediate
Wait for the message
ORACLE instance shut down
, then enter:startup mount
-
Wait for the message
Database mounted
, then enter:alter database archivelog; alter database open;
To verify that archivelog has been enabled, enter
select log_mode from v$database;
again. This time it should returnARCHIVELOG
.
The following steps check whether supplemental log data is enabled and, if not, enable it.
-
Enter the following command:
select supplemental_log_data_min, supplemental_log_data_pk from v$database;
If the command returns
YES
orIMPLICIT
, supplemental log data is already enabled. For example,SUPPLEME SUP -------- --- YES NO
indicates that supplemental log data is enabled, but primary key logging is not. If it returns anything else, enter:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-
To enable primary key logging for all tables in the database enter:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
Alternatively, to enable primary key logging only for selected tables (do not use this approach if you plan to use wildcards in the OracleReader
Tables
property to capture change data from new tables):ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
-
If replicating Oracle data to Redshift, enable supplemental logging on all columns for all tables in the database:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Alternatively, to enable only for selected tables:
ALTER TABLE <schema>.<table name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-
To activate your changes, enter:
alter system switch logfile;
If using Amazon RDS for Oracle, use the following commands instead:
exec rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => 'ADD'); exec rdsadmin.rdsadmin_util.alter_supplemental_logging(p_action => 'ADD', p_type => 'PRIMARY KEY'); exec rdsadmin.rdsadmin_util.switch_logfile; select supplemental_log_data_min, supplemental_log_data_pk from v$database;
You may use LogMiner with any supported Oracle version.
Log in as sysdba and enter the following commands to create a role with the privileges required by the Striim OracleReader adapter and create a user with that privilege. You may give the role and user any names you like. Replace ********
with a strong password.
If using Oracle 11g ,or 12c, 18c, or 19c without CDB, enter the following commands
create role striim_privs; grant create session, execute_catalog_role, select any transaction, select any dictionary to striim_privs; grant select on SYSTEM.LOGMNR_COL$ to striim_privs; grant select on SYSTEM.LOGMNR_OBJ$ to striim_privs; grant select on SYSTEM.LOGMNR_USER$ to striim_privs; grant select on SYSTEM.LOGMNR_UID$ to striim_privs; create user striim identified by ******** default tablespace users; grant striim_privs to striim; alter user striim quota unlimited on users;
If using Database Vault, omit execute_catalog_role,
and also enter the following commands:
grant execute on SYS.DBMS_LOGMNR to striim_privs; grant execute on SYS.DBMS_LOGMNR_D to striim_privs; grant execute on SYS.DBMS_LOGMNR_LOGREP_DICT to striim_privs; grant execute on SYS.DBMS_LOGMNR_SESSION to striim_privs;
For Oracle 12c only, also enter the following command.
grant LOGMINING to striim_privs;
If using Oracle 12c, 18c, or 19c with PDB, enter the following commands. Replace <PDB name>
with the name of your PDB.
create role c##striim_privs; grant create session, execute_catalog_role, select any transaction, select any dictionary, logmining to c##striim_privs; grant select on SYSTEM.LOGMNR_COL$ to c##striim_privs; grant select on SYSTEM.LOGMNR_OBJ$ to c##striim_privs; grant select on SYSTEM.LOGMNR_USER$ to c##striim_privs; grant select on SYSTEM.LOGMNR_UID$ to c##striim_privs; create user c##striim identified by ******* container=all; grant c##striim_privs to c##striim container=all; alter user c##striim set container_data = (cdb$root, <PDB name>) container=current;
If using Database Vault, omit execute_catalog_role,
and also enter the following commands:
grant execute on SYS.DBMS_LOGMNR to c##striim_privs; grant execute on SYS.DBMS_LOGMNR_D to c##striim_privs; grant execute on SYS.DBMS_LOGMNR_LOGREP_DICT to c##striim_privs; grant execute on SYS.DBMS_LOGMNR_SESSION to c##striim_privs;
Striim 3.10.3 can write to Kafka 0.8, 0.9, 0.10, or 0.11.
If your Kafka instance uses SASL authentication or SSL encryption, see Configuring Kafka.
Before following these instructions:
Striim must be running.
The Forwarding Agent in Oracle must be connected to Striim.
You will need the following information to complete the wizard:
-
Striim
VM user name
VM user password
Striim cluster name
Striim admin password
DNS name (displayed on the Essentials tab for the Striim VM)
-
Oracle (source)
connection URL in the format
<IP address>:<port>:<SID>
, for example,198.51.100.0:1521:orcl
login name and password
source table names
-
Kafka (target)
topic name
broker address
optionally, any Kafka producer properties required by your environment (see "KafkaWriter" in the "Adapters reference" section of the Striim Programmer's Guide)
Log into the Striim web UI at
<DNS name>:9080
usingadmin
as the user name and the Striim admin password.Click Oracle CDC to Kafka 0.8, Oracle CDC to Kafka 0.9,or Oracle CDC to Kafka 0.10. If using 0.10 or 0.11, see the known issues in Configuring an app template target.
Enter names for your application (for example, Oracle2Kafka) and new namespace (do not create applications in the admin namespace) and click Save.
Enter the name for the Oracle source component in the Striim application (for example, OracleSource), the connection URL, user name, and password.
Select LogMiner as the log reader.
Optionally, specify a wildcard string to select the Oracle tables to be read (see the discussion of the Tables property in Oracle Reader properties).
Set Deploy source on Agent on (if the Forwarding Agent is not connected to Striim, this property does not appear) and click Next.
If Striim's checks show that all properties are valid (this may take a few minutes), click Next.
If you specified a wildcard in the Oracle properties, click Next. Otherwise, select the tables to be read and click Next.
Press Next to skip mapping and filtering (or see Using Advanced Setup to map tables to streams for instructions on using this advanced feature).
Enter the name for the Kafka target component in the Striim application (for example, KafkaTarget), the topic name and the broker address.
For Input From, select the only choice. (This is OracleReader's output stream, and its name is <application name>_ChangeDataStream.)
Enter the topic name, and the broker address.
Optionally, click Show optional properties and specify any Kafka producer properties required by your environment. Leave Mode set to
sync
.Select AvroFormatter and specify its schema file name. This file will be created when the application is deployed (see AVRO Formatter).
Click Save, then click Next. (Click Create Target only if you specified maps or filters and want to create more than one target.)
-
Striim will create your application and open it in the Flow Designer. It should look something like this:
-
Select Configuration > App settings, set the recovery interval to 5 seconds, and click Save.
-
Select Configuration > Export to generate a TQL file. It should contain something like this (the password is encrypted):
CREATE APPLICATION Oracle2Kafka RECOVERY 5 SECOND INTERVAL; CREATE SOURCE OracleSource USING OracleReader ( FetchSize: 1, Compression: false, Username: 'myname', Password: '7ip2lhUSP0o=', ConnectionURL: '198.51.100.15:1521:orcl', DictionaryMode: 'OnlineCatalog', ReaderType: 'LogMiner', Tables: 'MYSCHEMA.%' ) OUTPUT TO OracleSourcre_ChangeDataStream; CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' ( Mode: 'Sync', Topic: 'MyTopic', brokerAddress: '198.51.100.55:9092' ) FORMAT USING AvroFormatter ( schemaFileName: 'MySchema.avro' ) INPUT FROM OracleSourcre_ChangeDataStream; END APPLICATION Oracle2Kafka;
Note that FetchSize: 1
is appropriate for development, but should be increased in a production environment. See Oracle Reader properties for more information.
Oracle CDC events formatted as Avro and written to Kafka will be similar to the following examples. For more information, see OracleReader WAEvent fields.
SQL*Plus command insert into qatest.vtesttable values(1,'insert1');
{ "metadata": { "TxnID": "9.5.256818", "RbaSqn": "65036", "TableSpace": "USERS", "CURRENTSCN": "402873649", "OperationName": "INSERT", "ParentTxnID": "9.5.256818", "SegmentType": "TABLE", "SessionInfo": "UNKNOWN", "TxnUserID": "UNKNOWN", "Session": "72", "BytesProcessed": "593", "TransactionName": "", "STARTSCN": "402873649", "SegmentName": "VTESTTABLE", "SEQUENCE": "1", "COMMITSCN": "402873650", "RbaBlk": "20376", "ThreadID": "1", "SCN": "40287364900183060065866895851680000", "AuditSessionId": "2064718", "ROWID": "AAJb5rAAEAAATL8AAA", "TimeStamp": "2017-02-02T04:48:18.000-08:00", "Serial": "36570", "RecordSetID": " 0x00fe0c.00004f98.0010 ", "TableName": "QATEST.VTESTTABLE", "OperationType": "DML", "SQLRedoLength": "69", "Rollback": "0" }, "before": null, "data": { "ID": "1", "NAME": "insert1" } }
SQL*Plus command UPDATE qatest.vtesttable SET NAME='Vino' WHERE ID=10;
:
{ "metadata": { "TxnID": "6.18.239971", "RbaSqn": "65036", "TableSpace": "USERS", "CURRENTSCN": "402878537", "OperationName": "UPDATE", "ParentTxnID": "6.18.239971", "SegmentType": "TABLE", "SessionInfo": "UNKNOWN", "TxnUserID": "UNKNOWN", "Session": "72", "BytesProcessed": "635", "TransactionName": "", "STARTSCN": "402878537", "SegmentName": "VTESTTABLE", "SEQUENCE": "1", "COMMITSCN": "402878538", "RbaBlk": "38226", "ThreadID": "1", "SCN": "40287853700183060065878594027680000", "AuditSessionId": "2064718", "ROWID": "AAJb5rAAEAAATL8AAJ", "TimeStamp": "2017-02-02T05:20:54.000-08:00", "Serial": "36570", "RecordSetID": " 0x00fe0c.00009552.0010 ", "TableName": "QATEST.VTESTTABLE", "OperationType": "DML", "SQLRedoLength": "90", "Rollback": "0" }, "before": { "ID": "10", "NAME": "insert10" }, "data": { "ID": "10", "NAME": "Vino" } }
Once you have your basic Oracle to Kafka application, you may modify it in many ways. The following are a few of the most common:
Including DDL in the output
DDL may be included by changing OracleReader's DictionaryMode property to OnlineCatalog. See Including DDL operations in Oracle Reader output and Handling DDL events when replicating Oracle data.
Using the Forwarding Agent
The Striim Forwarding Agent is a stripped-down version of the Striim server that can be used to run sources and queries locally on the Oracle host. See Striim Forwarding Agent installation and configuration for more information.
To make your application deployable to an agent, split it into two flows:
CREATE APPLICATION Oracle2Kafka RECOVERY 5 SECOND INTERVAL; CREATE FLOW AgentFlow; CREATE SOURCE OracleSource USING OracleReader ( FetchSize: 1, CommittedTransactions: true, Compression: false, Username: 'myname', Password: '7ip2lhUSP0o=', ConnectionURL: '198.51.100.15:1521:orcl', DictionaryMode: 'OnlineCatalog', ReaderType: 'LogMiner', Tables: 'MYSCHEMA.%' ) OUTPUT TO OracleSourcre_ChangeDataStream; END FLOW AgentFlow; CREATE FLOW ServerFlow; CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' ( Mode: 'Sync', Topic: 'MyTopic', brokerAddress: '198.51.100.55:9092' ) FORMAT USING AvroFormatter ( schemaFileName: 'MySchema.avro' ) INPUT FROM OracleSourcre_ChangeDataStream; END FLOW ServerFlow; END APPLICATION Oracle2Kafka;
With the default deployment group names, you would deploy this using the command:
DEPLOY APPLICATION Oracle2Kafka with AgentFlow in agent, ServerFlow in default;
See Using the Striim Forwarding Agent and Managing deployment groups for more information.
Scaling up OracleReader
If Oracle writes to its CDC log faster than your template-generated application can read it, you can increase throughput by using multiple instances of OracleReader. Even if deployed on the same server this can increase performance, since each can be run on a different core.
Use the Tables property to distribute tables among the OracleReaders:
Assign each table to only one OracleReader.
When tables are related (by primary or foreign key) or to ensure transaction integrity among a set of tables, assign them all to the same OracleReader.
When dividing tables among OracleReaders, distribute them according to how busy they are rather than simply by the number of tables. For example, if one table generates 50% of the entries in the CDC log, you might assign it and any related tables to one OracleReader and all the other tables to another.
The following is a simple example of how you could modify the basic application example above to use two OracleReaders, using one to read a very busy table and the other to read the rest of the tables in the same schema:
CREATE SOURCE OracleSource1 USING OracleReader ( FetchSize: 1, Compression: false, Username: 'myname', Password: '7ip2lhUSP0o=', ConnectionURL: '198.51.100.15:1521:orcl', ReaderType: 'LogMiner', Tables: 'MYSCHEMA.VERYBUSYTABLE' ) OUTPUT TO OracleSourcre_ChangeDataStream; CREATE SOURCE OracleSource2 USING OracleReader ( FetchSize: 1, CommittedTransactions: true, Compression: false, Username: 'myname', Password: '7ip2lhUSP0o=', ConnectionURL: '198.51.100.15:1521:orcl', ReaderType: 'LogMiner', Tables: 'MYSCHEMA.%', ExcludedTables: 'MYSCHEMA.VERYBUSYTABLE' ) OUTPUT TO OracleSourcre_ChangeDataStream;
Writing to multiple Kafka topic partitions
Use KafkaWriter's PartitionKey property to choose a field from the input stream, METADATA map (see OracleReader WAEvent fields, or USERDATA map (see Adding user-defined data to WAEvent streams) to be used to distribute events among multiple partitions. For example, assuming the sixth element (counting from zero) in the WAEvent data array is a city name:
CREATE AddUserData INSERT INTO OracleSourceWithPartitionKey SELECT putUserData(x, 'city', data[5]) FROM OracleSourcre_ChangeDataStream x; CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' ( Mode: 'Sync', Topic: 'MyTopic', brokerAddress: '198.51.100.55:9092' PartitionKey:'@userdata(city)' ) input from UserDataDemoStream;
Scaling up KafkaWriter
If OracleReader sends CDC events faster than KafkaWriter can write them, you can increase throughput by using the ParallelThreads property to create multiple instances of KafkaWriter. Even if deployed on the same server this can increase performance, since each can be run on a different core.
Warning
Use ParallelThreads only when the target is not able to keep up with incoming events (that is, when its input stream is backpressured). Otherwise, the overhead imposed by additional threads could reduce the application's performance.
The following is a simple example of how you could modify the basic application example above to use four KafkaWriters:
CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' ( Mode: 'Sync', Topic: 'MyTopic', brokerAddress: '198.51.100.55:9092' ParallelThreadsa:'4' ) input from UserDataDemoStream;
All four KafkaWriters will use the same properties. If the application is deployed ON ALL
to a deployment group with multiple servers, each server will have four KafkaWriters.
The following are some issues you might encounter when running your application.
OracleReader does not appear to be capturing data
Verify that the StartSCN value (if specified) is correct.
Verify that the Tables property is correct.
OracleReader will send no data until it has accumulated the number of records specified by the FetchSize value. For testing purposes, it can be helpful to set this to 1 to send each record immediately.
Verify that the transactions are committed. By default, OracleReader sends only committed transactions (since the default for the CommittedTransactions property is True).
In the Striim console, enter
mon <OracleReader name>;
and look at the Last observed LCR-SCN and Last observed LCR-Timestamp values. Enter the command again: if the values have increased, that means OracleReader is buffering events in its transaction cache, and will write them when the transactions are complete and the FetchSize value has been reached. If most of the events are part of long-running transactions, it will take longer for OracleReader to emit the next set.
KafkaWriter seems slow
To see whether KafkaWriter is keeping up with the output of OracleReader, in the Striim console, enter mon <KafkaWriter name>;
and look at the Read Lag value (see Understanding Read Lag values for more information). Wait two minutes and enter the command again. If the value has increased, KafkaWriter is not keeping up with OracleReader. Subtract the first value from the second and make a note of that number.
Create multiple instances of Kafkawriter (see Creating multiple writer instances), then run mon <KafkaWriter name>;
, wait two minutes, and run it again. If the value did not increase, KafkaWriter is keeping up with OracleReader. If the value increased but the difference between it and the first value is significantly less than it was with a single KafkaWriter, try using a higher prallelism-factor value.
If the PARALLELIZE option does not improve performance, investigate whether the issue might be in Kafka rather than in Striim, as detailed in Testing KafkaWriter performance.
Comments