Striim

Welcome to the Striim Help Center and Community Site

3.10.1 DDL and component reference

Follow

This section covers the property editors in the Flow Designer and the corresponding syntax and usage of TQL data definition (DDL) statements.

ALTER and RECOMPILE

Use these commands to modify applications that are loaded but not deployed or running (in other words, applications with status Stopped or Quiesced). Using ALTER instead of dropping and reloading retains persisted WActionStore and Kafka stream data. This could be useful, for example, when there is a DDL change to a source database table.

Note that some changes to the application can be made safely but others may be incompatible with persisted data, causing errors. For example:

should be compatible with persisted data

may be incompatible with persisted data

add components or flows

remove components or flows

add a field to a type

remove a field from a type

change a type from:

  • byte to integer, short, or long

  • integer to short or long

  • short to long

  • float to double

change a type from:

  • long to short, integer, or byte

  • short to integer or byte

  • integer to byte

  • double to float

Using ALTER when recovery is enabled (STOP vs. QUIESCE)

When you stop, alter, and restart an application with recovery enabled, it will resume operation with no missing or duplicate events ("exactly-once processing," also known as E1P), except as noted in Recovering applications and with the following possible excerptions:

should not interfere with exactly-once processing

may result in duplicate or missing events

  • adding components to an unbranched data flow (that is, to a series of components in which the output of each component is the input of only one other component)

  • removing components from an unbranched data flow

  • adding a branch to a data flow

  • removing a branch from a data flow

  • simple modifications to a CQ

  • modifying sources

  • changing a window's KEEP clause

  • changing a CQ's GROUP BY clause

  • changing the number of fields in a CQ's FROM clause

  • changing the size of a CQ's MATCH_PATTERN selection

  • changing the value of a CQ's LIMIT clause

  • changing a KafkaWriter's mode from sync to async

When you quiesce an application with recovery enabled, altering it in any way other than changing its recoverable sources will not interfere with exactly-once processing. However, there may be anomalous results (see the discussion of QUIESCE in Console commands for details).

Example

The workflow for ALTER and RECOMPILE is:

  1. If the application is running, STOP or QUIESCE it (see Console commands).

  2. Undeploy the application.

  3. Alter the application as described below.

  4. Recompile, deploy, and start the application.

To begin altering an application, use:

USE <application's namespace>;
ALTER APPLICATION <application name>;

At this point, enter CREATE, DROP, or CREATE OR REPLACE statements to modify the application, then complete the alteration with:

ALTER APPLICATION <application name> RECOMPILE;

For example, to add the email subscription described in Sending alerts from applications to the PosApp sample application:

USE Samples;
ALTER APPLICATION PosApp;
CREATE SUBSCRIPTION PosAppEmailAlert
USING EmailAdapter (
  SMTPUSER:'sender@example.com',
  SMTPPASSWORD:'password', 
  smtpurl:'smtp.gmail.com',
  starttls_enable:'true',
  subject:"test subject",
  emailList:"recipient@example.com,recipient2.example.com",
  senderEmail:"alertsender@example.com" 
)
INPUT FROM AlertStream;
ALTER APPLICATION PosApp RECOMPILE;

At this point you may deploy and start the modified application. If recovery was enabled for the application when it was loaded, when it is restarted, it will pick up source data (subject to the usual limitations detailed in Recovering applications) back to the time it went offline.

Keep in mind that a change made to one component may require changes to multiple downstream components and their types. For example, to add the event_url to Running the MeetupMap demo app, you would need edit MeetupMap.tql and modify both the MeetupJSONType and the ParseJSON CQ:

CREATE TYPE MeetupJSONType (
  venue_id integer KEY,
  group_name string,
  event_name string,
  event_url string,
  time DateTime,
  venue_name string,
  group_city string,
  group_country string,
  lat double,
  lon double
);
CREATE STREAM ParsedJSONStream OF MeetupJSONType;

CREATE CQ ParseJSON
INSERT INTO ParsedJSONStream
SELECT
  CASE
    WHEN data.has("venue") and data.get("venue").has("venue_id") THEN data.get("venue").get("venue_id").intValue()
    ELSE 0
  END,
  CASE
    WHEN data.has("group") and data.get("group").has("group_name") THEN data.get("group").get("group_name").textValue()
    ELSE "NA"
  END,
  CASE
    WHEN data.has("event") and data.get("event").has("event_name") THEN data.get("event").get("event_name").textValue()
    ELSE "NA"
  END,
  CASE
    WHEN data.has("event") and data.get("event").has("event_url") THEN data.get("event").get("event_url").textValue()
    ELSE "NA"
  END, ...

Then you would also need to edit MeetupMapDash and change the map settings to add event_url to the custom fields that define the pop-up:

alterMeetup.png

CREATE APPLICATION ... END APPLICATION

CREATE APPLICATION <application name>
[ WITH ENCRYPTION ]
[ RECOVERY <##> SECOND INTERVAL ]
[ EXCEPTIONHANDLER () ]
[ USE EXCEPTIONSTORE [ TTL <interval> ]; 

END APPLICATION <application name>;

CREATE APPLICATION application_name; creates an application in the current namespace. All subsequent CREATE statements until the END APPLICATION statement create components in that application.

The following illustrates typical usage in an application (see Writing a simple TQL application):

CREATE APPLICATION simple;

CREATE source SimpleSource USING FileReader (
  directory:'Samples',
  wildcard:'simple.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO RawDataStream;

CREATE TARGET SimpleOutput
USING SysOut(name:simple)
INPUT FROM RawDataStream;

END APPLICATION simple;

For a more complete example, see the PosApp sample application.

See Recovering applications for discussion of the RECOVERY option.

When the WITH ENCRYPTION option is specified, Striim will encrypt all streams that move data between Striim servers, or from a Forwarding Agent or HP NonStop source to a Striim server, to make them less vulnerable to network sniffers. Common use cases for this option are when a source resides outside the Striim cluster or outside your private network. This option may also be specified at the flow level, which may be useful to avoid the performance impact of encryption on streams not carrying sensitive data. If you are using Oracle JDK 8 or OpenJDK 8 version 1.8.0_161 or later, encryption will be AES-256. With earlier versions, encryption will be AES-128. In this release, encryption between Striim and HP NonStop sources is always AES-128.

See Handling exceptions for discussion of the EXCEPTIONHANDLER option.

See CREATE EXCEPTIONSTORE for discussion of the USE EXCEPTIONSTORE option.

CREATE CACHE

Note

A cache is loaded into memory when it is deployed, so deployment of an application or flow with a large cache may take some time. If your cache is too large to fit in memory, see CREATE EXTERNAL CACHE.

CREATE CACHE <name>
USING <reader name> { <properties> } 
[ PARSE USING <parser name> ( <properties> ) ]
QUERY ( keytomap:'<key field for type name>'
  [, refreshinterval:'<microseconds>' ]
  [, refreshstarttime:'<hh:mm:ss>' ]
  [, replicas:{<integer>|all} ]
  [, skipinvalid: 'true' )
OF <type name>;

The required and optional properties vary according to the reader selected (typically FileReader or DatabaseReader). See the Adapters reference topics for the reader (and parser, if any) you are using. If a required property is not specified, the cache will use its default value. Required properties without default values must be specified.

  • The keytomap value (in the example below, zip) is the name of field that will be used to index the cache and, in multi-server environments, to distribute it. For best performance, make the keytomap field the one used to join the cache data with stream data. Joins on other fields will be much slower.

  • The refreshinterval and refreshstarttime values specify when the cache is updated. See the discussion of these properties after the sample code below.

  • See Adapting TQL applications for multi-server deployment for discussion of the replicas property. This setting has no effect when running on a single server, so is normally of no concern during initial development of an application.

  • When skipinvalid has its default value of false, if the data in a cache does not match the defined format (for example, if it has fewer fields that are in the type, or the column delimiter specified in the PARSE USING clause is a comma but the data file is tab-delimited), deployment will fail with an error similar to:

    Deploy failed! Error invoking method CreateDeployFlowStatement: 
    java.lang.RuntimeException: com.webaction.exception.Warning: 
    java.util.concurrent.ExecutionException: 
    com.webaction.errorhandling.StriimRuntimeException: 
    Error in: Cache , error is: STRM-CACHE-1011 : 
    The size of this record is invalid for 
    {"class":"com.webaction.runtime.QueryValidator","method":"CreateDeployFlowStatement",
    "params":["01e6d1e9-3e67-bd31-adda-685b3587069e","APPLICATION",
    {"strategy":"any","flow":"dev9003","group":"default"},[]],"callbackIndex":5}

    To skip invalid records, set skipinvalid to true.

  • The OF type (in the example below, ZipCacheType) must correctly describe the data source.

The following illustrates typical usage in an application (see Writing a simple TQL application). In this example, the key field for ZipCache is zip, which is used in the join with FilteredDataStream:

CREATE TYPE ZipCacheType(
  zip String KEY,
  city String,
  state String,
  latVal double,
  longVal double
);
 
CREATE CACHE ZipCache
USING FileReader (
  directory: 'Samples',
  wildcard: 'zipdata.txt')
PARSE USING DSVParser (
  header: Yes,
  columndelimiter: '\t',
  trimquote:false
) QUERY (keytomap:'zip') OF ZipCacheType;
 
CREATE TYPE JoinedDataType(
  merchantId String KEY,
  zip String,
  city String,
  state String,
  latVal double,
  longVal double
);
CREATE STREAM JoinedDataStream OF JoinedDataType;
 
CREATE CQ JoinDataCQ
INSERT INTO JoinedDataStream
SELECT  f.merchantId,
  f.zip,
  z.city,
  z.state,
  z.latVal,
  z.longVal
FROM FilteredDataStream f, ZipCache z
WHERE f.zip = z.zip;

Using the sample code above, the cache data will be updated only when the cache is started or restarted. To refresh the cache at a set interval, add the refreshinterval option:

... QUERY (keytomap:'zip', refreshinterval:'360000000') OF ZipCacheType;

With the above setting, the cache will be refreshed hourly. To refresh the cache at a specific time, add the refreshstarttime option:

... QUERY (keytomap:'zip', refreshstarttime:'13:00:00') OF ZipCacheType;

With the above setting, the cache will be refreshed daily at 1:00 pm. You may combine the two options:

... QUERY (keytomap:'zip', refreshinterval:'360000000', refreshstarttime:'13:00:00') OF ZipCacheType;

With the above setting, the cache will be refreshed daily at 1:00 pm, and then hourly after that. This will ensure that the cache is refreshed at a specific time rather than relative to when it was started.

To see when a cache was last updated, use the console command MON <namespace>.<cache name>.

See Database Reader for examples of caches populated by querying databases.

CREATE CQ (query)

CREATE CQ name  
INSERT INTO {
  <output stream name>  | 
  <WActionStore name> [ ( <field name>, ... ) ] 
}
SELECT [DISTINCT] { { <expression or field name> [ AS <output stream field name> ], ... } 
[ ISTREAM ]
FROM {
  <input stream name> | 
  <cache name> |
  <window name> | 
  <WActionStore name>, ... |
  ITERATOR ( <nested collection name>.<member name> ) 
  }
[ { INNER | CROSS | LEFT | LEFT OUTER | RIGHT | RIGHT OUTER | FULL | FULL OUTER } JOIN ]
[ [ JUMPING WITHIN <integer> { SECOND | MINUTE | HOUR | DAY } ] ]
[ ON { <expression> } ]
[ WHERE { <expression> } ]
[ GROUP BY { field name } ]
[ HAVING { <expression> } ]
[ ORDER BY { <expression> } [ ASC | DESC ] ]
[ LIMIT { <expression> } ]
[ SAMPLE BY <field names>,... [SELECTIVITY <#.#> | MAXLIMIT <number>] ]
[ LINK SOURCE EVENT ] ;

See Operators and Functions for more information about writing expressions.

  • INSERT INTO: When a CQ's INSERT INTO clause specifies a stream that does not exit, the stream and its type will be created automatically based on the SELECT statement. For an example, see Parsing the data field of WAEvent. This clause may include any of the options described in CREATE STREAM.

  • DISTINCT: When a CQ includes the DISTINCT option, at least one of the components in the FROM clause must be a cache or window.

  • SELECT timeStamp: When selecting from the output stream of a source, you may use SELECT timeStamp to get the system time in milliseconds (as a long) when the source processed the event. The timeStamp field exists only in the source's output stream and is dropped by any window, CQ, or target using that stream as a source.

  • ISTREAM: By default, a CQ will update calculated values when events are added to or removed from its input window. If you specify the ISTREAM option, the CQ will update calculated values only when new events are added to the window.

  • SELECT ... FROM: When a CQ's FROM clause includes multiple components, at least one must be a cache or window. See Joins.

  • SELECT ... FROM <WActionStore name>: supported only when the WActionStore is persisted. By default, this will run once, when the application is deployed. Add the [JUMPING WITHIN ...] clause to re-run the query periodically. (Note that the square brackets are required.) For example, [JUMPING WITHIN 5 MINUTE] will run the query every five minutes, each time (including the first) returning the most recent five minutes of data.

  • ITERATOR: see Using ITERATOR.

  • INNER JOIN: When a CQ includes the INNER JOIN option, two and only two components must be specified in the FROM clause.

  • GROUP BY: See Aggregate functions.

  • SAMPLE BY: When a CQ's FROM clause includes only a single WActionStore or jumping window, you may use the SAMPLE BY clause to reduce the number of events in the CQ's output. One use for this is to reduce the number of events to avoid overloading a dashboard (see Defining dashboard queries). For WActionStores, the CQ must include an ORDERY BY clause to order the events by time.

    • <field name>,... specifies one or more fields from the WActionStore or window. The data will be sampled so as to preserve roughly the same distribution of those fields' values as in the total data set. The field(s) must be of type double, float, integer, long, or short.

    • SELECTIVITY sets the sample size as a digital fraction between 0 and 1. For example, SELECTIVITY 0.05 would select approximately 5% of the events from the source.

    • MAXLIMIT sets the sample size as a maximum number of events. For example, MAXLIMIT 100 would select 100 events every time the CQ is run.

    • You may not specify both SELECTIVITY and MAXLIMIT in the same SELECT statement. If you specify neither, the default is SELECTIVITY 0.01.

    • When selecting from a very large WActionStore, you may want to reduce the amount of data returned before sampling. For example, SELECT DateTime, Temp FROM TemperatureWS ORDER BY DateTime DESC LIMIT 100000 SAMPLE BY Temp MAXLIMIT 500. would return a sample of 500 of the 100,000 most recent events.

    • The smaller the SELECTIVITY or MAXLIMIT value, the less representative of the full data set the sample will be. It may be helpful to experiment with various values.

  • LINK SOURCE EVENT: When a CQ's INSERT INTO clause specifies a WActionStore, the optional LINK SOURCE EVENT clause makes the details of the events in the FROM clause available in the WActionStore.

  • NESTING: Queries may be nested by enclosing the subquery in parentheses: SELECT ... FROM (SELECT ... FROM ...) ....

See Continuous query (CQ) and Intermediate TQL programming: common patterns for some examples of common query types.

CREATE DASHBOARD

CREATE DASHBOARD USING "<path>/<filename>";

Imports a dashboard from the specified JSON file. The dashboard will be created in the current namespace. If not specified from root, the path is relative to the Striim directory. Example from the PosApp sample application:

CREATE DASHBOARD USING "Samples/PosApp/PosAppDashboard.json";

CREATE EVENTTABLE

CREATE EVENTTABLE <name>
USING STREAM ( NAME: '<stream name>' ) 
[ DELETE USING STREAM ( '<stream name>') ]
QUERY ( 
  keytomap:'<key field for type name>'
  [, persistPolicy:'True' ]
)
OF <type name>;

An event table is similar to a cache, except it is populated by an input stream instead of by an external file or database. CQs can both INSERT INTO and SELECT FROM an event table.

Event tables retain only the most recent event for each value of the key field defined by keytomap. In other words, the key field is similar to a database table's primary key. The first event received for each key value adds an event to the table, and subsequent events for that key value update it.

Optionally, an event table may have a second input stream, defined by DELETE USING STREAM. Events inserted into this stream will delete the event with the corresponding key field value. The other values in the event are ignored, though they must be valid as regards the delete stream's type.

If persistpolicy is True, events will be persisted to Elasticsearch and retained (including through crashes and restarts) until the application is dropped. If persistpolicy is False, event table data will be lost when the application is undeployed or crashes.

Inserting events into an event table

Use a CQ to insert events into an event table.

To demonstrate this, save the following as striim/Samples/EventTable1.csv:

id,color,value
1,green,10
2,blue,20

Then create EventTable1 in namespace ns1 and run it:

CREATE NAMESPACE ns1;
USE NS1;
CREATE APPLICATION EventTable1;

CREATE SOURCE EventTableSource USING FileReader  ( 
  Wildcard: 'eventtable1.csv',
  Directory: 'Samples',
  PositionByEof: false
) 
PARSE USING DSVParser  ( 
  header: true
) 
OUTPUT TO EventTableSource_Stream ;

CREATE CQ EventTableSource_Stream_CQ 
INSERT INTO EventTableSource_TransformedStream
SELECT TO_INT(data[0]) as id,
  TO_STRING(data[1]) as color,
  TO_INT(data[2]) as value
FROM EventTableSource_Stream;

CREATE EVENTTABLE EventTableDemo USING STREAM ( 
  name: 'EventTableSource_TransformedStream'
) 
DELETE USING STREAM ( 
  name: 'ETTestDeleteStream'
) 
QUERY ( 
  keytomap: 'id'
) 
OF EventTableSource_TransformedStream_Type;

END APPLICATION EventTable1;
DEPLOY APPLICATION EventTable1;
START APPLICATION EventTable1;

Once the application has started, query the event table, and you will see that the contents match EventTable1.csv:

W (ns1) > select * from EventTableDemo;
Processing - select * from EventTableDemo
[
   id = 1
   color = green
   value = 10
]
[
   id = 2
   color = blue
   value = 20
]

-> SUCCESS 
Updating an event table

When an event table receives a new event with the same key field value as an existing event in the table, it updates its values.

To demonstrate this, save the following as striim/Samples/EventTable2.csv:

id,color,value
1,purple,25

Then create EventTable2, run it, and query the event table again. This application does not need a CQ since it uses the one from EventTable1.

CREATE APPLICATION EventTable2;

CREATE SOURCE EventTableSource2 USING FileReader  ( 
  Wildcard: 'eventtable2.csv',
  Directory: 'Samples',
  PositionByEof: false
) 
PARSE USING DSVParser  ( 
  header: true
) 
OUTPUT TO EventTableSource_Stream;

END APPLICATION EventTable2;
DEPLOY APPLICATION EventTable2;
START APPLICATION EventTable2;

select * from EventTableDemo;

The event with id 1 is updated to match the data in EventTable2.csv:

W (ns1) > select * from EventTableDemo;
Processing - select * from EventTableDemo
[
   id = 1
   color = purple
   value = 25
]
[
   id = 2
   color = blue
   value = 20
]
Deleting events from an event table using the delete stream

To delete an event, send an event with the same key field value to the event table's delete stream.

To demonstrate this, you can reuse EventTable2.csv. Create EventTable3, run it, and query the event table again:

STOP APPLICATION EventTable2;
CREATE APPLICATION EventTable3;

CREATE SOURCE EventTableDeleteSource USING FileReader  ( 
  Wildcard: 'eventtable2.csv',
  Directory: 'Samples',
  PositionByEof: false
) 
PARSE USING DSVParser  ( 
  header: true
) 
OUTPUT TO EventTableDelete_Stream ;

CREATE CQ EventTableDelete_Stream_CQ 
INSERT INTO ETTestDeleteStream
SELECT TO_INT(data[0]) as id,
  TO_STRING(data[1]) as color,
  TO_INT(data[2]) as value
FROM EventTableDelete_Stream;

END APPLICATION EventTable3;
DEPLOY APPLICATION EventTable3;
START APPLICATION EventTable3;

The event with id 1 was deleted from the event table:

W (ns1) > select * from EventTableDemo;
Processing - select * from EventTableDemo
[
   id = 2
   color = blue
   value = 20
]
Deleting events from an event table using convertToDeleteEvent()

When deleting events using the delete stream, there could be a race condition between the input stream and the delete stream when they both receive an event with the same key at almost the same time. To avoid that, use convertToDeleteEvent(), as follows:

CREATE CQ <name>
INSERT INTO <event table input stream>
SELECT et.convertToDeleteEvent()
FROM <event table> et
WHERE <criteria selecting events to delete>;

The alias (et) is required by convertToDeleteEvent(). You may use any alias you wish.

For example, to delete the remaining event from the ETTestDeleteStream event table:

STOP APPLICATION EventTable3;
CREATE APPLICATION EventTable4;

CREATE CQ EventTableDelete_Stream_CQ 
INSERT INTO EventTableSource_Stream
SELECT et.convertToDeleteEvent()
FROM EventTableDemo et
WHERE id=2;

END APPLICATION EventTable4;
DEPLOY APPLICATION EventTable4;
START APPLICATION EventTable4;

CREATE EXCEPTIONSTORE

CREATE EXCEPTIONSTORE FOR APPLICATION <name> [ TTL: '<interval>' ];

Alternatively, to create an exception store at the same time you create an application::

CREATE APPLICATION <name> USE EXCEPTIONSTORE  [ TTL: '<interval>' ];

You may also create and browse an exception store in the Flow Designer:

ExceptionStoreFlowDesigner.png

When you create an application using templates, this is enabled automatically. Turn it off if you do not want an exception store for the application.

An exception store collects exceptions for a single application along with events related to the exception and persists them to Elasticsearch.

  • The exception store's name is the application name with _ExceptionStore appended.

  • By default, the time to live (TTL) is 7d, which means events are discarded after seven days. Optionally, you may specify a different time to live as m (milliseconds), s (seconds), h (hours), d (days), or w (weeks). To change the time to live, use ALTER EXCEPTIONSTORE <name> TTL: '<interval>'; (recompile is not necessary).

  • Dropping the application does not drop the exception store. This allows you to drop the application, modify the TQL, and reload it without losing the existing data in its exception store.

The type for exception store events is Global.ExceptionEvent. Its fields are:

field

type

notes

exceptionType

java.lang.String

One of the following:CRA

  • AdapterException

  • ArithmeticException

  • ClassCastException

  • ConnectionException

  • InvalidDataException

  • NullPointerException

  • NumberFormatException

  • SystemException

  • UnexpectedDDLException

  • UnknownException

action

java.lang.String

IGNORE or CRASH (see Handling exceptions)

appName

java.lang.String

the name of the associated application

entityType

java.lang.String

the type of component that threw the exception (source, CQ, target, etc.)

entityName

java.lang.String

the component's name

className

java.lang.String

the Java class name related to the exception

message

java.lang.String

the error message from the target DBMS

exceptionTime

org.joda.time.DateTime

the time of the exception

epochNumber

java.lang.Long

the epoch time of the exception

relatedActivity

java.lang.String

the Striim activity that caused the exception

relatedObjects

java.lang.String

the Striim event(s) affected by the exception

For a simple example, say you have the following Oracle table in SOURCEDB and TARGETDB schemas:

CREATE TABLE MYTABLE(
ID int PRIMARY KEY,
NAME varchar2(100),
CITY varchar2(100));

Replicate it from one Oracle instance to another using the following application:

CREATE APPLICATION ExceptionstoreDemo USE EXCEPTIONSTORE;
CREATE SOURCE OracleCDC USING OracleReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'10.211.55.3:1521:orcl1',
  Tables:'SOURCEDB.MYTABLE'
)
OUTPUT TO OracleCDCStream;
CREATE TARGET WriteToOracle USING DatabaseWriter (
  ConnectionURL:'jdbc:oracle:thin:@10.211.55.3:1521:orcl1',
  Username:'striim',
  Password:'******',
  Tables:'SOURCEDB.MYTABLE,TARGETDB.MYTABLE',
  IgnorableExceptionCode: 'NO_OP_UPDATE'
)
INPUT FROM OracleCDCStream;
END APPLICATION ExceptionstoreDemo;

Insert the identical row twice:

INSERT INTO MYTABLE VALUES (1,'name1','city1');
INSERT INTO MYTABLE VALUES (1,'name1','city1');

Since the primary key already exists, the second insert will throw an exception. Since NO_OP_UPDATE is specified as an ignorable exception (see discussion of Ignorable Exception Code property in Database Writer), the exception will be written to the application's exception store. You can query the exception store using the same syntax you would to query a WActionStore:

W (ns1) > select * from ExceptionstoreDemo_exceptionstore;
Processing - select * from ExceptionstoreDemo_exceptionstore
[
   exceptionType = AdapterException
   action = CRASH
   appName = ns1.NSDemo
   appid = 01ea1e42-8e77-7651-b9fa-52b54b45818e
   entityType = TARGET
   entityName = t
   className = java.sql.SQLIntegrityConstraintViolationException
   message = ORA-00001: unique constraint (MYUSERID.SYS_C007357) violated
   exceptionTime = 2019-12-14T12:57:32.067+05:30
   epochNumber = 1576308181383
   relatedActivity = target notify exception
   relatedObjects = {"_id":null,"timeStamp":1576308452050,"originTimeStamp":0,"key":null,
"sourceUUID":{"uuidstring":"01ea1e42-8ea5-3d11-b9fa-52b54b45818e"},"data":["1","name1","city1"],
"metadata":{"RbaSqn":"3","AuditSessionId":"30106","TableSpace":"USERS","CURRENTSCN":"1579373",
"SQLRedoLength":"78","BytesProcessed":null,"ParentTxnID":"1.27.567","SessionInfo":"UNKNOWN",
"RecordSetID":" 0x000003.000454d0.0010 ","DBCommitTimestamp":"1576308452000","COMMITSCN":1579374,
"SEQUENCE":"1","Rollback":"0","STARTSCN":"1579373","SegmentName":"MYTABLE","OperationName":"INSERT",
"TimeStamp":1576337252000,"TxnUserID":"MYUSERID","RbaBlk":"283856","SegmentType":"TABLE",
"TableName":"SOURCEDB.MYTABLE","TxnID":"1.27.567","Serial":"28518","ThreadID":"1",
"COMMIT_TIMESTAMP":1576337252000,"OperationType":"DML","ROWID":"AAAR+CAAHAAAADcAAB",
"DBTimeStamp":"1576308452000","TransactionName":"","SCN":"157937300000008444435329188000001",
"Session":"344"},"userdata":null,"before":null,"dataPresenceBitMap":"Bw==",
"beforePresenceBitMap":"AA==","typeUUID":{"uuidstring":"01ea1e42-9082-3a71-8672-52b54b45818e"}}
]

The following application demonstrates how you might query data from the exception store and write it to a file you could use as a starting point for inserting some or all of the logged events into the target. Contact Striim support if you need assistance in developing such an application.

CREATE APPLICATION ProcessExceptionstoreDemo;
CREATE TYPE ExceptionList_Type (
  evtlist java.util.List
);
CREATE STREAM ExceptionListStream OF ExceptionList_Type;
CREATE CQ ReadFromExceppStore 
INSERT INTO ExceptionListStream
SELECT to_waevent(s.relatedObjects) AS evtlist 
FROM ExceptionstoreDemo_exceptionstore [JUMPING WITHIN 5 SECOND] s;

CREATE STREAM RelatedEventStream OF Global.WAEvent;
CREATE CQ GetRelatedEvents 
INSERT INTO RelatedEventStream
SELECT com.webaction.proc.events.WAEvent.makecopy(cdcevent) 
FROM ExceptionListStream a, iterator(a.evtlist) cdcevent;

CREATE TARGET WriteToFileAsJSON USING FileWriter ( 
  filename: 'expEvent',
  directory: 'ExpStore_logs'
) 
FORMAT USING JSONFormatter()
INPUT FROM RelatedEventStream;
END APPLICATION ProcessExceptionstoreDemo;

CREATE EXTERNAL CACHE

Note

In this release, this is not a cache. Striim queries the external database as data is needed. If the same data is needed again, Striim queries it again.

CREATE EXTERNAL CACHE <name> (
  AdapterName:'DatabaseReader',
  Username: '<username>',
  Password: '<password>',
  ConnectionURL: '<database connection string>',
  ConnectionRetry: <parameters>,
  FetchSize: <count>,
  Table: '<table name>',
  Columns: '<colums to read>',
  KeyToMap: '<column name>',
  SkipInvalid: <True / False>
)  
OF <type name>;
  • AdapterName must be DatabaseReader.

  • For Username, Password, ConnectionURL, FetchSize and discussion of whether you need to install a JDBC driver, see Database Reader.

  • With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

  • For Table, specify a single table. See the discussion of the Tables property in Database Reader for additional information.

  • For Columns, specify the names of the columns you wish to retrieve, separated by commas.

  • For KeyToMap specify the table's primary key column. If the table has no primary key, specify any column.

  • When skipinvalid has its default value of False, if the data in a cache does not match the defined format (for example, if it has fewer fields that are in the type), the application will crash. To skip invalid records, set to True.

  • You may omit ConnectionRetry, FetchSize, and SkipInvalid from TQL if their default values are appropriate.

  • The OF type must match the order, number, and data types of the specified columns.

For example:

CREATE TYPE RackType(
  rack_id String KEY,
  datacenter_id String,
  rack_aisle java.lang.Integer,
  rack_row java.lang.Integer,
  slot_count java.lang.Integer
);
CREATE EXTERNAL CACHE ConfiguredRacks (
  AdapterName:'DatabaseReader',
  ConnectionURL:'jdbc:mysql://10.1.10.149/datacenter',
  Username:'username',
  Password:'passwd',
  Table:'RackList',
  Columns: "rack_id,datacenter_id,rack_aisle,rack_row,slot_count",
  KeyToMap: 'rack_id'
)
OF RackType;

CREATE FLOW ... END FLOW

CREATE FLOW <flow name>m [WITH ENCRYPTION];
...
END FLOW <flow name>;

CREATE FLOW <flow name> creates a flow. All subsequent CREATE statements until the END FLOW statement will create components in that flow. See CREATE APPLICATION ... END APPLICATION for discussion of WITH ENCRYPTION.

See Flow for more information and MultiLogApp for a detailed discussion of a multi-flow application.

CREATE OR REPLACE

If the component exists, replaces it; if it does not exist, creates it. Every CREATE statement has a corresponding CREATE OR REPLACE statement. The syntax is the same, just add OR REPLACE after CREATE. For an example, see ALTER and RECOMPILE.

CREATE PROPERTYVARIABLE

CREATE PROPERTYVARIABLE <name>='<value>';

Property variables allow you to store values for adapter properties in an encrypted form, so they may be used as passwords and tokens applications without sharing the cleartext with users.

The following will create a property variable common.dbpass:

USE common;
CREATE PROPERTYVARIABLE dbpass='12345678';

You could then use that in an application as follows:

CREATE CACHE ConfiguredRacks USING DatabaseReader (
  Username:'striim',
  Password:'$common.dbpass'...

You may omit the namespace if the property variable is in the same namespace as the application:

CREATE CACHE ConfiguredRacks USING DatabaseReader (
  Username:'striim',
  Password:'$dbpass'...

Note

If a property variable has the same name as an environment variable, Striim will use the value of the property variable.

CREATE SORTER

CREATE SORTER <name> OVER 
<input stream 1 name> ON <timestamp field name> OUTPUT TO <output stream 1 name>,
<input stream 2 name> ON <timestamp field name> OUTPUT TO <output stream 2 name>
[,  input stream 3 name> ON <timestamp field name> OUTPUT TO <output stream 3 name>... ] 
WITHIN <integer> { SECOND | MINUTE | HOUR | DAY } 
OUTPUT ERRORS TO <error output stream name>;

This ensures that events from multiple streams are processed in sync with each other based on timestamps in the streams. For example:

CREATE SORTER MySorter OVER
Stream1 ON logTime OUTPUT TO Stream1Sorted,
Stream2 ON logTime OUTPUT TO Stream2Sorted
WITHIN 2 second
OUTPUT ERRORS TO fooErrorStream;

If the events in Stream1 and Stream2 fall out of sync by more than two seconds, the stream with the more recent timestamps is buffered until the events from the other stream with matching timestamps arrive.

CREATE SOURCE

CREATE SOURCE <name>
USING <adapter name> ( <properties> ) 
[ PARSE USING <parser name> ( <properties> ) ] 
OUTPUT TO <stream name> 
  [SELECT ( <data field>, ...  [ WHERE <expression>, ... ] ), ...];

Whether the properties and PARSE USING clause are required depends on the adapter. See the Adapters reference section to determine the requirements of the adapter you are using.

If an optional property is not specified, the source will use its default value. Values for required properties must be always specified in TQL, even if they have default values (which are displayed automatically in the web UI). You may use environment variables in property values (see Using environment variables in source and target properties).

For information about the SELECT clause, see Filtering data in a source.

Here is a complete example of a CREATE SOURCE statement:

CREATE SOURCE AALSource using AAlReader (
  directory:'$ACCESSLOGPATH',
  wildcard:'$ACCESSLOGFILE:access_log')
OUTPUT TO AccessEntryStream;

If the OUTPUT TO stream does not exist, it will be created automatically, using the type associated with the adapter. When there is only one output stream, it is not mapped, and there is no SELECT clause, you may follow the stream name with any of the options described in CREATE STREAM. See also Using OUTPUT TO ... MAP.

Example from the PosApp sample application:

CREATE source CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;

CREATE STREAM

CREATE STREAM <name> OF <type name> 
[ PARTITION BY { <field name>, ... | <expression> } ]
[ PERSIST USING <property set> ]
[ GRACE PERIOD <integer> { SECOND | MINUTE | HOUR | DAY } ON <field name> ];

Creates a stream using a previously defined type.

Note

A stream and its type may also be created as part of a CREATE CQ declaration. See Parsing the data field of WAEvent for an example.

For example, PosApp defines the MerchantTxRate type and creates a stream based on that type:

CREATE TYPE MerchantTxRate(
  merchantId String KEY,
  zip String,
  startTime DateTime,
  count integer,
  totalAmount double,
  hourlyAve integer,
  upperLimit double,
  lowerLimit double,
  category String,
  status String
);
CREATE STREAM MerchantTxRateOnlyStream OF MerchantTxRate PARTITION BY merchantId;

PARTITION BYfield_name controls how events are distributed across multiple servers in a Striim cluster . See Adapting TQL applications for multi-server deployment for more information.

GRACE PERIOD should be used when data may be received out of order. For example, if you were collecting log data from servers all over the world, network latency might result in events with a logTime value of 1427890320 arriving from a nearby server before events with a timestamp of 1427890319 (one second earlier) from a server on another continent. If you knew that the maximum latency was two seconds, you could use the clause GRACE PERIOD 2 SECOND ON logTime to ensure that all events are processed in order. The events of the stream would then be buffered for two seconds, continuously sorted based on the logTime value, and passed to the next component in the application. If the time interval is too short, any out-of-order events received too late to be sorted in the correct order are discarded. Without the GRACE PERIOD option, out-of-order events are processed as they arrive, which may result in incorrect calculations.

For example, this sample sorter for out of order events is paired with a stream created with a 2 second grace period on the log time:

CREATE SORTER MySorter OVER
Stream1 ON logTime OUTPUT TO Stream1Sorted,
Stream2 ON logTime OUTPUT TO Stream2Sorted
WITHIN 2 second
OUTPUT ERRORS TO fooErrorStream;
 
CREATE STREAM EventStream1 OF EventType1 GRACE PERIOD 2 SECOND ON logTime;
Persisting a stream to Kafka

For an overview of this feature, see Introducing Kafka streams.

Note

Kafka and Zookeeper must be running when you create a Kafka-persisted stream, persist an existing stream, or import an application containing one.

CREATE STREAM <name> OF <type> PERSIST [USING <property set>];

To enable replay of a stream by persisting it to Kafka (see Replaying events using Kafka streams), use the syntax CREATE STREAM <name> OF <type> USING <namespace>.<property set>, where <property set> is the name of a set of Kafka server properties. To persist to a Striim cluster's integrated Kafka broker, use the property set Global.DefaultKafkaProperties, for example:

CREATE STREAM MyStream of MyStreamType PERSIST USING Global.DefaultKafkaProperties;

To persist to an external Kafka broker, instead of Global.DefaultKafkaProperties specify a custom property set created as described in Configuring Kafka.

This memory-resident stream may be used in the usual way in a window or CQ. Alternatively, the persisted data may be read by KafkaReader using topic name <namespace>_<stream name> (see Reading a Kafka stream with KafkaReader). To use persisted stream data from the integrated Kafka broker outside of Striim, see Reading a Kafka stream with an external Kafka consumer.

If a persisted stream is created in an application or flow with encryption enabled (see CREATE APPLICATION ... END APPLICATION) it will be encrypted. It may be read by another application without encryption enabled.

Limitations:

  • Kafka streams may be used only on the output of a source or the output of a CQ that parses a source.

  • Implicit streams may not be persisted to Kafka.

  • In an application or flow running in a Forwarding Agent, a source or CQ may output to a Kafka stream, but any further processing of that stream must take place on the Striim server.

  • If the Kafka broker configuration delete.topic.enable is false (the default for Kafka 0.11 and all other releases prior to 1.0.0), when you drop a Striim application with a Kafka stream after a crash, when you reload the application creating the stream will fail. To avoid this, set delete.topic.enable=true.

Thus the Kafka stream must be explicitly created before the source or CQ that populates it. Using MultiLogApp for example, to persist the raw output of the access log source:

CREATE STREAM RawAccessStream OF Global.WAEvent
  PERSIST USING Global.DefaultKafkaProperties;

CREATE SOURCE AccessLogSource USING FileReader (
  directory:'Samples/MultiLogApp/appData',
  wildcard:'access_log',
  positionByEOF:false
)
PARSE USING DSVParser (
  ignoreemptycolumn:'Yes',
  quoteset:'[]~"',
  separator:'~'
)
OUTPUT TO RawAccessStream;

Alternatively, to persist the output of the CQ that parses that raw output:

CREATE TYPE AccessLogEntry (
    srcIp String KEY ...
);
CREATE STREAM AccessStream OF AccessLogEntry
  PERSIST USING Global.DefaultKafkaProperties;

CREATE CQ ParseAccessLog 
INSERT INTO AccessStream
SELECT data[0] ...
FROM RawAccessStream;

To distribute events among multiple Kafka partitions, use PARTITION BY <field>:

CREATE STREAM AccessStream OF AccessLogEntry
  PERSIST USING Global.DefaultKafkaProperties
  PARTITION BY srcIp;

All events with the same value in <field> will be written to the same randomly selected Kafka partition. Striim will distribute the data evenly among the partitions to the extent allowed by the frequency of the <field> values. For example, if 80% of the events have the same <field> value, then one of the Kafka partitions will contain 80% of the events.

By default, events may be distributed among up to 200 Kafka partitions. See Configuring Kafka for more information.

Dropping a persisted stream will automatically delete the associated Kafka topics.

If recovery (see Recovering applications) is enabled for an application containing a Kafka stream, the persisted data will include "CheckPoint" events used by the recovery process.

CREATE SUBSCRIPTION

See Sending alerts from applications.

CREATE TARGET

CREATE TARGET <name>
USING <adapter name> ( <properties> )
[ FORMAT USING <formatter name> ( <properties> ) ]
INPUT FROM <stream name>;

Whether the properties and the FORMAT USING clause are required vary according to the adapter. See the Adapters reference section for the adapter you are using.

If an optional property is not specified, the source will use its default value. Values for required properties must be always specified in TQL, even if they have default values (which are displayed automatically in the web UI). You may use environment variables in property values (see Using environment variables in source and target properties).

Here is an example of a target that writes server data to a file:

CREATE TARGET DSVFormatterOut using FileWriter(
  filename:'DSVFormatterOutput')
FORMAT USING DSVFormatter ()
INPUT FROM DSVTransformed_Stream;

CREATE TYPE

CREATE TYPE <name> ( <field name> <data type> [KEY], ... );

KEY defines the field that will be used to relate events in WActions when the type is specified as the CONTEXT OF in the CREATE WACTIONSTORE statement.

Example from the PosApp sample application:

CREATE TYPE ProductTrackingType (
  sku String KEY, 
  salesAmount double, 
  Count Integer, 
  startTime DateTime 
  );

Unlike other components, types do not need to be deployed. They are available for use in running applications as soon as they are created.

You cannot specify a composite key directly. Instead, create a CQ that concatenates two or more fields into a new field, then make that field the key. For example, if inputStream has this type:

CREATE TYPE inputType(
  SubKey1 String,
  SubKey2 String,
  Name String,
  UniqueID String KEY
  );

The following will create a composite key on the first two fields:

CREATE TYPE outputType(
  SubKey1 String,
  SubKey2 String,
  Name String,
  CompKey String KEY
  );
CREATE STREAM outputStream OF outputType;
CREATE CQ makeCompositeKey
  INSERT INTO outputStream
  SELECT SubKey1,
    SubKey2,
    Name,
    SubKey1 + SubKey2 KEY
  FROM inputStream;

CREATE WACTIONSTORE

CREATE WACTIONSTORE <name> 
CONTEXT OF { <type name> } 
EVENT TYPES ( <type name> KEY ( <key field name> ) [, <type name> KEY ( <key field name> )... ] ) 
[ USING { MEMORY | ( <properties> ) } ];
  • The CONTEXT OF type defines the fields that may be stored in the WActionStore. Two WActionStores may not use the same CONTEXT OF type. If necessary, define multiple identical types to work around this limitation.

  • If LINK SOURCE EVENT is not specified in the CQ that populates the WActionStore, specify the CONTEXT OF type as the sole event type. If LINK SOURCE EVENT is specified, specify the type of each component specified in the CQ's FROM clause in EVENT TYPES.

  • USING MEMORY disables persistence.

  • If you omit the USING clause, the WActionStore will persist to Elasaticsearch with its default properties. This is functionally equivalent to USING (storageProvider:'elasticsearch'). Data is persisted to Striim/data/<cluster name>/nodes/<node number>/indices/<namespace>.<WActionStore name>. See https://www.elastic.co/blog/found-dive-into-elasticsearch-storage for more information about these paths.

For example, from MultiLogApp:

CREATE TYPE ZeroContentEventListType (
    srcIp String KEY,
    code Integer,
    size Integer,
    level String,
    message String,
    xception String);
    
CREATE WACTIONSTORE ZeroContentEventList
CONTEXT OF ZeroContentEventListType 
EVENT TYPES (
  ZeroContentEventListType KEY(srcIp) ...

CREATE CQ GenerateZeroContentEventList
INSERT INTO ZeroContentEventList
SELECT srcIp, code, size, level, message, xception ...

This stores the information used to populate the five-column table on the Zero Content details dashboard page.

See PosApp for a detailed discussion of how the queries, types, and WActionStore interact.

The following will persist to Elasticsearch:

CREATE WACTIONSTORE MerchantActivity  
CONTEXT OF MerchantActivityContext
EVENT TYPES (MerchantTxRate KEY (merchantId));

The following will retain data in Elasticsearch for a minimum of one day, after which it will be expunged. The exact time the data will be expunged is unpredictable.

CREATE WACTIONSTORE MerchantActivity  
CONTEXT OF MerchantActivityContext
EVENT TYPES (MerchantTxRate KEY (merchantId)) 
  USING (storageProvider:'elasticsearch', elasticsearch.time_to_live: '1d');

You may specify the time to live as m (milliseconds), s (seconds), h (hours), d (days), or w (weeks). 

The following does not persist the data to disk:

CREATE WACTIONSTORE MerchantActivity
  CONTEXT OF MerchantActivityContext
  EVENT TYPES ( MerchantTxRate KEY(merchantId) )
  USING MEMORY;

Striim also supports persistence to MySQL and Oracle. To use one of those options, specify USING (<properties>) with the appropriate properties for the DBMS as detailed below.

Warning

The correct JDBC driver for the DBMS must be installed as described in the Installing third-party drivers. WActionStores with more than one event type cannot be persisted to MySQL or Oracle.

The properties for MySQL are:

storageProvider:'jdbc',
persistence_interval: '10 sec',
JDBC_DRIVER:'com.mysql.jdbc.Driver',
JDBC_URL:'jdbc:mysql://<host>/<database name>',
JDBC_USER:'<user name>',
JDBC_PASSWORD:'<password>',
DDL_GENERATION:'create-or-extend-tables'

The properties for Oracle are:

storageProvider:'jdbc',
persistence_interval: '10 sec',
JDBC_DRIVER:'oracle.jdbc.driver.OracleDriver',
JDBC_URL:'jdbc:oracle:thin:@<host IP address>:<port>:<host SID>',
JDBC_USER:'<user name>',
JDBC_PASSWORD:'<password>',
DDL_GENERATION:'create-or-extend-tables',
CONTEXT_TABLE:'<context table name>',
EVENT_TABLE:'<event table name>'

Warning

When persisting a WActionStore to Oracle, the context and event table names must be unique within the application and not exceed Oracle's 30-character limit, and the number of characters in the namespace and the WActionStore name must total no more than 24.

CREATE WINDOW

To aggregate, join, or perform calculations on the data, you must create a bounded data set. The usual way to do this is with a window, which bounds the stream by a specified number of events, a period of time, or both. As discussed in the Concepts Guide (see Window), this may be a sliding window, which always contains the most recent set of events, or a jumping window, which breaks the stream up into successive chunks.

A third alternative is a session window, which breaks the stream up into chunks when there are gaps in the flow of events, that is, when no new event has been received for a specified period of time (the "idle timeout").

The syntax for sliding and jumping windows is:

CREATE [ JUMPING ] WINDOW <name> 
OVER <stream name> 
KEEP {
 <int> ROWS |
 WITHIN <int> <time unit> |
 <int> ROWS WITHIN <int> <time unit>}
[ PARTITION BY <field name> ];

The syntax for session windows is:

CREATE SESSION WINDOW <name>
OVER <stream name>
IDLE TIMEOUT <int> <time unit>
[ PARTITION BY <field name> ];

If JUMPING or SESSION is not specified, the window uses the "sliding" mode (see the Concepts Guide for an explanation of the difference).

If PARTITION BY is specified, the criteria will be applied separately for each value of the specified field name.

  • With a count-based window, PARTITION BY will keep the specified number of events for each field value. For example, a window with KEEP 100 ROWS PARTITION BY merchantID would contain 100 events for each merchant.

  • With a time-based window, PARTITION BY will start the timer separately for each field value. This could be used, for example, to raise an alert when a device has multiple errors in a certain period of time.

  • With a session window, PARTITION BY will apply the idle timeout separately for each value of the specified field, which might be a user ID, session ID, or IP address.

This example from MultiLogApp creates a one-hour jumping window for each company's API usage stream:

TQL:

CREATE JUMPING WINDOW CompanyWindow 
OVER CompanyApiUsageStream 
KEEP WITHIN 1 HOUR ON logTime 
PARTITION BY company;

UI:

  • Mode: Jumping

  • Size of Window: Time

    • Time: 1 hour

    • Event Time

    • On: logTime

The hour will be timed separately for each company, starting when the first event for that company is received.

The following is a detailed guide to the syntax for various types of windows. For more detailed discussion of windows in the context of applications, see Bounding data with windows.

Bounding data in batches (jumping) by system time
CREATE JUMPING WINDOW <name> OVER <stream name>
KEEP WITHIN <int> { SECOND | MINUTE | HOUR | DAY }
[ PARTITION BY <field name> ];

With this syntax, the window will output a batch of data each time the specified time interval has elapsed. For example, this window will emit a set of data every five minutes:

TQL:

CREATE JUMPING WINDOW P2J_ST
OVER PosSource_TransformedStream
KEEP WITHIN 5 MINUTE;

UI:

  • Mode: Jumping

  • Size of Window: Time

    • Time: 5 minute

    • System Time

Bounding data in batches (jumping) by event time
CREATE JUMPING WINDOW <name> OVER <stream name>
KEEP WITHIN <int> { SECOND | MINUTE | HOUR | DAY } ON <timestamp field name>
[ PARTITION BY <field name> ];

With this syntax, the window will output a batch of data each time it receives an event in which the specified timestamp field's value exceeds oldest event in the window by the specified amount of time. For example, assuming data is received continuously, this window will emit a set of data every five minutes:

TQL:

CREATE JUMPING WINDOW P5J_ET
OVER PosSource_TransformedStream
KEEP WITHIN 5 MINUTE 
ON dateTime;

UI:

  • Mode: Jumping

  • Size of Window: Time

    • Time: 5 minute

    • Event Time

    • On: dateTime

Bounding data in batches (jumping) by event count
CREATE JUMPING WINDOW <name>
OVER <stream name>
KEEP <int> ROWS
WITHIN <int>
[ PARTITION BY <field name> ];

With this syntax, the window will output a batch of data each time it contains the specified number of events. For example, the following will break the stream up into batches of 100 events:

TQL:

CREATE JUMPING WINDOW P11J_ROWS
OVER PosSource_TransformedStream
KEEP 100 ROWS;

UI:

  • Mode: Jumping

  • Size of Window: Count

    • Events: 100

Bounding data continuously (sliding) by time
CREATE WINDOW <name> OVER <stream name>
KEEP WITHIN <int> { SECOND | MINUTE | HOUR | DAY }
[ ON <timestamp field name> ] ]
[ SLIDE <int> { SECOND | MINUTE | HOUR | DAY } ]
[ PARTITION BY <field name> ];

With this syntax, the window emits data every time an event is added to the window, first removing any events that exceed the specified time interval from the window. For example, the following will emit the events received in the past five minutes each time it receives a new event:

TQL:

CREATE WINDOW P1S_ST
OVER PosSource_TransformedStream
KEEP WITHIN 5 MINUTE;

UI:

  • Mode: Sliding

  • Size of Window: Time

    • Time: 5 minute

    • System Time

The following is similar but uses event time:

TQL:

CREATE WINDOW P4S_ET 
OVER PosSource_TransformedStream 
KEEP WITHIN 5 MINUTE 
ON dateTime;

UI:

  • Mode: Sliding

  • Size of Window: Time

    • Time: 5 minute

    • Event Time

    • On: dateTime

If you wish to get events less often, use the Output interval (UI) / SLIDE (TQL) option. For example, the following will emit the past five minutes of events once a minute:

TQL:

CREATE WINDOW P3S_ST_OI
OVER PosSource_TransformedStream
KEEP WITHIN 5 MINUTE
SLIDE 1 MINUTE;

UI:

  • Mode: Sliding

  • Size of Window: Time

    • Time: 5 miinute

    • System Time

    • Output interval: 1 minute

CREATE WINDOW P6S_ET_OI
OVER PosSource_TransformedStream
KEEP WITHIN 5 MINUTE
ON dateTime
SLIDE 1 MINUTE;

UI:

  • Mode: Sliding

  • Size of Window: Time

    • Time: 5 miinute

    • Event Time

    • On: dateTime

    • Output interval: 1 minute

Bounding data continuously (sliding) by event count
CREATE WINDOW <name> OVER <stream name>
KEEP <int> ROWS 
[ SLIDE <int> ]
[ PARTITION BY <field name> ];

With this syntax, when the window has received the specified number of events, it will emit its contents. From then on, every time it receives a new event, it will remove the event that has been in the window the longest, then emit the remaining events contents. For example, the following will send the first 100 events it receives, and from then on send the most recent 100 events every time it receives a new one:

TQL:

CREATE WINDOW P10S_ROWS
OVER PosSource_TransformedStream
KEEP 100 ROWS;

UI:

  • Mode: Sliding

  • Size of Window: Count

    • Events: 100

If you wish to get events less often, use the Output interval (UI) / SLIDE (TQL) option. For example, the following will emit the most recent 100 events every tenth event:

TQL:

CREATE WINDOW P12S_ROWS_OI 
OVER PosSource_TransformedStream 
KEEP 100 ROWS 
SLIDE 10; 

UI:

  • Mode: Sliding

  • Size of Window: Count

    • Events: 100

    • Output interval: 10

Advanced window settings (RANGE / Timeout)

When a window's size is based on events (KEEP ROWS) or event time (KEEP WITHIN <int> <time unit> ON <timestamp field name>), the window may not jump for far longer than is desired. Use the RANGE or Timeout property to force the window to jump within a set period. For example:

CREATE JUMPING WINDOW MyWindow OVER MyStream KEEP WITHIN 5 MINUTE ON DateTime

If there is an hour gap between events, the window could be open for over an hour without sending any data. To prevent that, use the Timeout (UI) or RANGE (TQL) option. Use the following when the window size is based on event time:

CREATE [ JUMPING ] WINDOW <name> OVER <stream name>
KEEP RANGE <int> { SECOND | MINUTE | HOUR | DAY }
 ON <timestamp field name>
WITHIN <int> { SECOND | MINUTE | HOUR | DAY } 
[ PARTITION BY <field name> ];
Screen_Shot_2016-04-25_at_10.47.56_AM.png

Note

Note that Timeout / RANGE is always based on system time.

Use the following when the window size is based on event count:

CREATE [ JUMPING ] WINDOW <name> OVER <stream name>
KEEP <int> ROWS
WITHIN <int> { SECOND | MINUTE | HOUR | DAY }
[ PARTITION BY <field name> ];
Screen_Shot_2016-04-25_at_10.43.42_AM.png

Note

Note that when Timeout is used with used with Time it maps to RANGE, but when used with Events it maps to WITHIN.

Jumping by event time

With the following settings, when events enter the window steadily, the window will jump every five minutes, but if there is a gap in events, the window will always jump within six minutes.

TQL:

CREATE JUMPING WINDOW P8J_ET_TO
OVER PosSource_TransformedStream
KEEP RANGE 6 MINUTE
ON dateTime
WITHIN 5 MINUTE;

UI:

  • Mode: Jumping

  • Size of Window: Advanced

    • Time: 5 minute

    • Timeout: 6 minute

    • On: logTime

Jumping by event count

The following will emit a batch of events every time the count reaches 100 or ten seconds has elapsed from the last time it emitted data:

TQL:

CREATE JUMPING WINDOW P14J_ROWS_TO
OVER PosSource_TransformedStream
KEEP 100 ROWS
WITHIN 10 SECOND;

UI:

  • Mode: Jumping

  • Size of Window: Advanced

    • Events: 100

    • Timeout: 10 second

Sliding by event time

With the following settings, when there is a gap in events, the window will always emit its contents within six minutes.

TQL:

CREATE WINDOW P7S_ET_TO
OVER PosSource_TransformedStream
KEEP RANGE 6 MINUTE
ON dateTime
WITHIN 5 MINUTE;

UI:

  • Mode: Sliding

  • Size of Window: Advanced

    • Time: 5 miinute

    • Timeout: 6 minute

    • On: dateTime

Sliding by event count

With these settings, when there is a gap in events, the window will always emit its contents within six minutes.

TQL:

CREATE WINDOW P13S_ROWS_TO
OVER PosSource_TransformedStream
KEEP 100 ROWS
WITHIN 10 SECOND;

UI:

  • Mode: Sliding

  • Size of Window: Advanced

    • Events: 100

    • Timeout: 10

Bounding data in batches by session timeout

Session windows bound events based on gaps in the data flow, that is, when no new event has been received for a specified period of time. For example, the following window would emit a set of events every time a minute passes between one event and the next. 

TQL:

CREATE SESSION WINDOW MySessionWindow 
OVER MyStream
IDLE TIMEOUT 1 MINUTES;

UI:

  • Mode: Advanced

    • Timeout: 1 minute

Each set could contain any number of events, accumulated over any length of time, and the gap between the last event in one session and the first event in the next session could be of any duration of a minute or longer.

If a session window is partitioned, the id timeout will be applied separately to each value of the field it is partitioned by, and each set emitted will contain only events with that value. The partitioning field might be a session ID or a user ID.

Supported combinations of window mode and size properties

The following table lists all supported combinations of window properties. Note that in the UI the available Size of Window options change depending on whether the Mode is Sliding or Jumping.

sliding / jumping

time / event count

output interval

timeout

Flow Designer

TQL

sliding

system time

no

no

  • Mode: Sliding

  • Size of Window: Time

    • Time: 5 minute

    • System Time

CREATE WINDOW P1S_ST OVER PosSource_TransformedStream KEEP WITHIN 5 MINUTE;

jumping

system time

no

no

  • Mode: Jumping

  • Size of Window: Time

    • Time: 5 minute

    • System Time

CREATE JUMPING WINDOW P2J_ST OVER PosSource_TransformedStream KEEP WITHIN 5 MINUTE;

sliding

system time

yes

no

  • Mode: Sliding

  • Size of Window: Time

    • Time: 5 miinute

    • System Time

    • Output interval: 1 minute

CREATE WINDOW P3S_ST_OI OVER PosSource_TransformedStream KEEP WITHIN 5 MINUTE SLIDE 1 MINUTE;

sliding

event time

no

no

  • Mode: Sliding

  • Size of Window: Time

    • Time: 5 minute

    • Event Time

    • On: dateTime

CREATE WINDOW P4S_ET OVER PosSource_TransformedStream KEEP WITHIN 5 MINUTE ON dateTime;

jumping

event time

no

no

  • Mode: Jumping

  • Size of Window: Time

    • Time: 5 minute

    • Event Time

    • On: dateTime

CREATE JUMPING WINDOW P5J_ET OVER PosSource_TransformedStream KEEP WITHIN 5 MINUTE ON dateTime;

sliding

event time

yes

no

  • Mode: Sliding

  • Size of Window: Time

    • Time: 5 miinute

    • Event Time

    • On: dateTime

    • Output interval: 1 minute

CREATE WINDOW P6S_ET_OI OVER PosSource_TransformedStream KEEP WITHIN 5 MINUTE ON dateTime SLIDE 1 MINUTE;

sliding

event time

no

yes

  • Mode: Sliding

  • Size of Window: Advanced

    • Time: 5 miinute

    • Timeout: 6 minute

    • On: dateTime

CREATE WINDOW P7S_ET_TO OVER PosSource_TransformedStream KEEP RANGE 6 MINUTE ON dateTime WITHIN 5 MINUTE;

jumping

event time

no

yes

  • Mode: Jumping

  • Size of Window: Advanced

    • Time: 5 minute

    • Timeout: 6 minute

    • On: logTime

CREATE JUMPING WINDOW P8J_ET_TO OVER PosSource_TransformedStream KEEP RANGE 6 MINUTE ON dateTime WITHIN 5 MINUTE;

sliding

event time

yes

yes

  • Mode: Sliding

  • Size of Window: Advanced

    • Time: 5 miinute

    • Timeout: 6 minute

    • On: dateTime

    • Output interval: 1 minute

CREATE WINDOW P9SL_ET_TO_OI OVER PosSource_TransformedStream KEEP RANGE 6 MINUTE ON dateTime WITHIN 5 MINUTE SLIDE 1 MINUTE;

sliding

event count

no

no

  • Mode: Sliding

  • Size of Window: Count

    • Events: 100

CREATE WINDOW P10S_ROWS OVER PosSource_TransformedStream KEEP 100 ROWS;

jumping

event count

no

no

  • Mode: Jumping

  • Size of Window: Count

    • Events: 100

CREATE JUMPING WINDOW P11J_ROWS OVER PosSource_TransformedStream KEEP 100 ROWS;

sliding

event count

yes

no

  • Mode: Sliding

  • Size of Window: Count

    • Events: 100

    • Output interval: 10

CREATE WINDOW P12S_ROWS_OI OVER PosSource_TransformedStream KEEP 100 ROWS SLIDE 10;

sliding

event count

no

yes

  • Mode: Sliding

  • Size of Window: Advanced

    • Events: 100

    • Timeout: 10

CREATE WINDOW P13S_ROWS_TO OVER PosSource_TransformedStream KEEP 100 ROWS WITHIN 10 SECOND;

jumping

event count

no

yes

  • Mode: Jumping

  • Size of Window: Advanced

    • Events: 100

    • Timeout: 10 second

CREATE JUMPING WINDOW P14J_ROWS_TO OVER PosSource_TransformedStream KEEP 100 ROWS WITHIN 10 SECOND;

sliding

event count

yes

yes

  • Mode: Sliding

  • Size of Window: Advanced

    • Events: 100

    • Timeout: 10

    • Output interval: 2

CREATE WINDOW P15S_ROWS_TO_OI OVER PosSource_TransformedStream KEEP 100 ROWS WITHIN 10 SECOND SLIDE 2;

DROP

DROP { APPLICATION | FLOW | <component type> } <namespace>.<component name> [ CASCADE | FORCE ];

Removes a previously created component. For example:

DROP WACTIONSTORE Samples.MerchantActivity;

For an application or flow, if the CASCADE option is specified, all the components it contains are also removed. For a source that implicitly creates its output stream, if the CASCADE option is specified, the stream is also removed. For example:

DROP APPLICATION Samples.PosApp CASCADE;

The FORCE option works like CASCADE but will override any warnings that cause the DROP command to fail, such as components created in one application being used by another application. This may be used to drop applications that cannot be undeployed or to drop namespaces when DROP NAMESPACE ... CASCADE fails. Using FORCE may result in an invalid application. Using FORCE will remove all components from the metadata repository, but some components may remain in memory until the Striim cluster is restarted.

Note: DROP USER <user name> does not drop the corresponding <user name> namespace. If you wish to drop that as well, use DROP NAMESPACE <user name> CASCADE. (See Using namespaces for more about namespaces.)

IMPORT

TQL applications may import Java functions. For example, see Using imported Java functions for the code used in a Java class called AggFunctions.

To use the custom AggFunctions.concatStringAggAsDSV method in TQL, you would add the compiled JAR file to .../Striim/lib, and restart Striim. Here is an example how you could use the custom method in your TQL:

IMPORT STATIC com.webaction.custom.AggFunctions.*;
...
CREATE CQ selectCustom
INSERT INTO EmailBodyStream
SELECT concatStringAggAsDSV('Security Event ' + w.eventtype + ' at ' + w.timeGenerated +
  ' occured '  + w.count + ' times by systems: ' + w.WorkStationName, '\n')
FROM SecEventReportWin w;
3.10.1
Was this article helpful?
0 out of 0 found this helpful

Comments