Striim

Welcome to the Striim Help Center and Community Site

3.10.3 SQL CDC replication examples

Follow

To replicate CDC data, the CDC reader's output must be the input stream of the target.

Optionally, you may insert a CQ between the source and the target, but that CQ must be limited to adding user-defined fields and modifying values.

Striim includes wizards for creating applications for SQL CDC sources and many targets (see Creating apps using templates).

These examples use OracleReader, but may be used as a guide for replicating data from other SQL CDC readers.

Replicating Oracle data to another Oracle database

The first step in Oracle-to-Oracle replication is the initial load.

  1. Use select min(start_scn) from gv$transaction to get the SCN number of the oldest open or pending transaction.

  2. Use select current_scn from V$DATABASE; to get the SCN of the export.

  3. Use Oracle's exp or expdp utility, providing the SCN from step 2, to export the appropriate tables and data from the source database to a data file.

  4. Use Oracle's imp or impdp to import the exported data into the target database.

Once initial load is complete, the following sample application would continuously replicate changes to the tables SOURCE1 and SOURCE2 in database DB1 to tables TARGET1 and TARGET2 in database DB2 using Database Writer. The StartSCN value is the SCN number from step 1. In  the WHERE clause, replace  ######### with the SCN from step 2. Start the application with recovery enabled (see Recovering applications) so that on restart it will resume from the latest transaction rather than the StartSCN point.

CREATE SOURCE OracleCDC USING OracleReader (
  Username:'striim',
  Password:'******', 
  ConnectionURL:'10.211.55.3:1521:orcl1',
  Tables:'DB1.SOURCE1;DB1.SOURCE2', 
  Compression:true
  StartSCN:'...'
)
OUTPUT TO OracleCDCStream;

CREATE CQ FilterCDC
INSERT INTO FilteredCDCStream
SELECT x 
FROM OracleCDCStream x
WHERE TO_LONG(META(x,'COMMITSCN')) > #########;

CREATE TARGET WriteToOracle USING DatabaseWriter ( 
  ConnectionURL:'jdbc:oracle:thin:@10.211.55.3:1521:orcl1', 
  Username:'striim',
  Password:'******', 
  Tables:'DB1.SOURCE1,DB2.TARGET1;DB1.SOURCE2,DB2.TARGET2'
)
INPUT FROM FilteredCDCStream;

The FilterCDC CQ filters out all transactions that were replicated during initial load.

The following Oracle column types are supported:

  • BINARY DOUBLE

  • BINARY FLOAT

  • BLOB

  • CHAR

  • CLOB

  • DATE

  • FLOAT

  • INTERVAL DAY TO SECOND

  • INTERVAL YEAR TO MONTH

  • LONG

  • NCHAR

  • NUMBER

  • NVARCHAR

  • RAW

  • TIMESTAMP

  • TIMESTAMP WITH TIME ZONE

  • TIMESTAMP WITH LOCAL TIME ZONE

  • VARCHAR2

Limitations:

  • The primary key for a target table cannot be BLOB or CLOB.

  • TRUNCATE TABLE is not supported for tables containing BLOB or CLOB columns.

Understanding Oracle-to-Oracle DDL replication

Reading DDL transactions requires Oracle 18c or earlier and LogMiner. It is enabled by setting OracleReader's DictionaryMode property to OfflineCatalog.

When OracleReader output including DDL is written to another Oracle instance by DatabaseReader, changes to tables and other objects in a source database are duplicated in the corresponding objects in a target database. For example, if the source contains a table created with this DDL:

CREATE TABLE TEST (EMPID INT PRIMARY KEY, ENAME VARCHAR2(10));

and the following DDL was entered to add a column:

ALTER TABLE TEST ADD (SALARY NUMBER);

in both the source and target database the table's new description would be:

Name                                      Null?    Type
----------------------------------------- -------- ----------------------------
EMPID                                     NOT NULL NUMBER(38)
ENAME                                              VARCHAR2(10)
SALARY                                             NUMBER

Future INSERT or UPDATE operations to the revised table will be handled correctly.

Oracle DDL cannot be replicated to other DatabaseWriter targets. If you wish to write data (DML) to a non-Oracle database using the same application, you may filter out the DDL with a CQ using a META(OperationType = 'DDL') function (see Using the META() function).

Mapping source objects to target objects

Before initiating DDL replication, you must create tables, indexes, views, and other objects in the target database with the same DDL properties as each object to be replicated from the source database. If necessary, the target objects may have different names (see the discussion of the Tables property in Database Writer for details), but using the same names simplifies development and maintenance.

Specify source objects and map them to target objects using the Tables properties of OracleReader and DatabaseWriter. To replicate all objects in a schema, use wildcards: for example, in OracleReader, Tables:'HR.%' , and in DatabaseWriter, Tables:'HR.%,HR.%'

If you prefer, you may limit replication to selected objects by specifying their fully-qualified names: for example, in OracleReader, Tables:'HR.EMPLOYEES, HR.DEPARTMENTS' , and in DatabaseWriter, Tables:'HR.EMPLOYEES,HR.EMPLOYEES; HR.DEPARTMENTS,HR.DEPARTMENTS'

Functions,  indexes, packages,  procedures, and views must be specified individually. For example, to replicate HR.EMP_DETAILS_VIEWi and its underlying tables, you would specify Tables:'HR.EMPLOYEES, HR.DEPARTMENTS, HR.JOBS, HR.LOCATIONS, HR.COUNTRIES, HR.REGIONS, HR.EMP_DETAILS_VIEW' in OracleReader and Tables:'HR.EMPLOYEES,HR.EMPLOYEES; HR.DEPARTMENTS,HR.DEPARTMENTS; HR.JOBS,HR.JOBS; HR.LOCATIONS,HR.LOCATIONS; HR.COUNTRIES,HR.COUNTRIES; HR.REGIONS,HR.REGIONS; HR.EMP_DETAILS_VIEW,HR.EMP_DETAILS_VIEW' in DatabaseWriter.

If DatabaseWriter's Tables string includes a mapping for a view or index but not its underlying tables, DatabaseWriter will assume that the tables exist in the target schema. For instance, in the previous example, if you mapped only HR.EMP_DETAILS_VIEW, DatabaseWriter would assume that the six underlying tables exist in the target. However, instead of relying on this, we strongly recommend explicitly mapping the tables instead. Similarly, DatabaseWriter will assume that any unmapped objects referenced in a mapped function, package, or procedure exist in the target, but we strong recommend explicitly mapping all objects.

Objects may be replicated within a schema, in which case the target object's name must be different: for example, in  OracleReader, Tables:'HR.EMPLOYEES', and in DatabaseWriter, Tables:'HR.EMPLOYEESCOPY'. When the source and target are in same database instance, ANALYZE, GRANT, and REVOKE operations may result in an infinite loop, so you should filter them out with a CQ using functions such as META(OperationName = 'ANALYZE') (see Using the META() function).

Sample DDL replication application

After initial load and before running the replication application, run the following on the source database to dump the DDL to the LogMiner dictionary:

Execute DBMS_LOGMNR_D.BUILD(OPTIONS =>DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);

The following application will replicate both data and changes to the DDL for all replicable objects in database DB1 in the source to database DB2 in the target:

CREATE SOURCE OracleCDC USING OracleReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'10.211.55.3:1521:orcl1',
  Tables:'DB1.%',
  DictionaryMode:'OfflineCatalog'
)
OUTPUT TO OracleCDCStream;

CREATE TARGET WriteToOracle USING DatabaseWriter (
  ConnectionURL:'jdbc:oracle:thin:@10.211.55.3:1521:orcl1',
  Username:'striim',
  Password:'******',
  Tables:'DB1.%,DB2.%'
)
INPUT FROM OracleCDCStream;

It may take several minutes for Striim to read the LogMiner data, during which time the application's status will be DEPLOYING. When this process is complete, Striim writes a message to striim. server.log of the format Catalog object evolution is complete. OracleReader is positioned to read from followed by an SCN number, and the application's status changes to RUNNING.

Warning

Once you have initiated replication, the target objects evolve to match DDL changes to the source objects. Do not modify target objects directly.

You may begin replication from a particular transaction (see the discussion of startSCN  in Replicating Oracle data to another Oracle database). In that case, be sure that the specified SCN is from a position after the last DDL change to the source objects, and that the target objects reflect the current DDL of their source objects. Any DDL changes after that point will be applied to the target.

If you already have an application that replicates data (DML), as described in Replicating Oracle data to another Oracle database., you can simply change DictionaryMode to OfflineCatalog to replicate DDL as well. Note that you may need to modify the DatabaseWriter Tables property to add additional objects such as indexes and views.

Recovery considerations for Oracle-to-Oracle DDL replication

If objects were added or renamed by DDL replication, and the Tables properties do not use wildcards, you must edit the application to add all new and renamed objects to both the OracleReader and DatabaseWriter Tables properties before recovering the application.

See Recovering applications) for more information.

Replicating Oracle data to Amazon Redshift

Striim provides a template for creating applications that read from Oracle and write to Redshift. See Creating a new application using a template for details.

RedshiftWriter can continuously replicate one or many Oracle tables to an Amazon Redshift store. First, create a table in Redshift corresponding to each Oracle table to be replicated. Then load the existing data using DatabaseReader, for example:

CREATE SOURCE OracleJDBCSource USING DatabaseReader (
  Username:'Striim',
  Password:'****',
  ConnectionURL:'jdbc:oracle:thin:@192.168.123.14:1521/XE',
  Tables:'TPCH.H_CUSTOMER;TPCH.H_PART;TPCH.H_SUPPLIER'
)
OUTPUT TO DataStream;

CREATE TARGET TPCHInitialLoad USING RedshiftWriter (
  ConnectionURL: 'jdbc:redshift://mys3bucket.c1ffd5l3urjx.us-west-2.redshift.amazonaws.com:5439/dev',
  Username:'mys3user',
  Password:'******',
  bucketname:'mys3bucket',
/* for striimuser */
  accesskeyid:'********************',
  secretaccesskey:'****************************************',
  Tables:'TPCH.H_CUSTOMER,customer;TPCH.H_PART,part;TPCH.H_SUPPLIER,supplier'
)
INPUT FROM DataStream; 

Theœ Tables property maps each specified Oracle table to a Redshift table, for example, TPCH.H_CUSTOMER to customer.

Once the initial load is complete, the following application will read new data using LogMiner and continuously replicate it to Redshift:

CREATE SOURCE OracleCDCSource USING OracleReader (
  Username:'miner',
  Password:'miner',
  ConnectionURL:'192.168.123.26:1521:XE',
  Tables:'TPCH.H_CUSTOMER;TPCH.H_PART;TPCH.H_SUPPLIER'
)
Output To LCRStream;

CREATE TARGET RedshiftTarget USING RedshiftWriter (
  ConnectionURL: 'jdbc:redshift://mys3bucket.c1ffd5l3urjx.us-west-2.redshift.amazonaws.com:5439/dev',
  Username:'mys3user',
  Password:'******',
  bucketname:'mys3bucket',
/* for striimuser */
  accesskeyid:'********************',
  secretaccesskey:'****************************************',
  Tables:'TPCH.H_CUSTOMER,customer;TPCH.H_PART,part;TPCH.H_SUPPLIER,supplier',
  Mode:'IncrementalLoad' )
INPUT FROM LCRStream; 

Note that Redshift does not enforce unique primary key constraints. Use OracleReader's StartSCN or StartTimestamp property to ensure that you do not have duplicate or missing events in Redshift.

For for more information, see Redshift Writer.

Replicating Oracle data to Azure Cosmos DB

CosmosDBWriter can continuously replicate one or many Oracle tables to Cosmos DB collections.

You must create the target collections in Cosmos DB manually. Each partition key name must match one of the column names in the Oracle source table.

If you wish to run the following examples, adjust the Oracle Reader properties and Cosmos DB Writer properties to reflect your own environment.

  1. In Cosmos DB, create database MyDB containing the following collections (note that the collection and partition names are case-sensitive, so when replicating Oracle data they must be uppercase):

    • SUPPLIERS with partition key /LOCATION

    • CUSTOMERS with partition key /COUNTRY

  2. In Oracle, create tables and populate them as follows:

    CREATE TABLE SUPPLIERS(ID INT, NAME VARCHAR2(40), LOCATION VARCHAR2(200), PRIMARY KEY(ID));
    CREATE TABLE CUSTOMERS(ID INT, NAME VARCHAR2(40), EMAIL VARCHAR2(55), COUNTRY VARCHAR2(75),
      PRIMARY KEY(ID));
    COMMIT;
    INSERT INTO SUPPLIERS VALUES(100036492, 'Michelle', 'michelle@example.com', 'West Virginia');
    INSERT INTO CUSTOMERS VALUES(23004389, 'Manuel', 'manuel@example.com', 'Austria');
    COMMIT;
  3. In Striim, run the following application to perform the initial load of the existing data using DatabaseReader:

    CREATE APPLICATION Oracle2CosmosInitialLoad;
     
    CREATE SOURCE OracleJDBCSource USING DatabaseReader (
      Username: '<Oracle user>',
      Password: '<Oracle user password>',
      ConnectionURL: '<Oracle connection URL>',
      Tables: 'MYSCHEMA.%'
    )
    OUTPUT TO OracleStream;
     
    CREATE  TARGET CosmosTarget USING CosmosDBWriter  (
      ServiceEndpoint: '<Cosmos DB connection string>',
      AccessKey: '<Cosmos DB account read-write key>',
      Collections: 'MYSCHEMA.%,MyDB.%',
      ConnectionPoolSize: 3
     )
    INPUT FROM OracleStream;

    After the application is finished, the Cosmos DB collections should contain documents similar to the following.

    MyDB.SUPPLIERS:

    {
        "LOCATION": "West Virginia",
        "ID": "100036492",
        "NAME": "Example Inc.",
        "id": "100036492",
        "_rid": "CBcfAKX3xWACAAAAAAAACA==",
        "_self": "dbs/CBcfAA==/colls/CBcfAKX3xWA=/docs/CBcfAKX3xWACAAAAAAAACA==/",
        "_etag": "\"00008000-0000-0000-0000-5bacc99b0000\"",
        "_attachments": "attachments/",
        "_ts": 1538050459
    }
    

    MyDB.CUSTOMERS:

    {
        "COUNTRY": "Austria",
        "ID": "23004389",
        "EMAIL": "manuel@example.com",
        "NAME": "Manuel",
        "id": "23004389",
        "_rid": "CBcfAJgI4eYEAAAAAAAACA==",
        "_self": "dbs/CBcfAA==/colls/CBcfAJgI4eY=/docs/CBcfAJgI4eYEAAAAAAAACA==/",
        "_etag": "\"d600b243-0000-0000-0000-5bacc99c0000\"",
        "_attachments": "attachments/",
        "_ts": 1538050460
    }
    {
        "COUNTRY": "Austria",
        "ID": "23908876",
        "EMAIL": "michelle@example.com",
        "NAME": "Michelle",
        "id": "23908876",
        "_rid": "CBcfAJgI4eYFAAAAAAAACA==",
        "_self": "dbs/CBcfAA==/colls/CBcfAJgI4eY=/docs/CBcfAJgI4eYFAAAAAAAACA==/",
        "_etag": "\"d600b443-0000-0000-0000-5bacc99c0000\"",
        "_attachments": "attachments/",
        "_ts": 1538050460
    }
    
  4. In Striim, run the following application to continuously replicate new data from Oracle to Cosmos DB using OracleReader:

    CREATE APPLICATION Oracle2CosmosIncremental;
    
    CREATE SOURCE OracleCDCSource USING OracleReader (
      Username: '<Oracle user>',
      Password: '<Oracle user password>',
      ConnectionURL: '<Oracle connection URL>',
      Tables: 'DB.ORDERS;DB.SUPPLIERS;DB.CUSTOMERS'
    )
    OUTPUT TO OracleStream;
    
    CREATE  TARGET CosmosTarget USING CosmosDBWriter  (
      ServiceEndpoint: '<Cosmos DB connection string>',
      AccessKey: '<Cosmos DB account read-write key>',
      Collections: 'DB.%,MyDB.%',
      ConnectionPoolSize: 3
     )
    INPUT FROM OracleStream;
     
    END APPLICATION Oracle2CosmosIncremental;
  5. In Oracle, enter the following:

    INSERT INTO SUPPLIERS VALUES(100099786, 'Example LLC', 'Ecuador');
    UPDATE CUSTOMERS SET EMAIL='msanchez@example.com' WHERE ID='23004389';
    DELETE FROM CUSTOMERS WHERE ID='23908876';
    COMMIT;

    Within 30 seconds, those changes in Oracle should be replicated to the corresponding Cosmos DB collections with results similar to the following.

    MyDB.SUPPLIERS:

    {
        "LOCATION": "West Virginia",
        "ID": "100036492",
        "NAME": "Example Inc.",
        "id": "100036492",
        "_rid": "CBcfAKX3xWACAAAAAAAACA==",
        "_self": "dbs/CBcfAA==/colls/CBcfAKX3xWA=/docs/CBcfAKX3xWACAAAAAAAACA==/",
        "_etag": "\"00008000-0000-0000-0000-5bacc99b0000\"",
        "_attachments": "attachments/",
        "_ts": 1538050459
    }
    {
        "LOCATION": "Ecuador",
        "ID": "100099786",
        "NAME": "Example LLC",
        "id": "100099786",
        "_rid": "CBcfAKX3xWADAAAAAAAADA==",
        "_self": "dbs/CBcfAA==/colls/CBcfAKX3xWA=/docs/CBcfAKX3xWADAAAAAAAADA==/",
        "_etag": "\"0000e901-0000-0000-0000-5bacc99b0000\"",
        "_attachments": "attachments/",
        "_ts": 1538050559
    }
    

    MyDB.CUSTOMERS:

    {
        "COUNTRY": "Austria",
        "ID": "23004389",
        "EMAIL": "msanchez@example.com",
        "NAME": "Manuel",
        "id": "23004389"
        "_rid": "CBcfAJgI4eYEAAAAAAAACA==",
        "_self": "dbs/CBcfAA==/colls/CBcfAJgI4eY=/docs/CBcfAJgI4eYEAAAAAAAACA==/",
        "_etag": "\"d600b243-0000-0000-0000-5bacc99c0000\"",
        "_attachments": "attachments/",
        "_ts": 1538050460
    }
    

Striim provides a template for creating applications that read from Oracle and write to Cosmos DB. See Creating a new application using a template for details.

Replicating Oracle data to Cassandra

Cassandra Writer can continuously replicate one or many Oracle tables to a Cassandra or DataStax keyspace. First, create a table in Cassandra corresponding to each Oracle table to be replicated. Then load the existing data using DatabaseReader, for example:

CREATE SOURCE OracleJDBCSource USING DatabaseReader (
  Username:'Striim',
  Password:'****',
  ConnectionURL:'jdbc:oracle:thin:@192.168.123.14:1521/XE',
  Tables:'TPCH.H_CUSTOMER;TPCH.H_PART;TPCH.H_SUPPLIER'
)
OUTPUT TO DataStream;

CREATE TARGET TPCHInitialLoad USING CassandraWriter (
  ConnectionURL:'jdbc:cassandra://203.0.113.50:9042/mykeyspace',
  Username:'striim',
  Password:'******',
  Tables:'TPCH.H_CUSTOMER,customer;TPCH.H_PART,part;TPCH.H_SUPPLIER,supplier'
)
INPUT FROM DataStream; 

DatabaseWriter's Tables property maps each specified Oracle table to a Cassandra table, for example, TPCH.H_CUSTOMER to customer. Oracle table names must be uppercase and Cassandra table names must be lowercase. Since columns in Cassandra tables are not created in the same order they are specified in the CREATE TABLE statement, the ColumnMap option is required (see Mapping columns) and wildcards are not supported. See Database Reader and Cassandra Writer for more information about the properties.

Once the initial load is complete, the following application will read new data using LogMiner and continuously replicate it to Cassandra:

CREATE SOURCE OracleCDCSource USING OracleReader ( 
  Username: 'Striim',
  Password: '******',
  ConnectionURL: '203.0.113.49:1521:orcl',
  Compression:'True',
  Tables: 'TPCH.H_CUSTOMER;TPCH.H_PART;TPCH.H_SUPPLIER'
 ) 
OUTPUT TO DataStream;

CREATE TARGET CassandraTarget USING CassandraWriter(
  ConnectionURL:'jdbc:cassandra://203.0.113.50:9042/mykeyspace',
  Username:'striim',
  Password:'******',
  Tables: 'TPCH.H_CUSTOMER,customer;TPCH.H_PART,part;TPCH.H_SUPPLIER,supplier'
INPUT FROM DataStream;

OracleReader's Compression property must be True. Cassandra does not allow primary key updates. See Oracle Reader properties and Cassandra Writer for more information about the properties.

When the input stream of a Cassandra Writer target is the output of an Oracle source (DatabaseReader or OracleReader), the following types are supported:

Oracle type

Cassandra CQL type

BINARY_DOUBLE

double

BINARY_FLOAT

float

BLOB

blob

CHAR

text, varchar

CHAR(1)

bool

CLOB

ascii, text

DATE

timestamp

DECIMAL

double, float

FLOAT

float

INT

int 

INTEGER

int

NCHAR

text, varchar

NUMBER

int

NUMBER(1,0)

int

NUMBER(10)

int

NUMBER(19,0)

int

NUMERIC

int

NVARCHAR2

varchar

SMALLINT

int

TIMESTAMP

timestamp

TIMESTAMP WITH LOCAL TIME ZONE

timestamp

TIMESTAMP WITH TIME ZONE

timestamp

VARCHAR2

varchar

Replicating Oracle data to Google BigQuery

Striim provides a template for creating applications that read from Oracle and write to BigQuery. See Creating a new application using a template for details.

The following application will replicate data from all tables in MYSCHEMA in Oracle to the corresponding tables in mydataset in BIgQuery. The tables in BigQuery must exist when the application is started. All source and target tables must all have a UUID column. In the source tables, the UUID values must be unique identifiers. See the notes for the Mode property in BigQuery Writer) for additional details.

CREATE SOURCE OracleCDCSource USING OracleReader ( 
  CommittedTransactions: false,
  Username: 'myuser',
  Password: 'mypass',
  ConnectionURL: '192.168.33.10:1521/XE',
  Tables: 'MYSCHEMA.%'
 ) 
OUTPUT TO DataStream;

CREATE TARGET BigQueryTarget USING BigQueryWriter(
  ServiceAccountKey: '<path>/<configuration file>.json',
  ProjectId: '<project ID>',
  Mode: 'MERGE',
  Tables: "MYSCHEMA.%,mydataset.% keycolumns(UUID)"
INPUT FROM DataStream;

See BigQuery Writer for details of the property values.

Replicating Oracle data to Google Cloud Spanner

See Oracle to Cloud Spanner Migration Guide.

Replicating Oracle data to Google Cloud PostgreSQL

See Oracle to Google Cloud PostgreSQL Migration Guide.

Replicating Oracle data to a Hazelcast "hot cache"

Striim provides a template for creating applications that read from Oracle and write to Hazelcast. See Creating a new application using a template for details.

See Hazelcast Writer for information on the adapter properties.

To replicate Oracle data to Hazelcast:

  1. Write a Java class defining the Plain Old Java Objects (POJOs) corresponding to the Oracle table(s) to be replicated (see http://stackoverflow.com/questions/3527264/how-to-create-a-pojo for more information on POJOs), compile the Java class to a .jar file, copy it to the Striim/lib directory of each Striim server that will run the HazelcastWriter target, and restart the server.

  2. Write an XML file defining the object-relational mapping to be used to map Oracle table columns to Hazelcast maps (the "ORM file") and save it in a location accessible to the Striim cluster. Data types are converted as specified in the ORM file. Supported Java types on the Hazelcast side are:

    • binary (byte[])

    • Character, char

    • Double, double

    • Float, float

    • int, Integer

    • java.util.Date

    • Long, long

    • Short, short

    • String

    Odd mappings may throw invalid data errors, for example, when an Oracle VARCHAR2 column mapped to a long contains a value that is not a number. Oracle BLOB and CLOB types are not supported.

  3. Write a Striim application using DatabaseReader and HazelcastWriter to perform the initial load from Oracle to Hazelcast .

  4. Write a second Striim application using OracleReader and HazelcastWriter to perform continuous replication. 

This example assumes the following Oracle table definition: 

CREATE TABLE INV ( 
  SKU INT PRIMARY KEY NOT NULL,
  STOCK NUMBER(*,4),
  NAME varchar2(20),
  LAST_UPDATED date 
);

The following Java class defines a POJO corresponding to the table:

package com.customer.vo;
import java.io.Serializable;
import java.util.Date;
public class ProductInvObject  implements Serializable {

   public long sku = 0;
   public double stock = 0;
   public String name = null;
   public Date lastUpdated = null;

   public ProductInvObject ( ) {    }

   @Override
   public String toString() {
       return "sku : " + sku + ", STOCK:" + stock + ", NAME:" + name + ", LAST_UPDATED:" + lastUpdated  ;
   }
}

The following ORM file maps the Oracle table columns to Hazelcast maps:

<?xml version="1.0" encoding="UTF-8"?>
<entity-mappings xmlns="http://www.eclipse.org/eclipselink/xsds/persistence/orm" version="2.4">

    <entity name="prodcInv" class="com.customer.vo.ProductInvObject" >  
        <table name="MYSCHEMA.INV"/>
        <attributes>
            <id name ="sku" attribute-type="long"  >
                <column nullable="false" name="SKU" />
            </id>
            <basic name="stock" attribute-type="double" >
                <column nullable="false" name="STOCK"  />
            </basic>
            <basic name="name" attribute-type="String" >
                <column  name="NAME" />
            </basic>
            <basic name="lastUpdated"  attribute-type="java.util.Date" >
                <column name="LAST_UPDATED" />
            </basic>
        </attributes>
    </entity>

</entity-mappings>

 Assuming that the ORM file has been saved to Striim/Samples/Ora2HCast/invObject_orm.xml, the following Striim application will perform the initial load:

CREATE APPLICATION InitialLoadOra2HC;

CREATE SOURCE OracleJDBCSource USING DatabaseReader (
  Username:'striim',
  Password:'passwd',
  ConnectionURL:'203.0.113.49:1521:orcl',
  Tables:'MYSCHEMA.INV'
)
OUTPUT TO DataStream;

CREATE TARGET HazelOut USING HazelcastWriter (
  ConnectionURL: '203.0.1113.50:5702',
  ormFile:"Samples/Ora2HCast/invObject_orm.xml",
  mode: "initialLoad",
  maps: 'MYSCHEMA.INV,invCache'
)
INPUT FROM DataStream;

END APPLICATION InitialLoadOra2HC;

Once InitialLoadOra2HC has copied all the data, the following application will perform continuous replication of new data:

CREATE APPLICATION ReplicateOra2HC;

CREATE SOURCE OracleCDCSource USING OracleReader (
  Username:'striim',
  Password:'passwd',
  ConnectionURL:'203.0.113.49:1521:orcl',
  Tables:'MYSCHEMA.INVmyschema.ATM'
)
OUTPUT TO DataStream;

CREATE TARGET HazelOut USING HazelcastWriter (
  ConnectionURL: '203.0.1113.50:5702',
  ormFile:"Samples/Ora2HCast/invObject_orm.xml",
  mode: "incremental",
  maps: 'MYSCHEMA.INV,invCache'
)
INPUT FROM DataStream ;

END APPLICATION ReplicateOra2HCInitialLoadOra2HC;

Note

If the Hazelcast cluster goes down, the data in the map will be lost. To restore it, stop the replication application, do the initial load again, then restart replication.

Replicating Oracle data to HBase

Striim provides a template for creating applications that read from Oracle and write to HBase. See Creating a new application using a template for details. DDL may be included by modifying the application generated by the template (see Including DDL operations in Oracle Reader output).

The following sample application will continuously replicate changes to MYSCHEMA.MYTABLE to the HBase Writer table mytable in the column family oracle_data:

CREATE SOURCE OracleCDCSource USING OracleReader ( 
  Username:'striim',
  Password:'passwd',
  ConnectionURL:'203.0.113.49:1521:orcl',
  Tables:'MYSCHEMA.MYTABLE',
  ReaderType: 'LogMiner',
  CommittedTransactions: false
)
INSERT INTO DataStream;

CREATE TARGET HBaseTarget USING HBaseWriter(
  HBaseConfigurationPath:"/usr/local/HBase/conf/hbase-site.xml",
  Tables: "MYSCHEMA.MYTABLE,mytable.oracle_data"
INPUT FROM DataStream;

Notes:

  • INSERT, UPDATE, and DELETE are supported.

  • UPDATE does not support changing a row's primary key.

  • If the Oracle table has one primary key, the value of that column is treated as the HBase rowkey. If the Oracle table has multiple primary keys, their values are concatenated and treated as the HBase rowkey.

  • Inserting a row with the same primary key as an existing row is treated as an update.

  • The Tables property values are case-sensitive.

The Tables value may map Oracle tables to HBase tables and column families in various ways:

  • one to one: Tables: "MYSCHEMA.MYTABLE,mytable.oracle_data"

  • many Oracle tables to one HBase table: "MYSCHEMA.MYTABLE1,mytable.oracle_data;MYSCHEMA.MYTABLE2,mytable.oracle_data"

  • many Oracle tables to one HBase table in different column families: "MYSCHEMA.MYTABLE1,mytable.family1;MYSCHEMA.MYTABLE2,mytable.family2"

  • many Oracle tables to many HBase tables: "MYSCHEMA.MYTABLE1,mytable1.oracle_data;MYSCHEMA.MYTABLE2,mytable2.oracle_data "

Writing raw CDC data to Hive

The following sample application uses data from OracleReader, but you can do the same thing with any of the other CDC readers.

Oracle table

In Oracle, create the following table:

CREATE TABLE POSAUTHORIZATIONS (
  BUSINESS_NAME varchar2(30),
  MERCHANT_ID varchar2(100),
  PRIMARY_ACCOUNT NUMBER,
  POS NUMBER,CODE varchar2(20),
  EXP char(4),
  CURRENCY_CODE char(3),
  AUTH_AMOUNT number(10,3),
  TERMINAL_ID NUMBER,
  ZIP number,
  CITY varchar2(20),
  PRIMARY KEY (MERCHANT_ID));
COMMIT;
TQL application

Create the following TQL application, substituting the appropriate connection URL and table name:

CREATE SOURCE OracleCDCSource USING OracleReader (
  StartTimestamp:'07-OCT-2015 18:37:55',
  Username:'qatest',
  Password:'qatest',
  ConnectionURL:'192.0.2.0:1521:orcl',
  Tables:'QATEST.POSAUTHORIZATIONS',
  OnlineCatalog:true,
  FetchSize:1,
  Compression:true
) 
OUTPUT TO DataStream;

CREATE TARGET HiveTarget USING HDFSWriter(
	filename:'ora_hive_pos.bin',
	hadoopurl:'hdfs://localhost:9000/output/’,
)
FORMAT USING AvroFormatter (
	schemaFileName: ’ora_hive_pos.avsc'
)
INPUT FROM DataStream;
Avro schema file

See AVROFormatter for instructions on using the TQL application to generate ora_hive_pos.avsc, an Avro schema file based on WAEvent. The generated file's contents should be:

{
  "namespace": "waevent.avro",
  "type" : "record",
  "name": "WAEvent_Record",
  "fields": [
    {
    "name" : "data",
    "type" : { "type": "map","values":"string" }
     },
    {
    "name" : "before",
    "type" : ["null",{"type": "map","values":"string" }]
    },
    {
     "name" : "metadata",
     "type" : { "type": "map","values":"string" }
    }
  ]
}

WAEvent's data, before, and metadata fields are represented in Avro as Avro map types.

Hive table

Copy ora_hive_pos.avsc to In Hive, create a table using the generated Avro schema file. Modify the TBLPROPERTIES string to point to the correct location.

CREATE TABLE OracleHive
ROW FORMAT SERDE'org.apache.hadoop.hive.serde2.avro.AvroSerDe' 
STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.url'='hdfs://localhost:9000/avro/ora_hive_pos.avsc');

The new table should look like this:

hive> describe formatted oraclehive;
OK
# col_name            	data_type           	comment             
	 	 
data                	map<string,string>  	                    
before              	map<string,string>  	                    
metadata            	map<string,string>  	                    
	 	 
….                 
Time taken: 0.481 seconds, Fetched: 34 row(s)
hive> 

Configure the above to table to read from generated avro data

hive>LOAD DATA INPATH '/output/ora_hive_pos.bin' OVERWRITE INTO TABLE OracleHive;
Generate sample CDC data in Oracle

In Oracle, enter the following to generate CDC data:

INSERT INTO POSAUTHORIZATIONS VALUES('COMPANY 1',
'D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu',6705362103919221351,0,'20130309113025','0916',
'USD',2.20,5150279519809946,41363,'Quicksand');
INSERT INTO POSAUTHORIZATIONS VALUES('COMPANY 2',
'OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1',4710011837121304048,4,'20130309113025','0815',
'USD',22.78,5985180438915120,16950,'Westfield');
INSERT INTO POSAUTHORIZATIONS VALUES('COMPANY 3',
'ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx',2553303262790204445,6,'20130309113025','0316',
'USD',218.57,0663011190577329,18224,'Freeland');
INSERT INTO POSAUTHORIZATIONS VALUES('COMPANY 4',
'FZXC0wg0LvaJ6atJJx2a9vnfSFj4QhlOgbU',2345502971501633006,3,'20130309113025','0813',
'USD',18.31,4959093407575064,55470,'Minneapolis');
INSERT INTO POSAUTHORIZATIONS VALUES('COMPANY 5',
'ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx',6388500771470313223,2,'20130309113025','0415',
'USD',314.94,7116826188355220,39194,'Yazoo City');
UPDATE POSAUTHORIZATIONS SET BUSINESS_NAME = 'COMPANY 1A' where pos= 0;
UPDATE POSAUTHORIZATIONS SET BUSINESS_NAME = 'COMPANY 5A' where pos= 2;
UPDATE POSAUTHORIZATIONS SET BUSINESS_NAME = 'COMPANY 4A' where pos= 3;
UPDATE POSAUTHORIZATIONS SET BUSINESS_NAME = 'COMPANY 2A' where pos= 4;
UPDATE POSAUTHORIZATIONS SET BUSINESS_NAME = 'COMPANY 3A' where pos= 6;
DELETE from POSAUTHORIZATIONS where pos=6;
COMMIT;
Query the Hive table

Query the Hive table to verify that the CDC data is being captured:

hive> select * from oraclehive;
OK
{"3":"0","2":"6705362103919221351","1":"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",
"10":"Quicksand","0":"COMPANY 1","6":"USD","5":"0916","4":"20130309113025","9":"41363",
"8":"5150279519809946"} NULL    {"TxnID":"10.23.1524","RbaSqn":"209",
"TableSpace":"USERS","CURRENTSCN":"1939875","OperationName":"INSERT",
"ParentTxnID":"10.23.1524","SegmentType":"TABLE","SessionInfo":"UNKNOWN",
"ParentTxn":"QATEST","Session":"143","BytesProcessed":"760",
"TransactionName":"","STARTSCN":"","SegmentName":"POSAUTHORIZATIONS","COMMITSCN":"",
"SEQUENCE":"1","RbaBlk":"57439","ThreadID":"1","SCN":"193987500000588282738968494240000",
"AuditSessionId":"73401","ROWID":"AAAXlEAAEAAAALGAAA",
"TimeStamp":"2015-10-08T14:58:55.000-07:00","Serial":"685",
"RecordSetID":" 0x0000d1.0000e05f.0010 ","TableName":"QATEST.POSAUTHORIZATIONS",
"SQLRedoLength":"325","Rollback":"0"}
{"3":"4","2":"4710011837121304048","1":"OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1",
"10":"Westfield","0":"COMPANY 2","6":"USD","5":"0815","4":"20130309113025","9":"16950",
"8":"5985180438915120"} NULL    {"TxnID":"10.23.1524","RbaSqn":"209",
"TableSpace":"USERS","CURRENTSCN":"1939876","OperationName":"INSERT",
"ParentTxnID":"10.23.1524","SegmentType":"TABLE","SessionInfo":"UNKNOWN",
"ParentTxn":"QATEST","Session":"143","BytesProcessed":"762",
"TransactionName":"","STARTSCN":"","SegmentName":"POSAUTHORIZATIONS","COMMITSCN":"",
"SEQUENCE":"1","RbaBlk":"57441","ThreadID":"1","SCN":"193987600000588282738969804960001",
"AuditSessionId":"73401","ROWID":"AAAXlEAAEAAAALGAAB",
"TimeStamp":"2015-10-08T14:58:56.000-07:00","Serial":"685",
"RecordSetID":" 0x0000d1.0000e061.0010 ","TableName":"QATEST.POSAUTHORIZATIONS",
"SQLRedoLength":"327","Rollback":"0"}
...
Time taken: 0.238 seconds, Fetched: 11 row(s)

To select a subset of the data, use syntax similar to the following:

hive> select metadata["TimeStamp"], metadata["TxnID"], metadata["TableName"],
data from orclehive where metadata["OperationName"]="UPDATE";
2015-10-08T14:58:56.000-07:00	5.26.1740	QATEST.POSAUTHORIZATIONS
	{"0":"COMPANY 1A","1":"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu"}
2015-10-08T14:58:56.000-07:00	5.26.1740	QATEST.POSAUTHORIZATIONS
	{"0":"COMPANY 2A","1":"OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1"}
...
Time taken: 0.088 seconds, Fetched: 5 row(s)

Replicating Oracle data to Hive

See Hive Writer for information on storage types supported and limitations.

The following example assumes the following Oracle source table:

create table employee (Employee_name varchar2(30), 
Employeed_id number, 
CONSTRAINT employee_pk PRIMARY KEY (Employeed_id));

and the following Hive target table:

CREATE TABLE employee (emp_name string, emp_id int)
CLUSTERED BY (emp_id) into 2 buckets 
STORED AS ORC TBLPROPERTIES ('transactional'='true');

The following application will load existing data from Oracle to Hive:

CREATE SOURCE OracleJDBCSource USING DatabaseReader (
  Username:'oracleuser',
  Password:'********',
  ConnectionURL:'192.0.2.75:1521:orcl',
  Tables:'DEMO.EMPLOYEE',
  FetchSize:1
)
OUTPUT TO DataStream;

CREATE TARGET HiveTarget USING HiveWriter (
  ConnectionURL:’jdbc:hive2://localhost:10000’,
  Username:’hiveuser’, 
  Password:’********’,
  hadoopurl:'hdfs://18.144.17.75:9000/',
  Mode:’initiaload’,
  Tables:’DEMO.EMPLOYEE,employee’
)
INPUT FROM DataStream;

Once initial load is complete, the following application will read new data and continuously replicate it to Hive:

CREATE SOURCE OracleCDCSource USING OracleReader (
  Username:'oracleuser',
  Password:'********',
  ConnectionURL:'192.0.2.75:1521:orcl',
  Tables:'DEMO.EMPLOYEE',
  FetchSize:1
)
OUTPUT TO DataStream;

CREATE TARGET HiveTarget USING HiveWriter (
  ConnectionURL:’jdbc:hive2://192.0.2.76:10000’,
  Username:’hiveuser’, 
  Password:’********’,
  hadoopurl:'hdfs://192.0.2.76:9000/',
  Mode:’incrementalload’,
  Tables:’DEMO.EMPLOYEE,employee keycolumns(emp_id)’’
)
INPUT FROM DataStream;

Replicating Oracle data to Kafka

See Oracle to Kafka Quick Guide.

Replicating Oracle data to SAP HANA

DatabaseWriter can continuously replicate one or many Oracle tables to a SAP HANA database. First, create a table in SAP HANA corresponding to each Oracle table to be replicated. Then load the existing data using DatabaseReader. For example, to replcate all tables in MYORADB to MYSAPDB:

CREATE SOURCE OracleJDBCSource USING DatabaseReader (
  Username:'Striim',
  Password:'****',
  ConnectionURL:'jdbc:oracle:thin:@203.0.113.49:1521:orcl',
  Tables:'MYORADB.%'
)
OUTPUT TO DataStream;

CREATE TARGET SAPHANAInitialLoad USING DatabaseWriter (
  ConnectionURL:'jdbc:sap://203.0.113.50:39013/?databaseName=MYASPDB&currentSchema=striim',
  Username:'striim',
  Password:'******',
  Tables:'MYORADB.%,MYSAPDB.%'
)
INPUT FROM DataStream; 

See Database Reader and Database Writer for more information about the properties.

Once the initial load is complete, the following application will continuously replicate new data from Oracle to SAP HANA:

CREATE SOURCE OracleCDCSource USING OracleReader ( 
  Username: 'Striim',
  Password: '******',
  ConnectionURL: '203.0.113.49:1521:orcl',
  Compression:'True',
  Tables: 'MYORADB.%'
 ) 
OUTPUT TO DataStream;

CREATE TARGET SAPHANAContinuous USING DatabaseWriter(
  ConnectionURL:'jdbc:cassandra://203.0.113.50:9042/mykeyspace',
  Username:'striim',
  Password:'******',
  Tables: 'MYORADB.%,MYSAPDB.% '
INPUT FROM DataStream;

When the input stream of a SAP HANA DatabaseWriter target is the output of an Oracle source (Database Reader, Incremental Batch Reader, or Oracle Reader), the following types are supported:

Oracle type

SAP HANA type

BINARY_DOUBLE

DOUBLE

BINARY_FLOAT

REAL

BLOB

BLOB, VARBINARY

CHAR

ALPHANUM, TEXT, VARCHAR

CHAR(1)

BOOLEAN

CLOB

CLOB, VARCHAR

DATE

DATE

DECIMAL

DECIMAL

FLOAT

FLOAT

INT

INTEGER

INTEGER

INTEGER

NCHAR

NVARCHAR

NUMBER

INTEGER

NUMBER(1,0)

INTEGER

NUMBER(10)

INTEGER

NUMBER(19,0)

INTEGER

NUMERIC

BIGINTEGER, DECIMAL, DOUBLE, FLOAT, INTEGER

NVARCHAR2

NVARCHAR

SMALLINT

SMALLINT

TIMESTAMP

TIMESTAMP

TIMESTAMP WITH LOCAL TIME ZONE

TIMESTAMP

TIMESTAMP WITH TIME ZONE

TIMESTAMP

VARCHAR2

VARCHAR

Replicating Oracle data to Snowflake

Striim provides a template for creating applications that read from Oracle and write to Snowflake. See Creating a new application using a template for details.

SnowflakeWriter can continuously replicate one or many Oracle tables to Snowflake. First, create a table in Snowflake corresponding to each Oracle table to be replicated. Then load the existing data using DatabaseReader, for example:

CREATE SOURCE OracleJDBCSource USING DatabaseReader (
  Username: 'striim',
  Password: '******',
  ConnectionURL: 'jdbc:oracle:thin:@//127.0.0.1:1521/xe',
  Tables: 'QATEST.%'
OUTPUT TO DataStream;

CREATE TARGET SnowflakeInitialLoad USING SnowflakeWriter (
  ConnectionURL: 'jdbc:snowflake://hx75070.snowflakecomputing.com/?db=DEMO_DB&schema=public',
  username: 'striim',
  password: '******',
  Tables: 'QATEST.%,DEMO_DB.PUBLIC.%',
  appendOnly: true
)
INPUT FROM DataStream;

Once the initial load is complete, the following application will read new data using LogMiner and continuously replicate it to Snowflake:

CREATE SOURCE OracleCDCSource USING OracleReader (
  Username: 'striim',
  Password: '******',
  ConnectionURL: 'jdbc:oracle:thin:@//127.0.0.1:1521/xe',
  Tables: 'QATEST.%'
OUTPUT TO DataStream;

CREATE TARGET SnowflakeCDC USING SnowflakeWriter (
  ConnectionURL: 'jdbc:snowflake://hx75070.snowflakecomputing.com/?db=DEMO_DB&schema=public',
  username: 'striim',
  password: '******',
  Tables: 'QATEST.%,DEMO_DB.PUBLIC.%'  
)
INPUT FROM DataStream;

For for more information, see Snowflake Writer.

3.10.3
Was this article helpful?
0 out of 0 found this helpful

Comments