Striim

Welcome to the Striim Help Center and Community Site

3.10.3 Readers

Follow

Readers are used by specifying them in sources and caches.

Readers overview

The following is a summary of reader capabilities.

reader

input(s)

output stream type(s)

supports replication

recoverable

supports LEE

Database Reader

JDBC from a supported DBMS (see Database Reader)

WAEvent

insert only

if output is persisted to a Kafka stream (or use Incremental Batch Reader instead)

yes

File Reader

Apache access log, Avro, binary, delimited text, free-form text (using RegEx), GoldenGate trail file, JSON, name-value pairs, XML

WAEvent, AvroEvent (when input is Avro), or user-defined JSON (when input is JSON)

for GoldenGate only

yes

yes

HDFS Reader

Apache access log, binary, delimited text, free-form text (using RegEx), JSON, name-value pairs, XML

WAEvent or user-defined JSON (when input is JSON)

no

yes

yes

HP NonStop Enscribe, SQL/MP, and SQL/MX Readers

NonStop TMF audit trail

WAEvent

yes

yes

yes

HTTP Reader

Apache access log, binary, free-form text (using RegEx), JSON, name-value pairs, XML

WAEvent or user-defined JSON (when input is JSON)

no

if output is persisted to a Kafka stream

yes

Incremental Batch Reader

JDBC from same sources as Database Reader

WAEvent

insert only

yes

yes

JMS Reader

Apache access log, delimited text, free-form text (using RegEx), JSON, name-value pairs, XML

WAEvent or user-defined JSON (when input is JSON)

no

yes

yes

JMX Reader

Java Management Extensions (JMX)

JSONNodeEvent

no

yes

yes

Kafka Reader

Apache access log, Avro, delimited text, free-form text (using RegEx), JSON, name-value pairs, XML

WAEvent, AvroEvent (when input is Avro), or user-defined JSON (when input is JSON)

no

yes

yes

MapR FS Reader

Apache access log, binary, delimited text, free-form text (using RegEx), JSON, name-value pairs, XML

WAEvent or user-defined JSON (when input is JSON)

no

yes

yes

MariaDB

MariaDB Galera Cluster binary log (binlog)

WAEvent

yes

yes

no

MongoDB Reader

MongoDB replica set operations log (oplog.rs)

JSONNodeEvent

yes

yes

yes

MQTT Reader

Avro, delimited text, JSON, name-value pairs, XML

WAEvent, AvroEvent (when input is Avro), or user-defined JSON (when input is JSON)

no

yes

no

MS SQL Reader

SQL Server transaction log

WAEvent

yes

yes

yes

MultiFile Reader

Apache access log, Avro, binary, delimited text, free-form text (using RegEx), JSON, name-value pairs, XML

WAEvent, AvroEvent (when input is Avro), or user-defined JSON (when input is JSON)

no

if output is persisted to a Kafka stream

yes

MySQL

MySQL binary log (binlog)

WAEvent

yes

yes

yes

OPCUA Reader

an OPC-UA server

OPCUA Data Change Event

no

yes

no

Oracle Database

LogMiner or XStream Out

WAEvent

yes

yes

yes

PostgreSQL

PostgreSQL logical replication slot

WAEvent

yes

yes

yes

Salesforce Reader

Force.com REST API

WAEvent

yes

yes

yes

Salesforce Platform Event Reader

Salesforce platform event message subscription

WAEvent

insert only

yes

yes

Salesforce Push Topic Reader

Salesforce Streaming API

WAEvent

yes

yes

yes

S3 Reader

Apache access log, Avro, binary, delimited text, free-form text (using RegEx), JSON, name-value pairs, XML

WAEvent, AvroEvent (when input is Avro), or user-defined JSON (when input is JSON)

no

yes

yes

TCP Reader

Apache access log, binary, delimited text, free-form text (using RegEx), JSON, name-value pairs, XML

WAEvent or user-defined JSON (when input is JSON)

no

if output is persisted to a Kafka stream

yes

UDP Reader

Apache access log, binary, collectd, delimited text, free-form text (using RegEx), JSON, Kafka stream, name-value pairs, NetFlow v5 or v9, SNMP, XML

WAEvent, CollectdEvent (when input is collectd), or user-defined JSON (when input is JSON)

no

if output is persisted to a Kafka stream

yes

Windows Event Log Reader

Windows Application, Security, or System event log

WindowsLogEvent

no

yes

no

For more information, see:

Continuous Data Generator

This is a mock source that is used by the guided tour.

ContinuousDataGeneratorHereLink.png

To use this source in your own applications, create an app from scratch (go to the Apps page and click Add App > Start from Scratch), click the here link shown above, and choose either the "simple" or "advanced" source.

The only user-configurable property is Throughput, which by default is set to Low, 10 events per second. You may change that to Medium for around 700 events per second or Unrestricted to generate events at the maximum rate possible (which will vary depending on the number of cores in the Striim server and other factors). Note that if you choose Unrestricted the source may generate a gigabyte or more data per minute.

Database Reader

Returns data from a JDBC query against one or more tables in a supported database (see Connection URL below). If the connection to the database is interrupted, the application will crash.

Warning

Except for Oracle, PostgreSQL, and SQL Server 2012 or later, the JDBC driver for the DBMS must be installed as described in Installing database drivers.

property

type

default value

notes

Connection URL

String

The following databases are supported.

  • for HP NonStop SQL/MX: jdbc:t4sqlmx://<IP address>:<port> or jdbc:t4sqlmx://<IP address>:<port>/catalog=<catalog name>;schema=<schema name>

  • for MariaDB: jdbc:mariadb://<ip address>:<port>/<database name>

  • for MariaDB Galera Cluster: specify the IP address and port for each server in the cluster, separated by commas: jdbc:mariadb://<IP address>:<port>,<IP address>:<port>,...; optionally, append /<database name>

  • for MySQL: jdbc:mysql://<ip address>:<port>/<database name>

  • for Oracle: jdbc:oracle:thin:@<hostname>:<port>:<SID> (using Oracle 12c with PDB, use the SID for the PDB service) or jdbc:oracle:thin:@<hostname>:<port>/<service name>

  • for PostgreSQL, jdbc:postgresql://<ip address>:<port>/<database name>

  • for SQL Server: jdbc:sqlserver://<ip address>:<port>;DatabaseName=<database name>

    You may use Active Directory authentication with Azure SQL Database (see Supporting Active Directory authentication for Azure) or, when Striim or a Forwarding Agent is running in Windows, with SQL Server (see Supporting Active Directory authentication for SQL Server).

  • for Teradata: jdbc:teradata://<ip address>/DBS_PORT=<port>,DATABASE=<database name>

Database Provider Type

String

Default

Controls which icon appears in the Flow Designer.

Excluded Tables

String

When a wildcard is specified for Tables, you may specify here any tables you wish to exclude from the query. Specify the value exactly as for Tables (with the same database-specific limitations and options, such as specifying multiple tables or views). For example, to include data from HR_EMPLOYEES and HR_DEPTS but not HRMASTER when reading from SQL Server (since you cannot specify a literal underscore in the Tables string):

Tables='HR%',
ExcludedTables='HRMASTER'

Fetch Size

Integer

100

maximum number of records to be fetched from the database in a single JDBC method execution (see the discussion of fetchsize in the documentation for the your JDBC driver)

Password

encrypted password

The password for the specified user. See Encrypted passwords.

Query

String

SQL statement specifying the data to return. You may query tables, aliases, synonyms, and views.

When Query is specified and Tables is not, the WAEvent TableName metadata field value will be QUERY. When both Query and Tables are specified, the data specified by Query will be returned, and the Tables setting will be used only to populate the TableName field.

If the query includes a synonym containing a period, it must be enclosed in escaped quotes. For example: select * from \"synonym.name\"

If using a query when the output of a DatabaseReader source is the input of a DatabaseWriter target, specify the target table name as the value of DatabaseReader's Tables field.

Quiesce on IL Completion

Boolean

False

With the default value of False, you must stop the application manually after all data has been read.

Set to True to automatically quiesce the application after all data has been read (see discussion of QUIESCE in Console commands). When you see on the Apps page that the application is in the Quiescing state, it means that all the data that existed when the query was submitted has been read and that the target(s) are writing it. When you see that the application is in the Quiesced state, you know that all the data has been written to the target(s). At that point, you can undeploy the Database Reader application and then start another application for continuous replication of new data.

Note

Set to True only if all targets in the application support auto-quiesce (see Writers overview).

Return DateTime As

String

Joda

Set to String to return timestamp values as strings rather than Joda timestamps. The primary purpose of this option is to avoid losing precision when microsecond timestamps are converted to Joda milliseconds. The format of the string is yyyy-mm-dd hh:mm:ss.ffffff.

Tables

String

The table(s) or view(s) to be read. MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

If you are using the Query property, specify QUERY as the table name.

For Oracle and SQL Server, you may specify multiple tables and views as a list separated by semicolons or with the following wildcards:

  • %: any series of characters

  • \_: any single character in MySQL or Oracle (note that the underscore does not need to be escaped when using OracleReader)

  • _: any single character in SQL Server

For example, HR.% would read all tables in the HR schema.

When reading from Oracle, _ is a literal underscore, so, for example, HR_% would include HR_EMPLOYEES and HR_DEPTS but not HRMASTER. (Note that when using OracleReader the opposite is the case: _ is the wildcard and \_ is a literal underscore.)

When reading from SQL Server, there is no way to specify a literal underscore.

Username

String

the DBMS user name the adapter will use to log in to the server specified in ConnectionURL

Vendor Config

String

If the source is Oracle and it uses SSL, specify the required SSL properties (see the notes on SSL Config in Oracle Reader properties).

The output type is WAevent.

Note

To read from tables in both Oracle CDB and PDB databases, you must create two instances of DatabaseReader, one for each.

The following example creates a cache of data retrieved from a MySQL table:

CREATE TYPE RackType(
  rack_id String KEY,
  datacenter_id String,
  rack_aisle java.lang.Integer,
  rack_row java.lang.Integer,
  slot_count java.lang.Integer
);
CREATE CACHE ConfiguredRacks USING DatabaseReader (
  ConnectionURL:'jdbc:mysql://10.1.10.149/datacenter',
  Username:'username',
  Password:'passwd',
  Query: "SELECT rack_id,datacenter_id,rack_aisle,rack_row,slot_count FROM RackList"
)
QUERY (keytomap:'rack_id') OF RackType;
The following example creates a cache of data retrieved from an Oracle table:
CREATE TYPE CustomerType (
  IPAddress  String KEY,
  RouterId  String,
  ConnectionMode  String,
  CustomerId  String,
  CustomerName  String
);
CREATE CACHE Customers USING DatabaseReader (
  Password: 'password',
  Username: 'striim',
  ConnectionURL: 'jdbc:oracle:thin:@node05.example.com:1521:test5',
  Query: 'SELECT ip_address, router_id, connection_mode, customer_id, customer_name FROM customers',
  FetchSize: 1000
)
QUERY (keytomap:'IPAddress') OF CustomerType;
DatabaseReader data type support and correspondence

JDBC column type

TQL type

Types.BIGINT

java.lang.Long

Types.BIT

java.lang.Boolean

Types.CHAR

java.lang.String

Types.DATE

org.joda.time.LocalDate

Types.DECIMAL

java.lang.String

Types.DOUBLE

java.lang.Double

Types.FLOAT

java.lang.Double

Types.INTEGER

java.lang.Integer

Types.NUMERIC

java.lang.String

Types.REAL

java.lang.Float

Types.SMALLINT

java.lang.Short

Types.TIMESTAMP

org.joda.time.DateTime

Types.TINYINT

java.lang.Short

Types.VARCHARCHAR

java.lang.String

other types

java.lang.String

DatabaseReader can not read Oracle RAW or LONG RAW columns (Oracle Reader can).

File Reader

Reads files from disk using a compatible parser.

You can create FileReader sources in the web UI using Source Preview.

See Supported reader-parser combinations) for parsing options.

When used with DSV Parser, the type for the output stream can be created automatically from the file header (see Creating the FileReader output stream type automatically).

Striim also provides templates for creating applications that read from files and write to various targets. See Creating a new application using a template for details.

property

type

default value

notes

Block Size

Integer

64

amount of data in KB for each read operation

Compression Type

String

Set to gzip when wildcard specifies a file or files in gzip format. Otherwise, leave blank.

Directory

String

Specify the path to the directory containing the file(s). The path may be relative to the Striim installation directory (for example, Samples/PosApp/appdata) or from the root.

Include Subdirectories

Boolean

False

Set to True if the files are written to subdirectories of the Directory path, for example, if each day's files are in a subdirectory named by date.

Position By EOF

Boolean

True

  • If set to True, reading starts at the end of the file, so only new data is acquired.

  • If set to False, reading starts at the the beginning of the file and then continues with new data.

  • When FileReader is used with a cache, this setting is ignored and reading always begins from the beginning of the file.

  • When you create a a FileReader using Source Preview, this is set to False.

Rollover Style

String

Default

Set to log4j if reading Log4J files created using RollingFileAppender.

Skip BOM

Boolean

True

If set to True, when the wildcard value specifies multiple files, Striim will read the Byte Order Mark (BOM) in the first file and skip the BOM in all other files. If set to False, it will read the BOM in every file.

Wildcard

String

Specify the name of the file, or a wildcard pattern to match multiple files (for example, *.xml). When reading multiple files, Striim will read them in the default order for the operating system. Once Striim has read a file, it will ignore any further updates to it.

The output type is WAevent except when using Avro Parser  or JSONParser.

An example from the PosApp sample application:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
)
OUTPUT TO CsvStream;

See PosApp for a detailed explanation and MultiLogApp for additional examples.

Creating the output stream type automatically

When FileReader is used with DSV Parser, the type for the output stream can be created automatically from the file header using OUTPUT TO <stream name> MAP(filename:'<source file name>') . For example:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata*.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
)
OUTPUT TO CsvStream MAP(filename:’posdata*.csv’);

Notes:

  • The specified source file must exist when the source is created.

  • The header must be the first line of the file (the HeaderLineNo setting is ignored by MAP).

  • If multiple files are specified by the wildcard property, the header will be taken from the first one read.

  • All files must be like the first one read, with headers in the first line and the same number of fields.

Creating the FileReader output stream type automatically

When FileReader is used with DSV Parser, the type for the output stream can be created automatically from the file header using OUTPUT TO <stream name> MAP(filename:'<source file name>'). A regular, unmapped output stream must also be specified. For example:

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview*.csv',
  directory: 'Samples/PosApp/appData',
  positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
)
OUTPUT TO PosSource_Stream,
OUTPUT TO PosSource_Mapped_Stream MAP(filename:'PosDataPreview.csv');

Notes:

  • When you use a MAP clause, you may not specify the Column Delimit Till. Header Line No, Line Number, or No Column Delimiter properties.

  • The file specified in the MAP clause must be in the directory specified by FileWriter's directory property when the source is created.

  • The header must be the first line of that file.

  • The column names in the header can contain only alphanumeric, _ (underscore) and $ (dollar sign) characters, and may not begin with numbers.

  • All files to be read must be similar to the one specified in the MAP clause, with headers (which will be ignored) in the first line and the same number of fields.

  • All fields in the output stream type will be of type String.

  • In this release, this feature is available only in the console, the MAP clause cannot be edited in the web UI, and changing the Wildcard property value in the web UI will break the source.

GG Trail Reader

See Oracle GoldenGate.

HDFS Reader

Reads files from Hadoop Distributed File System (HDFS ) volumes. You can create HDFSReader sources in the web UI using Source Preview.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

Authentication Policy

String

If the HDFS cluster uses Kerberos authentication, provide credentials in the format Kerberos, Principal:<Kerberos principal name>, KeytabPath:<fully qualified keytab file name>. Otherwise, leave blank. For example: authenticationpolicy:'Kerberos, Principal:nn/ironman@EXAMPLE.COM, KeytabPath:/etc/security/keytabs/nn.service.keytab'

Compression Type

String

Set to gzip when wildcard specifies a file or files in gzip format. Otherwise, leave blank.

Directory

String

optional directory from which the files specified by the wildcard property will be read; otherwise files will be read relative to the Hadoop URL

EOF Delay

Integer

100

milliseconds to wait after reaching the end of a file before starting the next read operation

Hadoop Configuration Path

String

If using Kerberos authentication, specify the path to Hadoop configuration files such as core-site.xml and hdfs-site.xml. If this path is incorrect or the configuration changes, authentication may fail.

Hadoop URL

String

The URI for the HDFS cluster NameNode. See below for an example. The default HDFS NameNode IPC port is 8020 or 9000 (depending on the distribution). Port 50070 is for the web UI and should not be specified here.

For an HDFS cluster with high availability, use the value of the dfs.nameservices property from hdfs-site.xml with the syntax hadoopurl:'hdfs://<value>', for example, hdfs://'mycluster'.  When the current NameNode fails, Striim will automatically connect to the next one.

In MapRFSReader, you may start the URL with hdfs:// or maprfs:/// (there is no functional difference).

Include Subdirectories

Boolean

False

Set to True to read files in subdirectories. 

Position by EOF

Boolean

True

If set to True, reading starts at the end of the file, so only new data is acquired. If set to False, reading starts at the the beginning of the file and then continues with new data.

Rollover Style

String

Default

Do not change.

Skip BOM

Boolean

True

If set to True, when the wildcard value specifies multiple files, Striim will read the Byte Order Mark (BOM) in the first file and skip the BOM in all other files. If set to False, it will read the BOM in every file.

Wildcard

String

name of the file, or a wildcard pattern to match multiple files (for example, *.xml)

The output type is WAevent except when using JSONParser.

CREATE SOURCE CSVSource USING HDFSReader (
  hadoopurl:'hdfs://myserver:9000/',
  WildCard:'posdata.csv',
  positionByEOF:false
)

HP NonStop Enscribe, SQL/MP, and SQL/MX Readers

See HP NonStop.

HTTP Reader

This reader requires the Oracle JDK (see System requirements).

Listens for HTTP POST requests on the specified port.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

Authenticate Client

Boolean

False

Set to True to have the server authenticate the client.

Compression Type

String

Set to gzip when the input is in gzip format. Otherwise, leave blank.

Defer Response

Boolean

False

With the default setting of False, HTTP Reader returns code 200 (success) or 400 (failure) to the client. Set to True to return a custom response defined with HTTP Writer.

Defer Response Timeout

String

5s

When Defer Response is True, the amount of time to wait for HTTP Writer to respond. See HTTP Writer for more details.

IP Address

String

The Striim server binding IP address for the TCP socket. Set to 0.0.0.0 to allow any available network interface.

Keystore

String

Location of the Java keystore file containing the Striim application’s own certificate and private key. If this is blank and a value is specified for Keystore Type, an empty keystore is created. If Keystore Type is blank, leave blank

Keystore Password

encrypted password

Provide a password if required to unlock the keystore or to check the integrity of the keystore data. Otherwise, leave blank. See Encrypted passwords.

Keystore Type

String

Set to JKS, JCEKS, or PKCS12 to enable SSL. Otherwise, leave blank.

Port Number

Integer

The TCP socket listening port on the Striim server, typically 80 for HTTP or 443 for HTTPS. This port must not be used by another process. In TQL, the property is portno.

Thread Count

Integer

10

The number of threads to be initialized for handling multiple concurrent HTTP connections. Valid values are 1 to 500.

The output type is WAevent except when using JSONParser.

Sample code using the DSVParser:

CREATE SOURCE HTTPSource USING HTTPReader (
	PortNo:'10000')
PARSE USING DSVParser (
    header:'yes'
)
OUTPUT TO HttpDataStream;

See HTTP Writer for a complete sample application.

Incremental Batch Reader

Works like DatabaseReader but has two additional properties, Check Column and Start Position, which allow you to specify that reading will begin at a user-selected position. To specify the starting point, the table(s) to be read must have a column containing either a timestamp or a sequential number. The most common use case is for populating data warehouses.

See also Spanner Batch Reader.

See Connection URL below for a list of supported databases. See Database Reader for supported data types.

Warning

Except for Oracle, PostgreSQL, and SQL Server 2012 or later, the JDBC driver for the DBMS must be installed as described in Installing database drivers.

property

type

default value

notes

Check Column

String

Specify the name of the column containing the start position value. The values must be unique and continuously increasing, such as the creation timestamp or a unique identifier such as an employee ID.

MySQL and Oracle names are case-sensitive, SQL Server names are not. Use the syntax <schema name>.<table name>=<column name> for MySQL and Oracle and <database name>.<schema name>.<table name>=<column name> for SQL Server.

If you specify multiple tables in the Tables property, you may specify different check columns for the tables separated by semicolons. In this case, you may specify the check column for the remaining tables using wildcards: for example, MYSCHEMA.TABLE1=UUID; MYSCHEMA.%=LAST_UPDATED would use UUID as the start column for TABLE1 and LAST_UPDATED as the start column for the other tables.

Connection URL

String

The following databases are supported.

  • for MariaDB: jdbc:mariadb://<ip address>:<port>/<database name>

  • for MariaDB Galera Cluster: specify the IP address and port for each server in the cluster, separated by commas: jdbc:mariadb://<IP address>:<port>,<IP address>:<port>,...; optionally, append /<database name>

  • for MySQL: jdbc:mysql://<ip address>:<port>/<database name>

  • for Oracle: jdbc:oracle:thin:@<hostname>:<port>:<SID> (using Oracle 12c with PDB, use the SID for the PDB service) or jdbc:oracle:thin:@<hostname>:<port>/<service name>

  • for PostgreSQL, jdbc:postgresql://<ip address>:<port>/<database name>

  • for SQL Server: jdbc:sqlserver://<ip address>:<port>;DatabaseName=<database name>

    You may use Active Directory authentication with Azure SQL Database (see Supporting Active Directory authentication for Azure) or, when Striim or a Forwarding Agent is running in Windows, with SQL Server (see Supporting Active Directory authentication for SQL Server).

  • for Teradata: jdbc:teradata://<ip address>/DBS_PORT=<port>,DATABASE=<database name>

Database Provider Type

String

Default

Controls which icon appears in the Flow Designer.

Excluded Tables

String

When a wildcard is specified for Tables, you may specify here any tables you wish to exclude from the query. Specify the value exactly as for Tables (with the same database-specific limitations and options, such as specifying multiple tables or views). For example, to include data from HR_EMPLOYEES and HR_DEPTS but not HRMASTER when reading from SQL Server (since you cannot specify a literal underscore in the Tables string):

Tables='HR%',
ExcludedTables='HRMASTER'

Fetch Size

Integer

100

maximum number of records to be fetched from the database in a single JDBC method execution (see the discussion of fetchsize in the documentation for the your JDBC driver)

Password

encrypted password

The password for the specified user. See Encrypted passwords.

Polling Interval

String

120sec

time to wait between fetches; may be specified in seconds (as in the default) or milliseconds (for example, 500ms)

Return DateTime As

String

Joda

Set to String to return timestamp values as strings rather than Joda timestamps. The primary purpose of this option is to avoid losing precision when microsecond timestamps are converted to Joda milliseconds. The format of the string is yyyy-mm-dd hh:mm:ss.ffffff.

Start Position

String

The value in the specified check column from which Striim will start reading. Striim will read rows in which the check column's value is the same as or greater or later than this value and skip the other rows. Since Check Column may specify multiple tables you must specify the corresponding table name or wildcard for each value. With the Check Column example above, the Start Position value could be MYSCHEMA.TABLE1=1234; MYSCHEMA.%=2018-OCT-07 18:37:55.

Tables

String

The table(s) or view(s) to be read. MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

For Oracle and SQL Server, you may specify multiple tables and views as a list separated by semicolons or with the following wildcards:

  • %: any series of characters

  • \_: any single character in MySQL or Oracle (note that the underscore does not need to be escaped when using OracleReader)

  • _: any single character in SQL Server

For example, HR.% would read all tables in the HR schema.

When reading from Oracle, _ is a literal underscore, so, for example, HR_% would include HR_EMPLOYEES and HR_DEPTS but not HRMASTER. (Note that when using OracleReader the opposite is the case: _ is the wildcard and \_ is a literal underscore.)

When reading from SQL Server, there is no way to specify a literal underscore.

Username

String

the DBMS user name the adapter will use to log in to the server specified in ConnectionURL

Vendor Config

String

If the source is Oracle and it uses SSL, specify the required SSL properties (see the notes on SSL Config in Oracle Reader properties).

The following would read rows from TABLE1 with UUID column values equal to or greater than 1234: 

CREATE SOURCE OraSource USING IncrementalBatchReader  ( 
  Username: 'striim',
  Password: '********',
  ConnectionURL: '192.0.2.:1521:orcl',
  Tables: 'MYSCHEMA.TABLE1',
  CheckColumn: 'MYSCHEMA.TABLE1=UUID',
  StartPosition: 'MYSCHEMA.TABLE1=1234'
) 
OUTPUT TO OraSourceOutput;

If IncrementalBatchReader sends duplicate records to a DatabaseWriter target, by default the application will crash. This can happen, for example, when recovery is enabled (see Recovering applications), there are multiple rows with the same CheckColumn timestamp, and only some of them were written before a system failure,  To avoid this, specify the appropriate IgnorableException in the target: for example, for CosmosDBWriter, RESOURCE_ALREADY_EXISTS.

JMS Reader

Reads data from the Java Message Service.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

Compression Type

String

Set to gzip when the input is in gzip format. Otherwise, leave blank.

Connection Factory Name

String

the name of the ConnectionFactory containing the queue or topic

Crash On Unsupported Message Type

Boolean

True

With the default value of True, when JMSReader encounters a message of an unsupported type, the application will crash. Set to False to ignore such messages.

Ctx

String

the JNDI initial context factory name

Durable Subscriber Name

String

Leave blank to create a nondurable subscription. Specify a subscriber name to create a durable subscription.

JMS Provider Config

String

Optionally, specify any required path variables as <property>=<value>, separated by semicolons. For example

com.tibco.tibjms.naming.security_protocol=ssl; 
ssl_enable_verify_hostname=false; 
com.tibco.tibjms.naming.ssl_identity=client_identity.p12; 
com.tibco.tibjms.naming.ssl_password=password; 
com.tibco.tibjms.naming.ssl_trusted_certs=server_root.cert.pem; 
java.property.https.protocols=SSLv3; 
com.tibco.tibjms.naming.ssl_trace=true'

Password

encrypted password

see Encrypted passwords

Provider

String

the path to the JNDI binding

Queue Name

String

leave blank if Topic is specified

Topic

String

leave blank if QueueName is specified

Transaction Policy

String

Specify a message count and/or interval (s / m / h / d) to have the JMS broker group messages as transactions. This will ensure that all messages are processed by JMSReader before they are removed from the queue.

For example, with the settingTransactionPolicy='MessageCount:100, Interval:10s, JMSReader will send a commit message to the broker every ten seconds or sooner if it accumulates 100 messages. If JMSReader is stopped or crashes before sending a commit, the broker will resend the messages in the current transaction when JMSReader is restarted.

When using a transaction policy:

This feature has been tested with ActiveMQ and WebLogic.

User Name

String

a messaging system user with the necessary permissions

Note that JMSReader's properties must accurately reflect your configuration. See Using JMSReader with IBM WebSphere MQ for a detailed discussion.

The output type is WAevent except when using JSONParser.

The following example is for ActiveMQ:

CREATE SOURCE AMQSource USING JMSReader (
  ConnectionFactoryName:'jms/TestConnectionFactory'
  Ctx:'org.apache.activemq.jndi.ActiveMQInitialContextFactory',
  Provider:'tcp://192.168.123.200:61616',
  QueueName:'jms/TestJMSQueue',
  UserName:'striim',
  Password:'******'
) ...

Message headers are included in the output. For example:

SNMPNT: WAEvent{
  data: ["abc","def"]
  metadata: {"RecordEnd":9,"JMSType":"","RecordOffset":0,"JMSExpiration":0,
  "JMSDestinationName":"TanuTopic","JMSRedelivered":false,"AMQ_SCHEDULED_REPEAT":3,
  "JMSTimestamp":1599633667256,
  "messageid":"ID:Apples-MacBook-Pro-2.local-54631-1599632751529-4:1:1:1:1",
  "JMSDestinationType":"Topic","JMSDeliveryMode":1,"JMSPriority":0,"JMSCorrelationID":"",
  "RecordStatus":"VALID_RECORD"}
  userdata: null
};
TIBCO_EMS_SSL_sysout: JsonNodeEvent{
  data: {"idx":"0","Test":"Test0"}
  metadata: {"JMSPriority":4,"JMSType":null,"JMSXDeliveryCount":1,"JMSExpiration":0,
  "JMSDestinationName":"newqueue5","JMSRedelivered":false,"JMSTimestamp":1598523499812,
  "JMSCorrelationID":null,"JMSDestinationType":"Queue","JMSDeliveryMode":2}
  userdata: null
};
Using JMSReader with IBM WebSphere MQ

The following summary assumes that you are, or are working with, an experienced WebSphere system administrator. This has been tested on WMQ version 7.5.

WMQ configuration

  1. If an appropriate user account for use by JMSReader does not already exist, create one. Specify this as the value for JMSReader's UserName property.

  2. Add the Striim user to the mqm group.

  3. Create a QueueManager.

  4. Start the QueueManager’s listener.

  5. Create a server connection channel for the QueueManager.

  6. Create a queue.

  7. Copy JMSAdmin.config from <WMQ installation location>/java/lib/ to the JNDI-Directory where you want to create the binding for use by Striim.

  8. Edit this file and set INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory and PROVIDER_URL=<path from root>/JNDI-Directory.

  9. Using MQ Explorer, under JMS administered objects create a new initial context and connection factory as described in chapter 3 of IBM's white paper, "Configuring and running simple JMS P2P and Pub/Sub applications in MQ 7.0, 7.1, 7.5 and 8.0." Specify this connection factory's name (for example, StriimCF) as the value for JMSReader's ConnectionFactoryName property.

  10. Create a JMS queue using the queue, initial context, and connection factory from the steps above. Specify this queue's name (for example, StriimJMSQueue) as the value for JMSReader's QueueName property.

The following steps may also be necessary:

  • Provide access to MCAUSER and configure CHLAUTH rules accordingly.

  • Configure firewall to allow inbound connections if needed.

Striim configuration

  1. Copy the following files from <WMQ installation location>/java/ to Striim/lib and restart Striim:

    • com.ibm.mqjms.jar

    • com.ibm.mq.jmqi.jar

    • com.ibm.mq.headers.jar

    • fscontext.jar

    • providerutil.jar

    • dhbcore.jar

  2. Copy the .bindings file from the JNDI-Directory you created above to a location accessible to Striim (for example, /users/striim/JNDI). This is the location to specify in JMSReader's Provider property. Since this is a file, set JMSReader's Ctx property to com.sun.jndi.fscontext.RefFSContextFactory.

  3. Edit the .bindings file and change all occurrences of localhost with the IP address of server hosting the queue. For example, if the IP address were 198.51.100.1, you would change localhost(1414) to 198.51.100.0(1414).

JMSReader properties

With the above configuration, the JMSReader properties would be:

CREATE SOURCE WMQSource using JMSReader (
  connectionfactoryname:'StriimCF',
  Ctx:'com.sun.jndi.fscontext.RefFSContextFactory',
  Provider:'file:/users/striim/JNDI/',
  Queuename:'StriimJMSQueue',
  username:'striim',
  password:'******'
) ...

JMX Reader

Reads data from Java virtual machines running Java applications that support Java Management Extensions (JMX).

property

type

default value

notes

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Poll Interval

Double

 

how often JMXReader will poll the JVM(s), in seconds 

Service URL

String

a URL specifying how to connect to the JVM(s); you may specify multiple URLs separated by commas

The output type is JSONNodeEvent.

If multiple JVMs are specified in serviceurl and the application is deployed on multiple servers, the JVMs will automatically be distributed among the servers.

If the internal Kafka server is running with JMX enabled (see Configuring Kafka), the following application will write Kafka broker operational metrics to a WActionStore:

CREATE SOURCE JmxSrc USING JmxReader (
  serviceurl:"service:jmx:rmi:///jndi/rmi://localhost:9998/jmxrmi",
  pollinterval:"1"
) 
OUTPUT TO jmxstream;

CREATE TYPE MBeanType (
  time datetime,
  objectname string,
  attributes com.fasterxml.jackson.databind.JsonNode
);
CREATE STREAM MetricStream OF MBeanType;

CREATE CQ getMetric
INSERT INTO MetricStream
SELECT DNOW(),
  data.get("ObjectName").textValue(),
  data.get("Attributes")
FROM jmxstream;

CREATE TYPE MetricType (
  objectname string,
  time datetime,
  metricname string,
  metrictype string,
  metricval string
);
CREATE WACTIONSTORE MetricStore CONTEXT OF MetricType;

CREATE CQ getAttr
INSERT INTO MetricStore
SELECT ms.objectname,
  ms.time, 
  attritr.get(0).textValue(),
  attritr.get(1).textValue(),
  attritr.get(2).textValue()
FROM MetricStream ms, iterator(ms.attributes) attritr;

The query SELECT * FROM MetricStore; will return events similar to:

[
   objectname = kafka.network:type=RequestMetrics,name=LocalTimeMs,request=Offsets
   time = 2018-06-05T09:01:00.087-07:00
   metricname = 50thPercentile
   metrictype = double
   metricval = 0.0
]

Kafka Reader

Reads data from Apache Kafka 0.8, 0.9, 0.10, 0.11, or 2.1.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

Auto Map Partition

Boolean

True

When reading from multiple partitions, if there are multiple servers in the Striim deployment group on which KafkaReader is deployed ON ALL, partitions will be distributed automatically among the servers. Partitions will be rebalanced automatically as Striim servers are added to or removed from the group.

When deploying on a Forwarding Agent, set to False.

Block Size

Integer

10240

for KafkaReader 0.8 only: size of the fetch buffer in KB, which must be greater than or equal to the Kafka broker's message.max.bytes size

Broker Address

String

Kafka Config

String

Optionally, specify Kafka producer properties, separated by semicolons. See Configuring Kafka for more information.

Set value.deserializer=com.striim.avro.deserializer.SingleRecordAvroRecordDeserializer when each message contains one Avro record and it is not length-delimited.

When using KafkaReader 0.9 or later, the default is:

max.partition.fetch.bytes=10485760;
fetch.min.bytes=1048576;
fetch.max.wait.ms=1000;
receive.buffer.bytes=2000000;
poll.timeout.ms=10000

When using KafkaReader 0.8, the default is blank. We recommend setting:

retry.backoff.ms=10000

Kafka Config Property Separator

String

;

Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon.

Kafka Config Value Separator

String

=

Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol.

Partitiion ID List

String

partition numbers to read from, separated by semicolons (for example, 0;1), or leave blank to read from all partitions

Start Offset

Long

-1

With default value of -1, reads from the end of the partition. Change to 0 to read from the beginning of the partition.

If you specify startOffset, leave startTimestamp at its default value.

Start Timestamp

String

for KafkaReader 0.10 and later only:

If not specified, only new transactions (based on current Kafka host system time) are read. Specify a value in the format yyyy-MM-dd hh:mm:ss:SSS (for example, 2017-10-20 13:55:55.000) to start reading from an earlier point.

If the Kafka host and Striim host are not in the same time zone, specify the start time using the Striim host's time zone.

If you specify startTimestamp, leave startOffset at its default value.

Topic

String

Specify the Kafka version for the broker being read using VERSION '0.#.0': for example, to use 2.1, CREATE SOURCE <name> USING KafkaReader VERSION '2.1.0'. Striim's internal Kafka server (see Configuring Kafka) is version 0.11.

The output type is WAevent except when using Avro Parser  or JSONParser.

The following sample application will read the data written to Kafka by the Kafka Writer sample application and write it to striim/KR11Output.00:

CREATE SOURCE KR11Sample USING KafkaReader VERSION '0.11.0'(
  brokerAddress:'localhost:9092',
  topic:'KafkaWriterSample',
  startOffset:'0'
)
PARSE USING DSVParser ()
OUTPUT TO RawKafkaStream;

CREATE TARGET KR11Out USING FileWriter (
  filename:'KR11Output'
)
FORMAT USING DSVFormatter ()
INPUT FROM RawKafkaStream;
Reading a Kafka stream with KafkaReader

For an overview of this feature, see Introducing Kafka streams.

Reading from a Kafka stream can be useful for development tasks such as doing A/B comparisons of variations on a TQL application. If you modified Samples.PosApp.tql to persist PosDataStream to Kafka, the following source would read the persisted data from Kafka.

CREATE SOURCE AccessLogSource USING KafkaReader VERSION 0.11.0(
  brokerAddress:'localhost:9998',
  Topic:'Samples_PosDataStream',
  PartitionIDList:'0',
  startOffset:0
)
PARSE USING StriimParser ()
OUTPUT TO KafkaDSVStream;

For more information, see Kafka Reader.

MapR FS Reader

Reads from a file in the MapR File System. Except for the name of the adapter, it is functionally identical to HDFSReader.  See HDFS Reader for documentation of the properties.

MariaDB

See Database Reader and MySQL / MariaDB.

MongoDB Reader

See MongoDB.

MQTT Reader

Reads messages from an MQTT broker.

See the MQTT FAQ for information on firewall settings.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

Broker URI

String

format is tcp://<broker address>:<broker port>

Client ID

String

MQTT client ID (maximum 23 characters). Must be unique (not used by any other client) in order to identify this instance of MQTTReader. The MQTT broker will use this ID close the connection when MQTTReader goes offline and resend events when it restarts.

Keystore

String

Location of the Java keystore file containing the Striim application’s own certificate and private key. If this is blank and a value is specified for Keystore Type, an empty keystore is created. If Keystore Type is blank, leave blank

Keystore Password

encrypted password

Provide a password if required to unlock the keystore or to check the integrity of the keystore data. Otherwise, leave blank. See Encrypted passwords.

Keystore Type

String

Set to DKS, JKS, JCEKS, PKCS11, or PKCS12 to enable SSL. Otherwise, leave blank.

Password

encrypted password

The password for Username, if specified. See Encrypted passwords.

QoS

Integer

0

0: at most once

1: at least once

2: exactly once

Topic

String

Username

String

If the MQTT broker is using application-level authentication, provide a username. Otherwise leave blank.

The output type is WAevent except when using Avro Parser  or JSONParser.

Sample:

CREATE SOURCE tempsensor 
USING MQTTReader (
  brokerUri:'tcp://m2m.eclipse.org:1883',
  Topic:'/striim/room687/temperature',
  QoS:0,
  clientId:'Striim'
)
PARSE USING JSONParser ( eventType:'') 
OUTPUT TO tempstream;

MS SQL Reader

See Microsoft SQL Server.

MultiFile Reader

Reads files from disk. This reader is similar to File Reader except that it reads from multiple files at once.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

Block Size

Integer

64

amount of data in KB for each read operation

Compression Type

String

Set to gzip when wildcard specifies a file or files in gzip format. Otherwise, leave blank.

Directory

String

Specify the path to the directory containing the file(s). The path may be relative to the Striim installation directory (for example, Samples/PosApp/appdata) or from the root.

Group Pattern

String

a regular expression defining the rollover pattern for each set of files (see Using regular expressions (regex))

Position by EOF

Boolean

True

If set to True, reading starts at the end of the file, so only new data is acquired. If set to False, reading starts at the the beginning of the file and then continues with new data.

Rollover Style

String

Default

Set to log4j if reading Log4J files created using RollingFileAppender.

Skip BOM

Boolean

True

If set to True, when the wildcard value specifies multiple files, Striim will read the Byte Order Mark (BOM) in the first file and skip the BOM in all other files. If set to False, it will read the BOM in every file.

Thread Pool Size

Integer

20

For best performance, set to the maximum number of files that will be read at once.

Wildcard

String

name of the file, or a wildcard pattern to match multiple files (for example, *.xml)

Yield After

Integer

20

the number of events after which a thread will be handed off to the next read process

The output type is WAevent except when using Avro Parser  or JSONParser.

This example would recognize log.proc1.0 and log.proc1.1 as parts of one log and log.proc2.0 and log.proc2.1 as parts of another, ensuring that all the events from each log will be read in the correct order.

CREATE SOURCE MFRtest USING MultiFileReader (
  directory:'Samples',
  WildCard:'log.proc*',
  grouppattern:'(?:(?:(?:<[^>]+>)*[^<.]*)*.){2}'
)

Alternatively, you can use this statement to ensure the events from each log are read in the correct order:

CREATE SOURCE MFRtest USING MultiFileReader (
  directory:'Samples',
  WildCard:'log.proc*',
  grouppattern:'log\\.proc[0-9]{1,3}'
)

MySQL

See Database Reader and MySQL.

OPCUA Reader

Reads data from an OPC-UA server. See the OPC UA Unified Architecture Specification, Part 4: Services for information on the MonitoredItem, Session, and Subscription service sets used by this reader.

If PKCS 12 is required, generate a certificate for Striim and add its public key in DER format to the the OPC-UA server's keystore. See OPC UA Unified Architecture Specification, Part 6: Mappings, Annex E.

See Firewall Settings for information on which firewall ports must be open.

property

type

default value

notes

App URI

String

URI to connect to the OPC-UA server: for example, rn:striim:opcua:sensor:connector

If a PKCS 12 certificate has been provided, make sure that its AppUriName field matches this value.

Client Alias

String

PKCS 12 keystore certificates alias, required when keystore is specified

Keep Alive Count

Long

10

number of publish intervals with no data after which Striim will send an empty notification to let the server know the client is still running

Keystore

String

fully qualified name of PKCS 12 certificate: for example, /path/to/certificate.pfx

Keystore Password

encrypted password

PKCS 12 keystore password, required when keystore is specified

Lifetime Count

Long

20

number of publish intervals with no requests from Striim after which the OPC-UA server will assume that the Striim client is no longer running and remove its subscription; must be higher than the keepAliveCount

Max Notification per Publish

Integer

2

number of events to receive per published notification

Message Security Mode

String

None

supported values (case-sensitive) are None, Sign, SignAndEncrypt: see  OPC UA Unified Architecture Specification, Part 4: Services, section 7.15, and Part 6: Mappings, section 6.1

Node ID List

String

comma-separated list (case sensitive) of variable or object nodes to be monitored: for example, ns=1;s=Sensor/Temperature,ns=2;s=Sensor/Pressure; if an object node is specified, all its variables are returned

OPCUA Endpoint URL

String

endpoint for the OPC-UA server: for example, opc.tcp://localhost:12686/example

Password

encrypted password

the password for the specified username

Publish Interval

Double

2000

how often (in milliseconds) the OPC-UA server checks for requests from Striim

Do not set to -1 or 0. Instead of using a very small publishInterval, consider using a smaller samplingInterval.

Queue Size

Integer

10

see OPC UA Unified Architecture Specification, Part 4: Services, section 5.12.15

Read Timeout

Long

10000

time (in milliseconds) to wait for the server to respond to a connection request before crashing the application

Sampling Interval

Double

2000

how often (in milliseconds) the OPC-UA updates the values for the variables specified in nodeIdList

Security Policy

String

None

supported values (case-sensitive) are Basic128Rsa15, Basic256, Basic256Sha256, and None: see OPC UA Unified Architecture Specification, Part 7: Profiles, sections 6.5.147-150

Severity

Integer

15

see OPC UA Unified Architecture Specification, Part 5: Information Model, section 6.4.2

Username

String

see OPC UA Unified Architecture Specification, Part 4: Services, section 7.36.3

The output format is OPCUADataChangeEvent:

Striim field / type

UA built-in type / field name / data type

notes

dataValue / Object

DataValue / Value / Variant

the new value after the change 

dataIsNull / Boolean

dataIsNotNull / Boolean

dataTypeNodeId / String

NodeId

see OPC UA Unified Architecture Specification, Part 6: Mappings, section 5.2.2.9 and A.3

sourceTime / Long

DataValue / SourceTimestamp / DateTime

the time the change was made in the source device or program

sourcePicoSeconds / Long

DataValue / SourcePicoSeconds / UInt16

see OPC UA Unified Architecture Specification, Part 6: Mappings, section 5.2.2.17

serverTime / Long

DataValue / ServerTimestamp / DateTime

the time the server recorded the change

serverPicoSeconds / Long

DataValue / ServerPicoSeconds / UInt16

see OPC UA Unified Architecture Specification, Part 6: Mappings, section 5.2.2.17

statusCodeValue / Long

DataValue / Status / StatusCode

see https://github.com/OPCFoundation/UA-.NET/blob/master/Stack/Core/Schema/Opc.Ua.StatusCodes.csv for a list of possible values

statusCodeIsGood / Boolean

derived from http://www.opcfoundation.org/UA/schemas/1.02/StatusCode.csv

statusCodeIsBad / Boolean

derived from http://www.opcfoundation.org/UA/schemas/1.02/StatusCode.csv

statusCodeIsUncertain / Boolean

derived from http://www.opcfoundation.org/UA/schemas/1.02/StatusCode.csv

statusCodehasOverflowSet / Boolean

see OPC UA Unified Architecture Specification, Part 4: Services, section 5.12.1.5

metadata / java.util.Map

see example in the sample event below

Sample OPCUADataChangeEvent:

{
  dataValue: 3.14
  dataIsNull: false
  dataIsNotNull: true
  dataTypeNodeId: "ns=0;i=11"
  sourceTime: 131517670918430000
  sourcePicoSeconds: null
  serverTime: 131517670918430000
  serverPicoSeconds: null
  statusCodeValue: 0
  statusCodeIsGood: true
  statusCodeIsBad: false
  statusCodeIsUncertain: false
  statusCodehasOverflowSet: false
  metadata: 
{
    "Description": {
        "locale": null,
        "text": null
    },
    "monitoringMode": "Reporting",
    "requestedQueueSize": 10,
    "readValueId": {
        "typeId": {
            "namespaceIndex": 0,
            "identifier": 626,
            "type": "Numeric",
            "null": false,
            "notNull": true
        },
        "nodeId": {
            "namespaceIndex": 2,
            "identifier": "HelloWorld/ScalarTypes/Double",
            "type": "String",
            "null": false,
            "notNull": true
        },
        "binaryEncodingId": {
            "namespaceIndex": 0,
            "identifier": 628,
            "type": "Numeric",
            "null": false,
            "notNull": true
        },
        "xmlEncodingId": {
            "namespaceIndex": 0,
            "identifier": 627,
            "type": "Numeric",
            "null": false,
            "notNull": true
        },
        "attributeId": 13,
        "indexRange": null,
        "dataEncoding": {
            "namespaceIndex": 0,
            "name": null,
            "null": true,
            "notNull": false
        }
    },
    "ValueRank": -1,
    "requestedSamplingInterval": 2000.0,
    "revisedSamplingInterval": 2000.0,
    "filterResult": {
        "bodyType": "ByteString",
        "encoded": null,
        "encodingTypeId": {
            "namespaceIndex": 0,
            "identifier": 0,
            "type": "Numeric",
            "null": true,
            "notNull": false
        }
    },
    "BrowseName": {
        "namespaceIndex": 2,
        "name": "Double",
        "null": false,
        "notNull": true
    },
    "ArrayDimensions": "-1",
    "NodeId": {
        "namespaceIndex": 2,
        "identifier": "HelloWorld/ScalarTypes/Double",
        "type": "String",
        "null": false,
        "notNull": true
    },
    "DataType": "Double",
    "clientHandle": 11,
    "monitoredItemId": 11,
    "revisedQueueSize": 10
}
}

The following sample application writes OPC-UA data to an HBase table. The table contains the most recently reported value for each node. The application is divided into two flows so that the source and CQ can run in a Forwarding Agent on the OPC-UA server.

CREATE APPLICATION OPCUAapp;

CREATE FLOW OPCUAFlow;

CREATE SOURCE OPCUASource USING OPCUAReader (
  OPCUAEndpointURL:'opc.tcp://mfactorengineering.com:4840',
  nodeIdList:'ns=1;s=EVR2.state.Empty_Box_Timer'
)
OUTPUT TO NotificationStream;

CREATE TYPE OPCUADataChange (
  data java.lang.Object,
  dataIsNull java.lang.Boolean,
  dataIsNotNull java.lang.Boolean,
  dataTypeNodeId java.lang.Object,
  sourceTime java.lang.Long,
  serverTime java.lang.Long,
  sourcePicoSeconds java.lang.Long,
  serverPicoSeconds java.lang.Long,
  statusCodeValue java.lang.Long,
  statusCodeGood java.lang.Boolean,
  statusCodeBad java.lang.Boolean,
  statusCodeUncertain java.lang.Boolean,
  statusCodeHasOverflowSet java.lang.Boolean,
  dataType java.lang.String,
  valueRank java.lang.Integer,
  arrayDimensions java.lang.Object,
  BrowseName java.lang.Object,
  Description java.lang.Object,
  monitoringMode java.lang.String,
  nodeIdNS java.lang.Integer,
  nodeIdIdentifier java.lang.Object,
  displayName java.lang.String  KEY
);
CREATE STREAM OPCUAStream OF OPCUADataChange;

CREATE CQ OPCUADataCQ
INSERT INTO OPCUAStream
SELECT dataValue,
  dataIsNull,
  dataIsNotNull,
  dataTypeNodeId,
  sourceTime,
  serverTime,
  sourcePicoSeconds,
  serverPicoSeconds,
  statusCodeValue,
  statusCodeIsGood,
  statusCodeIsBad,
  statusCodeIsUncertain,
  statusCodehasOverflowSet,
  META(x,"DataType"),
  META(x,"ValueRank"),
  TO_JSON_NODE(META(x,"ArrayDimensions")),
  TO_JSON_NODE(META(x,"BrowseName")),
  META(x,"Description"),
  META(x,"monitoringMode"),
  TO_JSON_NODE(META(x,"NodeId")).get("namespaceIndex").asInt(),
  TO_JSON_NODE(META(x,"NodeId")).get("identifier"),
  TO_JSON_NODE(META(x,"displayName")).get("text").asText()
FROM NotificationStream x;

END FLOW OPCUAFlow;

CREATE FLOW HBASEFlow;
CREATE TARGET HBaseTarget USING HBaseWriter (
  HBaseConfigurationPath:"/path/to/hbase/conf/hbase-site.xml",
  Tables: "OPCUAEvents.data",
  PKUpdateHandlingMode: "DELETEANDINSERT"
)
INPUT FROM OPCUAStream;
END FLOW HBASEFlow;

END APPLICATION OPCUAapp;

Oracle Database

See Database Reader and Oracle Database.

PostgreSQL

See Database Reader and PostgreSQL.

Salesforce Reader

Reads Salesforce sObject data. See also Salesforce Platform Event Reader and Salesforce Push Topic Reader.

See Getting Started with the Force.com REST API for information on which firewall ports must be open.

property

type

default value

notes

API End Point

String

endpoint for your Force.com REST API

Auth Token

encrypted password

If autoAuthTokenRenewal is set to false , specify your Salesforce access token (see Set Up Authorization on developer.salesforce.com: the first section, "Setting Up OAuth 2.0," explains how to create a "connected app"; the second section, "Session ID Authorization," explains how to get the token using curl).

If autoAuthTokenRenewal is set to true, leave blank.

Auto Auth Token Renewal

String

false

Set to true if you want the access token to be renewed automatically so the application can run continuously. 

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

When autoAuthTokenRenewal is false, this setting has no effect.

Consumer Key

String

If autoAuthTokenRenewal is set to true, specify the Consumer Key (see Set Up Authorization on developer.salesforce.com).

Consumer Secret

encrypted password

If autoAuthTokenRenewal is set to true, specify the Consumer Secret (see Set Up Authorization on developer.salesforce.com).

Mode

String

InitialLoad

With the default setting, will load all existing data using force-rest-api 0.28 and stop.

Set to BulkLoad to load all existing data using the Salesforce Bulk API and stop.

Set to Incremental to read new data continuously using force-rest-api 0.28.

Password

encrypted password

If autoAuthTokenRenewal is set to true, specify the password for the specified username.

Polling Interval

String

5 min

how often the data will be read, in minutes: Salesforce recommends the default value, the maximum value is 120

Security Token

String

If autoAuthTokenRenewal is set to true, specify the security token for the specified username (see Reset Your Security Token on help.salesforce.com).

sObject

String

the object to read, for example, Customer or PO (the account associated with the authToken must have View All Data permission for the object)

Start Timestamp

String

By default, Salesforce Reader reads only new events. Optionally, specify the time (based on LastModifiedDate) from which to start reading older events in the format YYYY-MM-DDTHH:MM:SS. If the Salesforce organization's time zone is not the same as Striim's, convert the Salesforce start time to UTC (GMT+00:00) and include a Z at the end of the string. See SimpleDateFormat for more information.

Username

String

If autoAuthTokenRenewal is set to true, specify an appropriate username (see Add a Single User on help.salesforce.com).

The output type is WAEvent.

Use the following cURL commands (see Using cURL in the REST Examples and curl.haxx.se) to verify your configuration and get information about available resources and sObjects.

  1. Get an access token using the Salesforce login URL.

    curl https://login.salesforce.com/services/oauth2/token -d "grant_type=password" \
    -d "client_id=<your consumer key>" -d "client_secret=<your consumer secret>" \
    -d "username=<your username>" -d "password=<your password>"
    
  2. Using the access token returned by that command, test the REST API URL for your organization. The instance is typically the first part of the URL you see in your browser when logged into Salesforce, such as "mycompany" in mycompany.salesforce.com. Alternatively, ask your Salesforce technical administrator for access to a connected app. (For more information, see Understanding the Username-Password OAuth Authentication Flow.)

    If you do not have a proxy server:

    curl https://<your Salesforce instance>.salesforce.com/services/data/ \
    -H 'Authorization: Bearer <token>'

    If you have a proxy server (change the proxy server URL to match yours):

    curl -x http://mycompany.proxy.server.com:8080/ \
    https://<your Salesforce instance >.salesforce.com/services/data/ \
    -H 'Authorization: Bearer <token>'
  3. List available REST resources and sObjects (see List Available REST Resources and Get a List of Objects).

    curl https://<your Salesforce instance>.salesforce.com/services/data/v41.0 \
    -H 'Authorization: Bearer <token>'
    curl https://<your Salesforce instance>.salesforce.com/services/data/v41.0/sobjects \
    -H 'Authorization: Bearer <token>'
    

For additional information, see Salesforce's REST API Developer Guide .

The following TQL will read from the Business__c sObject and create an appropriate typed stream:

CREATE SOURCE SFPoller USING SalesforceReader ( 
  sObject: 'Business__c',
  authToken: '********',
  mode: 'InitialLoad'
) 
OUTPUT TO DataStream;

CREATE TYPE OpStream_Type  (
  Id String KEY,
  Name String, 
  POSDataCode__c String, 
  Currency__c String 
);
CREATE OR REPLACE STREAM OpStream OF OpStream_Type;

CREATE OR REPLACE CQ CQ1 
INSERT INTO OpStream
SELECT data[0],data[1],data[2],data[3]
FROM DataStream;
Salesforce data type support and correspondence

Salesforce data type

Striim type

base64

java.lang.Object

boolean

java.lang.String

byte

java.lang.Byte

date

org.joda.time.LocalDate

dateTime

org.joda.time.DateTime

double

java.lang.Double

int

java.lang.Long

string

java.lang.String

time

java.lang.String

sObject field

Striim type

address

java.lang.String (see discussion below)

anyType

java.lang.String

calculated

java.lang.String

combobox

java.lang.String

currency

java.lang.Double

DataCategoryGroupReference

java.lang.String

email

java.lang.String

encryptedstring

java.lang.String

ID

java.lang.String

JunctionIdList

java.lang.String

location

Java.lang.String (see discussion below)

masterrecord

java.lang.String

multipicklist

java.lang.String

percent

java.lang.Double

phone

java.lang.String

picklist

java.lang.String

reference

java.lang.String

textarea

java.lang.String

url

java.lang.String

The address and location fields are compound types. The lava.lang.String data field contains a JSON representation of the sObject values and the WAEvent metadata map. The following is the WAEvent for a location:

data: ["a067F00000B52obQAB","a067F00000B52ob","{latitude=1.0, longitude=1.0}"]
metadata: {"LastModifiedDate":"2018-09-11T05:45:43.000+0000","IsDeleted":false,
  "CustomObject":true,"OperationName":"INSERT","SystemModstamp":"2018-09-11T05:45:43.000+0000",
  "TableName":"compoundobject__c","OwnerId":"0057F000001oImoQAE","CreatedById":"0057F000001oImoQAE",
  "location__Latitude__s":1.0,"CreatedDate":"2018-09-11T05:45:43.000+0000",
  "location__Longitude__s":1.0,"attributes":{"type":"compoundobject__c",
  "url":"/services/data/v34.0/sobjects/compoundobject__c/a067F00000B52obQAB"},
  "LastModifiedById":"0057F000001oImoQAE"}
Replicating Salesforce data to Oracle

The following TQL will load all data from the Salesforce Business_c object to the Oracle table BUSINESS_ORACLE_P, then stop writing (in other words, new data will be ignored). Replace ******** with your Salesforce access token (see Understanding the Web Server OAuth Authentication Flow on developer.salesforce.com).

CREATE SOURCE SalesforceCloud USING SalesForceReader ( 
  sObject: 'Business__c',
  authToken: '********',
  mode: 'BulkLoad'
 ) 
OUTPUT TO DataStream;

CREATE TARGET OracleOnPremise USING DatabaseWriter ( 
  DriverName:'oracle.jdbc.OracleDriver',
  ConnectionURL:'jdbc:oracle:thin:@192.168.123.18:1521/XE',
  Username:'system',
  Password:'manager',
  Tables: 'Business__c,SYSTEM.BUSINESS_ORACLE_P',
  CommitTransaction: 'true'
 ) 
INPUT FROM DataStream;

The following TQL will replicate new data continuously:

CREATE SOURCE SalesforceCloud USING SalesForceReader ( 
  sObject: 'Business__c',
  authToken: '********',
  pollingInterval: '5 min',
  apiEndPoint: 'https://ap2.salesforce.com',
  mode: 'Incremental'
 ) 
OUTPUT TO DataStream;

CREATE TARGET OracleOnPremise USING DatabaseWriter ( 
  DriverName:'oracle.jdbc.OracleDriver',
  ConnectionURL:'jdbc:oracle:thin:@192.168.123.18:1521/XE',
  Username:'system',
  Password:'manager',
  Tables: 'Business__c,SYSTEM.BUSINESS_ORACLE_P',
  CommitTransaction: 'true'
 ) 
INPUT FROM DataStream;
Salesforce Platform Event Reader

Reads Salesforce platform events (user-defined notifications) using a subscription model (see Delivering Custom Notifications with Platform Events).

This adapter is based on Salesforce Reader. The output type and data type support are the same. The properties are the same, except for the following:

  • Mode and Polling Interval: Since platform events use a subscription model, these properties are omitted. Platform events are received as they are published by Salesforce.

  • Event Name: Specify the name of the platform event to subscribe to. The account associated with the authToken must have View All Data permission for the platform event. Any fields added to the platform event while the reader is running will be ignored.

  • Replay From (default value Earliest): With the default value, will read all platform events currently in the retention window. To read only new events, set to Tip. To start reading from a specific point in the retention window, set to a valid ReplayID value (see Platform Event Fields.

  • sObject: Omitted since this adapter reads platform events, not sObjects.

When the output of this adapter is the input for DatabaseWriter and other table-based targets, events are insert-only. (Since platform events are one-time notifications, it does not make sense to update or delete them.)

Recovery (see Recovering applications) is supported provided that Striim is restarted within 24 hours, the length of time Salesforce holds events in the retention window.

Salesforce Push Topic Reader

Reads Salesforce sObject data using a PushTopic subscription (see Salesforce's Streaming API Developer Guide).

This gives you the latest data sooner than SalesforceReader. Note, however, that Salesforce restricts use of PushTopics in various ways, including limiting how many events you can read in a day and how many PushTopics you can create. See the PushTopic section of Streaming API Allocations for details.

This adapter is based on Salesforce Reader. The output type and data type support are the same. The properties are the same, except for the following:

  • Mode and Polling Interval: Since this uses a subscription model, these properties are omitted. sObject data is received as it is published by Salesforce.

  • PushTopic: The PushTopic to subscribe to. You must also specify the sObject to be read. (To read multiple sObjects from a PushTopic, create a source for each.) The account associated with the authToken must have View All Data permissions for both the PushTopic and the sObject. 

  • Replay From (default value Earliest): With the default value, will read all events currently in the PushTopic retention window. To read only new events, set to Tip. To start reading from a specific point in the retention window, set to a valid ReplayID value (see Message Durability).

Recovery (see Recovering applications) is supported provided that Striim is restarted within 24 hours, the length of time Salesforce holds events in the retention window.

Spanner Batch Reader

SpannerBatchReader is identical to Incremental Batch Reader, minus the Username and Password properties, plus the Augment Query Clause property.

Specify the connection URL as jdbc:cloudspanner:/projects/<project ID>/instances/<instance ID>/databases/<database name>?credentials=<service account key>. See Spanner Writer for a detailed description of the service account key.

Optionally, use the Augment Query Clause property to specify FORCE_INDEX directives (see Table hints) using the syntax:

{"<table name>": {"tablehints": {"FORCE_INDEX": "<index name>"}}

You may specify multiple indexes (no more than one per table) separated by commas, for example:

{"order": {"tablehints": {"FORCE_INDEX": "order_amount_index"}},
{"customer": {"tablehints": {"FORCE_INDEX": "customer_ID_index"}}

SQL Server

See Database Reader and SQL Server.

S3 Reader

Reads from Amazon S3.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

accesskeyid

String

Specify an AWS access key ID (created on the AWS Security Credentials page) for a user with read permissions (ListBucket, GetObject) on the bucket.

When Striim is running in Amazon EC2 and there is an IAM role with those permissions associated with the VM, leave accesskeyid and secretaccesskey blank to use the IAM role.

blocksize

Integer

64

amount of data in KB for each read operation

bucketname

String

S3 bucket to read from

clientconfiguration

String

Optionally, specify one or more of the following property-value pairs, separated by commas.

If you access S3 through a proxy server, specify it here using the syntax ProxyHost=<IP address>,ProxyPort=<port number>,ProxyUserName=<user name>,ProxyPassword=<password>. Omit the user name and password if not required by your proxy server.

Specify any of the following to override Amazon's defaults:

  • ConnectionTimeout=<timeout in milliseconds>: how long to wait to establish the HTTP connection, default is 50000

  • MaxErrorRetry=<number of retries>: the number of times to retry failed requests (for example, 5xx errors), default is 3

  • SocketErrorSizeHints=<size in bytes>: TCP buffer size, default is 2000000

See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/section-client-configuration.html for more information about these settings.

compressiontype

String

Set to gzip when the files to be read are in gzip format. Otherwise, leave blank.

foldername

String

Specify a folder within the bucket, or leave blank to read from the root.

objectnameprefix

String

The start of the names of the files to be read. For example, myfile will read myfile*.*. Specify * to read all files.

secretaccesskey

encrypted password

Specify the AWS secret access key for the specified access key.

The output type is WAevent except when using Avro Parser  or JSONParser.

Example:

CREATE SOURCE S3Source USING S3Reader (
  bucketname:'MyBucket',
  objectnameprefix:'posdata',
  accesskeyid:'********************',
  secretaccesskey:'****************************************',
  foldername:'MyFolder'
)
PARSE USING DSVParser ()
OUTPUT TO S3SourceStream;

TCP Reader

Reads data via TCP.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

Block Size

Integer

64

amount of data in KB for each read operation

Compression Type

String

Set to gzip when the input is in gzip format. Otherwise, leave blank.

IP Address

String

localhost

If the server has more than one IP address, specify the one to use.

Max Concurrent Clients

Integer

5

Port No

Integer

10000

port number where the TCPReader will listen

The output type is WAevent except when using JSONParser.

Sample code using the DSV parser:

create source CSVSource using TCPReader (
    IpAddress:'10.1.10.55',
    PortNo:'3546'
)
parse using DSVParser (
    header:'yes'
)

Teradata

See Database Reader.

UDP Reader

Reads data via UDP.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

Block Size

Integer

64

amount of data in KB for each read operation

Compression Type

String

Set to gzip when the input is in gzip format. Otherwise, leave blank.

IP Address

String

localhost

the IP address of the Striim server that will receive the UDP data (do not use localhost unless testing with a source running on the same system as Striim)

Port No

Integer

10000

port number where the UDPReader will listen

The output type is WAevent, except when using CollectdParser or JSONParser.

Sample code using the DSV parser:

CREATE SOURCE CSVSource USING UDPReader (
  IpAddress:'192.0.2.0',
  PortNo:'3546'
)
PARSE USING DSVParser (
  header:'yes'
)

Windows Event Log Reader

Use with the Forwarding Agent (Using the Striim Forwarding Agent) to read Windows event logs.

property

type

default value

notes

EOF Delay

Integer

1000

milliseconds to wait after reaching the end of a file before starting the next read operation

Event Source Name

String

Security

  • the log to read

  • other supported values are  Application and System

  • you may also specify any custom log name 

Include Event ID List

String

*

specify a comma-separated list of eventIDs to output only those events, or use default value to output all events

Start Event Record Number

Integer

-1

  • the recordNumber from which to begin reading the log

  • with the default value, reads new events only 

  • set to 1 to read the entire log

This adapter uses Microsoft's OpenEventLog function so returns only data provided by that function. In some cases this may not include all the fields displayed in the Event Log UI.

The following example reads only Security log events with EventID 4625 (logon failures):

CREATE SOURCE WindowsLogSource USING WindowsEventLogReader(
   includeEventIDList:'4625'
)
OUTPUT TO SecurityLogStream;

The data type for the output is WindowsLogEvent, which contains a single single field, data, an array containing the events' fields. The first nine fields are always the same and are selected using data.<field name> (as shown in the example below):

field name

type

sample value

sourceName

string

Microsoft-Windows-Security-Auditing

computerName

string

wsrv2012-00

userSid

string

recordNumber

long

1138

timeGenerated

DateTime

1400798337

timeWritten

DateTime

1400798337

eventID

long

4625

eventType

long

16

eventCategory

long

12544

The remaining fields are selected using data.stringPayload[#] (as shown in the example below). How many fields there are and what they contain vary depending on the EventID. For example, for Windows 2012 Security Log EventID 4625:

#

field name

sample value

0

SubjectUserSid

S-1-5-18

1

SubjectUserName

WSRV2012-00$

2

SubjectDomainName

WORKGROUP

3

SubjectLogonId

0x3e7

4

TargetUserSid

S-1-0-0

5

TargetUserName

Administrator

6

TargetDomainName

WSRV2012-00

7

Status

0xc000006d

8

FailureReason

%%2313

9

SubStatus

0xc000006a

10

LogonType

7

11

LogonProcessName

User32

12

AuthenticationPackageName

Negotiate

13

WorkstationName

WSRV2012-00

14

TransmittedServices

15

LmPackageName

16

KeyLength

0

17

ProcessId

0x738

18

ProcessName

C:\Windows\System32\winlogon.exe

19

IpAddress

10.1.10.180

20

IpPort

0

The following example creates a stream FailedLoginStream containing all the fields for Windows 2012 Security Log events with EventID 4625 ("an account failed to log on"). See Using the Striim Forwarding Agent for an explanation of the DEPLOY statement.

CREATE APPLICATION EventId4625;

CREATE FLOW agentFlow;

CREATE SOURCE WindowsEventLogReaderSource USING WindowsEventLogReader ( 
  includeEventIDList: '4625',
  eventSourceName: 'Security'
 ) 
OUTPUT TO rawLog;

END FLOW agentFlow;

CREATE FLOW serverFlow;

CREATE TYPE WindowsSecurityLogType(	
  sourceName String,
  computerName String,
  userSid String,
  recordNumber long,
  timeGenerated DateTime,
  timeWritten DateTime,
  eventID long,
  eventType long,
  eventCategory long,
  SubjectUserSid String,
  SubjectUserName String,
  SubjectDomainName String,
  SubjectLogonId String,
  TargetUserSid String,
  TargetUserName String,
  TargetDomainName String,
  Status String,
  FailureReason String,
  SubStatus String,
  LogonType String,
  LogonProcessName String,
  AuthenticationPackageName String,
  WorkstationName String,
  TransmittedServices String,
  LmPackageName String,
  KeyLength String,
  ProcessId String,
  ProcessName String,
  IpAddress String,
  IpPort String
);
CREATE STREAM FailedLogonStream OF WindowsSecurityLogType;

CREATE CQ MappingCQ 
INSERT INTO FailedLogonStream
SELECT 
  data.sourceName,
  data.computerName,
  data.userSid,
  data.recordNumber,
  data.timeGenerated,
  data.timeWritten,
  data.eventID,
  data.eventType,
  data.eventCategory,
  data.stringPayload[0],
  data.stringPayload[1],
  data.stringPayload[2],
  data.stringPayload[3],
  data.stringPayload[4],
  data.stringPayload[5],
  data.stringPayload[6],
  data.stringPayload[7],
  data.stringPayload[8],
  data.stringPayload[9],
  data.stringPayload[10],
  data.stringPayload[11],
  data.stringPayload[12],
  data.stringPayload[13],
  data.stringPayload[14],
  data.stringPayload[15],
  data.stringPayload[16],
  data.stringPayload[17],
  data.stringPayload[18],
  data.stringPayload[19],  
  data.stringPayload[20]  
FROM rawLog;

CREATE TARGET winlogLout USING SysOut ( 
  name:winlog
 ) 
INPUT FROM FailedLogonStream;

END FLOW serverFlow;

END APPLICATION EventId4625;

DEPLOY APPLICATION EventId4625 with agentFlow in agent, serverFlow in default;

See Handling variable-length events with CQs for an example of handling multiple EventIDs.

Reading from other sources

When there is no compatible reader for your data source, consider Using Apache Flume or Using the Java event publishing API.

3.10.3
Was this article helpful?
0 out of 0 found this helpful

Comments