Striim

Welcome to the Striim Help Center and Community Site

3.10.1 Writers

Follow

Writers are used by specifying them in targets.

Writers overview

The following is a summary of writer capabilities.

writer

input stream type(s)

supports replication

supports Database Reader auto-quiesce

output(s)

DDL support

parallel threads

recovery3

supports LEE

ADLS Gen1 Writer

user-defined

no

no

Avro, delimited text, JSON, XML

optional rollover on MySQL or Oracle DDL

no

A1P

yes

ADLS Gen2 Writer

user-defined

no

yes

Avro, delimited text, JSON, XML

optional rollover on MySQL or Oracle DDL

no

A1P

yes

Azure Blob Writer

user-defined

no

yes

Avro, delimited text, JSON, XML

-

no

A1P

yes

Azure Event Hub Writer

user-defined

no

yes

Avro, delimited text, JSON, XML

-

no

AIP (default) or E1P3 4

yes

Azure Synapse Writer

user-defined, WAEvent1

yes

yes

Azure Synapse table(s)

-

yes

A1P

yes

BigQuery Writer

user-defined, WAEvent1

yes

yes

BigQuery table(s)2

-

yes

A1P

yes

Cassandra Cosmos DB Writer

user-defined, WAEvent1

yes

yes

Cosmos DB Cassandra API tables

-

yes

E1P4 5

yes

Cloudera Hive Writer

user-defined, WAEvent1

insert only

no

Hive table(s)2

-

no

A1P

yes

Cosmos DB Writer

user-defined, JSONNodeEvent, WAEvent1

yes

yes

CosmosDB documents

-

yes

A1P

yes

Database Writer

user-defined, WAEvent1

yes

yes

table(s)2

replication Oracle to Oracle only

yes

E1P4 6

yes

File Writer

user-defined

no

yes

Avro, delimited text, JSON, XML

optional rollover on MySQL or Oracle DDL

no

A1P

no

GCS Writer

user-defined

no

yes

Avro, delimited text, JSON, XML

optional rollover on MySQL or Oracle DDL

yes

A1P

yes

Google PubSub Writer

user-defined

no

yes

Avro, delimited text, JSON, XML

-

no

A1P

yes

Hazelcast Writer

user-defined, WAEvent1

yes

no

Hazelcast map(s)2

-

no

A1P

yes

HBase Writer

user-defined, WAEvent1

yes

no

HBase table(s)**

-

yes

A1P

no

HDFS Writer

user-defined

no

no

Avro, delimited text, JSON, XML

optional rollover on MySQL or Oracle DDL

no

A1P

yes

Hive Writer

user-defined, WAEvent1

yes (insert-only in some cases)

yes

Hive table(s)2

-

yes

E1P (when using MERGE) or A1P

yes

Hortonworks Hive Writer

user-defined, WAEvent1

yes (insert-only in some cases)

no

Hive table(s)2

-

no

E1P (when using MERGE) or A1P

no

JMS Writer

user-defined

no

no

delimited text, JSON, XML

-

no

A1P

yes

Kafka Writer

user-defined, WAEvent1

requires schema registry, append only

yes

Avro, delimited text, JSON, XML

can track schema evolution using schema registry

yes

E1P4

yes

Kinesis Writer

user-defined

no

yes

Avro, delimited text, JSON, XML

-

no

E1P4

no

Kudu Writer

user-defined, WAEvent*

yes

yes

Kudu table(s)2

-

yes

A1P

yes

MapR DB Writer

user-defined

no

no

MapR DB table

-

yes

A1P

no

MapR FS Writer

user-defined

no

no

Avro, delimited text, JSON, XML

no

A1P

no

MapR Stream Writer

user-defined, WAEvent1

no

no

Avro, delimited text, JSON, XML

no

A1P

yes

MongoDB Writer

JSONNodeEvent, user-defined, WAEvent1

yes

yes

MongoDB documents

-

yes

A1P

yes

MQTT Writer

user-defined

no

no

Avro, delimited text, JSON, XML

no

A1P

no

Redshift Writer

user-defined, WAEvent1

yes

yes

Redshift table(s)2

yes

A1P

yet

S3 Writer

user-defined

no

yes

Avro, delimited text, JSON, XML

optional rollover on MySQL or Oracle DDL

yes

A1P

yet

Snowflake Writer

user-defined, WAEvent1

yes (append-only or merge)

yes

Snowflake table(s)2

-

yes

A1P

yet

Spanner Writer

user-defined, WAEvent1

yes

yes

Spanner table(s)2

-

yes

E1P4

yew

SysOut

any except Avro

n/a

no

log file or terminal

all input is written

no

A1P

yes

Footnotes

1. WAEvent must be the output of a Database Reader, Incremental Batch Reader, or SQL CDC source.

2. With an input stream of a user-defined type, output is to a single table or map. Output to multiple tables or maps requires source database metadata included in WAEvent.

3. A1P ("at-least once processing") means that after recovery there may be some duplicate events written to the target. E1P ("exactly once processing") means there will be no duplicate events.

4. If the source is WAEvent from Incremental Batch Reader, recovery is A1P.

5. Primary key updates to source rows cannot be replicated.

6. When writing to Cosmos DB, recovery is A1P.

For more information, see:

Setting output names and rollover / upload policies

ADLS Gen1 Writer,, ADLS Gen2 Writer Azure Blob Writer FileWriter, GCS Writer, HDFS Writer, and S3 Writer support the following options to define the paths and names for output files, when new files are created, and how many files are retained.

Dynamic output names

The blobname string in Azure Blob Writer, the bucketname and objectname strings in GCSWriter and S3Writer, the directory and filename strings in ADLS Gen1 Writer, ADLS Gen2 Writer, File Writer, and HDFSWriter, and the foldername string in AzureBlobWriter, GCS Writer, and S3Writer may include field-name tokens that will be replaced with values from the input stream. For example, if the input stream included yearString, monthString, and dayString fields, directory:'%yearString%/%monthString%/%dayString%' would create a new directory for each date, grouped by month and year subdirectories. If desirable, you may filter these fields from the output using the members property in DSVFormatter or JSONFormatter.

Known issues DEV-19661, DEV-19662, DEV-19666: If a bucketname in GCSWriter or S3Writer, directoryname in ADLS Gen1 Writer or ADLS Gen2 Writer, or foldername in S3Writer contains two field-name tokens, such as %field1%%field2%, the second creates a subfolder of the first.

When the target's input is the output of a CDC or DatabaseReader source, values from the WAEvent metadata or userdata map or JSONNodeEvent metadata map may be used in these names using the syntax %@metadata(<field name>)% or %@userdata(<field name>)%, for example, %@metadata(TableName)%. You may combine multiple metadata and/or userdata values, for example, %@metadata(name)%/%@userdata(TableName)%'. You may also mix field, metadata, and userdata values.

For S3Writer bucket names, do not include punctuation between values. Hyphens, the only punctuation allowed, will be added automatically. For more information, see Amazon's Rules for Bucket Naming.

Rollover and upload policies

rolloverpolicy or uploadpolicy trigger output file or blob rollover different ways depending on which parameter you specify:

parameter

rollover trigger

example

eventcount (or size)

specified number of events have been accumulated (in Kinesis Writer, this is specified as a size in bytes rather than a number of events)

 eventcount:10000

filesize

specified file size has been reached (value must be specified in megabytes, the maximum is 2048M)

filesize:1024M

interval

specified time has elapsed (use s for second or m for minute, h for hour)

interval:1h

You may specify both eventcount and interval, in which case rollover is triggered whenever one of the limits is reached. For example, eventcount:10000,interval:1h will start a new file after one hour has passed since the current file was created or after 10,000 events have been written to it, whichever happens first.

Caution

When the rollover policy includes an interval or eventcount, and the file or blob name includes the creation time, be sure that the writer will never receive events so quickly that it will create a second file with the same name, since that may result in lost data. You may work around this by using nanosecond precision in the creation time or by using a sequence number instead of the creation time.

If file lineage is enabled as described in Enabling file lineage, when the writer is is stopped and restarted, the sequence will continue with the next file name in the series. If you drop and re-create the application and there are existing files in the output directory, start will fail with a "file ... already exists in the current directory" error. To retain the existing files in that directory, add a sequencestart parameter to the rollover policy, or add %<time format>%, %epochtime%, or %epochtimems% to the file name.

If file lineage is not enabled, when the writer is is stopped and restarted, the sequence will restart from the beginning, overwriting existing files, so you might want to back up the output directory before restarting.

  filelimit

By default, there is no limit to how many files are retained (filelimit:-1). You may change that by including the filelimit parameter in the rollover policy:

rolloverpolicy:'interval:1h,filelimit:24'

  sequencestart, incrementsequenceby

If you prefer to start with a value greater than 0 or increment by a value greater than 1, add the sequencestart or incrementsequenceby parameters to rolloverpolicy or uploadpolicy, for example:

rolloverpolicy:'eventcount:10000,sequencestart:10,incrementsequenceby:10'

File and blob names

By default, output file or blob names are the base name (specified in filename in ADLS Gen1 Writer, ADLS Gen2 Writer, File Writer, and HDFS Writer, blobname in AzureBlobWriter, or objectname in GCS Writer and S3 Writer) with a counter between the filename and extension. The counter starts at 00 and is incremented by one for each new file. For example, if filename is myfile.txt, the output files will be myfile.00.txtmyfile.01.txt, and so on. If filelimit is set to a positive value, the counter will start over at 00 when it reaches that number, overwriting the existing files. If filelimit is -1, the counter will increment indefinitely (in which case you might want to create a cron job to delete old log files).

You may customize the output file names using the following tokens. They are allowed only in the file name, not in the extension.

%n...%

Use one n for each digit you want in the file name. The sequence number may be placed elsewhere than at the end of the file name by including the %<n...>% token in the writer's filename property value:

filename:'batch_%nnn%_data.csv'

With that setting, rolled-over files will be named batch_000_data.csv, batch_001_data.csv, and so on.

%<time format>%, %epochtime%, %epochtimems%

The formatted file creation time may be included in the file name by including the %<time format>% token in the writer's filename property value:

filename:'batch_%yyyy-MM-dd_HH_mm_ss%_data.csv'

With that setting:

  • a file rolled over at 1:00 pm on June 1, 2016, would be named batch_2016-06-01_13_00_00_data.csv

  • one rolled over five minutes later would be named batch_2016-06-01_13_05_00_data.csv

and so on. You may use any valid Joda DateTimeFormat pattern.

The unformatted epoch file creation time may be included in the file name by using the %epochtime%  token instead. For millisecond resolution, use %epochtimems%.

Examples combining multiple options

The following are a few examples of the many ways these options can be combined.

properties

behavior

  • rolloverpolicy: 'EventCount:5000'

  • directory: '%yearString%/%monthString%/%dayString%'

  • filename: 'output_%yyyy-MM-dd_HH_mm_ss%

  • file is rolled over every 5,000 events

  • files are grouped by day in a year/month/day directory tree

  • file names include the timestamp

  • rolloverpolicy: 'EventCount:5000'

  • directory: '%yearString%/%monthString%/%dayString%'

  • filename: 'output_%nnnnnn%

  • file is rolled over every 5,000 events

  • each day's files are grouped in a directory in a year/month/day directory tree

  • file names include the six-digit sequence number

  • directory: '%tableName%/%City%/'

  • filename: 'output_%nnnnnn%.csv'

  • files are grouped by city in a table_name/city_name directory tree

  • file names include the six-digit sequence number

See also Kafka Writer for discussion of how to write to specific partitions based on field values.

Creating multiple writer instances

Some writers have a Parallel Threads property, which in some circumstances may allow you to create multiple instances for better performance. For example:

CREATE TARGET KWSample USING KafkaWriter VERSION '0.9.0' (
  brokeraddress:'localhost:9092',
  topic:'test',
  ParallelThreads:'4',
  PartitionKey:'merchantId'
)
FORMAT USING DSVFormatter ()
INPUT FROM TypedCSVStream;

This would create four instances of KafkaWriter with identical settings. Each instance would run in its own thread, increasing throughput. If KWSample were deployed ON ALL to multiple servers, each server would run four instances, so the total number of KafkaWriter instances would be the ParallelThreads value times the number of servers.

Warning

Use ParallelThreads only when the target is not able to keep up with incoming events (that is, when its input stream is backpressured). Otherwise, the overhead imposed by additional threads could reduce the application's performance.

When you can use parallel threads to improve performance depends on the writer.

writer

event distribution

limitations

notes

  • CassandraCosmosDBWriter

  • CosmosDBWriter

  • DatabaseWriter

  • HBaseWriter

  • KuduWriter

  • MaprDBWriter

  • SpannerWriter

Events are evenly distributed among the writer instances in round-robin fashion. All instances may write to all target tables.

Enabling recovery for the application disables parallel threads.

Use only for initial load, not for continuous replication.

  • AzureSQLDWHWriter

  • BigQueryWriter

  • HiveWriter

  • RedShiftWriter

  • SnowflakeWriter

Events are distributed among the writer instances based on the target table name. Each target table will be written to by only one of the instances.

Enabling recovery for the application disables parallel threads.

Use only for initial load, not for continuous replication.

Creating more instances than there are target tables will not improve performance.

Database Reader reads tables sequentially rather than in parallel. Consequently, creating more target instances than there are Database Reader sources will not improve performance. You may be able to improve performance by creating multiple Database Reader sources that read from different tables and all output to the same stream, and using that stream as the input for a target with multiple instances.

  • GCSWriter

  • S3Writer

Events are distributed among the writer instances based on the target bucket name, directory name, and file name. Each file will be written to by only one of the instances.

You must use dynamic names for target buckets, directories, or files. Otherwise, parallel threads will not improve performance.

Creating more instances than there are target files will not improve performance.

KafkaWriter

Events are be distributed among the writer instances by the PartitionKey field value. Each target partition will be written to by only one of the instances.

Creating more instances than there are target partitions will not improve performance.

Mapping columns

By default, when a writer's Tables property maps source tables to target tables, the first column in the source table is written to the first column in the target table, the second column in the source table is written to the second column in the target table, and so on. This requires that the target table have at least the same number of columns as the source table and that the data types are compatible. For example:

Source table Emp:

column name

id

birthDate

name

salary

data type

int

date

text

double

Target table Employee:

column name

EMP_ID

EMP_DOB

EMP_NAME

SAL

DEPT

data type

number

datetime

varchar

number

varchar

In this case, assuming the DEPT column in the target table allows nulls, all you need is Tables:'Emp,Employee'.

When the target table does not match the source table, use the ColumnMap function to map individual columns in the source and target tables. The syntax is:

'<source table>,<target table> ColumnMap(<target column>=<source column>,...),...'

Note

Tables are mapped source first, but columns are mapped target first.

Example 1: columns in different order and extra column in target

Source table Emp:

column name

ID

NAME

DOB

Target table Employee:

column name

EMP_ID

EMP_SSN

EMP_DOB

EMP_NAME

Since the target table's columns are in a different order, you would use ColumnMap as follows:

Tables:'Emp,Employee ColumnMap(EMP_NAME=NAME,EMP_ID=ID,EMP_DOB=DOB)'

The unmapped EMP_SSN column in the target table must allow nulls.

Example 2: some columns have the same name

If a column has the same name in both the source and target, there is no need to specify it in ColumnMap, so long as at least one other column is mapped. For example:

Source table Emp:

column name

ID

NAME

ADDRESS

DOB

Target table Employee:

column name

EMP_ID

DOB

NAME

ADDRESS

Tables:'Emp,Employee ColumnMap(EMP_ID=ID)'

The NAME, ADDRESS, and DOB columns will be mapped automatically.

Example 3: extra column in source

Any source columns that are not in ColumnMap and do not exist in the target will be omitted. For example:

Source table Emp:

column name

ID

NAME

DOB

ADDRESS

Target table Employee:

column name

EMP_ID

EMP_DOB

EMP_NAME

Tables:'Emp,Employee ColumnMap(EMP_NAME=NAME,EMP_ID=ID,EMP_DOB=DOB)'
Modifying output using ColumnMap

When using ColumnMap (see Mapping columns), in place of the source column name you may specify a field in the METADATA or USERDATA map, an environment variable, or a static string. You may use this to overwrite data from the input stream or to supply data for columns in the target table for which the input stream has no values.

See also Changing and masking field values using MODIFY and Masking functions for other ways to modify output.

Note

Field names are case-sensitive.

For example, to write the CDC log timestamp in the METADATA map to the target column CDCTIMESTAMP:

... ColumnMap(EMP_NAME=NAME,EMP_ID=ID,EMP_DOB=DOB,CDCTIMESTAMP=@METADATA(TimeStamp))'

To specify a field in the USERDATA map (see Adding user-defined data to WAEvent streams or Adding user-defined data to JSONNodeEvent streams), use @USERDATA(<field name>):

... ColumnMap(EMP_NAME=NAME,EMP_ID=ID,EMP_DOB=DOB,EMP_CITY=@USERDATA(city))'

To write the Striim server's $HOSTNAME environment variable to the target column STRIIMSERVER:

... ColumnMap(EMP_NAME=NAME,EMP_ID=ID,EMP_DOB=DOB,STRIIMSERVER=$HOSTNAME)'

To write a static string, the syntax is:

... ColumnMap(<field name>='<string>')'

You may modify multiple tables by using wildcards. For example:

SRC_SCHEMA.%,TGT_SCHEMA.% COLUMNMAP(CDCTIMESTAMP=@METADATA(TimeStamp))

Handling "table not found" errors

By default, when a writer's Tables property specifies a table that does not exist in the target database, when it receives an event for that table it will crash with a TargetTableNotFoundException error.

Azure Synapse Writer, BigQuery Writer, Hive Writer, and Snowflake Writer may be configured to ignore such errors and continue by setting the Ignorable Exception Code TABLE_NOT_FOUND. All dropped events will be written to the exception store (see CREATE EXCEPTIONSTORE) and the first 100 will appear in the Flow Designer's exception list. Even. if the missing table is added, Striim will not write to it until the application is restarted.

Setting encryption policies

Writers with an Encryption Policy property can encrypt files using AES, PGP, or RSA before sending them to the target. The property includes the following sub-properties:

sub-property

type

default value

notes

Algorithm

String

  • for AES, supported values are:

    • AES (no transformation)

    • AES/ECB/NoPadding (128)

    • AES/ECB/PKCS5Padding (128)

  • for PGP, set to PGP

  • for RSA, supported values are:

    • RSA (no transformation): required if key size is not 1024 or 2048

    • RSA/ECB/OAEPWithSHA-256AndMGF1Padding

    • RSA/ECB/PKCS1Padding

Compress

Boolean

False

If using PGP, optionally set to True to compress the data as .zip using org.bouncycastle.openpgp.PGPCompressedData.ZIP from the Bouncy Castle OpenPGP API before encrypting it.

Key File Name

String

Name of the key file (do not include the path). For PGP, this must be the public key. For RSA, you may use either the public key (default) or private key, in which case you must set Use Private Key to True.

If the key file is encrypted, specify its passphrase in the adapter's Data Encryption Key Passphrase property.

Key Location

String

path to the specified key file (must be readable by Striim)

Key Size

Long

If the Key Type is RSA, specify the key size, for example, 2048.

Key Type

String

 

supported values are AES, PGP, and RSA

Use Private Key

Boolean

False

For RSA only: With the default value of False, the file specified in Key File Name must be the public key. Set to True if to use the private key file instead.

For example:

CREATE TARGET EncryptedFileOut using FileWriter(
  filename:'EncryptedOutput.txt',
  directory:'FileOut',
  encryptionpolicy:'
    KeyType=PGP, 
    KeyLocation=/opt/striim/keys, 
    KeyFileName=myPgp.pub, 
    Algorithm=PGP'
) ...

DDL support in writers

OracleReader output may optionally include events for DDL operations. See Including DDL operations in Oracle Reader output and Handling DDL events when replicating Oracle data.

ADLSGen1Writer, ADLSGen2Writer, AzureBlobWriter, FileWriter, GCSWriter, HDFSWriter, and S3Writer can roll over to a new file when a DDL event is received from MySQLReader or OracleReader.

KafkaWriter can track schema evolution Using the Confluent or Hortonworks schema registry.

How update and delete operations are handled in writers

In some cases, when the input of a target is the output of a CDC source, Striim can match primary keys or other unique identifiers to replicate source update and delete operations in the target, keeping the source and target in sync.

In other cases, such matching is impossible, so updates and deletes are written as inserts. This is often acceptable when the target is a data warehouse.

With the following source-target combinations, update and delete operations in the source are handled as updates and deletes in the target:

source

target

  • GG Trail Reader

  • HPNonStop readers

  • MSSQL Reader

  • MySQL Reader

  • Oracle Reader

  • PostgreSQL Reader

  • Salesforce Reader

  • Azure Synapse Writer

  • BigQuery Writer (in MERGE mode)

  • CosmosDB Writer

  • Database Writer

  • Hazelcast Writer

  • HBase Writer

  • Kudu Writer

  • MapRDB Writer

  • MongoDB Writer

  • Redshift Writer

  • Snowflake Writer

  • Spanner Writer

  • MongoDB Reader

  • CosmosDB Writer

With the following source-target combinations, update and delete operations in the source are handled as inserts in the target:

source

target

  • GG Trail Reader

  • HPNonStop readers

  • MSSQL Reader

  • MySQL Reader

  • Oracle Reader

  • PostgreSQL Reader

  • Salesforce Reader

  • BigQueryWriter (in APPENDONLY mode)

  • Cloudera Hive Writer

  • DatabaseR eader

  • Incremental Batch Reader

  • Azure Synapse Writer

  • BigQuery Writer

  • CosmosDB Writer

  • Database Writer

  • Hazelcast Writer

  • HBase Writer

  • Kudu Writer

  • MapRDB Writer

  • Redshift Writer

  • Snowflake Writer

  • Spanner Writer

When the target is HiveWriter or HortonworksHiveWriter, update and delete operations may be handled by the target as updates and deletes or as inserts depending on various factors (see Hive Writer for details).

ADLS Gen1 Writer

Writes to files in Azure Data Lake Storage Gen1. A common use case is to write data from on-premise sources to an ADLS staging area from which it can be consumed by Azure-based analytics tools.

property

type

default value

notes

Auth token Endpoint

String

the token endpoint URL for your web application (see "Generating the Service Principal" under Using Client Keys)

Client ID

String

the application ID for your web application (see "Generating the Service Principal" under Using Client Keys)

Client Key

encrypted password

the key for your web application (see "Generating the Service Principal" under Using Client Keys)

Compression Type

String

Set to gzip when the input is in gzip format. Otherwise, leave blank.

Data Lake Store Name

String

the name of your Data Lake Storage Gen1 account, for example, mydlsname.azuredatalakestore.net (do not include adl://)

Directory

String

The full path to the directory in which to write the files. See Setting output names and rollover / upload policies for advanced options.

File Name

String 

The base name of the files to be written. See Setting output names and rollover / upload policies.

Rollover on DDL

Boolean

True

Has effect only when the input stream is the output stream of a CDC reader source. With the default value of True, rolls over to a new file when a DDL event is received. Set to False to keep writing to the same file.

Rollover Policy

String

eventcount:10000, interval:30s

See Setting output names and rollover / upload policies.

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Data is written in 4 MB batches or whenever rollover occurs.

Sample application:

CREATE APPLICATION testADLSGen1;

CREATE SOURCE PosSource USING FileReader ( 
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) 
OUTPUT TO PosSource_Stream;

CREATE CQ PosSource_Stream_CQ 
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET testADSLGen1target USING ADSLGen1Writer (
  directory:'mydir',
  filename:'myfile.json',
  datalakestorename:'mydlsname.azuredatalakestore.net',
  clientid:'********-****-****-****-************',
  authtokenendpoint:'https://login.microsoftonline.com/********-****-****-****-************/oauth2/token',
  clientkey:'********************************************'
)
FORMAT USING JSONFormatter ()
INPUT FROM PosSource_TransformedStream;

END APPLICATION testADLSGen1;

Since the test data set is less than 10,000 events, and ADSLGen1Writer is using the default rollover policy, the data will be uploaded in 30 seconds.

ADLS Gen2 Writer

Writes to files in an Azure Data Lake Storage Gen2 file system. A common use case is to write data from on-premise sources to an ADLS staging area from which it can be consumed by Azure-based analytics tools.

When you create the Gen2 storage account, set Storage account kind to StorageV2 and enable Hierarchical namespace.

property

type

default value

notes

Account Name

String

the storage account name

Compression Type

String

Set to gzip when the input is in gzip format. Otherwise, leave blank.

Directory

String

The full path to the directory in which to write the files. See Setting output names and rollover / upload policies for advanced options.

File Name

String 

The base name of the files to be written. See Setting output names and rollover / upload policies.

File System Name

String

the ADLS Gen2 file system where the files will be written

Rollover on DDL

Boolean

True

Has effect only when the input stream is the output stream of a MySQLReader or OracleReader source. With the default value of True, rolls over to a new file when a DDL event is received. Set to False to keep writing to the same file.

SAS Token

encrypted password

The SAS token for a shared access signature for the storage account. Allowed services must include Blob, allowed resource types must include Object, and allowed permissions must include Write and Create. Remove the ? from the beginning of the SAS token.

Note that SAS tokens have an expiration date. See Best practices when using SAS.

Upload Policy

String

eventcount:10000, interval:5m

See Setting output names and rollover / upload policies. Keep these settings low enough that individual uploads do not exceed the underlying Microsoft REST API's limit of 100 MB for a single operation.

For best performance, Microsoft recommends uploads between 4 and 16 MB. Setting UploadPolicy to filesize:16M will accomplish that. However, if there is a long gap between events, this will mean some events will not be written to ADLS for some time. For example, if Striim receives events only during working hours, the last events received at the end of the day on Friday would not be written until Monday morning.

When the app is stopped, any remaining data in the upload buffer is discarded.

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Sample application:

CREATE APPLICATION ADLSGen2Test;

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
  positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;

CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET ADLSGen2Target USING ADLSGen2Writer (
  accountname:'mystorageaccount',
  sastoken:'********************************************',
  filesystemname:'myfilesystem',
  directory:'mydir',
  filename:'myfile.json',
  uploadpolicy: 'interval:15s'
)
FORMAT USING JSONFormatter ()
INPUT FROM PosSource_TransformedStream;

END APPLICATION ADLSGen2Test;

Azure Blob Writer

Writes to a blob in a Microsoft Azure Storage account (see Creating apps using templates). 

property

type

default value

notes

Account Access Key

String

the account access key from Storage accounts > <account name> > Access keys

Account Name

String

the name of the Azure storage account for the blob container

Blob Name

String

The base name of the blobs to be written. See Setting output names and rollover / upload policies.

Client Configuration

String

If using a proxy, specify ProxyHost=<host name or IP address>,ProxyPort=<port number>.

Compression Type

String

Set to gzip when the input is in gzip format. Otherwise, leave blank.

Container Name

String

the blob container name from Storage accounts > <account name> > Containers

Folder Name

String

name of the directory to contain the blob (optional)

See Setting output names and rollover / upload policies for instructions on defining dynamic directory names.

Rollover on DDL

Boolean

True

Has effect only when the input stream is the output stream of a CDC reader source. With the default value of True, rolls over to a new file when a DDL event is received. Set to False to keep writing to the same file.

Upload Policy

String

eventcount:10000,interval:5m

The upload policy may include eventcount, interval, and/or filesize (see Setting output names and rollover / upload policies for syntax). Cached data is written to Azure every time any of the specified values is exceeded. With the default value, data will be written every five minutes or sooner if the cache contains 10,000 events. When the app is undeployed, all remaining data is written to Azure.

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Azure Event Hub Writer

Writes to an existing Azure event hub, which is equivalent to a Kafka topic.

Azure Event Hubs is similar to Kafka, compatible with many Kafka tools, and uses some of the same architectural elements, such as consumer groups and partitions. AzureEventHubWriter is generally similar to Kafka Writer in sync mode and its output formats are the same.

See Connections and sessions for information on firewall settings.

property

type

default value

notes

Batch Policy

String

Size:1000000, Interval:30s

The batch policy may include size or interval. Cached data is written to the target every time either of the specified values is exceeded. With the default setting, data will be written every 30 seconds or sooner if the cache contains 1,000,000 bytes. When the application is stopped any remaining data in the buffer is discarded.

Connection Retry

String

Retries:0, RetryBackOff:1m

With the default Retries:0, retry is disabled. To enable retries, set a positive value for Retries and in RetryBackOff specify the interval between retries in minutes (#m) or seconds (#s) . For example, with the setting Retries:3, RetryBackOff:30s, if the first connection attempt is unsuccessful, in 30 seconds Striim will try again. If the second attempt is unsuccessful, in 30 seconds Striim will try again. If the third attempt is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Consumer Group

String

If E1P is true, specify an Event Hub consumer group for Striim to use for tracking which events have been written.

E1P

Boolean

false

With the default value, after recovery (see Recovering applications) there may be some duplicate events. Set to true to ensure that there are no duplicates ("exactly once processing"). If recovery is not enabled for the application, this setting will have no effect.

When this property is set to true, the target event hub must be empty the first time the application is started, and other applications must not write to the event hub.

Known issue DEV-16624: When set to true, AzureEventHubWriter will use approximately 42 MB of memory per partition, so if the hub has 32 partitions, it will use 1.3 GB.

Event Hub Config

String

If Striim is connecting with Azure through a proxy server, provide the connection details, in the format ProxyIP=<IP address>, ProxyPort=<port>, ProxyUsername=<user name>, ProxyPassword:<password>, for example, EventHubConfig='ProxyIP=192.0.2.100, ProxyPort=8080, ProxyUsername=myuser, ProxyPassword=passwd.

Event Hub Name

String

the name of the event hub, which must exist when the application is started and have between two and 32 partitions

Event Hub Namespace

String

the namespace of the specified event hub

Operation Timeout

Integer

1m

amount of time Striim will wait for Azure to respond to requests (reading, writing, or closing connections) before the application will fail

Partition Key

String

The name of a field in the input stream whose values determine how events will be distributed among multiple partitions. Events with the same partition key field value will be written to the same partition.

If the input stream is of any type except WAEvent, specify the name of one of its fields.

If the input stream is of the WAEvent type, specify a field in the METADATA map (see HP NonStop reader WAEvent fieldsMySQLReader WAEvent fieldsOracleReader WAEvent fields, or MS SQL Reader WAEvent fields) using the syntax @METADATA(<field name>), or a field in the USERDATA map (see Adding user-defined data to WAEvent streams), using the syntax @USERDATA(<field name>).

SAS Key

String

the primary key associated with the SAS policy

SAS Policy Name

String

an Azure SAS policy to authenticate connections (see Shared Access Authorization Policies)

For samples of the output, see:

If E1P is set to true, the records will contain information Striim can use to ensure no duplicate records are written during recovery (see Recovering applications).

The following sample application will write data from PosDataPreview.csv to an event hub.

CREATE SOURCE PosSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:yes
)
OUTPUT TO RawStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM RawStream;

CREATE TARGET EventHubTarget USING AzureEventHubWriter (
  EventHubNamespace:'myeventhub-ns',
  EventHubName:’PosAppData’,
  SASTokenName:'RootManageSharedAccessKey',
  SASToken:'******',
  PartitionKey:'merchantId'
)
FORMAT USING DSVFormatter ()
INPUT FROM PosDataStream;

The following sample application will replicate data from two Oracle tables to two partitions in an event hub.

CREATE SOURCE OracleSource USING OracleReader (
  Username:'myname',
  Password:'******',
  ConnectionURL: 'localhost:1521:XE’,
  Tables:'QATEST.EMP;QATEST.DEPT’
) 
OUTPUT TO sourceStream;

CREATE TARGET EventHubTarget USING AzureEventHubWriter (
  EventHubNamespace:'myeventhub-ns',
  EventHubName:’OracleData’,
  SASTokenName:'RootManageSharedAccessKey',
  SASToken:'******',
  PartitionKey:'@metadata(TableName)',
  E1P:'True',
  ConsumerGroup:'testconsumergroup'
)
FORMAT USING DSVFormatter()
INPUT FROM sourceStream;

Azure SQL DWH (Data Warehouse) Writer

See Azure Synapse Writer.

Azure Synapse Writer

Writes to Azure Synapse (formerly Azure SQL Data Warehouse).

Prerequisites:

  • Deploy an Azure Synapse instance.

  • Deploy an Azure Data Lake Storage Gen2 instance to be used for staging the data. (Alternatively, use an Azure Blob storage account, but Microsoft no longer recommends this.)

  • Optionally (strongly recommended by Microsoft), connect the Azure Synapse instance and the Azure Data Lake Storage Gen2 instance with an Azure Virtual Network (VNet). See Impact of using VNet Service Endpoints with Azure storage, particularly the prerequisites and the instructions for creating a database master key.

  • Create an Azure Synapse login for use by Striim.

  • Create an Azure Synapse database scoped credential with the storage account name as the IDENTITY and the storage account access key as the SECRET. For example:

    CREATE MASTER KEY ENCRYPTION BY PASSWORD='<password>';
    CREATE DATABASE SCOPED CREDENTIAL AppCred WITH IDENTITY = '<storage account name>',
    SECRET = '<access key>'; 

    You can view scoped credentials with the command:

    SELECT * FROM sys.database_scoped_credentials;

property

type

default value

notes

Account Access Key

String

the account access key for the Blob storage account from Storage accounts > <account name> > Access keys

Account Name

String

the Blob storage account name

Client Configuration

String

If using a proxy, specify ProxyHost=<host name or IP address>,ProxyPort=<port number>.

Column Delimiter

String

|

If the data to be written may contain the default column delimiter (ASCII / UTF-8 124), specify a different delimiter that will never appear in the data.

Connection URL

String

the JDBC connection URL for Azure Synapse, in the format jdbc:sqlserver://<fully qualified server name>:<port>;database=<database name>, for example, jdbc:sqlserver://mysqldw.database.windows.net:1433;database=mydb

Alternatively, you may use Active Directory authentication (see Supporting Active Directory authentication for Azure).

Excluded Tables

String

When a wildcard is specified for Tables, you may specify here any source tables you wish to exclude. Specify the value as for the source portion of Tables. For example, to include data from HR_EMPLOYEES and HR_DEPTS but not from HRMASTER when writing to SQL Server (since you cannot specify a literal underscore in the Tables string):

Tables='HR%',
ExcludedTables='HRMASTER'

Ignorable Exception Code

String

Set to TABLE_NOT_FOUND if you do not want the application to crash when Striim tries to write to a table that does not exist in the target database. See Handling "table not found" errors for more information.

Mode

String

MERGE

With the default value of MERGE, inserts and deletes in the source are handled as inserts and deletes in the target. With this setting:

  • Since Synapse does not have primary keys, you may include the keycolumns option in the Tables property to specify a column in the target table that will contain a unique identifier for each row: for example, Tables:'SCOTT.EMP,mydataset.employee keycolumns(emp_num)'.

  • You may use wildcards for the source table provided all the tables have the key columns: for example, Tables:'DEMO.%,mydataset.% KeyColumns(...)'.

  • If you do not specify keycolumns , Striim will concatenate all column values and use that as a unique identifier.

Set to APPENDONLY to handle all operations as inserts. With this setting:

  • Updates and deletes from DatabaseReader, IncrementalBatchReader, and SQL CDC sources are handled as inserts in the target.

  • Primary key updates result in two records in the target, one with the previous value and one with the new value. If the Tables setting has a ColumnMap that includes @METADATA(OperationName), the operation name for the first event will be DELETE and for the second INSERT.

Parallel Threads

Integer

See Creating multiple writer instances.

Password

encrypted password

The password for the specified user. See Encrypted passwords.

Storage Access Driver Type

String

WASBS

Set to ABFS if you are using an Azure Data Lake Storage Gen2 instance for staging the data, or if you are using a general-purpose blob storage instance connected to Synapse using VNet or across a firewall. (See The Azure Blob Filesystem driver (ABFS) for more information.)

Leave at the default setting WASBS if using a general-purpose V1 or V2 blob storage account without VNet or a firewall.

Tables

String

The name(s) of the table(s) to write to. The table(s) must exist in the DBMS and the user specified in Username must have insert permission.

When the target's input stream is a user-defined event, specify a single table.

If the source table has no primary key, you may use the KeyColumns option to define a unique identifier for each row in the target table: for example, Tables:'SOURCEDB.EMP,MYDB.MYSCHEMA.EMP KeyColumns(EMP_ID)'. If necessary to ensure uniqueness, specify multiple columns with the syntax KeyColumns(<column 1>,<column 2>,...). You may use wildcards for the source table, provided all the tables have the key columns: for example, Tables:'SOURCEDB.%,MYSCHEMA.% KeyColumns(...)'. If the source has no primary key and KeyColumns is not specified, the concatenated value of all source fields is used as the primary key in the target.

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use wildcards for the table names, but not for the schema or database. For example:

source.emp,target.emp
source.db1,target.db1;source.db2,target.db2
source.%,target.%
source.mydatabase.emp%,target.mydb.%
source1.%,target1.%;source2.%,target2.%

MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

See Mapping columns for additional options.

Upload Policy

String

eventcount:10000, interval:5m

The upload policy may include eventcount, interval, and/or filesize (see Setting output names and rollover / upload policies for syntax). Cached data is written to the storage account every time any of the specified values is exceeded. With the default value, data will be written every five minutes or sooner if the cache contains 10,000 events. When the app is undeployed, all remaining data is written to the storage account.

Username

String

the user name Striim will use to log in to the Azure Synapse specified in ConnectionURL

The following sample application would read from Oracle using IncrementalBatchReader and write to Azure Synapse.

CREATE  SOURCE ibr2azdw_Source USING IncrementalBatchReader  ( 
  Username: 'striim',
  Password: '********',
  ConnectionURL: '192.0.2.1:1521:orcl',
  Tables: 'MYSCHEMA.TABLE1',
  CheckColumn: 'MYSCHEMA.TABLE1=UUID',
  StartPosition: 'MYSCHEMA.TABLE1=1234'
) 
OUTPUT TO ibr2azdw_Source_Stream ;

CREATE  TARGET ibr2azdw_AzureSynapseTarget1 USING AzureSQLDWHWriter  ( 
  Username: 'striim',
  Password: '********',
  ConnectionURL: 'jdbc:sqlserver://testserver.database.windows.net:1433;database=rlsdwdb',
  Tables: 'MYSCHEMA.TABLE1,dbo.TABLE1',
  AccountName: 'mystorageaccount'
  AccountAccessKey: '********'
) 
INPUT FROM ibr2azdw_Source_Stream;
Azure Synapse data type support and correspondence

TQL type

Azure Synapse type

java.lang.Byte

tinyint

java.lang.Double

float

java.lang.Float

float

java.lang.Integer

int

java.lang.Long

bigint

java.lang.Short

smallint

java.lang.String

char, nchar, nvarchar, varchar

org.joda.time.DateTime

datetime, datetime2, datetimeoffset

When the input of an Azure Synapse target is the output of a MySQL source (DatabaseReader, IncremenatlBatchReader, or MySQLReader):

MySQL type

Azure Synapse type

bigint

bigint, numeric

bigint unsigned

bigint

binary

binary

char

nchar

date

date

datetime

datetime, datetime2, datetimeoffset

decimal

decimal

decimal unsigned

decimal

double

money, smallmoney

float

float, real

int

int

int unsigned

int

longblob

varbinary

longtext

varchar

mediumblob

binary

mediumint

int

mediumint unsigned

int

mediumtext

varchar

numeric unsigned

int

smallint

smallint

smallint unsigned

smallint

text

varchar

time

time

tinyblob

binary

tinyint

bit (if only one digit), tinyint

tinyint unsigned

tinyint

tinytext

varchar

varbinary

varbinary

varchar

nvarchar, varchar

year

varchar

When the input of an Azure Synapse target is the output of an Oracle source (DatabaseReader, IncremenatlBatchReader, or OracleReader):

Oracle type

Azure SQL Data Synapse type

binary_double

float

binary_float

real

blob

binary, varbinary

char

char

clob

nvarchar

date

date

float

float

nchar

nchar

nclob

varchar

number(1)

bit

number(10,4)

smallmoney

number(10)

int

number(19,4)

money

number(19)

bigint

number(3)

tinyint

number(5)

char, smallint

timestamp

datetime, datetime2, datetimeoffset

timestamp with local timezone

datetimeoffset

timestamp with timezone

datetimeoffset

varchar2

varchar

varchar2(30)

time

xmltype

varchar

When the input of an AzureSynapse target is the output of a SQL Server source (DatabaseReader, IncremenatlBatchReader, or MSSQLReader):

SQL Server type

Azure Synapse type

bigint

bigint

binary

binary

bit

bit, char

date

date

datetime

datetime

datetime2

datetime2

datetimeoffset

datetimeoffset

decimal

decimal

float

float

image

varbinary

int

int

money

money

nchar

nchar

ntext

varchar

numeric

numeric

nvarchar

nvarchar

nvarchar

nvarchar

real

real

smalldatetime

smalldatetime

smallint

smallint

smallmoney

smallmoney

text

varchar

time

time

tinyint

tinyint

varbinary

varbinary

varchar

varchar

xml

varchar

BigQuery Writer

BigQuery is a serverless, highly scalable, and cost-effective cloud data warehouse offered by Google. Striim’s BigQueryWriter writes the data from various supported sources into Google’s BigQuery data warehouse to support real time data warehousing and reporting.

BigQuery Writer can be used to move data from transactional databases such as Oracle, Amazon RDS, Azure SQL DB, PostgreSQL, Microsoft SQL Server, MySQL, Google Spanner, and other supported databases into BigQuery with low latency and in real time. Striim supports complete table loads, change feeds (CDC) , and incremental batch feeds into BigQuery. BigQuery Writer properties can be configured to support authentication, object mappings, batching, performance, and failure handling.

BigQuery Writer uses the google-cloud-bigquery client for Java API version 1.110.0.

You have a choice of two methods, using different parts of this API, for BigQuery Writer to use to write to its target tables. You cannot switch between these methods while an application is running.

  • Streaming (new in Striim 3.10.1): Incoming data is buffered locally as one memory buffer per target table. Once the upload condition is met, BigQuery Writer uses InsertAllResponse to stream the content of each memory buffer into its target table. We recommend using this method when you need low latency.

  • Load: Incoming data is buffered locally as one CSV file per target table. Once the upload condition for a file is met, BigQuery Writer uses TableDataWriteChannel to upload the content of the file to BigQuery, which writes it to the target table. This was the only method supported by BigQuery Writer in Striim 3.9.x and earlier releases. This method may be a good fit if your uploads are infrequent (for example, once an hour).

If BigQuery Writer applications you created with 3.9.8 or earlier are working well, you may keep using the load method. On the other hand, if you are spending a lot of time tuning those applications' batch policies or are running up against BigQuery's quotas, the streaming method may work better for you.

The most typical workflow when using BigQuery Writer is for streaming integration. Briefly, it works like this:

  1. Create tables in the target system corresponding to those in the source. You may do this using Striim's schema conversion utility for this task (Using the schema conversion utility or any other tool you prefer.

  2. Create a Striim initial load application using Database Reader and BigQuery Writer. This application will use the load method and, to avoid potential BigQuery quota limitations, will use large batches. Run it once to load existing data to BigQuery.

  3. Create a Striim streaming integration application using the appropriate CDC reader and BigQuery Writer. This application will use the streaming method to minimize the time it takes for new data to arrive at the target. Streaming also avoids the quota limitation issues of the load method. Typically you will want to enable recovery for this application (see Recovering applications) and perhaps deploy it on a multi-server cluster configured for failover to ensure high availability (see Continuing operation after server failover). Run this application continuously to stream new data to BigQuery (in other words, to synchronize the source and target.

BigQuery architecture and terminology

Some of BigQuery's terminology may be confusing for users of relational databases that use different terms, or use the same terms to mean different things.

  • Project: contains one or more datasets, similar to the way an Oracle or PostgreSQL schema contains one or more databases.

  • Dataset: contains one or more tables, similar to a MySQL, Oracle, PostgreSQL, or SQL Server database.

  • Schema: defines column names and data types for a table, similar to a CREATE TABLE DDL statement in SQL.

  • Table: equivalent to a table in SQL.

BigQuery setup

Before you can use BigQuery Writer, you must create a service account (see Service Accounts). 

The service account must have the BigQuery Data Editor, BigQuery Job User, and BigQuery Resource Admin roles for the target tables (see BigQuery predefined Cloud IAM roles). Alternatively, you may create a custom role with the following permissions for the target tables (see BigQuery custom roles):

  • bigquery.datasets.get

  • bigquery.jobs.create

  • bigquery.jobs.get

  • bigquery.jobs.list

  • bigquery.jobs.listAll

  • bigquery.tables.create

  • bigquery.tables.delete

  • bigquery.tables.get

  • bigquery.tables.getData

  • bigquery.tables.list

  • bigquery.tables.update

  • bigquery.tables.updateData

  • bigquery.tables.updateTag

After you have created the service account, download its key file (see Authenticating with a service account key file) and copy it to the same location on each Striim server that will run this adapter, or to a network location accessible by all servers. You will specify the path and file name in BigQuery Writer's Service Account Key property.

BigQuery writer simple application

This simple application reads data from a .csv file from the PosApp sample application and writes it to BigQuery. In this case, BigQuery Writer has an input stream of a user-defined type. The sample code assumes that you have created the following table in BigQuery:

Screen Shot 2016-09-25 at 5.34.11 PM.png
CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
    positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;
 
CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET BigQueryTarget USING BigQueryWriter(
  ServiceAccountKey:"/<path>/<file name>.json",
  projectId:"myprojectid",
  Tables: "mydataset.mytable"
)
INPUT FROM PosSource_TransformedStream;

After running this application, BigQuery, run select * from mydataset.mytable; and you will see the data from the file. Since the default timeout is 90 seconds, it may take that long after the application completes before you see all 1160 records in BigQuery.

BigQuery Writer properties

The adapter properties are:

property

type

default value

notes

Allow Quoted Newlines

Boolean

False

Set to True to allow quoted newlines in the delimited text files in which BigQueryWriter accumulates batched data.

Batch Policy

String

eventCount:1000000, Interval:90

The batch policy includes eventCount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to EventCount:1,Interval:0.

With the default setting, data will be written every 90 seconds or sooner if the buffer accumulates 1,000,000 events.

When Streaming Upload is False, use Interval:60 so as not to exceed the quota for 1500 a day. When Streaming Upload isTrue, use EventCount = 10000 since that is the quota for one batch. (Quotas are subject to change by Google.)

Do not exceed BigQuery's quotas or limits (see Load jobs in the "Quotas and limits" section of Google's BigQuery documentation). For example, if you exceed the quota of batches per table per day day, BigQueryWriter will throw an exception such as error code 500, "An internal error occurred and the request could not be completed," and stop the application. To avoid this, reduce the number of batches by increasing the event count and/or interval.

Column Delimiter

String

| (UTF-8 007C)

The character(s) used to delimit fields in the delimited text files in which the adapter accumulates batched data. If the data will contain the | character, change the default value to a sequence of characters that will not appear in the data.

Connection Retry Policy

String

totalTimeout=600, initialRetryDelay=10, retryDelayMultiplier=2.0, maxRetryDelay=60 , maxAttempts=5, jittered=True, initialRpcTimeout=10, rpcTimeoutMultiplier=2.0, maxRpcTimeout=30

Do not change unless instructed to by Striim support.

Data Location

String

Specify the dataset's Data location property value if necessary (see Dataset Locations).

Encoding

String

UTF-8

Encoding for the delimited text files in which BigQueryWriter accumulates batched data. Currently the only supported encoding is UTF-8 (see Loading encoded data).

Ignorable Exception Code

String

Set to TABLE_NOT_FOUND if you do not want the application to crash when Striim tries to write to a table that does not exist in the target database. See Handling "table not found" errors for more information.

Include Insert ID

Boolean

True

When Streaming Upload is False, this setting is ignored.

When Mode is APPENDONLY and Streaming Upload is True, with the default setting of True, BigQuery will add a unique ID to every row. Set to False if you prefer that BigQuery not add unique IDs. For more information, see Ensuring data consistency and Disabling best effort de-duplication.

When Mode is MERGE, you may set this to False as Striim will de-duplicate the events before writing them to the target.

Mode

String

APPENDONLY

With the default value APPENDONLY:

  • Updates and deletes from DatabaseReader, IncrementalBatchReader, and SQL CDC sources are handled as inserts in the target.

  • Primary key updates result in two records in the target, one with the previous value and one with the new value. If the Tables setting has a ColumnMap that includes @METADATA(OperationName), the operation name for the first event will be DELETE and for the second INSERT.

  • Data should be available for querying immediately after it has been written, but itcopying and modification may not be possible for up to 90 minutes (see Checking for data availability).

Set to MERGE to handle updates and deletes as updates and deletes instead. When using MERGE:

  • Data will not be written to any target tables that have streaming buffers.

  • Since BigQuery does not have primary keys, you may include the keycolumns option in the Tables property to specify a column in the target table that will contain a unique identifier for each row: for example, Tables:'SCOTT.EMP,mydataset.employee keycolumns(emp_num)'.

  • You may use wildcards for the source table provided all the tables have the key columns: for example, Tables:'DEMO.%,mydataset.% KeyColumns(...)'.

  • If you do not specify keycolumns , Striim will concatenate all column values and use that as a unique identifier.

Null Marker

String

NULL

When Streaming Upload is False, a string inserted into fields in the delimited text files in which BigQueryWriter accumulates batched data to indicate that a field has a null value. These are converted back to nulls in the target tables. If any field might contain the string NULL, change this to a sequence of characters that will not appear in the data.

When Streaming Upload is True, this setting has no effect.

Optimized Merge

Boolean

false

Set to true only when the target's input stream is the output of an HP NonStop reader, MySQL Reader, or OracleReader source, and the source events will include partial records. For example, with Oracle Reader, when supplemental logging has not been enabled for all columns, partial records are sent for updates. When the source events will always include full records, leave this set to false.

Parallel Threads

Integer

See Creating multiple writer instances.

Project Id

String

Specify the project ID of the dataset's project.

Quote Character

String

" (UTF-8 0022)

The character(s) used to quote (escape) field values in the delimited text files in which the adapter accumulates batched data. If the data will contain ", change the default value to a sequence of characters that will not appear in the data.

Service Account Key

String

The path (from root or the Striim program directory) and file name to the .json credentials file downloaded from Google (see BigQuery setup).

Standard SQL

Boolean

True

With the default setting of True, BigQueryWriter constrains timestamp values to standard SQL. Set to False to use legacy SQL. See Migrating to Standard SQL for more information.

Streaming Upload

Boolean

False

With the default value of False, the writer uses the load method. Set to True to use the streaming method. See discussion in BigQuery Writer..

Tables

String

The name(s) of the table(s) to write to, in the format <dataset>.<table>. The table(s) must exist when the application is started.

When the target's input stream is a user-defined event, specify a single table.

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use wildcards for the table names, but not for the schema or database. For example:

source.emp,target.emp
source.db1,target.db1;source.db2,target.db2
source.%,target.%
source.mydatabase.emp%,target.mydb.%
source1.%,target1.%;source2.%,target2.%

MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

Known issue DEV-21740: If the columns in the target are not in the same order as the source, writing will fail, even if the column names are the same. Workaround: Use ColumnMap to map at least one column.

See Mapping columns for additional options.

Transport Options

String

connectTimeout=300, readTimeout=120

Sets timeouts in BigQuery.

BigQuery data type support and correspondence

Oracle type

TQL type

BFILE

unsupported

BINARY_DOUBLE

NUMERIC

BINARY_FLOAT

FLOAT64

BLOB

BYTES

CHAR

STRING

CLOB

STRING

An insert or update containing a column of this type generates two CDC log entries: an insert or update in which the value for this column is null, followed by an update including the value.

DATE

DATE

FLOAT

FLOAT64

INTERVALDAYTOSECOND

STRING

INTERVALYEARTOMONTH

STRING

LONG

unsupported

LONG RAW

unsupported

NCHAR

STRING

NCLOB

STRING

NESTED TABLE

unsupported

NUMBER

NUMERIC

NVARCHAR2

STRING

RAW

BYTES

ROWID

unsupported

TIMESTAMP

DATETIME

TIMESTAMP WITHLOCALTIMEZONE

TIMESTAMP

TIMESTAMP WITHTIMEZONE

TIMESTAMP

UROWID

unsupported

VARCHAR2

STRING

VARRAY

unsupported

XMLTYPE

unsupported

PostgreSQL type

BigQuery type

bigint

INT64

bigserial

INT64

bit

unsupported

bit varying

unsupported

boolean

BOOLEAN

box

unsupported

bytea

BYTES

character

STRING

character varying

STRING

cidr

unsupported

circle

unsupported

date

STRING

double precision

FLOAT64

inet

INT64

integer

INT64

int2

INT64

int4

INT64

int4range

STRING

int8

LONG

int8range

STRING

integer

INTEGER

interval

STRING

json

STRING

jsonb

STRING

line

unsupported

lseg

unsupported

macaddr

unsupported

money

unsupported

numeric

NUMERIC

path

unsupported

pg_lan

unsupported

point

unsupported

polygon

unsupported

real

REAL

smallint

INT64

smallserial

INT64

serial

INT64

text

STRING

time

STRING

time with time zone

TIMESTAMP

timestamp

DATETIME

timestamp with time zone

TIMESTAMP

tsquery

unsupported

tsvector

unsupported

txid_snapshot

unsupported

uuid

unsupported

xml

STRING

Cassandra Cosmos DB Writer

Writes to tables in Cosmos DB using the Cassandra API. Notes:

  • Add a Baltimore root certificate to Striim's Java environment following the instructions in To add a root certificate to the cacerts store.

  • Target tables must have primary keys.

  • Primary keys can not be updated.

  • During recovery (see Recovering applications), events with primary keys that already exist in the target will be updated with the new values.

  • When the input stream of a Cassandra Cosmos DB Writer target is the output of a SQL CDC source, Compression must be enabled in the source.

  • If Cassandra Cosmos DB Writer exceeds the number of Request Units per second provisioned for your Cosmos DB instance (see Request Units in Azure Cosmos DB), the application will crash.

property

type

default value

notes

Account Endpoint

String

Contact Point from the Azure Cosmos DB account's Connection String page

Account Key

encrypted password

Primary Password from the Azure Cosmos DB account's Connection String page's Read-write Keys tab

Checkpoint Table

String

CHKPOINT

To support recovery (see Recovering applications, a checkpoint table must be created in the target keyspace using the following DDL:

CREATE TABLE chkpoint (
  id varchar PRIMARY KEY,
  sourceposition blob,
  pendingddl int,
  ddl ascii);

If necessary you may use a different table name, in which case change the value of this property.

Column Name Escape Sequence

String

 

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source, you may use this property to specify which characters Striim will use to escape column names that contain special characters or are on the List of reserved keywords. You may specify two characters to be added at the start and end of the name (for example, [] ), or one character to be added at both the start and end (for example, ").

Connection Retry

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Consistency Level

String

ONE

How many replicas need to respond to the coordinator in order to consider the operation a success. Supported values are ONE, TWO, THREE, ANY, ALL, EACH QUORUM, and LOCAL QUORUM. For more information, see Consistency levels and Azure Cosmos DB APIs.

Excluded Tables

String

When a wildcard is specified for Tables, you may specify here any tables you wish to exclude. Specify the value as for Tables.

Flush Policy

String

EventCount:1000, Interval:60

If data is not flushed properly with the default setting, you may use this property to specify how many events Striim will accumulate before writing and/or the maximum number of seconds that will elapse between writes. For example:

  • flushpolicy:'eventcount:5000'

  • flushpolicy:'interval:10s'

  • flushpolicy:'interval:10s, eventcount:5000'

Note that changing this setting may significantly degrade performance.

With a setting of 'eventcount:1', each event will be written immediately. This can be useful during development, debugging, testing, and troubleshooting.

Ignorable Exception Code

String

By default, if the Cassandra API returns an error, the application will crash. Specify a portion of an error message to ignore errors and continue. This property is not case-sensitive.

When the input stream is the output of a SQL CDC source, and primary keys will be updated in the source, set this to primary key to ignore primary key errors and continue.

To capture the ignored exceptions, Writing exceptions to a WActionStore.

Keyspace

String

the Cassandra keyspace containing the specified tables

Load Balancing Policy

String

TokenAwarePolicy(RoundRobinPolicy())

See Specifying load balancing policies for more information.

Overload Policy

String

retryInterval=10, maxRetries=3

With the default setting, if Cassandra Cosmos DB Writer exceeds the number of Request Units per second provisioned for your Cosmos DB instance (see Request Units in Azure Cosmos DB) and the Cassandra API reports an overload error, the adapter will try again in ten seconds (retryInterval. If the second attempt is unsuccessful, in ten seconds it will try a a second time. If the second attempt is unsuccessful, in ten seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Parallel Threads

Integer

See Creating multiple writer instances.

Port

String

10350

Port from the Azure Cosmos DB account's Connection String page

Tables

String

Cassandra table names must be lowercase. The tables must exist in Cassandra. Since columns in Cassandra tables are not usually created in the same order they are specified in the CREATE TABLE statement, when the input stream of the DatabaseWriter target is the output of a DatabaseReader or CDC source, the ColumnMap option is usually required (see Mapping columns) and wildcards are not supported. You may omit ColumnMap if you verify that the Cassandra columns are in the same order as the source columns.

For example:

CREATE TARGET CassandraTarget USING CassandraCosmosDBWriter (
  AccountEndpoint: 'myCosmosDBAccount.cassandra.cosmos.azure.com',
  AccountKey: '**************************************************************************************==',
  Keyspace: 'myKeyspace',
  Tables: '<myKeyspace.MyTable1,myKeyspace.MyTable2'
INPUT FROM FilteredDataStream;

Data type support and correspondence are the same as for Database Writer (see Database Writer data type support and correspondence).

Cassandra Writer

Writes to tables in Apache Cassandra or DataStax. The Cassandra JDBC driver is included with Striim.

If writing to tables in Cosmos DB using the Cassandra API, use Cassandra Cosmos DB Writer.

property

type

default value

notes

Batch Policy

String

eventcount:1000, interval:60

The batch policy includes eventcount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to BatchPolicy:'-1'.

With the default setting, events will be sent every 60 seconds or sooner if the buffer accumulates 1,000 events.

Checkpoint Table

String

CHKPOINT

The table where DatabaseWriter will store recovery information when recovery is enabled. See Creating the checkpoint table below for DDL to create the table. Multiple instances of DatabaseWriter may share the same table. If the table is not in the Oracle or SQL/MX schema being written to, or the same MySQL or SQL Server database specified in the connection URL, specify a fully qualified name.

Column Name Escape Sequence

String

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source, you may use this property to specify which characters Striim will use to escape column names that are on the List of reserved keywords. You may specify two characters to be added at the start and end of the name (for example, [] ), or one character to be added at both the start and end.

If this value is blank, Striim will use the following escape characters for the specified target databases:

  • Oracle: " (ASCII / UTF-8 22)

  • MySQL: ` (ASCII / UTF-8 60)

  • PostgreSQL: " (ASCII / UTF-8 22)

  • SQL Server: []

Commit Policy

String

eventcount:1000, interval:60

The commit policy controls how often transactions are committed in the target database. The syntax is the same as for BatchPolicy. CommitPolicy values must always be equal to or greater than BatchPolicy values. To disable CommitPolicy, set to CommitPolicy:'-1'.

If BatchPolicy is disabled, each event is sent to the target database immediately and the transactions are committed as specified by CommitPolicy.

If BatchPolicy is enabled and CommitPolicy is disabled, each batch is committed as soon as it is received by the target database.

If BatchPolicy and CommitPolicy are both disabled, each event received by DatabaseWriter will be committed immediately. This may be useful in development and testing, but is inappropriate for a production environment.

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

Use the syntax jdbc:cassandra://<host name>:<port>/<keyspace>

See cassandra-jdbc-wrapper for additional options and more information.

Excluded Tables

String

When a wildcard is specified for Tables, you may specify here any source tables you wish to exclude. Specify the value as for the source portion of Tables. For example, to include data from HR_EMPLOYEES and HR_DEPTS but not from HRMASTER when writing to SQL Server (since you cannot specify a literal underscore in the Tables string):

Tables='HR%',
ExcludedTables='HRMASTER'

Ignorable Exception Code

String

By default, if the target DBMS returns an error, Cassandra Writer crashes the application. Use this property to specify errors to ignore, separated by commas. For example, to ignore "com.datastax.driver.core.exceptions.InvalidQueryException: PRIMARY KEY part id found in SET part," specify:

IgnorableExceptionCode: 'PRIMARY KEY'

When an ignorable exception occurs, Striim will write an "Ignoring VendorExceptionCode" message to the log, including the error number, and increment the "Number of exceptions ignored" value for the target.

To view the number of exceptions ignored in the web UI, go to the Monitor page, click the application name, click Targets, and click More Details next to the target.

For details of the individual ignored events, search striim.server.log for VendorExceptionCode (see Reading log files). Alternatively, to capture the ignored exceptions, Writing exceptions to a WActionStore.

When replicating from MySQL/MariaDB, Oracle 12c, PostgreSQL, and SQL Server CDC readers, the following three generic (that is, not corresponding to any database-specific error code) exceptions can be specified:

  • NO_OP_UPDATE: could not update a row in the target (typically because there was no corresponding primary key)

  • NO_OP_PKUPDATE: could not update the primary key of a row in the target (typically because the "before" primary key could not be found); not supported when source is PostgreSQLReader

  • NO_OP_DELETE: could not delete a row in the target (typically because there was no corresponding primary key)

These exceptions typically occur when other applications besides Striim are writing to the target database. The unwritten events will be captured to the application's exception store, if one exists (see CREATE EXCEPTIONSTORE).

Parallel Threads

Integer

See Creating multiple writer instances.

Enabling recovery for the application disables parallel threads.

Password

encrypted password

The password for the specified user. See Encrypted passwords.

Tables

String

Specify the name(s) of the table(s) to write to. Cassandra table names must be lowercase. The tables must exist in Cassandra.

Since columns in Cassandra tables are not usually created in the same order they are specified in the CREATE TABLE statement, when the input stream of the target is the output of a DatabaseReader or CDC source, the ColumnMap option is usually required (see Mapping columns). You may omit ColumnMap if you verify that the Cassandra columns are in the same order as the source columns.

If a specified target table does not exist, the application will crash with an error. To skip writes to missing tables without crashing, specify TABLE_NOT_FOUND as an Ignorable Exception Code.

When the target's input stream is a user-defined event, specify a single table.

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. Wildcards are not supported.

source.emp,target.emp
source.db1,target.db1;source.db2,target.db2

MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

Username

String

The DBMS user name the adapter will use to log in to the server specified in Connection URL. The specified user must have MODIFY permission on the tables to be written to.

Vendor Configuration

String

Reserved.

Limitations:

  • If the Cassandra host goes offline, DatabaseWriter will log a "No host reachable" error and the application will crash.

  • Primary keys may not be updated (a limitation of Cassandra) and attempts to do so will crash the application. With DatabaseWriter and CDC sources, one way to avoid this is to filter out events where the PK_UPDATE metadata field is True. For example:

    CREATE CQ FilterPKUpdateCQ
    INSERT into CassandraStream
    SELECT * FROM CDCStream x WHERE META(x,PK_UPDATE) != True;

The sample application below assumes that you have created the following table in Cassandra:

CREATE TABLE mykeyspace.testtable (
  merchantid text PRIMARY KEY,
  datetime timestamp, 
  authamount decimal, 
  zip text;

The following TQL will write to that table:

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
    positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;
 
CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET CassandraTarget USING CassandraWriter(
  connectionurl: 'jdbc:cassandra://203.0.113.50:9042/mykeyspace',
  Username:'striim',
  Password:'******',
  Tables: 'mykeyspace.testtable'
)
INPUT FROM PosSource_TransformedStream;

Cosmos DB Writer

Writes to Azure Cosmos DB collections using the Cosmos DB SQL API. (To write to Cosmos DB using the Cassandra API, see Cassandra Cosmos DB Writer.) It may be used in four ways:

  • With an input stream of a user-defined type, CosmosDBWriter writes events as documents to a single collection.

    In this case, the key field of the input stream is used as the document ID. If there is no key field, the document ID is generated by concatenating the values of all fields. Alternatively, you may specify a subset of fields to be concatenated using the syntax <database name>.<collection name> keycolumns(<field1 name>, <field2 name>, ...) in the Collections property. Target document field names are taken from the input stream's event type.

  • With an input stream of type JSONNodeEvent that is the output stream of a source using JSONParser, CosmosDBWriter writes events as documents to a single collection.

    When the JSON event contains an id field, its value is used as the Cosmos DB document ID. When the JSON event does not contain an id field, a unique ID (for example, 5abcD-56efgh0-ijkl43) is generated for each document. Since Cosmos DB is case-sensitive, when the JSON event includes a field named IdiD, or ID, it is imported as a separate field.

  • With an input stream of type JSONNodeEvent that is the output stream of a MongoDBReader source, CosmosDBWriter writes each MongoDB collection to a separate Cosmos DB collection.

    MongoDB collections may be replicated in Cosmos DB by using wildcards in the Collections property (see  Replicating MongoDB data to Azure CosmosDB for details). Alternatively, you may manually map MongoDB collections to Cosmos DB collections as discussed in the notes for the Collections property.

    The MongoDB document ID (included in the JSONNodeEvent metadata map) is used as the Cosmos DB document ID.

    When a source event contains a field named id, in the target it will be renamed ID to avoid conflict with the Cosmos DB target document ID.

  • With an input stream of type WAEvent that is the output stream of a SQL CDC reader or DatabaseReader source, CosmosDBWriter writes data from each source table to a separate collection. The target collections may be in different databases.

    Source table data may be replicated to Cosmos DB collections of the same names by using wildcards in the Collections property. Note that data will be read only from tables that exist when the source starts. Additional tables added later will be ignored until the source is restarted. Alternatively, you may manually map source tables to Cosmos DB collections as discussed in the notes for the Collections property. When the source is a CDC reader, updates and deletes in source tables can be replicated in the corresponding Cosmos DB target collections. See Replicating Oracle data to Azure Cosmos DB.

    Each source row's primary key value (which may be a composite) is used as the document ID for the corresponding Cosmos DB document. If the table has no primary key, the document ID is generated by concatenating the values of all fields in the row. Alternatively, you may select a subset of fields to be concatenated using the keycolumns option as discussed in the notes for the Collections property.

    Each row in a source table is written to a document in the target collection mapped to the table. Target document field names are taken from the source event's metadata map and their values from its data array (see WAEvent contents for change data).

    When a source event contains a field named id, in the target it will be renamed ID to avoid conflict with the Cosmos DB target document ID.

Caution

Cosmos DB document IDs may not exceed 255 characters (see the Documents page in Microsoft's documentation). When using wildcards or keycolumns, be sure that the generated document IDs will not exceed that limit.

Striim provides templates for creating applications that read from various sources and write to Cosmos DB. See Creating a new application using a template for details.

property

type

default value

notes

Access Key

encrypted password

read-write key from the Azure Cosmos DB account's Keys tab

Batch Policy

String

EventCount:10000, Interval:30

The batch policy includes eventCount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to EventCount:1,Interval:0.

With the default setting, events will be sent every 30 seconds or sooner if the cache contains 10,000 events.

Collections

String

The collection(s) to write to. The collection(s) must exist when the application is started. Partition keys must match one of the fields of the input and matching is case-sensitive. Unpartitioned collections are not supported (see Migrate non-partitioned containers to partitioned containers).

When the input stream is of a user-defined type or of type JSONNodeEvent from JSONParser, specify a single collection with the syntax <database name>.<collection name>.

When the input stream is of type JSONNodeEvent from MongoDBReader, or of type WAEvent from DatabaseReader or a CDC reader, specify one or more source-target pairs using the syntax <database>.<collection / table>,<database>.<collection name>. Note that Cosmos DB collection names are case-sensitive, so must match the case of source collection / table names. You may use wildcards ($ for MongoDB, % for other sources and Cosmos DB) in place of collection and table names.

If you are not using wildcards, you may override the default document ID by specifying one or more source column names to concatenate as the document ID in the Collections property using the syntax <source database>.<source collection / table>, <target database>.<target collection> keycolumns(<column name,...);... For example, Collections: sourcedb.sourcetable1, targetdb.targetcollection1 keycolumns(UUID); sourcedb.sourcetable2, targetdb.targetcollection2 keycolumns(company,branch).  (Note that @userdata is not supported in keycolumns values.) If an update in the source changes any of the concatenated values, the ID of the corresponding Cosmos DB document will be updated.

Collections Config

String

By default, throughput is set to 5000 RUs/sec. (see Request units in Azure Cosmos DB). You may override this by specifying values manually using the syntax <database>.<collection (throughput=<RUs/sec.>). You may use wildcards and specify multiple values separated by semicolons. A setting for a specific table overrides a wildcard setting, for example, db1.col%(throughput=4000); db1.col5(throughput=2000).

Connection Pool Size

Integer

1000

maximum number of database connections allowed (should not exceed connection pool size in Cosmos DB)

Connection Retry Policy

String

RetryInterval=60, MaxRetries=3

The connection retry policy includes retryInterval and maxRetries. With the default setting, if a connection attempt is unsuccessful, the adapter  will try again in 60 seconds (retryInterval. If the second attempt is unsuccessful, in 60 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Excluded Collections

String

Optionally, specify one or more collections to exclude from the set defined by the Collections property. Wildcards are not supported.

Ignorable Exception Code

String

By default, if Cosmos DB returns an error, CosmosDBWriter crashes the application. Use this property to specify errors to ignore, separated by commas. Supported values are:

  • PARTITION_KEY_NOT_FOUND (partition key is wrong or missing)

  • RESOURCE_ALREADY_EXISTS (the target collection has a document with the same id and partition key)

  • RESOURCE_NOT_FOUND (id is wrong or missing)

To capture the ignored exceptions, Writing exceptions to a WActionStore.

Key Separator

String

:

Inserted between values when generating document IDs by concatenating column or field values. If the values might contain a colon, change this to something that will not occur in those values.

Parallel Threads

Integer

See Creating multiple writer instances.

Service Endpoint

String

read-write connection string from the Azure Cosmos DB account's Keys tab

Database Writer

Writes to one of the following:

  • HP NonStop SQL/MX (and SQL/MP via aliases in SQL/MX)

  • MemSQL

  • MySQL and compatible versions of MariaDB and MariaDB Galera Cluster

  • Oracle

  • PostgreSQL

  • SAP HANA

  • SQL Server

The driver for PostgreSQL is included with the Striim server. For the others, the appropriate JDBC driver must be installed as described in Installing third-party drivers.

property

type

default value

notes

Batch Policy

String

eventcount:1000, interval:60

The batch policy includes eventcount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to BatchPolicy:'-1'.

With the default setting, events will be sent every 60 seconds or sooner if the buffer accumulates 1,000 events.

Bidirectional Marker Table

String

When performing bidirectional replication, the fully qualified name of the marker table (see Bidirectional replication). This setting is case-sensitive.

Checkpoint Table

String

CHKPOINT

The table where DatabaseWriter will store recovery information when recovery is enabled. See Creating the checkpoint table below for DDL to create the table. Multiple instances of DatabaseWriter may share the same table. If the table is not in the Oracle or SQL/MX schema being written to, or the same MySQL or SQL Server database specified in the connection URL, specify a fully qualified name.

Column Name Escape Sequence

String

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source, you may use this property to specify which characters Striim will use to escape column names that are on the List of reserved keywords. You may specify two characters to be added at the start and end of the name (for example, [] ), or one character to be added at both the start and end.

If this value is blank, Striim will use the following escape characters for the specified target databases:

  • Oracle: " (ASCII / UTF-8 22)

  • MySQL: ` (ASCII / UTF-8 60)

  • PostgreSQL: " (ASCII / UTF-8 22)

  • SQL Server: []

Commit Policy

String

eventcount:1000, interval:60

The commit policy controls how often transactions are committed in the target database. The syntax is the same as for BatchPolicy. CommitPolicy values must always be equal to or greater than BatchPolicy values. To disable CommitPolicy, set to CommitPolicy:'-1'.

If BatchPolicy is disabled, each event is sent to the target database immediately and the transactions are committed as specified by CommitPolicy.

If BatchPolicy is enabled and CommitPolicy is disabled, each batch is committed as soon as it is received by the target database.

If BatchPolicy and CommitPolicy are both disabled, each event received by DatabaseWriter will be committed immediately. This may be useful in development and testing, but is inappropriate for a production environment.

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

  • for HP NonStop SQL/MX: jdbc:t4sqlmx://<IP address>:<port> or jdbc:t4sqlmx://<IP address>:<port>/catalog=<catalog name>;schema=<schema name>

  • for MariaDB: jdbc:mariadb://<ip address>:<port>/<database name>

  • for MariaDB Galera Cluster: specify the IP address and port for each server in the cluster, separated by commas: jdbc:mariadb://<IP address>:<port>,<IP address>:<port>,...; optionally, append /<database name>

  • for MemSQL: same as MySQL

  • for MySQL: jdbc:mysql://<ip address>:<port>/<database name>

  • for Oracle: jdbc:oracle:thin:@<hostname>:<port>:<SID> (using Oracle 12c with PDB, use the SID for the PDB service) or jdbc:oracle:thin:@<hostname>:<port>/<service name>

  • for PostgreSQL, jdbc:postgresql://<ip address>:<port>/<database name>

  • for SAP HANA: jdbc:sap://<ip address>:<port>/?databaseName=<database name>&currentSchema=<schema name>

  • for SQL Server: jdbc:sqlserver://<ip address>:<port>;DatabaseName=<database name>

    You may use Active Directory authentication with Azure SQL Database (see Supporting Active Directory authentication for Azure) or, when Striim or a Forwarding Agent is running in Windows, with SQL Server (see Supporting Active Directory authentication for SQL Server).

When writing to MySQL, performance may be improved by appending ?rewriteBatchedStatements=true to the connection URL (see Configuration Properties and MySQL and JDBC with rewriteBatchedStatements=true).

Excluded Tables

String

When a wildcard is specified for Tables, you may specify here any source tables you wish to exclude. Specify the value as for the source portion of Tables. For example, to include data from HR_EMPLOYEES and HR_DEPTS but not from HRMASTER when writing to SQL Server (since you cannot specify a literal underscore in the Tables string):

Tables='HR%',
ExcludedTables='HRMASTER'

Ignorable Exception Code

String

By default, if the target DBMS returns an error, DatabaseWriter crashes the application. Use this property to specify errors to ignore, separated by commas. For example, to ignore Oracle ORA-00001 and ORA-00002, you would specify:

IgnorableExceptionCode: '1,2'

When an ignorable exception occurs, Striim will write an "Ignoring VendorExceptionCode" message to the log, including the error number, and increment the "Number of exceptions ignored" value for the target.

To view the number of exceptions ignored in the web UI, go to the Monitor page, click the application name, click Targets, and click More Details next to the target.

For details of the individual ignored events, search striim.server.log for VendorExceptionCode (see Reading log files). Alternatively, to capture the ignored exceptions, Writing exceptions to a WActionStore.

When replicating from MySQL/MariaDB, Oracle 12c, PostgreSQL, and SQL Server CDC readers, the following three generic (that is, not corresponding to any database-specific error code) exceptions can be specified:

  • NO_OP_UPDATE: could not update a row in the target (typically because there was no corresponding primary key)

  • NO_OP_PKUPDATE: could not update the primary key of a row in the target (typically because the "before" primary key could not be found); not supported when source is PostgreSQLReader

  • NO_OP_DELETE: could not delete a row in the target (typically because there was no corresponding primary key)

These exceptions typically occur when other applications besides Striim are writing to the target database. The unwritten events will be captured to the application's exception store, if one exists (see CREATE EXCEPTIONSTORE).

Parallel Threads

Integer

See Creating multiple writer instances.

Enabling recovery for the application disables parallel threads.

Password

encrypted password

The password for the specified user. See Encrypted passwords.

Preserve Source Transaction Boundary

Boolean

False

Set to True to ensure that all operations in each transaction are committed together.

When the target's input stream is the output of an HP NonStop source or when writing to an HP NonStop database, this setting must be False.

This setting interacts with CommitPolicy as follows:

When PreserveSourceTransactionBoundary is True and CommitPolicy is disabled, each transaction will be committed when all of its operations have been received. For example, if you have a series of three transactions containing 300, 400, and 700 operations, there will be three commits.

When PreserveSourceTransactionBoundary is True and CommitPolicy has a positive EventCount value, that value is the minimum number of operations included in each commit. For example, if CommitPolicy includes EventCount=1000 and you have a series of three transactions containing 300, 400, and 700 operations, there will be one commit, after the third transaction (because the first two transactions had a total of only 700 operations, less than the EventCount value).

Statement Cache Size

Integer

50

The number of prepared statements that Database Writer can cache. When the number of cached statements exceeds this number, the least recently used statement is dropped. When a DatabaseWriter Oracle target in the same application fails with the error "ORA-01000: maximum open cursors exceeded," increasing this value may resolve the problem.

Tables

String

The name(s) of the table(s) to write to. The table(s) must exist in the DBMS and the user specified in Username must have insert permission.

If a specified target table does not exist, the application will crash with an error. To skip writes to missing tables without crashing, specify TABLE_NOT_FOUND as an Ignorable Exception Code.

When the target's input stream is a user-defined event, specify a single table.

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use wildcards for the table names, but not for the schema or database. For example:

source.emp,target.emp
source.db1,target.db1;source.db2,target.db2
source.%,target.%
source.mydatabase.emp%,target.mydb.%
source1.%,target1.%;source2.%,target2.%

MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

See Mapping columns for additional options.

Username

String

the DBMS user name the adapter will use to log in to the server specified in ConnectionURL

Vendor Configuration

String

When the target is Oracle and it uses SSL, specify the required SSL properties (see the notes on SSL Config in Oracle Reader properties).

When the target is SQL Server, the following configuration options are supported. If the target table contains an identity, rowversion, or timestamp column and you do not specify the relevant option(s), the application will crash.

  • enableidentityInsert=true: for insert operations only (not updates), replicate identity column values from the source to the target using identity inserts

  • excludeColTypes={identity|rowversion|timestamp}: ignore any identity, rowversion, or timestamp values in the source and have the target database supply values; to specify multiple options, separate them with a comma, for example, exludeColTypes=identity,rowversion

  • To combine both options, separate them with a semicolon. For example, enableidentityInsert=true; exludeColTypes=timestamp would replicate identity column values and have the target database supply timestamp values.

Note

PostgreSQL does not allow NULL (\0x00) character values (not to be confused with database NULLs) in text columns. If writing to PostgreSQL from a source that contains such values, Contact Striim support for a workaround.

The following example uses an input stream of a user-defined type. When the input is the output of a CDC or DatabaseReader source, see Replicating data from one Oracle instance to another.

The following TQL will write to a MySQL table created as follows in MySQL database mydb:

CREATE TABLE mydb.testtable (merchantId char(36), dateTime datetime, amount decimal(10,2), zip char(5));

The striim user must have insert permission on mydb.testtable.

CREATE SOURCE PosSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:yes
)
OUTPUT TO RawStream;
 
CREATE CQ CsvToPosData
INSERT INTO PosDataStream partition by merchantId
SELECT TO_STRING(data[1]) as merchantId,
       TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
       TO_DOUBLE(data[7]) as amount,
       TO_STRING(data[9]) as zip
FROM RawStream;

CREATE TARGET WriteMySQL USING DatabaseWriter (
  connectionurl: 'jdbc:mysql://192.168.1.75:3306/mydb',
  Username:'striim',
  Password:'******',
  Tables: 'mydb.testtable'
) INPUT FROM PosDataStream;
Creating the checkpoint table

When recovery is not enabled, there is no need to create the checkpoint table.

When recovery is enabled, DatabaseWriter uses the table specified by the CheckpointTable property to store information used to ensure that there are no missing or duplicate events after recovery (see Recovering applications). Before starting DatabaseWriter with recovery enabled, use the following DDL to create the table, and grant insert, update, and delete privileges to the user specified in the Username property. The table and column names are case-sensitive, do not change them.

HP NonStop SQL/MX (replace <catalog>.<schema> with the catalog and schema in which to create the table):

CREATE TABLE <catalog>.<schema>.CHKPOINT (
  ID VARCHAR(100) NOT NULL NOT DROPPABLE PRIMARY KEY,
  SOURCEPOSITION VARCHAR(30400),
  PENDINGDDL NUMERIC(1),
  DDL VARCHAR(2000)
) ATTRIBUTES BLOCKSIZE 32768;

Microsoft SQL Server:

CREATE TABLE CHKPOINT (
  id VARCHAR(100) PRIMARY KEY,
  sourceposition VARBINARY(MAX), 
  pendingddl BIT, 
  ddl VARCHAR(MAX));

MySQL:

CREATE TABLE CHKPOINT (
  id VARCHAR(100) PRIMARY KEY, 
  sourceposition BLOB, 
  pendingddl BIT(1), 
  ddl LONGTEXT);

Oracle:

CREATE TABLE CHKPOINT (
  ID VARCHAR2(100) PRIMARY KEY, 
  SOURCEPOSITION BLOB, 
  PENDINGDDL NUMBER(1), 
  DDL CLOB);

PostgreSQL:

create table chkpoint (
  id character varying(100) primary key,
  sourceposition bytea,
  pendingddl numeric(1), 
  ddl text);
Database Writer data type support and correspondence

Use the following when the input stream is of a user-defined type. (See the Change Data Capture Guide when the input is the output of a CDC or DatabaseReader source.)

Most Striim data types can map to any one of several column types in the target DBMS.

TQL type

Cassandra

MariaDB / MySQL

Oracle

java. lang. Byte

blob

  • BIGINT

  • LONGTEXT

  • MEDIUMINT

  • MEDIUMTEXT

  • SMALLINT

  • TEXT

  • TINYINT

  • INT

  • NUMBER

java. lang. Double

double

  • DOUBLE

  • REAL

  • BINARY_DOUBLE

  • BINARY_FLOAT

  • FLOAT

  • NUMBER

java. lang. Float

float

FLOAT

  • BINARY_DOUBLE

  • BINARY_FLOAT

  • FLOAT

  • NUMBER

java. lang. Integer

int

  • BIGINT

  • INT

  • MEDIUMINT

  • SMALLINT

  • TINYINT

  • INT

  • NUMBER

java. lang. Long

bigint

  • BIGINT

  • SMALLINT

  • TINYINT

  • INT

  • NUMBER

java. lang. Short

int

  • BIGINT

  • SMALLINT

  • TINYINT

  • INT

  • NUMBER

java. lang. String

varchar

  • CHAR

  • TINYTEXT

  • VARCHAR

  • CHAR

  • NCHAR

  • NVARCHAR

  • VARCHAR

  • VARCHAR2

org.joda. time. DateTime

timestamp

  • DATE

  • DATETIME

  • TIMESTAMP

  • YEAR

  • DATE

  • TIMESTAMP

TQL type

PostgreSQL

SAP HANA

SQL Server

java. lang. Byte

not supported

  • BLOB

  • VARBINARY

  • BIGINT

  • SMALLINT

  • TEXT

  • TINYINT

java. lang. Double

double precision

  • DOUBLE

  • FLOAT

FLOAT

java. lang. Float

float

  • FLOAT

  • REAL

  • FLOAT

  • REAL

java. lang. Integer

  • integer

  • serial

INTEGER

  • BIGINT

  • NUMERIC

  • SMALLINT

  • TINYINT

java. lang. Long

  • bigint

  • bigserial

BIGINTEGER

  • BIGINT

  • SMALLINT

  • TINYINT

java. lang. Short

  • smallint

  • smallserial

SMALLINT

  • BIGINT

  • SMALLINT

  • TINYINT

java. lang. String

  • character

  • character varying

  • date

  • numeric

  • text

  • timestamp with timezone

  • timestamp without timezone

  • ALPHANUM

  • NVARCHAR

  • VARCHAR

  • CHAR

  • NCHAR

  • NVARCHAR

  • TEXT

  • UNIQUEIDENTIFER

  • VARCHAR

  • XML

org.joda. time. DateTime

  • timestamp with timezone

  • timestamp without timezone

  • DATE

  • SECONDDATE

  • TIME

  • TIMESTAMP

  • DATE

  • DATETIME

  • DATETIME2

  • TIME

Viewing discarded events

When DatabaseWriter can not find an expected target table, Striim writes the event to its server log and increments the Discarded Event Count for the target.

To view the Discarded Event Count in the Striim console, enter mon <namespace>.<target name>

To view the Discarded Event Count in the web UI, go to the Monitor page, click the application name, click Targets, and click More Details next to the target.

For details of the individual discards, search striim.server.log for "No target found" (see Reading log files).

File Writer

Writes to files.

property

type

default value

notes

Directory

String

  • If no directory is specified, the file will be written to the Striim program directory.

  • If a directory name is specified without a path, it will be created in the the Striim program directory.

  • If the specified directory does not exist, it will be created, provided the Striim server process has the necessary permissions.

  • In a multi-server environment, if the directory is local to the Striim server, each server's file will contain only the events processed on that server.

See Setting output names and rollover / upload policies for advanced options.

Encryption Policy

String

See Setting encryption policies.

Data Encryption Key Passphrase

encrypted password

See Setting encryption policies.

File Name

String

The base name of the files to be written. See Setting output names and rollover / upload policies.

Flush Policy

Integer

EventCount:10000, Interval:30s

If data is not flushed properly with the default setting, you may use this property to specify how many events FileWriter will accumulate before it writes to disk and/or the maximum number of seconds that will elapse between writes. For example:

  • 'eventcount:5000'

  • 'interval:10'

  • 'interval:10,eventcount:5000'

With a setting of 'eventcount:1', each event will be written to disk immediately. This can be useful during development, debugging, testing, and troubleshooting.

Rollover on DDL

Boolean

True

Has effect only when the input stream is the output stream of a CDC reader source. With the default value of True, rolls over to a new file when a DDL event is received. Set to False to keep writing to the same file.

Rollover Policy

String

EventCount:10000, Interval:30s

See Setting output names and rollover / upload policies.

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

GCS Writer

Writes to Google Cloud Storage.

property

type

default value

notes

Bucket Name

String

The GCS bucket name. If it does not exist, it will be created (provided Location and Storage Class are specified).

See Setting output names and rollover / upload policies for advanced options.

Note the limitations in Google's Bucket and Object Naming Guidelines. Note particularly that bucket names must be unique not just within your project or account but across all Google Cloud Storage accounts.

See Setting output names and rollover / upload policies for advanced Striim options.

Client Configuration

String

Optionally, specify one or more of the following property-value pairs, separated by commas, to override Google's defaults (see Class RetrySettings):

  • connectionTimeout=<value>: default is 30000

  • msxErrorRetry=<value>: default is 3

  • retryDelay=<value>: default is 30000

Compression Type

String

Set to gzip when the input is in gzip format. Otherwise, leave blank.

Data Encryption Key Passphrase

encrypted password

See Setting encryption policies.

Encryption Policy

String

See Setting encryption policies.

Folder Name

String

Optionally, specify a folder within the specified bucket. If it does not exist, it will be created.

See Setting output names and rollover / upload policies for advanced options.

Location

String

The location of the bucket, which you can find on the bucket's overview tab (see Bucket Locations).

Object Name

String

The base name of the files to be written. See Google's Bucket and Object Naming Guidelines.

See Setting output names and rollover / upload policies for advanced Striim options.

Parallel Threads

Integer

See Creating multiple writer instances.

Partition Key

String

If you enable ParallelThreads, specify a field to be used to partition the events among the threads.  Events will be distributed among multiple folders based on this field's values. 

If the input stream is of any type except WAEvent, specify the name of one of its fields.

If the input stream is of the WAEvent type, specify a field in the METADATA map (see HP NonStop reader WAEvent fieldsMySQLReader WAEvent fieldsOracleReader WAEvent fields, or MS SQL Reader WAEvent fields) using the syntax @METADATA(<field name>), or a field in the USERDATA map (see Adding user-defined data to WAEvent streams), using the syntax @USERDATA(<field name>). If appropriate, you may concatenate multiple METADATA and/or USERDATA fields.

Project Id

String

The Google Cloud Platform project for the bucket.

Roll Over on DDL

Boolean

True

Has effect only when the input stream is the output stream of a CDC reader source. With the default value of True, rolls over to a new file when a DDL event is received. Set to False to keep writing to the same file.

Service Account Key

String

The path (from root or the Striim program directory) and file name to the .json credentials file downloaded from Google (see Service Accounts). This file must be copied to the same location on each Striim server that will run this adapter, or to a network location accessible by all servers. The associated service account must have the Storage Legacy Bucket Writer role for the specified bucket.

Storage Class

String

The storage class of the bucket, which you can find on the bucket's overview tab (see Bucket Locations).

Upload Policy

String

eventcount:10000, interval:5m

The upload policy may include eventcount, interval, and/or filesize (see Setting output names and rollover / upload policies for syntax). Cached data is written to GCS every time any of the specified values is exceeded. With the default value, data will be written every five minutes or sooner if the cache contains 10,000 events. When the app is undeployed, all remaining data is written to GCS.

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

For example:

CREATE APPLICATION testGCS;

CREATE SOURCE PosSource USING FileReader ( 
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
  positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false ) 
OUTPUT TO PosSource_Stream;

CREATE CQ PosSource_Stream_CQ 
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET GCSOut USING GCSWriter  ( 
  bucketName: 'mybucket',
  objectName: 'myobjectname',
  serviceAccountKey: 'conf/myproject-ec6f8b0e3afe.json',
  projectId: 'myproject' ) 
FORMAT USING DSVFormatter  () 
INPUT FROM PosSource_TransformedStream;

END APPLICATION testGCS;

Google PubSub Writer

Writes to an existing topic in Google Cloud Pub/Sub.

Only async mode is supported. Consequently, events may be written out of order.

property

type

default value

notes

Batch Policy

String

EventCount:1000, Interval:1m, Size:1000000

Cached data is written to the target every time one of the specified values is exceeded. With the default value, data will be written once a minute or sooner if the buffer contains 1000 events or 1,000,000 bytes of data. When the application is stopped any remaining data in the buffer is discarded.

Due to google-cloud-java issue #4757, the maximum supported value for EventCount is 1000.

Project ID

String

the project to which the PubSub instance belongs

PubSub Config

String

RetryDelay: 1, MaxRetryDelay:60, TotalTimeout:600, InitialRpcTimeout:10, MaxRpcTimeout:600, RetryDelayMultiplier:2.0, NumThreads:10, MaxOutstandingElementCount:1000, MaxOutstandingRequestBytes:1000000

Do not change these values except as instructed by Striim support.

Service Account Key

String

The path (from root or the Striim program directory) and file name to the .json credentials file downloaded from Google (see Service Accounts). This file must be copied to the same location on each Striim server that will run this adapter, or to a network location accessible by all servers.

If a value for this property is not specified, Striim will use the $GOOGLE_APPLICATION_CREDENTIALS environment variable.

The service account must have the PubSub Publisher or higher role for the topic (see Access Control).

Topic

String

the topic to publish to

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Sample application:

CREATE APPLICATION GooglePubSubWriterTest;

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
  positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;

CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET GooglePubSubTarget USING GooglePubSubWriter (
  ServiceAccountKey:'my-pubsub-cb179721c223.json',
  ProjectId:'my-pubsub',
  Topic:'mytopic'
)
FORMAT USING JSONFormatter ()
INPUT FROM PosSource_TransformedStream;

END APPLICATION GooglePubSubWriterTest;

Hazelcast Writer

Writes to Hazelcast maps.

The following describes use of HazelcastWriter with an input stream of a user-defined type. For use with a CDC reader or DatabaseReader input stream of type WAEvent, see Replicating Oracle data to a Hazelcast "hot cache".

property

type

default value

notes

BatchPolicy

String

eventCount:10000, Interval:30

This property is used only when the Mode is InitialLoad.

The batch policy includes eventCount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to EventCount:1,Interval:0.

With the default setting, data will be written every 30 seconds or sooner if the buffer accumulates 10,000 events.

ClusterName

String

the Hazelcast cluster group-name value, if required

ConnectionURL

String

<IP address>:<port> of the Hazelcast server (cannot be on the same host as Striim)

Maps

String

With an input stream of a user-defined type, the name of the Hazelcast map to write to. See the example below.

With an input stream of type WAEvent, <source table 1 name>,<Hazelcast map 1 name>;<source table 2 name>,<Hazelcast map 2 name>;... . For an example, see Replicating Oracle data to a Hazelcast "hot cache".

Mode

String

incremental

With an input stream of a user-defined type, do not change the default. See Replicating Oracle data to a Hazelcast "hot cache" for more information.

ORMFile

String

fully qualified filename of the Hazelcast object-relational mapping (ORM) file

Password

encrypted password

the group-password value corresponding to the group-name value specified in ClusterName

To use HazelcastWriter, you must:

  1. Write a Java class defining the Plain Old Java Objects (POJOs) corresponding to the input stream's type (see http://stackoverflow.com/questions/3527264/how-to-create-a-pojo for more information on POJOs), compile the Java class to a .jar file, copy it to the Striim/lib directory of each Striim server that will run the HazelcastWriter target, and restart the server. If the class is missing, a "ClassNotFound" error will be written to the log.

  2. Write an XML file defining the object-relational mapping to be used to map stream fields to Hazelcast maps (the "ORM file") and save it in a location accessible to the Striim cluster.

The following example assumes the following stream definition:

CREATE TYPE invType (
  SKU String key,
  STOCK String,
  NAME String,
  LAST_UPDATED DateTime);
CREATE STREAM invStream OF invType;

The following Java class defines a POJO corresponding to the stream:

package com.customer.vo;
import java.io.Serializable;
import java.util.Date;
public class ProductInvObject  implements Serializable {

   public Long sku = 0;
   public double stock = 0;
   public String name = null;
   public Date lastUpdated = null;

   public ProductInvObject ( ) {    }

   @Override
   public String toString() {
       return "sku : " + sku + ", STOCK:" + stock + ", NAME:" + name + ", LAST_UPDATED:" + lastUpdated  ;
   }
}

The following ORM file maps the input stream fields to Hazelcast maps (see the discussion of data type support below):

<?xml version="1.0" encoding="UTF-8"?>
<entity-mappings xmlns="http://www.eclipse.org/eclipselink/xsds/persistence/orm" version="2.4">

    <entity name="prodcInv" class="com.customer.vo.ProductInvObject" >  
        <table name="MYSCHEMA.INV"/>
        <attributes>
            <id name ="sku" attribute-type="Long"  >
                <column nullable="false" name="SKU" />
            </id>
            <basic name="stock" attribute-type="double" >
                <column nullable="false" name="STOCK"  />
            </basic>
            <basic name="name" attribute-type="String" >
                <column  name="NAME" />
            </basic>
            <basic name="lastUpdated"  attribute-type="java.util.Date" >
                <column name="LAST_UPDATED" />
            </basic>
        </attributes>
    </entity>

</entity-mappings>

Data types are converted as specified in the OML file. Supported types on the Hazelcast side are:

  • binary (byte[])

  • Character, char

  • Double, double

  • Float, float

  • int, Integer

  • java.util.Date

  • Long, long

  • Short, short

  • String

Assuming that the ORM file has been saved to Striim/Samples/TypedData2HCast/invObject_orm.xml, the following TQL will write the input stream events to Hazelcast:

CREATE TARGET HazelOut USING HazelcastWriter (
  ConnectionURL: '203.0.1113.50:5702',
  ormFile:"Samples/TypedData2HCast/invObject_orm.xml",
  mode: "incremental",
  maps: 'invCache'
)
INPUT FROM invStream;

If the application crashes, after recovery (see Recovering applications) the Hazelcast map may contain some duplicate events. If Hazelcast crashes, stop the application, restart Hazelcast, then restart the application.

HBase Writer

Writes to an HBase database.

The following describes use of HBaseWriter with an input stream of a user-defined type. For use with a CDC reader or DatabaseReader input stream of type WAEvent, see Replicating Oracle data to HBase.

For information on which firewall ports must be open, see the hbase-site.xml file specified by HBaseConfigurationPath.

property

type

default value

notes

Authentication Policy

String

If the target HBase instance is unsecured, leave this blank. If it uses Kerberos authentication, provide credentials in the format Kerberos, Principal:<Kerberos principal name>, KeytabPath:<fully qualified keytab file name>. For example: authenticationpolicy:'Kerberos, Principal:nn/ironman@EXAMPLE.COM, KeytabPath:/etc/security/keytabs/nn.service.keytab'

Batch Policy

String

eventCount:1000, Interval:30

The batch policy includes eventCount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to EventCount:1,Interval:0.

With the default setting, data will be written every 30 seconds or sooner if the buffer accumulates 1000 events.

HBase Configuration Path

String

Fully-qualified name of the hbase-site.xml file. Contact your HBase administrator to obtain a valid copy of the file if necessary, or to mark the host as a Hadoop client so that the file gets distributed automatically.

PK Update Handling Mode

String

ERROR

With the default value, when the input stream contains an update to a primary key, the application stops and its status is ERROR. With the value IGNORE, primary key update events are ignored and the application continues.

To support primary key updates, set to DELETEANDINSERT. The Compression property in the CDC source reader must be set to False.

Parallel Threads

Integer

See Creating multiple writer instances.

Tables

String

the input stream type and name of the HBase table to write to, in the format <table name>.<column family name> (case-sensitive; multiple tables are supported only when Replicating Oracle data to HBase)

The input stream's type must define a key field.

The following TQL will write to the HBase table posdata of ColumnFamily striim:

CREATE SOURCE PosSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:yes
)
OUTPUT TO RawStream;

CREATE TYPE PosData(
  merchantId String KEY, 
  dateTime DateTime, 
  amount Double, 
  zip String
);
CREATE STREAM PosDataStream OF PosData;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream 
SELECT TO_STRING(data[1]), 
       TO_DATEF(data[4],'yyyyMMddHHmmss'),
       TO_DOUBLE(data[7]),
       TO_STRING(data[9])
FROM RawStream;

CREATE TARGET WriteToHBase USING HBaseWriter(
  HBaseConfigurationPath:"/usr/local/HBase/conf/hbase-site.xml",
  Tables: "posdata.striim"
INPUT FROM PosDataStream;

In HBase, merchantId, dateTime, hourValue, amount, and zip columns will automatically be created under the striim ColumnFamily if they do not already exist.

HDFS Writer

Writes to files in the Hadoop Distributed File System (HDFS). 

Warning

If your version of Hadoop does not include the fix for HADOOP-10786, HDFSWriter may crash due to Kerberos ticket expiration.

To write to MapR-FS, use MapRFSWriter. HDFSWriter and MapRFSWriter use the same properties except for the difference in hadoopurl noted below and the different names for the configuration path property.

property

type

default value

notes

authentication policy

String

If the HDFS cluster uses Kerberos authentication, provide credentials in the format Kerberos, Principal:<Kerberos principal name>, KeytabPath:<fully qualified keytab file name>. Otherwise, leave blank. For example: authenticationpolicy:'Kerberos, Principal:nn/ironman@EXAMPLE.COM, KeytabPath:/etc/security/keytabs/nn.service.keytab'

Directory

String

The full path to the directory in which to write the files. See Setting output names and rollover / upload policies for advanced options.

File Name

String 

The base name of the files to be written. See Setting output names and rollover / upload policies.

flush policy

String

eventcount:10000, interval:30s

If data is not flushed properly with the default setting, you may use this property to specify how many events Striim will accumulate before writing and/or the maximum number of seconds that will elapse between writes. For example:

  • flushpolicy:'eventcount:5000'

  • flushpolicy:'interval:10s'

  • flushpolicy:'interval:10s, eventcount:5000'

Note that changing this setting may significantly degrade performance.

With a setting of 'eventcount:1', each event will be written immediately. This can be useful during development, debugging, testing, and troubleshooting.

hadoopConfigurationPath

String

If using Kerberos authentication, specify the path to Hadoop configuration files such as core-site.xml and hdfs-site.xml. If this path is incorrect or the configuration changes, authentication may fail.

hadoopurl

String

The URI for the HDFS cluster NameNode. See below for an example. The default HDFS NameNode IPC port is 8020 or 9000 (depending on the distribution). Port 50070 is for the web UI and should not be specified here.

For an HDFS cluster with high availability, use the value of the dfs.nameservices property from hdfs-site.xml with the syntax hadoopurl:'hdfs://<value>', for example, hdfs://'mycluster'.  When the current NameNode fails, Striim will automatically connect to the next one.

When using MapRFSWriter, you may start the URL with hdfs:// or maprfs:/// (there is no functional difference).

MapRDBConfigurationPath

String

see notes for hadoopConfigurationPath

Rollover on DDL

Boolean

True

Has effect only when the input stream is the output stream of a CDC reader source. With the default value of True, rolls over to a new file when a DDL event is received. Set to False to keep writing to the same file.

Rollover Policy

String

interval:60s

See Setting output names and rollover / upload policies.

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

The following sample writes some of the PosApp sample dat to the file /output/hdfstestOut in the specified HDFS instance:

CREATE SOURCE CSVSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  WildCard:'posdata.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:'yes'
)
OUTPUT TO CsvStream;

CREATE TYPE CSVType (
  merchantId String,
  dateTime DateTime,
  hourValue Integer,
  amount Double,
  zip String
);
CREATE STREAM TypedCSVStream OF CSVType;

CREATE CQ CsvToPosData
INSERT INTO TypedCSVStream
SELECT data[1],
  TO_DATEF(data[4],'yyyyMMddHHmmss'),
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')),
  TO_DOUBLE(data[7]),
  data[9]
FROM CsvStream;

CREATE TARGET hdfsOutput USING HDFSWriter(
  filename:'hdfstestOut.txt',
  hadoopurl:'hdfs://node8057.example.com:8020',
  flushpolicy:'interval:10,eventcount:5000',
  authenticationpolicy:'Kerberos,Principal:striim/node8057.example.com@STRIIM.COM,
    KeytabPath:/etc/security/keytabs/striim.service.keytab',
  hadoopconfigurationpath:'/etc/hadoop/conf',
  directory:'/user/striim/PosAppOutput' 
)
FORMAT USING DSVFormatter (
)
INPUT FROM TypedCSVStream;

Hive Writer

Writes to one or more tables in Apache Hive.

When the input stream of a HiveWriter target is of a user-defined type, it can write to Hive tables that use Avro, ORC, Parquet, or text file storage formats, and writes use SQL APPEND or INSERT INTO.

When the input stream is the output stream of a DatabaseReader or CDC source:

  • and the Mode is initialload, the storage format may be Avro, ORC, Parquet, or text file, and writes use SQL APPEND or INSERT INTO.

  • the Mode is incremental, and the storage format is Avro or Parquet, writes use SQL APPEND or INSERT INTO.

  • the Mode is incremental, and the storage format is ORC, Hive ACID transactions must be enabled and writes use SQL MERGE (which your version of Hive must support). In this case, there will be no duplicate events written to Hive ("exactly-once processing") after an application crash and recovery (Recovering applications), as may happen when using SQL APPEND or INSERT INTO.

Limitations:

  • When the input stream is the output steam of a CDC reader, the reader's Compression property must be False.

  • DDL is not supported. If you need to alter the source tables, quiesce the application, change the source and target tables, and restart.

  • Columns specified in the Tables property's keycolumns option may not be updated. Any attempted update will be silently discarded.

  • Bucketed or partitioned columns may not be updated. This is a limitation of Hive, not HiveWriter.

  • Multiple instances of HiveWriter cannot write to the same table. When a HiveWriter target is deployed on multiple Striim servers, partition the input stream or use an environment variable in table mappings to ensure that they do not write to the same tables.

property

type

default value

notes

Authentication Policy

String

If the HDFS cluster uses Kerberos authentication, provide credentials in the format Kerberos, Principal:<Kerberos principal name>, KeytabPath:<fully qualified keytab file name>. Otherwise, leave blank. For example: authenticationpolicy:'Kerberos, Principal:nn/ironman@EXAMPLE.COM, KeytabPath:/etc/security/keytabs/nn.service.keytab'

Connection URL

String

the JDBC connection URL, for example, ConnectionURL= 'jdbc:hive2:@192.0.2.5:10000'

Directory

String

By default, Striim will create an HDFS directory on the Hive server to use as a staging area. If Striim does not have permission to create the necessary directory, HiveWriter will crash with a "File Not Found" exception. To resolve that issue, create a staging directory manually and specify it here..

Hadoop Configuration Path

String

If using Kerberos authentication, specify the path to Hadoop configuration files such as core-site.xml and hdfs-site.xml. If this path is incorrect or the configuration changes, authentication may fail.

Hadoop URL

String

The URI for the HDFS cluster NameNode. See below for an example. The default HDFS NameNode IPC port is 8020 or 9000 (depending on the distribution). Port 50070 is for the web UI and should not be specified here.

For an HDFS cluster with high availability, use the value of the dfs.nameservices property from hdfs-site.xml with the syntax hadoopurl:'hdfs://<value>', for example, hdfs://'mycluster'.  When the current NameNode fails, Striim will automatically connect to the next one.

Ignorable Exception Code

Striim

Set to TABLE_NOT_FOUND if you do not want the application to crash when Striim tries to write to a table that does not exist in the target database. See Handling "table not found" errors for more information.

Merge Policy

String

eventcount:10000, interval:5m

With the default setting, events are written every five minutes or sooner if there are 10,000 events.

Mode

String

incremental

With an input stream of a user-defined type, do not change the default. See Replicating Oracle data to Hive.

Password

encrypted password

The password for the specified user. See Encrypted passwords.

Parallel Threads

Integer

See Creating multiple writer instances.

Tables

String

The name(s) of the table(s) to write to. The table(s) must exist in Hive.

When the input stream of the HiveWriter target is the output of a CDC reader or DatabaseReader source (that is, when replicating data from one database to another), HiveWriter can write to multiple tables. In this case, specify the names of both the source and target tables.

Since HIve does not have primary keys, you must use the keycolumns option to define a unique identifier for each row in the target table: for example, Tables:'DEMO.EMPLOYEE,employee keycolumns(emp_id)'. If necessary to ensure uniqueness, specify multiple columns with the syntax keycolumns(<column 1>,<column 2>,...). You may use wildcards for the source table provided all the tables have the key columns: for example, ables:'DEMO.Ora%,HIVE.Hiv% KeyColumns(...)'.

See Mapping columns for additional options.

Username

String

A Hive user for the server specified in ConnectionURL. The user must have INSERT, UPDATE, DELETE, TRUNCATE, CREATE, DROP, and ALTER privileges on the specified tables.

The following sample code writes data from PosDataPreview.csv  to Hive  (to run this code, you must first create the target table in Hive):

CREATE SOURCE PosSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:yes
)
OUTPUT TO RawStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM RawStream;

CREATE TARGET HiveSample USING HiveWriter (
  ConnectionURL:'jdbc:hive2://192.0.2.76:10000',
  Username:'hiveuser', 
  Password:'********',
  hadoopurl:'hdfs://192.0.2.76:9000/',
  Tables:'posdata'
)
FORMAT USING DSVFormatter ()
INPUT FROM PosDataStream;
HiveWriter data type support and correspondence

When the input stream is of a user-defined type:

TQL type

Hive type

java.lang.Byte

BINARY

java.lang.Double

DOUBLE

java.lang.Float

FLOAT

java.lang.Integer

INTEGER

java.lang. Long

BIGINT

java.lang.Short

SMALLINT, TINYINT

java.lang.String

CHAR, DECIMAL, INTERVAL, NUMERIC, STRING, VARCHAR

org.joda.time.DateTime

TIMESTAMP

When the input stream of a HiveWriter target is the output of an Oracle source (DatabaseReader or OracleReader):

Oracle type

Hive type

BINARY_DOUBLE

DOUBLE

BINARY_FLOAT

FLOAT

BLOB

BINARY

CHAR

CHAR, STRING, VARCHAR

CLOB

STRING

DATE

TIMESTAMP

DECIMAL

DECIMAL, DOUBLE, FLOAT

FLOAT

FLOAT

INTEGER

BIGINT, INT, SMALLINT TINYINT

LONG

BIGINT

NCHAR

STRING, VARCHAR

NUMBER

  • INT when the scale is 0 and the precision is less than 10

  • BIGINT when the scale is 0 and the precision is less than 19

  • DECIMAL when the scale is greater than 0 or the precision is greater than 19

NVARCHAR2

STRING, VARCHAR

SMALLINT

SMALLINT

TIMESTAMP

TIMESTAMP

TIMESTAMP WITH LOCAL TIME ZONE

TIMESTAMP

TIMESTAMP WITH TIME ZONE

TIMESTAMP

VARCHAR2

STRING, VARCHAR

Cloudera Hive Writer

ClouderaHiveWriter is identical to HiveWriter except that, since Cloudera's Hive distribution does not support SQL MERGE, there is no Mode property. ClouderaHiveWriter is always in InitialLoad mode, and writes always use SQL APPEND or INSERT INTO.

See Hive Writer for further discussion and documentation of the properties.

Hortonworks Hive Writer

Except for the name of the adapter, HortonworksHiveWriter is identical to HiveWriter. See Hive Writer for documentation of the properties.

HTTP Writer

Sends a custom response to an HTTP Reader source when the source's Defer Response property is set to True. If HTTPWriter does not return a response before the Defer Response Timeout specified in HTTPReader, HTTPReader will respond with error 408 with the body, "Request timed out. Make sure that there is an HTTPWriter in the current application with property Mode set to RESPOND and HTTPWriter property RequestContextKey is mapped correctly, or set HTTPReader property DeferResponse to FALSE, or check Striim server log for details."

property

type

default value

notes

Mode

String

RESPOND

Do not change default value.

Request Context Field

String

 

The name of the input stream field that contains the UUID of the HTTP Reader source that will send the response. This UUID is the value of the RequestContextField metadata field of the HTTPReader's output stream (see HttpCacheResponseApp.tql for an example).

Response Code Field

String

"200"

Status code for the custom response. Typically the default value of "200" will be appropriate unless your application has multiple instances of HTTP Writer that will handle events with various characteristics (see HttpCacheResponseApp.tql for an example).

Response Headers

String

Optionally, specify one or more header fields to be added to the custom response using the format <header name>=<value>. Separate multiple headers with semicolons. The value can be a static string (for example, Server="Striim") or the value of a specified input stream field (for example, Table=@metadata(TableName); Operation=@metadata{OperationName)).

To see how this works, run striim/docs/HTTPWriter/HttpCacheResponseApp.tql, open a terminal or command prompt, and enter the following (if Striim is not running on your local system, change 127.0.0.1 to the IP address of the Striim server):

curl --location --request POST '127.0.0.1:8765' \
--header 'Content-Type: text/csv' \
--data-raw 'COMPANY 1'

That will return the cache entry for Company 1:

{
  "BUSINESS_NAME":"COMPANY 1",
  "MERCHANT_ID":"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",
  "PRIMARY_ACCOUNT_NUMBER":"6705362103919221351",
  "POS_DATA_CODE":0,
  "DATETIME":"2607-11-27T09:22:53.210-08:00",
  "EXP_DATE":"0916",
  "CURRENCY_CODE":"USD",
  "AUTH_AMOUNT":2.2,
  "TERMINAL_ID":"5150279519809946",
  "ZIP":"41363",
  "CITY":"Quicksand"
 }

Then enter the following:

curl --location --request POST '127.0.0.1:8765' \
--header 'Content-Type: text/csv' \
--data-raw 'COMPANY 99'

Since there is no entry for Company 99 in the cache, that will return:

{
  "MESSAGE":"Requested entry was not found in cache."
}

See the comments in HttpCacheResponseApp.tql for more information.

JMS Writer

Writes data using the JMS API.

property

type

default value

notes

Connection Factory Name

String

the name of the ConnectionFactory containing the queue or topic

Ctx

String

the JNDI initial context factory name

Message Type

String

TextMessage

the other supported value is BytesMessage

Password

encrypted password

see Encrypted passwords

Provider

String

Queue Name

String

leave blank if Topic is specified

Topic

String

leave blank if QueueName is specified

Username

String

a messaging system user with the necessary permissions

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Sample code using DSVFormatter:

CREATE SOURCE JMSCSVSource USING FileReader (
  directory:'/opt/Striim/Samples/PosApp/appData',
  WildCard:'posdata.csv',
  positionByEOF:false,
  charset:'UTF-8'
)
PARSE USING DSVParser (
  header:'yes'
)
OUTPUT TO CsvStream;

CREATE TYPE CSVType (
  merchantName String,
  merchantId String,
  dateTime DateTime,
  hourValue Integer,
  amount Double,
  zip String
);

CREATE STREAM TypedCSVStream of CSVType;

CREATE CQ CsvToPosData
INSERT INTO TypedCSVStream
SELECT data[0],data[1],
  TO_DATEF(data[4],'yyyyMMddHHmmss'),
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')),
  TO_DOUBLE(data[7]),
  data[9]
FROM CsvStream;

CREATE TARGET JmsTarget USING JMSWriter (
  Provider:'tcp://192.168.123.101:61616',
  Ctx:'org.apache.activemq.jndi.ActiveMQInitialContextFactory',
  UserName:'striim',
  Password:'******',
  Topic:'dynamicTopics/Test'
)
FORMAT USING DSVFormatter (
)
INPUT FROM TypedCSVStream;

JPA Writer

Use only as directed by Striim support.

Kafka Writer

Writes to a topic in Apache Kafka.

There are five versions of KafkaWriter, 0.8.0,  0.9.0, 0.10.0, 0.11.0, and 2.1.0. Use the one that corresponds to the target Kafka broker. For example, to use 0.9.0, the syntax is CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. If writing to the internal Kafka instance, use 0.11.0.

Known issue DEV-13039: application with KafkaWriter 0.9 or 0.10 crashes if Kafka broker goes offline.

property

type

default value

notes

Broker Address

String

Kafka Config

String

Optionally, specify Kafka producer properties, separated by semicolons. See the table below for details.

Kafka Config Property Separator

String

;

Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon.

Kafka Config Value Separator

String

=

Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol.

Message Header

String

Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig batch.size=-1, specify one or more custom headers to be added to messages as key-value pairs. Values may be:

  • a field name from an in put stream of a user-defined type: for example, MerchantID=merchantID

  • a static string: for example, Company="My Company"

  • a function: for example, to get the source table name from a WAEvent input stream that is the output of a CDC reader, Table Name=@metadata(TableName)

To specify multiple custom headers, separate them with semicolons.

Message Key

String

Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig batch.size=-1, specify one or more keys to be added to messages as key-value pairs. The property value may be a static string, one or more fields from the input stream, or a combination of both. Examples:

MessageKey : CName=”Striim”

MessageKey : Table=@metadata(TableName);
  Operation=@metadata(OperationName);key1=@userdata(key1)

MessageKey : CityName=City; Zipcode=zip

MessageKey : CName=”Striim”;Table=@metadata(TableName);
  Operation=@metadata(OperationName)

Among other possibilities, you may use this property to support log compaction or to allow downstream applications to use queries based on the message payload..

Mode

String

Sync

see Setting KafkaWriter's mode property: sync versus async

Parallel Threads

Integer

See Creating multiple writer instances.

Partition Key

String

The name of a field in the input stream whose values determine how events are distributed among multiple partitions. Events with the same partition key field value will be written to the same partition.

If the input stream is of any type except WAEvent, specify the name of one of its fields.

If the input stream is of the WAEvent type, specify a field in the METADATA map (see HP NonStop reader WAEvent fieldsMySQLReader WAEvent fieldsOracleReader WAEvent fields, or MS SQL Reader WAEvent fields) using the syntax @METADATA(<field name>), or a field in the USERDATA map (see Adding user-defined data to WAEvent streams), using the syntax @USERDATA(<field name>).

Topic

String

the existing Kafka topic to write to (will not be created if it does not exist)

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Notes on the KafkaConfig property

With the exceptions noted in the following table, you may specify any Kafka producer property in KafkaConfig. 

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

retries

  • In sync mode, to prevent out-of-order events, the producer properties set in Kafka with will be unchanged and ignored, and Striim will handle these internally.

  • In async mode, Striim will update the Kafka producer properties and these will be handled by Kafka.

  • In sync mode, you may set batch.size=-1 to write one event per Kafka message. This will seriously degrade performance so is not recommended in a production environment. With this setting, messages will be similar to those in async mode.

enable.idempotence

When using version 2.1.0 and async mode, set to true to write events in order (see

key.deserializer

value is always org.apache.kafka.common.serialization.ByteArrayDeserializer, cannot be overridden by KafkaConfig

Internally, KafkaWriter invokes KafkaConsumer for various purposes, and the WARNING from the consumer API due to passing KafkaConfig  properties can be safely ignored. See Configuring Kafka for more information about Kafka producer properties.

KafkaWriter sample application

The following sample code writes data from PosDataPreview.csv to the Kafka topic KafkaWriterSample. This topic already exists in Striim's internal Kafka instance. If you are using an external Kafka instance, you must create the topic before running the application.

CREATE SOURCE PosSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:yes
)
OUTPUT TO RawStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM RawStream;

CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'(
  brokeraddress:'localhost:9092',
  topic:'KafkaWriterSample'
)
FORMAT USING DSVFormatter ()
INPUT FROM PosDataStream;

You can verify that data was written to Kafka by running the Kafka Reader sample application.

The first field in the output (position) stores information required to avoid lost or duplicate events after recovery (see Recovering applications). If recovery is not enabled, its value is NULL.

mon output (see Using the MON command) for targets using KafkaWriter includes:

  • in async mode only, Sent Bytes Rate: how many megabytes per second were sent to the brokers

  • in both sync and async mode, Write Bytes Rate: how many megabytes per second were written by the brokers and acknowledgement received by Striim

Enabling compression

When you enable compression in KafkaWriter, the broker and consumer should handle the compressed batches automatically. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:

KafkaConfg:'compression.codec=snappy'

To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'
Writing to multiple Kafka partitions

If the INPUT FROM stream is partitioned, events will be distributed among Kafka partitions based on the values in the input stream's PARTITION BY property. All events with the same value in the PARTITION BY field will be written to the same randomly selected partition. Striim will distribute the data evenly among the partitions to the extent allowed by the frequency of the various PARTITION BY field values (for example, if 80% of the events have the same value, then one of the Kafka partitions will contain at least 80% of the events). In the example above, to enable partitioning by city, you would revise the definition of TransformedDataStream as follows:

CREATE STREAM TransformedDataStream OF TransformedDataType PARTITION BY City;

To override this default behavior and send events to specific partitions based on the PARTITION BY field values, see Creating a custom Kafka partitioner.

Setting KafkaWriter's mode property: sync versus async

KafkaWriter performs differently depending on whether the mode property value is sync or async and whether recovery (see Recovering applications) is enabled for the application. The four possibilities are:

notes

sync with recovery 

Provides the most accurate output. Events are written in order with no duplicates ("exactly-once processing," also known as E1P), provided that you do not change the partitioner logic, number of partitions, or IP address used by Striim while the application is stopped.

To avoid duplicate events after recovery, the Kafka topic's retention period must be longer than the amount of time that elapses before recovery is initiated (see Recovering applications) and, if writing to multiple partitions, the brokers must be brought up in reverse of the order in which they went down.

With this configuration, two KafkaWriter targets (even if in the same application) cannot write to the same topic. Instead, use a single KafkaWriter, a topic with multiple partitions (see Writing to multiple Kafka partitions), and, if necessary, parallel threads (see Creating multiple writer instances).

async with recovery

Provides higher throughput with the tradeoff that events are written out of order (unless using KafkaWriter version 2.1 with enable.idempotence set to true in KafkaConfig) and recovery may result in some duplicates ("at-least-once processing," also known as A1P).

sync without recovery

Appropriate with non-recoverable sources (see Recovering applications) when you need events to be written in order. Otherwise, async will give better performance.

async without recovery

Appropriate with non-recoverable sources when you don't care whether events are written in order. Throughput will be slightly faster than async with recovery.

When using sync, multiple events are batched in a single Kafka message. The number of messages in a batch is controlled by the batch.size parameter, which by default is 1 million bytes. The maximum amount of time KafkaWriter will wait between messages is set by the linger.ms parameter, which by default is 1000 milliseconds. Thus, by default, KafkaWriter will write a message after it has received a million bytes or one second has elapsed since the last message was written, whichever occurs first.

 Batch.size must be larger than the largest event KafkaWriter will receive, but must not exceed the max.message.bytes size in the Kafka topic configuration.

The following setting would write a message every time KafkaWriter has received 500,000 bytes or two seconds has elapsed since the last message was written:

KafkaConfig:'batch.size=500000,linger.ms=2000'
Testing KafkaWriter performance

Striim includes two utility scripts that may be useful in tuning your Kafka configuration. Striim/tools/enhanced-producer-perf.sh writes to Kafka directly from the Striim host. Striim/tools/standalone-kafkawriter-perf.sh writes with KafkaWriter. The arguments are the same for both:

argument

notes

file

The file to be used for testing.

Specify none to use an in-memory data generator, in which case you must also specify record-size and num-records. 

input-format

If using a file, set to avro or dsv, or omit or set to none to read the file as is.

topic

The Kafka topic to write to.

producer-props

A file containing the Kafka server properties. See striim/tools/props for an example.

output-format

Set to json to format the data or omit or set to none to write as is.

record-size

If using --file none, the size of each generated record in bytes.

num-records

If using --file none, the number of records to write.

mode

Set to sync for synchronous writes, or omit or set to async for asynchronous writes (see Setting KafkaWriter's mode property: sync versus async).

single-partition

Set to true to write to all partitions of the topic. Omit or set to false to write only to partition 0.

For example, the following will generate one million 120-byte records and write them directly to Kafka.

./enhanced-producer-perf.sh --file none --topic test01  --producer-props props
 --record-size 120 --num-records 1000000 

Output from that command would be similar to:

Configuration used for testing Apache Kafka Producer:
File : none
Topic : test01
Partitions : 1
Single Partition : false
Mode : async
Output format : none
Num Records : 1000000
Record Size : 120
---------------------------------
Final Stats
Records sent : 1000000
Records/sec : 521920.6680584551
MB/sec : 62.63048016701461
Avg Latency(ms) : 559.206501
Max Latency(ms) : 1054
Avg message size (bytes): 120.0
Number of times Producer send called : 1000000
Avg time between 2 Producer send calls(ms) : 0.003199727455364971
Max time between 2 Producer send calls(ms) : 210
---------------------------------

The following will generate the same number of records of the same size but write them using KafkaWriter:

./standalone-kafkawriter-perf.sh --file none --topic test02  --producer-props props
 --record-size 120 --num-records 1000000

Output would be similar to:

Configuration used for testing Striim Kafka Writer:
File : none
Topic : test02
Partitions : 1
Single Partition : false
Mode : async
Output format : none
Num Records : 1000000
Record Size : 120
---------------------------------
Final Stats
Records sent : 1000000
Records/sec : 84918.47826086957
MB/sec : 3.9817331861413043
Avg Latency(ms) : 0.912115
Max Latency(ms) : 10018
Avg message size (bytes): 46.0
Number of times Producer send called : 1000000
Avg time between 2 Producer send calls(ms) : 0.011775
Max time between 2 Producer send calls(ms) : 10011
KafkaWriter output with AvroFormatter

In async mode, each Kafka message will contain a single event. The event will start with four bytes containing its length.

In sync mode, one message can contain multiple events. How many depends on the batch.size setting (see Setting KafkaWriter's mode property: sync versus async. Each event in the message will start with four bytes containing its length.

In sync mode, output will contain a nested record named __striimmetadata with a field position. With recovery on, this field will contain information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications). With recovery off, the value of this field will be null.

For input events of this user-defined Striim type:

Create Type PERSON (
  ID int,
  City String,
  Code String,
  Name String);

the schema would be:

{
    "type": "record",
    "name": "PERSON",
    "namespace": "AVRODEMO",
    "fields": [{
        "name": "ID",
        "type": ["null", "int"]
    }, {
        "name": "CITY",
        "type": ["null", "string"]
    }, {
        "name": "CODE",
        "type": ["null", "string"]
    }, {
        "name": "NAME",
        "type": ["null", "string"]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

For input events of type WAEvent, the schema would be similar to:

{
  "namespace": "WAEvent.avro",
  "type": "record",
  "name": "WAEvent",
  "fields": [{
      "name": "metadata",
      "type": ["null",
        {"type": "map", "values": ["null", "string"] }
      ]
    },
    {
      "name": "data",
      "type": ["null",
        {"type": "map","values": ["null", "string"] }
      ]
    },
    {
      "name": "before",
      "type": ["null",
        {"type": "map", "values": ["null", "string"] }
      ]
    },
    {
      "name": "userdata",
      "type": ["null",
        {"type": "map", "values": ["null", "string"] }
      ]
    },
    {
      "name": "__striimmetadata",
      "type": {
        "type": "record",
        "name": "StriimMeta_Record",
        "fields": [
          {"name": "position", "type": ["null", "string"] }
        ]
      }
    }
  ]
}

and output would be similar to:

{
"data" : { "ID" : "1" , "NAME" : "User One" },
"before":{ "null" },
"metadata" : {
       "TABLENAME" : "Employee",
        "CommitTimestamp" : "12-Dec-2016 19:13:01",
         "OperationName" : "UPDATE"  },
"userdata":{ "null" },
"__striimmetadata" :  { "position" : SCN:1234002356" }
}
KafkaWriter output with DSVFormatter

Each output record will begin with a field containing information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications).

For input events of this user-defined Striim type:

CREATE TYPE emptype ( id int, name string);

output would be similar to:

1234002350,1,User One

For input events of type WAEvent, output (Position, TableName, CommitTimestamp, Operation, data) would be similar to:

SCN:1234002344,Employee,12-Dec-2016 19:13:00,INSERT,1,User One
KafkaWriter output with JSONFormatter

Each JSON output node will contain a field named __striimmetadata with a nested field position containing information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications).

For input events of this user-defined Striim type:

CREATE TYPE emptype ( id int, name string);

output would be similar to:

{
    "ID": 1,
    "Name": "User One", 
    "__striimmetadata"  :  {"position" : "SCN:1234002344" } 
}

For input events of type WAEvent, output would be similar to:

{
"metadata" : { "TABLENAME" : "Employee","CommitTimestamp" : "12-Dec-2016 19:13:00", 
  "OperationName" : "INSERT"  }
"data" : { 
      "ID" : "1",
      "NAME" : "User One"},
"__striimmetadata" :  { "position" : "SCN:1234002350" }   // but in binary format
}
KafkaWriter output with XMLFormatter

Each output record will include an extra element __striimmetadata with a nested element position containing information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications).

For input events of this user-defined Striim type:

CREATE TYPE emptype ( id int, name string);

output would be similar to:

...
<ID> 1 </ID>
<Name> User One </Name> 
<__striimmetadata>
  <position>1234002344</position>
</__striimmetadata> ...
Using the Confluent or Hortonworks schema registry

Note

This feature requires Kafka 0.11 (recommended) or 0.10.

You may use the Confluent or Hortonworks schema registry by selecting AvroFormatter and specifying its schemaRegistryURL. property, for example, schemaRegistryURL:'http://198.51.100.55:8081.

Tracking schema evolution of database tables

When a KafkaWriter target's input stream is the output stream of a DatabaseReader or CDC reader source, this allows you to track the evolution of database tables over time. The first time the Striim application is run, KafkaWriter creates a record in the schema registry for each table being read. Each schema record has a unique ID which which is stored in the next four bytes (bytes 4-7) after the record length (bytes 0-3) of the data records for the associated table.

Each time KafkaWriter receives a DDL event, it writes a new schema record for the referenced table. From then until the next schema change, that schema record's unique ID is stored in data records for the associated table.

In this case, in addition to specifying AvroFormatter's schemaRegistryURL property, you must set its formatAs property to table or native.

Tracking evolution of Striim stream types

When a KafkaWriter target's input stream is of a user-defined type, the schema registry allows you to track the evolution of that type over time. The first time the Striim application is run, KafkaWriter creates a record in the schema registry for the input stream's type. Each time the application the KafkaWriter's input stream's type is changed using ALTER and RECOMPILE, KafkaWriter will create a new schema record. As with database tables, the schema records' unique IDs associate them with their corresponding data records.

In this case, specify AvroFormatter's schemaRegistryURL property and leave formatAs unspecified or set to default.

Reading schema and data records together

Consuming applications can use the following code to combine schema and data records into Avro records

package test.kafka.avro;
 
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import org.apache.kafka.common.TopicPartition;
 
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Properties;
import java.util.List;
 
public class KafkaAvroConsumerUtilWithDeserializer {
 
    private KafkaConsumer<byte[], Object> consumer;
     
    public KafkaAvroConsumerUtilWithDeserializer(String configFileName) throws Exception {
         
        Properties props = new Properties();
        InputStream in = new FileInputStream(configFileName);
        props.load(in);
 
        this.consumer = new KafkaConsumer<byte[], Object>(props);
     
        TopicPartition tp = new TopicPartition(props.getProperty("topic.name"), 0);
        List<TopicPartition> tpList = new ArrayList<TopicPartition>();
        tpList.add(tp);
        this.consumer.assign(tpList);
        this.consumer.seekToBeginning(tpList);
    }
     
    public void consume() throws Exception {
        while(true) {
            ConsumerRecords<byte[], Object> records = consumer.poll(1000);
            for(ConsumerRecord<byte[], Object> record : records) {
                System.out.println("Topic " + record.topic() + " partition " + record.partition()
                    + " offset " + record.offset() + " timestamp " + record.timestamp());
                List<GenericRecord> avroRecordList = (List<GenericRecord>) record.value();
                for(GenericRecord avroRecord : avroRecordList) {
                    System.out.println(avroRecord);
                }
            }
        }
    }  
 
    public void close() throws Exception {
        if(this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
    }
     
    public static void help() {
        System.out.println("Usage :\n x.sh {path_to_config_file}");
    }
     
    public static void main(String[] args) throws Exception {
        if(args.length != 1) {
            help();
            System.exit(-1);
        }
        String configFileName = args[0];
        System.out.println("KafkaConsumer config file : " + configFileName);
        KafkaAvroConsumerUtilWithDeserializer consumerutil = null;
        try {
            consumerutil = new KafkaAvroConsumerUtilWithDeserializer(configFileName);
            consumerutil.consume();
        } finally {
            if(consumerutil != null) {
                consumerutil.close();
                consumerutil = null;
            }
        }
    }
}

The pom.xml for that class must include the following dependencies. Adjust the Kafka version to match your environment.

<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.7.7</version>
    </dependency>
</dependencies>

StriimKafkaAvroDeserializer.jar (which you can find under the Striim program directory in Client/Kafka/lib) must be in the classpath when you start your application.

The config file for your Kafka consumer should be similar to:

bootstrap.servers=192.168.1.35:9092
topic.name=test_sync_registry
schemaregistry.url=http://192.168.1.35:8081/
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=com.striim.kafka.deserializer.KafkaAvroDeserializer
group.id=KafkaAvroDemoConsumer

The command to start the consumer with the deserializer should be something like:

java -Xmx1024m -Xms256m -Djava.library.path=/usr/local/lib:/usr/bin/java \
  -cp "target/classes:target/dependency/*:<path to \
  StriimKafkaAvroDeserializer.jar>" com.striim.kafka.KafkaAvroConsumerUtil $*
Schema registry REST API calls

The following Kafka schema registry REST API calls may be useful in using the schema registry.

To list all the subjects in the schema registry, use curl -X GET http://localhost:8081/subjects. This will return a list of subjects:

["AVRODEMO.EMP","AVRODEMO.DEPT","DDLRecord"]

This shows that there are schemas for the EMP and DEPT tables. DDLRecord defines the schemas for storing DDL events.

To list the versions for a particular schema, use curl -X GET http://localhost:8081/subjects/<subject>/versions. If there were three versions of the EMP table's schema, curl -X GET http://localhost:8081/subjects/AVRODEMO.EMP/versions would return:

[1,2,3]

To see the second version of that schema, you would use curl -X GET http://localhost:8081/subjects/AVRODEMO.EMP/versions/2:

{
    "subject": "AVRODEMO.EMP",
    "version": 2,
    "id": 261,
    "schema": {
        "type": "record",
        "name": "EMP",
        "namespace": "AVRODEMO",
        "fields": [{ ...

The first three lines are Kafka metadata. The subject property, which identifies the table, is the same for all schema records associated with that table. The  version property records the order in which the schema versions were created. id is the schema's unique identifier. The rest of the output is the schema definition in one of the formats shown below.

Record formats for database schema evolution

For example, say you have the following Oracle table:

CREATE TABLE EMP(  
  EMPNO    NUMBER(4,0),  
  ENAME    VARCHAR2(10),  
  JOB      VARCHAR2(9),  
  HIREDATE TIMESTAMP,  
  SAL      BINARY_DOUBLE,  
  COMM     BINARY_FLOAT,  
  DEPTNO   INT ...

Let's start by looking at the default format of an Avro-formatted Kafka record for an input type of WAEvent:

{
    "type": "record",
    "name": "WAEvent",
    "namespace": "WAEvent.avro",
    "fields": [{
        "name": "metadata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "data",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "before",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "userdata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

For more information, see WAEvent contents for change data and Parsing the fields of WAEvent for CDC readers.

AvroFormatter's formatAs property allows two other formats, table and native. When you use one of these, the first time an event is received from the table, a record is written in the schema registry in the following format:

formatAs: 'table'

formatAs: 'native'

{
    "type": "record",
    "name": "EMP",
    "namespace": "AVRODEMO",
    "fields": [{
        "name": "EMPNO",
        "type": ["null", "string"]
    }, {
        "name": "ENAME",
        "type": ["null", "string"]
    }, {
        "name": "JOB",
        "type": ["null", "string"]
    }, {
        "name": "HIREDATE",
        "type": ["null", "string"]
    }, {
        "name": "SAL",
        "type": ["null", "double"]
    }, {
        "name": "COMM",
        "type": ["null", "float"]
    }, {
        "name": "DEPTNO",
        "type": ["null", "string"]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}











































































































{
    "type": "record",
    "name": "EMP",
    "namespace": "AVRODEMO",
    "fields": [{
        "name": "data",
        "type": ["null", {
            "type": "record",
            "name": "data_record",
            "namespace": "data_record",
            "fields": [{
                "name": "EMPNO",
                "type": ["null", "string"]
            }, {
                "name": "ENAME",
                "type": ["null", "string"]
            }, {
                "name": "JOB",
                "type": ["null", "string"]
            }, {
                "name": "HIREDATE",
                "type": ["null", "string"]
            }, {
                "name": "SAL",
                "type": ["null", "double"]
            }, {
                "name": "COMM",
                "type": ["null", "float"]
            }, {
                "name": "DEPTNO",
                "type": ["null", "string"]
            }]
        }]
    }, {
        "name": "before",
        "type": ["null", {
            "type": "record",
            "name": "before_record",
            "namespace": "before_record",
            "fields": [{
                "name": "EMPNO",
                "type": ["null", "string"]
            }, {
                "name": "ENAME",
                "type": ["null", "string"]
            }, {
                "name": "JOB",
                "type": ["null", "string"]
            }, {
                "name": "HIREDATE",
                "type": ["null", "string"]
            }, {
                "name": "SAL",
                "type": ["null", "double"]
            }, {
                "name": "COMM",
                "type": ["null", "float"]
            }, {
                "name": "DEPTNO",
                "type": ["null", "string"]
            }]
        }]
    }, {
        "name": "metadata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "userdata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "datapresenceinfo",
        "type": ["null", {
            "type": "record",
            "name": "datapresenceinfo_record",
            "namespace": "datapresenceinfo_record",
            "fields": [{
                "name": "EMPNO",
                "type": "boolean"
            }, {
                "name": "ENAME",
                "type": "boolean"
            }, {
                "name": "JOB",
                "type": "boolean"
            }, {
                "name": "HIREDATE",
                "type": "boolean"
            }, {
                "name": "SAL",
                "type": "boolean"
            }, {
                "name": "COMM",
                "type": "boolean"
            }, {
                "name": "DEPTNO",
                "type": "boolean"
            }]
        }]
    }, {
        "name": "beforepresenceinfo",
        "type": ["null", {
            "type": "record",
            "name": "beforepresenceinfo_record",
            "namespace": "beforepresenceinfo_record",
            "fields": [{
                "name": "EMPNO",
                "type": "boolean"
            }, {
                "name": "ENAME",
                "type": "boolean"
            }, {
                "name": "JOB",
                "type": "boolean"
            }, {
                "name": "HIREDATE",
                "type": "boolean"
            }, {
                "name": "SAL",
                "type": "boolean"
            }, {
                "name": "COMM",
                "type": "boolean"
            }, {
                "name": "DEPTNO",
                "type": "boolean"
            }]
        }]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

The native format includes more WAEvent data than the table format.

Records are also created for ALTER TABLE and CREATE TABLE DDL operations. The schema for DDL events is:

{
    "type": "record",
    "name": "DDLRecord",
    "namespace": "DDLRecord.avro",
    "fields": [{
        "name": "metadata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "data",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "userdata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

Records for DDL events have this format:

{
    "metadata": {
        "CURRENTSCN": "386346466",
        "OperationName": "ALTER",
        "OperationSubName": "ALTER_TABLE_ADD_COLUMN",
        "TimeStamp": "2018-04-17T04:31:48.000-07:00",
        "ObjectName": "EMP",
        "COMMITSCN": "386346473",
        "TxnID": "9.10.60399",
        "COMMIT_TIMESTAMP": "2018-04-17T04:31:48.000-07:00",
        "CatalogName": null,
        "CatalogObjectType": "TABLE",
        "OperationType": "DDL",
        "SchemaName": "AVRODEMO",
        "STARTSCN": "386346462",
        "SCN": "386346466"
    },
    "data": {
        "DDLCommand": "ALTER TABLE AVRODEMO.EMP\n  ADD phoneNo NUMBER(10,0)"
    },
    "userdata": null,
    "__striimmetadata": {
        "position": "TQAAQA="
    }
}

This example is for an ALTER TABLE EMP ADD phoneNo NUMBER(10,0); command.

Records are also created for BEGIN, COMMIT, and ROLLBACK control events. The schema for control events is:

{
    "type": "record",
    "name": "DDLRecord",
    "namespace": "DDLRecord.avro",
    "fields": [{
        "name": "metadata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "data",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "userdata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

Records for control events have this format:

{
	"metadata": {
		"RbaSqn": "2031",
		"AuditSessionId": "439843",
		"MockedBegin": null,
		"CURRENTSCN": "8328113",
		"OperationName": "COMMIT",
		"SQLRedoLength": "6",
		"BytesProcessed": "424",
		"ParentTxnID": "5.17.9129",
		"SessionInfo": "UNKNOWN",
		"RecordSetID": " 0x0007ef.000001e3.00e0 ",
		"TimeStamp": "2018-06-16T17:41:57.000-07:00",
		"TxnUserID": "QATEST",
		"RbaBlk": "483",
		"COMMITSCN": "8328113",
		"TxnID": "5.17.9129",
		"Serial": "2943",
		"ThreadID": "1",
		"SEQUENCE": "1",
		"COMMIT_TIMESTAMP": "2018-06-16T17:41:57.000-07:00",
		"TransactionName": "",
		"STARTSCN": "8328112",
		"SCN": "832811300005716756777309964480000",
		"Session": "135"
	},
	"userdata": null
}
Record format for Striim stream type evolution
CREATE TYPE Person_TYpe (
  ID int KEY, 
  CITY string,
  CODE string,
  NAME string);

The schema record for the Striim type above would be:

{
    "type": "record",
    "name": "Person_Type",
    "namespace": "AVRODEMO",
    "fields": [{
        "name": "ID",
        "type": ["null", "int"]
    }, {
        "name": "CITY",
        "type": ["null", "string"]
    }, {
        "name": "CODE",
        "type": ["null", "string"]
    }, {
        "name": "NAME",
        "type": ["null", "string"]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

This shows the input stream's type name (Person_Type), which will be the same in all schema registry records for this KafkaWriter target, and  the names, default values, and types of its four fields at the time this record was created.

Sample record in sync mode:

{
  "ID": "1",
  "Code": "IYuqAbAQ07NS3lZO74VGPldfAUAGKwzR2k3",
  "City": "Chennai",
  "Name": "Ramesh",
  "__striimmetadata": {
    "position": "iUAB6JoOLYlCsaKn8nWYw"
  }
}
Schema registry sample application: OracleReader to KafkaWriter

The following example assumes you have the following tables in Oracle:

create table EMP(
  empno number(4,0),
  ename varchar2(10),
  job varchar2(9),
  hiredate Timestamp,
  sal binary_double,
  comm binary_float,
  deptno int,
   constraint pk_emp primary key (empno),
   constraint fk_deptno foreign key (deptno) references dept (deptno)
);
create table DEPT(
   deptno int,
  dname varchar2(30),
  loc varchar2(13),
 constraint pk_dept primary key (deptno)
);

This application will read from those tables and write to Kafka in native format:

CREATE APPLICATION Oracle2Kafka;

CREATE OR REPLACE SOURCE OracleSource USING OracleReader  (
  DictionaryMode: 'OfflineCatalog',
  FetchSize: 1,
  Username: 'sample',
  Password: 'sample',
  ConnectionURL: '10.1.10.11:1521:orcl',
  Tables: 'sample.emp;sample.dept'
 )
OUTPUT TO OracleStream ;
 
CREATE OR REPLACE TARGET WriteToKafka USING KafkaWriter VERSION '0.11.0' (
  Mode: 'Sync',
  Topic: 'test',
  brokerAddress: 'localhost:9093',
  KafkaConfig: 'request.timeout.ms=60001;session.timeout.ms=60000'
 )
FORMAT USING AvroFormatter  (
  schemaregistryurl: 'http://localhost:8081/',
  formatAs: 'Native'
 )
INPUT FROM OracleStream;

END APPLICATION Oracle2Kafka;

See Avro Parser for a sample application that reads from that Kafka topic.

Kinesis Writer

Writes to an Amazon Kinesis stream.

property

type

default value

notes

Access Key ID

String

Specify an AWS access key ID (created on the AWS Security Credentials page) for a user with write permission on the stream.

When Striim is running in Amazon EC2 and there is an IAM role with that permission associated with the VM, leave accesskeyid and secretaccesskey blank to use the IAM role.

Batch Policy

String

Size:900000, Interval:1

The batch policy includes eventCount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to EventCount:1,Interval:0.

With the default setting, data will be written every second or sooner if the an event pushes the buffer past 900,000 bytes. The buffer will expand as necessary to include that last event in the batch.

Mode

String

Sync

Do not change from default.

Partition Key

String

Optionally, specify a field to be used to partition the events among multiple shards. See Kafka Writer for more details.

Region Name

String

the AWS region of the stream (see AWS Regions and Endpoints)

Secret Access Key

String

the secret access key for the stream

Session Token

String

If you are using a session token (see GetSessionToken), specify it here. See also Temporary Security Credentials.

Stream Name

String

the existing Kinesis stream to write to (will not be created if it does not exist)

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Limitations:

  • Only sync mode is supported.

  • Parallel threads are not supported.

  • Do not write to a target stream with more than 249 shards.

  • Do not merge, split, start or stop encryption on, or change the shard count of the target stream.

  • QUIESCE is not supported in this release.

  • If multiple writers write to the same shard, there may be duplicate events after Recovering applications (in other words, exactly-once processing cannot be guaranteed).

  • The maximum record length cannot exceed 1MB, including the metadata Striim adds to support recovery. (This is a limitation in Kinesis, not Striim.)

Example:

CREATE TARGET KinesisTest USING KinesisWriter (
  regionName:'us-east-1',
  streamName:'myStream',
  accesskeyid:'********************',
  secretaccesskey:'****************************************',
  partitionKey: 'merchantId'
)
FORMAT USING JSONFormatter ()
INPUT FROM PosSource_TransformedStream;

Kudu Writer

Writes to Apache Kudu 1.4 or later.

property

type

default value

notes

Batch Policy

String

EventCount:1000, Interval:30

The batch policy includes eventCount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to EventCount:1,Interval:0.

With the default setting, events will be sent every 30 seconds or sooner if the buffer accumulates 1000 events.

Each batch may include events for only one table. When writing to multiple tables, the current batch will be sent and a new one started every time an event is received for a different table.

Checkpoint Table

String

CHKPOINT

A table with the specified value will be created automatically in Kudu and used by Striim for internal purposes.

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Ignorable Exception

String

By default, if the target returns an error, KuduWriter crashes the application. Use this property to specify errors to ignore, separated by commas. Supported values are ALREADY_PRESENT and NOT_FOUND.

For example, to ignore ALREADY_PRESENT and NOT_FOUND errors, you would specify:

IgnorableExceptionCode: 'ALREADY_PRESENT,NOT_FOUND'

When an ignorable exception occurs, Striim will write an "Ignoring VendorExceptionCode" message to the log. Alternatively, to capture the ignored exceptions, Writing exceptions to a WActionStore.

Kudu Client Config

String

Specify the master address, socket read timeout, and operation timeout properties for Kudu. For example:

master.addresses->192.168.56.101:7051;
socketreadtimeout->10000;
operationtimeout->30000

In a high availability environment, specify multiple master addresses, separated by commas. For example:

master.addresses->192.168.56.101:7051, 
192.168.56.102:7051; ...

Parallel Threads

Integer

See Creating multiple writer instances.

PK Update Handling Mode

String

ERROR

This property controls how KuduWriter will handle events that update the primary key, which is not supported by Kudu.

  • With the default setting of ERROR, the application will crash.

  • Set to IGNORE to ignore such events and continue.

  • Set to DELETEANDINSERT to drop the existing row and insert the one with the updated primary key. When using this setting, the Compression property in the CDC reader must be set to False.

Tables

String

The name(s) of the table(s) to write to. The table(s) must exist in Kudu and the user specified in Username must have access. Table names are case-sensitive. The columns must have only supported data types as described below.

When the input stream of a KuduWriter target is the output of a CDC reader or DatabaseReader source, KuduWriter can write to multiple tables. For example:

source.emp,target.emp
source.db1,target.db1;source.db2,target.db2
source.%,target.%
source.mydatabase.emp%,target.mydb.%
source1.%,target1.%;source2.%,target2.%

MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

Primary key columns must be first in Kudu (see Known Issues and Limitations), so you may need to map columns if the source table columns are not in the same order (see Mapping columns).

Update As Upsert

Boolean

False

With the default value of False, if an update fails, KuduWriter will crash. When set to True, if an update fails, KuduWriter will insert the row instead. Do not set to True when a source table has no primary key.

The following TQL will replicate data for the specified tables from Oracle to Kudu:

CREATE SOURCE OracleCDCIn USING OracleReader (
  Username:'striim',
  Password:'passwd',
  ConnectionURL:'203.0.113.49:1521:orcl',
  Tables:'MYSCHEMA.NAME,MYSCHEMA.DEPT'
)
OUTPUT TO OracleCDCStream;

CREATE TARGET KuduOut USING KuduWriter(
  KuduClientConfig:"master.addresses->203.0.113.88:7051;
    socketreadtimeout->10000;operationtimeout->30000",
  Tables: "MYSCHEMA.NAME,name;MYSCHEMA.DEPT,dept"
INPUT FROM OracleCDCStream;
KuduWriter data type support and correspondence

Columns in target tables must use only the following supported data types.

If using Cloudera's Kudu, see Apache Kudu Schema Design and Apache Kudu Usage Limitations.

Striim data type

Kudu data type

java.lang.Byte[]

binary

java.lang.Double

double

java.lang.Float

float

java.lang.Integer

int32, int64

java.lang.Long

int64

java.lang.Short

int16

java.lang.String

string

org.joda.time.DateTime

unixtime_micros

When the input stream for a KuduwriterTarget is the output of an OracleReader source, the following combinations are supported:

Oracle type

Kudu type

BINARY_DOUBLE

double

BINARY_FLOAT

float

BLOB

binary, string

CHAR

string

CHAR(1)

bool

CLOB

string

DATE

unixtime_micros

DEC

float

DECIMAL

float

FLOAT

float

INT

int32

INTEGER

int32

LONG

int64, string

NCHAR

string

NUMBER

int64

NUMBER(1,0)

bool

NUMBER(10)

int64

NUMBER(19,0)

int64

NUMERIC

float

NVARCHAR2

string

SMALLINT

int16

TIMESTAMP

unixtime_micros

TIMESTAMP WITH LOCAL TIME ZONE

unixtime_micros

TIMESTAMP WITH TIME ZONE

unixtime_micros

VARCHAR2

string

When the input stream for a KuduwriterTarget is the output of an MSSQLReader source, the following combinations are supported:

SQL Server type

Kudu type

bigint

int64

bit

bool or int64

char

string

date

unixtime_micros

datetime

unixtime_micros

datetime2

unixtime_micros

decimal

float

float

double or float

image

binary

int

int64

money

double

nchar

string

ntext

string

numeric

float

nvarchar

string

nvarchar(max)

string

real

float

smalldatetime

unixtime_micros

smallint

int64

smallmoney

double

text

string

tinyint

int64

varbinary

binary

varbinary(max)

binary

varchar

string

varchar(max)

string

xml

string

MapR DB Writer

Writes to a table in MapR Converged Data Platform version 5.1.

Except for the name of the adapter and the name of the configuration path property, MapRDBWriter is identical to HBaseWriter. See HBase Writer for documentation of the properties.

CREATE TARGET Target2 using MapRDBWriter(
 MapRDBConfigurationPath:"/opt/mapr/hbase/hbase-1.1.8/conf",
  Tables: "SCOTT.BUSINESS,/tables/business.data",
  BatchPolicy: "eventCount:1")
INPUT FROM DataStream;

MapR FS Writer

Except for the name of the adapter, MapRFSWriter is identical to HDFSWriter. See HDFS Writer for documentation of the properties.

MapR Stream Writer

Writes to a stream in MapR Converged Data Platform version 5.1.

property

type

default value

notes

MapR Stream Config

String

Optionally specify Kafka producer properties, separated by semicolons.

Mode

String

Sync

See Setting KafkaWriter's mode property: sync versus async for discussion of this property.

Topic

String

Specify the fully-qualified topic name. The syntax is /<stream>:<topic>, where <stream> includes its path (which may be displayed using the command hadoop fs -ls /), for example, /tmp/myfirststream:topic1. The stream must already exist. Striim will create the topic if it does not exist.

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Sample code:

CREATE TARGET MapRStreamSample USING MapRStreamWriter (
  MapRStreamConfig: "acks=0;batch.size=250000;max.request.size=40000000",
  Mode: "ASync",
  Topic:'/striim/myfirststream:topic1'
)
FORMAT USING JSONFormatter ()
INPUT FROM TypedCSVStream;

If the MapR topic is partitioned, events will be distributed among the partitions based on the target's input stream's PARTITION BY field. If the input stream is not partitioned, all events will be written to partition 0.

MapRStreamWriter is based on Kafka Writer and the KafkaWriter sample code can be used as a model by replacing the target with the sample code above and creating a MapR stream named striim. See Apache Kafka and MapR Streams: Terms, Techniques and New Designs for more information about MapR's Kafka implementation.

MemSQL

See Database Writer.

MongoDB Writer

Writes to MongoDB collections. It may be used in four ways:

  • With an input stream of a user-defined type, MongoDB Writer writes events as documents to a single collection.

    Target document field names are taken from the input stream's event type.

    The value of the key field of the input event is used as the document key (_id field value). If the input stream's type has no key, the target document's key is generated by concatenating the values of all fields, separated by the Key Separator string. Alternatively, you may specify a subset of fields to be concatenated using the syntax <database name>.<collection name> keycolumns(<field1 name>, <field2 name>, ...) in the Collections property.

  • With an input stream of type JSONNodeEvent that is the output stream of a source using JSONParser, MongoDB Writer writes events as documents to a single collection.

    Target document field names are taken from the input events' JSON field names.

    When the JSON event contains an _id field, its value is used as the MongoDB Writer document key. Otherwise, MongoDB will generate an ObjectId for the document key.

  • With an input stream of type JSONNodeEvent that is the output stream of a MongoDBReader source, MongoDB Writer writes each MongoDB collection to a separate MongoDB collection.

    MongoDB collections may be replicated in another MongoDB instance by using wildcards in the Collections property. Alternatively, you may manually map source collections to target collections as discussed in the notes for the Collections property.

    The source document's primary key and field names are used as the target document's key and field names.

  • With an input stream of type WAEvent that is the output stream of a SQL CDC reader or DatabaseReader source, MongoDB Writer writes data from each source table to a separate collection. The target collections may be in different databases. In order to process updates and deletes, compression must be disabled in the source adapter (that is, WAEvents for insert and delete operations must contain all values, not just primary keys and, for inserts, the modified values)..

    Each row in a source table is written to a document in the target collection mapped to the table. Target document field names are taken from the source event's metadata map and their values from its data array (see WAEvent contents for change data).

    Source table data may be replicated to MongoDB collections of the same names by using wildcards in the Collections property. Note that data will be read only from tables that exist when the source starts. Additional tables added later will be ignored until the source is restarted. Alternatively, you may manually map source tables to MongoDB collections as discussed in the notes for the Collections property. When the source is a CDC reader, updates and deletes in source tables are replicated in the corresponding MongoDB target collections.

    Each source row's primary key value (which may be a composite) is used as the key (_id field value) for the corresponding MongoDB document. If the table has no primary key, the target document's key is generated by concatenating the values of all fields in the row, separated by the Key Separator string. Alternatively, you may select a subset of fields to be concatenated using the keycolumns option as discussed in the notes for the Collections property.

property

type

default value

notes

Auth DB

String

admin

Specify the authentication database for the specified username. If not specified, uses the admin database.  When the specified authType uses an external authentication server, this setting is ignored.

Auth Type

String

SCRAM_SHA_1

Specify the authentication mechanism used by your MongoDB instance. The default setting uses MongoDB's default authentication mechanism, SCRAM. Other supported values are MONGODBCR and SCRAMSHA1. Set to NoAuth if authentication is not enabled. 

GSSAPI (Kerberos), MONGODBX509, and PLAIN (LDAP) are valid values for this property, but those authentication mechanisms have not been tested so are not supported.

Batch Policy

String

EventCount:10000, Interval:30

The batch policy includes eventCount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to EventCount:1,Interval:0.

With the default setting, data will be written every 30 seconds or sooner if the buffer accumulates 10,000 events.

Collections

String

The fully-qualified name(s) of the MongoDB collection(s) to write to, for example, mydb.mycollection. Separate multiple collections by commas.

You may use the % wildcard, for example, mydb.%. Note that data will be written only to collections that exist when the Striim application starts. Additional collections added later will be ignored until the application is restarted.

Connection URL

String

Specify <IP address>:< port> for the primary instance of the replica set, for example, 192.168.1.10:27107.

When connecting to a cloud instance of MongoDB, specify mongodb+srv://<host DNS name>/<database>, for example, mongodb+srv://abcdev3.gcp.mongodb.net/mydb.

Excluded Collections

String

Any collections to be excluded from the set specified in the Collections property. Specify as for the Collections property.

Ignorable Exception Code

String

By default, if the target returns an error, the application will crash. Specify DUPLICATE_KEY or KEY_NOT_FOUND to ignore such errors and continue. To specify both, separate them with a comma.

Key Separator

String

:

Inserted between values when generating document keys by concatenating column or field values. If the values might contain a colon, change this to something that will not occur in those values.

Parallel Threads

Integer

See Creating multiple writer instances.

Password

com. webaction. security. Password

The password for the specified Username.

SSL Config

String

If using the MongoDB API for CosmosDB, set to public.

Username

String

A MongoDB user with the readwrite role on the target collection(s).

MongoDB sample applications

This application writes data from a CSV file to MongoDB. It has an input stream of a user-defined type.

CREATE SOURCE FileSource USING FileReader  ( 
  directory: '/Users/user/Desktop', 
  wildcard: 'data.csv', 
  positionbyeof: false ) 
PARSE USING DSVParser() 
OUTPUT TO FileStream;

CREATE TYPE CqStream_Type (
  uid java.lang.Integer, 
  name java.lang.String , 
  zip java.lang.Long, 
  city java.lang.String);
CREATE STREAM CqStream OF CqStream_Type;

CREATE CQ Cq1 INSERT INTO CqStream
  SELECT TO_INT(data[0]) as uid,
  data[1] as name, 
  TO_LONG(data[2]) as zip, 
  data[3] as city 
FROM FileStream;

CREATE TARGET MongoTarget USING MongoDBWriter  ( 
  Collections: 'test.emp keycolumns(uid,name)', 
  ConnectionURL: 'localhost:27017', 
  AuthDB: 'admin', 
  UserName: 'waction', 
  keyseparator: ':', 
  Password: '********') 
INPUT FROM CqStream;

This application writes data from a JSON file to MongoDB. It has an input stream of type JSONNodeEvent from JSONParser.

CREATE SOURCE JsonSource USING FileReader  ( 
  directory: '/Users/user/Desktop', 
  wildcard: 'jsondata.txt', 
  positionbyeof: false
) 
PARSE USING JSONParser() 
OUTPUT TO JsonStream;

CREATE TARGET MongoTgt USING MongoDBWriter  ( 
AuthType: 'SCRAM_SHA_1', 
  ConnectionURL: 'localhost:27017', 
  AuthDB: 'admin', 
  Collections: 'test.emp1', 
  UserName: 'waction', 
  Password: '********', 
) 
INPUT FROM JsonStream;

This initial load application writes data from one MongoDB collection to another. It has an input stream of type JSONNodeEvent from MongoDB Reader.

CREATE SOURCE Mongoource USING MongoDBReader (
  Mode: 'InitialLoad',
  collections: 'qatest.col1',
  connectionUrl: 'localhost:27017'
 )
OUTPUT TO Mongostream ;

CREATE TARGET MongoTarget USING MongoDBWriter (
  Collections: 'qatest.col1,db2.TEST',
  ServiceEndpoint: 'https://user.documents.azure.com:443/'
)
INPUT FROM Mongostream;

This streaming integration application writes data from Oracle to MongoDB. It has an input stream type of WAEvent from Oracle Reader (a SQL CDC source).

CREATE SOURCE Oracle_Source USING OracleReader  ( 
  Username: 'miner',
  Password: 'miner',
  ConnectionURL: 'jdbc:oracle:thin:@//192.168.1.49:1521/orcl',
  Tables: 'QATEST.%'
 ) 
OUTPUT TO DataStream;

CREATE TARGET MongoDBTarget1 USING MongoDBWriter  ( 
  Username: 'admin',
  Password: 'admin',
  ConnectionURL : 'localhost:27017',
  Collections: 'QATEST.%,MongoRegrDB01.%'
 ) 
INPUT FROM DataStream;

MQTT Writer

Writes messages to an MQTT broker.

See the MQTT FAQ for information on firewall settings.

property

type

default value

notes

Broker URI

String

format is tcp://<broker address>:<broker port>

Client ID

String

MQTT client ID (maximum 23 characters). Must be unique (not used by any other client) in order to identify this instance of MQTTWriter. The MQTT broker will use this ID close the connection when MQTTWriter goes offline and resend events after it restarts.

QoS

Integer

0

  • 0: at most once

  • 1: at least once

  • 2: exactly once

Topic

String

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

Sample:

CREATE CQ ConvertTemperatureToControl
INSERT INTO controlstream
SELECT TO_STRING(data.get('roomName') ),
  TO_STRING(data.get('temperature'))
FROM tempstream;
CREATE TARGET accontrol USING MQTTWriter(
  brokerUri:'tcp://m2m.eclipse.org:1883',
  Topic:'/striim/room687/accontrol',
  QoS:0,
  clientId:"StriimMqttSource"
) 
FORMAT USING JSONFormatter (
    members:'roomName,targetTemperature'
) 
INPUT FROM controlstream;

Redshift Writer

Writes to one or more table(s) in a Amazon Redshift store via an Amazon S3 staging area. Before writing to RedShift, its JDBC driver must have been installed as described in Installing the Redshift JDBC driver.

Before you create a RedshiftWriter target, we suggest you first create an S3Writer for the staging area (see S3 Writer) and verify that Striim can write to it. We recommend that the Redshift cluster's zone be in the same region as the S3 bucket (see http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html). If it is not, you must set the S3region property.

After the data has been written to Redshift, the files in the S3 bucket are moved to a subdirectory called archive. They are not deleted automatically, so you should periodically delete them. This may be automated (see https://aws.amazon.com/code/Amazon-S3/943).

Specify either the access key and secret access key or an IAM role.

property

type

default value

notes

Access Key ID

String

an AWS access key ID (created on the AWS Security Credentials page) for a user with read permission on the S3 bucket (leave blank if using an IAM role)

Bucket Name

String

the S3 bucket name

Column Delimiter

String

| (UTF-8 007C)

The character(s) used to delimit fields in the delimited text files in which the adapter accumulates batched data. If the data will contain the | character, change the default value to a sequence of characters that will not appear in the data.

Connection URL

String

copy this from the JDBC URL field on the AWS Dashboard cluster-details page for the target cluster

Conversion Params

String

Optionally, specify one or more of the following Redshift Data Conversion Parameters, separated by commas:

  • ACCEPTINVCHARS=”<replacement character”

  • BLANKSASNULL

  • DATEFORMAT="<format string>"

  • EMPTYASNULL

  • ENCODING=<encoding type>

  • EXPLICIT_IDS

  • FILLRECORD

  • IGNOREBLANKLINES

  • IGNOREHEADER=<number of rows>

  • NULL AS="<string>"

  • ROUNDEC

  • TIMEFORMAT="<format string>"

  • TRIMBLANKS

  • TRUNCATECOLUMNS

For example, ConversionParams: 'IGNOREHEADER=2, NULL AS="NULL", ROUNDEC'

Mode

String

incremental

With an input stream of a user-defined type, do not change the default. See Replicating Oracle data to Amazon Redshift for more information.

Parallel Threads

Integer

See Creating multiple writer instances.

Password

encrypted password

the password for the Redshift user

Quote Character

String

" (UTF-8 0022)

The character(s) used to quote (escape) field values in the delimited text files in which the adapter accumulates batched data. If the data will contain ", change the default value to a sequence of characters that will not appear in the data.

S3 IAM Role

String

an AWS IAM role with read write permission on the bucket (leave blank if using an access key)

S3 Region

String

If the S3 staging area is in a different AWS region (not recommended), specify it here (see AWS Regions and Endpoints). Otherwise, leave blank.

Secret Access Key

encrypted password

the secret access key for the S3 staging area

Tables

String

The name(s) of the table(s) to write to. The table(s) must exist in Redshift and the user specified in Username must have insert permission.

When the input stream of the RedshiftWriter target is the output of a CDC reader or DatabaseReader source, RedshiftWriter can write to multiple tables. In this case, specify the names of both the source and target tables. You may use wildcards for the table names, but not for the schema or database. For example:

source.emp,target.emp
source.db1,target.db1;source.db2,target.db2
source.%,target.%
source.mydatabase.emp%,target.mydb.%
source1.%,target1.%;source2.%,target2.%

See Replicating Oracle data to Amazon Redshift for an example.

Upload Policy

String

eventcount:10000, interval:5m

see S3 Writer

Username

String

a Redshift user

The staging area in S3 will be created at the path <bucketname> / <namespace> / <target input stream type> / <table name>.

The following describes use of RedshiftWriter with an input stream of a user-defined type. When the input is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source, see Replicating Oracle data to Amazon Redshift.

The following example would write to a table called MyTable:

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
    positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;
 
CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;
 
CREATE TARGET testRedshiftTarget USING RedshiftWriter(
  ConnectionURL: 'jdbc:redshift://mys3bucket.c1ffd5l3urjx.us-west-2.redshift.amazonaws.com:5439/dev',
  Username:'mys3user',
  Password:'******',
  bucketname:'mys3bucket',
/* for striimuser */
  accesskeyid:'********************',
  secretaccesskey:'****************************************',
  Tables:'mytable'
)
INPUT FROM PosSource_TransformedStream;

If this application were deployed to the namespace RS1, the staging area in S3 would be mys3bucket / RS1 / PosSource_TransformedStream_Type / mytable.

The target table must match PosSource_TransformedStream_Type:

create table mytable(
MerchantId char(35),
DateTime timestamp,
AuthAmount float,
Zip char(5));

After the data is written to Redshift, the intermediate files will be moved to mys3bucket / RS1 / PosSource_TransformedStream_Type / mytable / archive.

Redshift data type correspondence

Striim data type

Redshift data type

java.lang.Boolean

BOOLEAN

java.lang.Double

DOUBLE PRECISION

java.lang.Float

REAL

java.lang.Integer

INTEGER

java.lang.Long

BIGINT

java.lang.Short

SMALLINT

java.lang.String

CHAR or VARCHAR

org.joda.time.DateTime

TIMESTAMP

S3 Writer

Writes to Amazon S3.

See Port Requirements for information on firewall settings.

property

type

default value

notes

Access Key ID

String

Specify an AWS access key ID (created on the AWS Security Credentials page) for a user with "Write objects" permission on the bucket.

When Striim is running in Amazon EC2 and there is an IAM role with that permission associated with the VM, leave accesskeyid and secretaccesskey blank to use the IAM role.

Bucket Name

String

The S3 bucket name. If it does not exist, it will be created.

See Setting output names and rollover / upload policies for advanced options. If you use dynamic bucket names, you must specify a value for the Region property.

Note the limitations in Amazon's Rules for Bucket Naming.

Client Configuration

String

Optionally, specify one or more of the following property-value pairs, separated by commas.

If you access S3 through a proxy server, specify it here using the syntax ProxyHost=<IP address>,ProxyPort=<port number>,ProxyUserName=<user name>,ProxyPassword=<password>. Omit the user name and password if not required by your proxy server.

Specify any of the following to override Amazon's defaults:

  • ConnectionTimeout=<timeout in milliseconds>: how long to wait to establish the HTTP connection, default is 50000

  • MaxErrorRetry=<number of retries>: the number of times to retry failed requests (for example, 5xx errors), default is 3

  • SocketErrorSizeHints=<size in bytes>: TCP buffer size, default is 2000000

See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/section-client-configuration.html for more information about these settings.

If you have enabled server-side encryption and have a bucket policy configured to require that upload requests have an x-amz-server-side-encryption:AES256 header, specify SSEAlgorithm=AES256 here.

Compression Type

String

Set to gzip when the input is in gzip format. Otherwise, leave blank.

Folder Name

String

Optionally, specify a folder within the specified bucket. If it does not exist, it will be created.

See Setting output names and rollover / upload policies for advanced options.

Object Name

String

The base name of the files to be written. See Setting output names and rollover / upload policies.

Object Tags

String

Optionally, specify one or more object tags (see Object Tagging) to be associated with the file as key-value pairs <tag name>=<value> separated by commas. Values may include field, metadata, and/or userdata values (see Setting output names and rollover / upload policies) and/or environment variables (specified as $<variable name>).

Parallel Threads

Integer

See Creating multiple writer instances.

Partition Key

String

If you enable ParallelThreads, specify a field to be used to partition the events among the threads.  Events will be distributed among multiple S3 folders based on this field's values. 

If the input stream is of any type except WAEvent, specify the name of one of its fields.

If the input stream is of the WAEvent type, specify a field in the METADATA map (see HP NonStop reader WAEvent fieldsMySQLReader WAEvent fieldsOracleReader WAEvent fields, or MS SQL Reader WAEvent fields) using the syntax @METADATA(<field name>), or a field in the USERDATA map (see Adding user-defined data to WAEvent streams), using the syntax @USERDATA(<field name>). If appropriate, you may concatenate multiple METADATA and/or USERDATA fields.

Region

String

Optionally, specify an AWS region. This is required to use dynamic bucket names (see Setting output names and rollover / upload policies).

Rollover on DDL

Boolean

True

Has effect only when the input stream is the output stream of a CDC reader source. With the default value of True, rolls over to a new file when a DDL event is received. Set to False to keep writing to the same file.

Secret Access Key

encrypted password

Specify the AWS secret access key for the specified access key.

Upload Policy

String

eventcount:10000, interval:5m

The upload policy may include eventcount, interval, and/or filesize (see Setting output names and rollover / upload policies for syntax). Cached data is written to S3 every time any of the specified values is exceeded. With the default value, data will be written every five minutes or sooner if the cache contains 10,000 events. When the app is undeployed, all remaining data is discarded.

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.

For example:

CREATE APPLICATION testS3;

CREATE SOURCE PosSource USING FileReader ( 
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
    positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false ) 
OUTPUT TO PosSource_Stream;

CREATE CQ PosSource_Stream_CQ 
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET testS3target USING S3Writer (
  bucketname:'mybucket',
  objectname:'myfile.json',
  accesskeyid:'********************',
  secretaccesskey:'******************************',
  foldername:'myfolder')
FORMAT USING JSONFormatter ()
INPUT FROM PosSource_TransformedStream;

END APPLICATION tests3;

Note that since the test data set is less than 10,000 events, and the application is using the default upload policy, the data will be upload to S3 for in five minutes, or when you undeploy.

SAP Hana

See Database Writer.

Snowflake Writer

Writes to one or more existing tables in Snowflake. Events are staged to local storage, Azure Storage, or AWS S3, then written to Snowflake as per the Upload Policy setting. Before writing to Snowflake, its JDBC driver must be installed as described in Installing the Snowflake JDBC driver. See OCSP Certification Checks Require Port 80 for information on firewall settings.

To evaluate Striim with Snowflake, see Getting your free trial of Striim for Snowflake.

property

type

default value

notes

Append Only

Boolean

False

With the default value of False, updates and deletes in the source are handled as inserts in the target.

Set to True to handle updates and deletes in the source as updates and deletes in the target. With this setting:

  • Updates and deletes from DatabaseReader, IncrementalBatchReader, and SQL CDC sources are handled as inserts in the target.

  • Primary key updates result in two records in the target, one with the previous value and one with the new value. If the Tables setting has a ColumnMap that includes @METADATA(OperationName), the operation name for the first event will be DELETE and for the second INSERT.

Client Configuration

String

If using a proxy, specify ProxyHost=<host name or IP address>,ProxyPort=<port number>.

Column Delimiter

String

| (UTF-8 007C)

The character(s) used to delimit fields in the delimited text files in which the adapter accumulates batched data. If the data will contain the | character, change the default value to a sequence of characters that will not appear in the data.

Connection URL

String

The JDBC driver connection string for your Snowflake account. If you do not include the db=<database> option in the connection URL, specify tables as <database>.<schema>.<table> in the Tables property.

External Stage Type

String

local

With the default value, events are staged to local storage. To stage to Azure Storage, set to AzureBlob and set the Azure Storage properties. To stage to S3, set to S3 and set the S3 properties.

File Format Options

null_if = ""

Do not change unless instructed to by Striim support.

Ignorable Exception Code

String

Set to TABLE_NOT_FOUND if you do not want the application to crash when Striim tries to write to a table that does not exist in the target database. See Handling "table not found" errors for more information.

Null Marker

String

Optionally, specify a string inserted into fields in the stage files to indicate that a field has a null value. These are converted back to nulls in the target tables. If any field might contain the string NULL, change this to a sequence of characters that will not appear in the data.

When you set a value for Null Marker, set the same value for File Format Options. For example, if Null Marker is xnullx, File Format Options must be null_if="xnullx".

Optimized Merge

Boolean

false

Set to true only when the target's input stream is the output of an HP NonStop reader, MySQL Reader, or OracleReader source, and the source events will include partial records. For example, with Oracle Reader, when supplemental logging has not been enabled for all columns, partial records are sent for updates. When the source events will always include full records, leave this set to false.

Parallel Threads

Integer

See Creating multiple writer instances.

Password

encrypted password

The password for the specified user. See Encrypted passwords.

Tables

String

The name(s) of the table(s) to write to. The table(s) must exist in the DBMS and the user specified in Username must have insert permission.

Snowflake table names must be specified in uppercase. If you do not specify a database name in the connection URL, specify tables as <database>.<schema>.<table>.

If the source table has no primary key, you may use the KeyColumns option to define a unique identifier for each row in the target table: for example, Tables:'SOURCEDB.EMP,MYDB.MYSCHEMA.EMP KeyColumns(EMP_ID)'. If necessary to ensure uniqueness, specify multiple columns with the syntax KeyColumns(<column 1>,<column 2>,...). You may use wildcards for the source table, provided all the tables have the key columns: for example, Tables:'SOURCEDB.%,MYSCHEMA.% KeyColumns(...)'. If the source has no primary key and KeyColumns is not specified, the concatenated value of all source fields is used as the primary key in the target.

When the target's input stream is a user-defined event, specify a single table.

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use wildcards for the table names, but not for the schema or database. For example:

source.emp,MYDB.MYSCHEMA.EMP
source.%,MYDB.MYSCHEMA.%

MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

See Mapping columns for additional options.

Upload Policy

String

eventcount:10000, interval:5m

The upload policy may include eventcount, interval, and/or filesize (see Setting output names and rollover / upload policies for syntax). Cached data is written to the storage account every time any of the specified values is exceeded. With the default value, data will be written every five minutes or sooner if the cache contains 10,000 events. When the app is undeployed, all remaining data is written to the storage account.

Username

String

a Snowflake user with SELECT, INSERT, UPDATE, and DELETE privileges on the tables to be written to and the CREATE TABLE privileges on the schema specified in the connection URL

Azure Storage properties for SnowflakeWriter

property

type

default value

notes

Azure Account Access Key

encrypted password

the account access key from Storage accounts > <account name> > Access keys

Azure Account Name

String

the name of the Azure storage account for the blob container

Azure Container Name

String

the blob container name from Storage accounts > <account name> > Containers

If it does not exist, it will be created.

S3 properties for SnowflakeWriter

Specify either the access key and secret access key or an IAM role.

property

type

default value

notes

S3 Access Key

String

an AWS access key ID (created on the AWS Security Credentials page) for a user with read and write permissions on the bucket (leave blank if using an IAM role)

S3 Bucket Name

String

Specify the S3 bucket to be used for staging. If it does not exist, it will be created.

S3 IAM Role

String

an AWS IAM role with read and write permissions on the bucket (leave blank if using an access key)

S3 Region

String

the AWS region of the bucket

S3 Secret Access Key

encrypted password

the secret access key for the access key

SnowflakeWriter sample application

The following sample application will write data from PosDataPreview.csv to Snowflake. The target table must exist.

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
    positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;
 
CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET SnowflakeDemo (
  ConnectionURL: '<JDBC connection string',
  username: 'striim',
  password: '********',
  Tables: 'MYDB.MYSCHEMA.POSDATA',
  appendOnly: true
)
INPUT FROM PosSource_TransformedStream;

The above example shows how to use SnowflakeWriter with an input of a user-defined type. For examples of applications where the input is the output of a CDC reader, DatabaseReader, or IncrementalBatchReader, see Replicating Oracle data to Snowflake.

SnowflakeWriter data type support and correspondence

TQL type

Snowflake type

Boolean

BOOLEAN

Byte, Integer, Long, Short

BIGINT, DOUBLE, FLOAT, INT, NUMBER, SMALLINT 

DateTime

DATE, DATETIME, TIMESTAMP_L, TIMESTAMP_NTZ, TIMESTAMP_TZ

Double, Float

DOUBLE, FLOAT

String

CHAR, VARCHAR

When the input of a SpannerWriter target is the output of an Oracle source (DatabaseReader, IncremenatlBatchReader, or MySQLReader):

Oracle type

Snowflake type

BINARY

BINARY_DOUBLE

DOUBLE, FLOAT, NUMBER

BINARY_FLOAT

DOUBLE, FLOAT, NUMBER

BLOB

BINARY, VARCHAR

CHAR

CHAR, VARCHAR

CLOB

BINARY, VARCHAR

DATE

DATE

LONG

LONG RAW

NCHAR

CHAR, VARCHAR

NCLOB

NVARCHAR2

CHAR, VARCHAR

NUMBER

NUMBER(precision,scale)

RAW

TIMESTAMP

TIMESTAMP_NTZ

TIMESTAMP WITH LOCAL TIMEZONE

TIMESTAMP_LZ

TIMESTAMP WITH TIMEZONE

TIMESTAMP_LTZ

VARCHAR2

CHAR, VARCHAR

XMLTYPE

Spanner Writer

Writes to one or more tables in Google Cloud Spanner.

property

type

default value

notes

Batch Policy

String

eventCount: 1000, Interval: 60s

The batch policy includes eventcount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, all remaining events are sent to the target. To disable batching, set to EventCount:1,Interval:0.

With the default setting, data will be written every 60 seconds or sooner if the buffer accumulates 1000 events.

Checkpoint Table

String

CHKPOINT

To support recovery (see Recovering applications, a checkpoint table must be created in each target database using the following DDL:

CREATE TABLE CHKPOINT (
  ID STRING(MAX) NOT NULL,
  SOURCEPOSITION BYTES(MAX)
) PRIMARY KEY (ID);

If necessary you may use a different table name, in which case change the value of this property. All databases must use the same checkpoint table name.

Excluded Tables

String

When a wildcard is specified for Tables, you may specify here any tables you wish to exclude. Specify the value as for Tables.

Ignorable Exception Code

String

By default, if the target DBMS returns an error, Striim crashes the application. Use this property to specify one or more numeric gRPC response codes to ignore, separated by semicolons.

Instance ID

String

Specify the instance ID for the databases containing the tables to be written to. (Note: the instance ID may not be the same as the instance name.)

Parallel Threads

Integer

See Creating multiple writer instances.

Service Account Key

String

The path (from root or the Striim program directory) and file name to the .json credentials file downloaded from Google (see Service Accounts). This file must be copied to the same location on each Striim server that will run this adapter, or to a network location accessible by all servers. The associated service account must have the Cloud Spanner Database User or higher role for the instance (see Cloud Spanner Roles).

Tables

String

The name(s) of the table(s) to write to, in the format <database>.<table>. The table(s) must exist when the application is started.

The target table name(s) specified here must match the case shown in the Spanner UI. See Naming conventions.

When the target's input stream is a user-defined event, specify a single table.

When the input stream of the target is the output of a CDC or DatabaseReader source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use wildcards for the table names, but not for the schema or database. For example:

SOURCEDB.EMP,target.emp;SOURCEDB.DEPT,target.dep
SOURCEDB.%,targetdb.%

MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

When a target table has a commit timestamp column, by default its value will be Spanner's current system time when the transaction is committed. To use a different value, use ColumnMap. For example, to use the time the source transaction was committed in Oracle: ORADB1.%,spandb1.% ColumnMap (Ts @metadata(DBCommitTimestamp))

See Mapping columns for additional options.

The following example application will copy all tables from two Oracle source schemas to tables with the same names in two Spanner databases. All source and target tables must exist before the application is started.

CREATE SOURCE OracleSource1 USING OracleReader (
  Username:'myname',
  Password:'******',
  ConnectionURL: 'localhost:1521:XE’,
  Tables:'MYDB1.%;MYDB2.%’
) 
OUTPUT TO sourceStream;

CREATE TARGET SpannerWriterTest USING SpannerWriter(
  Tables:'ORADB1.%,spandb1.%;ORADB2.%,spandb2.%',
  ServiceAccountKey: '<path>/<filename>.json',
  instanceId: 'myinstance'
)
INPUT FROM sourceStream;
SpannerWriter data type support and correspondence

TQL type

Spanner type

notes

Boolean

BOOL

byte[]

BYTES

DateTime

DATE, TIMESTAMP

Double, Float

FLOAT64, NUMERIC

Integer, Long

INT64

String

STRING

  • maximum permitted length in Spanner is 2,621,440

  • If a String represents a timestamp value, use one of the TO_ZONEDDATETIME functions (see Date functions) to convert it to java.time.ZonedDateTime.

When the input of a SpannerWriter target is the output of an Oracle source (DatabaseReader, IncremenatlBatchReader, or MySQLReader):

Oracle type

Spanner type

notes

BINARY_DOUBLE

FLOAT64, NUMERIC

BINARY

FLOAT64, NUMERIC

BLOB

BYTES

CHAR

STRING

maximum permitted length in Spanner is 2,621,440

CLOB

DATE

DATE

FLOAT

FLOAT64, NUMERIC

LONG

STRING

maximum permitted length in Spanner is 2,621,440

NCHAR

STRING

maximum permitted length in Spanner is 2,621,440

NCLOB

STRING

maximum permitted length in Spanner is 2,621,440

NVARCHAR2

STRING

maximum permitted length in Spanner is 2,621,440

NUMBER

INT64

NUMBER(precision,scale)

FLOAT64, NUMERIC

RAW

BYTES

ROWID

STRING

maximum permitted length in Spanner is 2,621,440

TIMESTAMP

TIMESTAMP

TIMESTAMP

TIMESTAMP WITH LOCAL TIMEZONE

TIMESTAMP

TIMESTAMP

TIMESTAMP WITH TIMEZONE

TIMESTAMP

TIMESTAMP

UROWID

STRING

maximum permitted length in Spanner is 2,621,440

VARCHAR2

STRING

maximum permitted length in Spanner is 2,621,440

XMLTYPE

When the input of a SpannerWriter target is the output of a SQL Server source (DatabaseReader, IncremenatlBatchReader, or MySQLReader):

SQL Server type

Spanner type

bigint

INT64

binary

bit

bit

char

STRING

maximum permitted length in Spanner is 2,621,440

date

datetime

datetime2

datetimeoffset

decimal

FLOAT64, NUMERIC

float

FLOAT64, NUMERIC

image

int

INT64

money

nchar

ntext

numeric

nvarchar

nvarchar(max)

real

smalldatetime

smallint

INT64

smallmoney

text

STRING

maximum permitted length in Spanner is 2,621,440

time

tinyint

INT64

uniqueidentifier

varbinary

varchar

STRING

maximum permitted length in Spanner is 2,621,440

xml

SysOut

Writes the input stream to striim-node.log (see Reading log files), which can be useful during development and testing. (If you are Running Striim as a process), it writes output to the terminal running the server instead.)

The only property is name, a string that precedes each event. This allows you to distinguish output from multiple targets. Here is a typical example:

CREATE TARGET SimpleOutput
USING SysOut(name:simple)
INPUT FROM RawDataStream; 

See Writing a simple TQL application for examples of how to use SysOut during development.

Note

SysOut cannot display Kafka streams in Avro format. To work around that limitation, use a CQ to convert the stream to string format. For example:

CREATE TYPE newStreamType (
  AvroString java.lang.String
);
CREATE STREAM newStream OF newStreamType;

CREATE REPLACE CQ ConvertAvro
INSERT INTO newStream
SELECT k.data().toString() 
FROM KafkaAvroStream k;

CREATE TARGET AvroOut 
USING SysOut (name: 'AvroOut')
INPUT FROM newStream;
3.10.1
Was this article helpful?
0 out of 0 found this helpful

Comments