Striim

Welcome to the Striim Help Center and Community Site

3.10.1 Intermediate TQL programming: common patterns

Follow

The topics in this section assume that you have read Fundamentals of TQL programming.

Getting data from sources

The appropriate programming pattern for getting data from a source depends on the output type of the source's adapter.

Sources with WAEvent output

The basic programming pattern for a source using an adapter with the WAEvent output type is:

source > stream > CQ > stream

You do not need to define the streams explicitly: instead, define them implicitly as part of the source and CQ, and they will be created automatically. For example, from PosApp:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
)
OUTPUT TO CsvStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM CsvStream;

In addition to the explicitly defined CsvDataSource and CsvToPosData, this will create the "implicit" stream CsvStream (of type WAEvent), PosDataStream_Type, and PosDataStream:

Screen_Shot_2016-04-25_at_11.32.39_AM.png

You can inspect these using the DESCRIBE command, for example:

W (admin) > describe type Samples.PosDataStream_Type;
Processing - describe type Samples.PosDataStream_Type
TYPE Samples.PosDataStream_Type CREATED 2016-01-22 12:35:34
ATTRIBUTES (
  merchantId java.lang.String
  dateTime org.joda.time.DateTime
  hourValue java.lang.Integer
  amount java.lang.Double
  zip java.lang.String
)

For more information, see Parsing the data field of WAEvent.

The code for XML sources is slightly different. For example, from MultiLogApp:

CREATE SOURCE Log4JSource USING FileReader (
  directory:'Samples/MultiLogApp/appData',
  wildcard:'log4jLog.xml',
  positionByEOF:false
) 
PARSE USING XMLParser(
  rootnode:'/log4j:event',
  columnlist:'log4j:event/@timestamp,
    log4j:event/@level,
    log4j:event/log4j:message,
    log4j:event/log4j:throwable,
    log4j:event/log4j:locationInfo/@class,
    log4j:event/log4j:locationInfo/@method,
    log4j:event/log4j:locationInfo/@file,
    log4j:event/log4j:locationInfo/@line'
)
OUTPUT TO RawXMLStream;

Here, fields 0-7 in the WAEvent data array are defined by columnlist. These are then parsed by the CQ ParseLog4J:

CREATE TYPE Log4JEntry (
  logTime DateTime,
  level String,
  message String,
  api String,
  sessionId String,
  userId String,
  sobject String,
  xception String,
  className String,
  method String,
  fileName String,
  lineNum String
);
CREATE STREAM Log4JStream OF Log4JEntry;

CREATE CQ ParseLog4J
INSERT INTO Log4JStream
SELECT TO_DATE(TO_LONG(data[0])),
  data[1],
  data[2], 
  MATCH(data[2], '\\\\[api=([a-zA-Z0-9]*)\\\\]'),
  MATCH(data[2], '\\\\[session=([a-zA-Z0-9\\-]*)\\\\]'),
  MATCH(data[2], '\\\\[user=([a-zA-Z0-9\\-]*)\\\\]'),
  MATCH(data[2], '\\\\[sobject=([a-zA-Z0-9]*)\\\\]'),
  data[3],
  data[4],
  data[5],
  data[6],
  data[7]
FROM RawXMLStream;

See MultiLogApp for a more detailed discussion including explanation of the MATCH function.

If you preferred, instead of separately defining Log4JStream as above, you could define it within the ParseLog4J CQ, as follows:

CREATE CQ ParseLog4J
INSERT INTO Log4JStream
SELECT TO_DATE(TO_LONG(data[0])) as logTime,
  TO_STRING(data[1]) as level,
  TO_STRING(data[2]) as message,
  TO_STRING(MATCH(data[2], '\\\\[api=([a-zA-Z0-9]*)\\\\]')) as api,
  TO_STRING(MATCH(data[2], '\\\\[session=([a-zA-Z0-9\\-]*)\\\\]')) as sessionId,
  TO_STRING(MATCH(data[2], '\\\\[user=([a-zA-Z0-9\\-]*)\\\\]')) as userId,
  TO_STRING(MATCH(data[2], '\\\\[sobject=([a-zA-Z0-9]*)\\\\]')) as sobject,
  TO_STRING(data[3]) as xception,
  TO_STRING(data[4]) as className,
  TO_STRING(data[5]) as method,
  TO_STRING(data[6]) as fileName,
  TO_STRING(data[7]) as lineNum
FROM RawXMLStream;

With this approach, Log4JStream's automatically generated type would be Log4JStream_Type, so you would have to replace the four other references to Log4JEntry in the application with Log4JStream_Type.

JSON sources with custom output types

The basic programming pattern for sources using adapters with the JSONParser is:

type, stream, source

First define the type for the output stream, then the stream, then the source. For example:

CREATE TYPE ScanResultType (
  timestamp1 String,
  rssi String
);
CREATE STREAM ScanResultStream OF ScanResultType;

CREATE SOURCE JSONSource USING FileReader (
  directory: 'Samples',
  WildCard: 'sample.json',
  positionByEOF: false
)
PARSE USING JSONParser (
  eventType: 'ScanResultType',
  fieldName: 'scanresult'
)
OUTPUT TO ScanResultStream;

See JSONParser for a more detailed discussion.

Filtering data in a source

The following examples give an idea of various possibilities for filtering source data using OUTPUT TO and SELECT in sources. This syntax is supported both by the server and the Forwarding Agent (see Using the Striim Forwarding Agent).

In sources, SELECT statements must use DATA[#] function functions (see also Parsing the data field of WAEvent. To select using DATA(x) function or META() function functions, you must create a CQ on the output of the source (see also Using the DATA() and BEFORE() functions).

Select only events where the fifth column value is greater than 10,000:

CREATE SOURCE ...
OUTPUT TO CSVStream (a String, b String, c String, d String,e String)
  SELECT * WHERE TO_INT(data[4]) > 10000;

Filter out second and third columns:

... OUTPUT TO CSVStream (a String, b String, c String)
  SELECT data[0], data[3],data[4] ;

Cast the first and third columns as integers:

... OUTPUT TO CSVStream (a Integer, b String, c Integer, d String, e String)
  SELECT TO_INT(data[0]), data[1], TO_INT(data[2]), data[3], data[4];

Cast the first and third columns as integers and select only events where the fifth column value is greater than 10,000:

... OUTPUT TO CSVStream (a Integer, b String, c Integer, d String, e String)
SELECT TO_INT(data[0]), data[1], TO_INT(data[2]), data[3], data[4]
  where TO_INT(data[4]) > 10000)

Add the first and third columns and output as a single field a:

... OUTPUT TO CSVStream (a Integer, b String, c String, d String) 
  SELECT TO_INT(data[0])+TO_INT(data[2]), data[1], data[3],data[4] ;

You can also use OUTPUT TO to split events among multiple streams based on their field values.

When the fifth column value is over 10,000, output the event to to HighOrderStream, when the value is 10,000 or less, output it to LowOrderStream:

... OUTPUT to HighOrderStream (a Integer, b String, c Integer, d String, e String)
  SELECT TO_INT(data[0]), data[1], TO_INT(data[2]), data[3], data[4]
  WHERE TO_INT(data[4]) > 10000),
OUTPUT to LowOrderStream (a Integer, b String, c Integer, d String, e String)
  SELECT TO_INT(data[0]), data[1], TO_INT(data[2]), data[3], data[4]
  WHERE TO_INT(data[4]) <= 10000);

Output all events to FullStream and only events where the fifth column value is 10,000 or less to LowOrderStream:

... OUTPUT to FullStream,
OUTPUT to LowOrderStream (a Integer, b String, c Integer, d String, e String)
SELECT TO_INT(data[0]), data[1], TO_INT(data[2]), data[3], data[4]
  WHERE TO_INT(data[4]) <= 10000);

Parsing sources with regular expressions

Regular expressions are frequently used to parse source data. See Using regular expressions (regex) for an introduction.

Parsing HTTP log entries

The following example uses the FreeFormTextParser regex property to match several patterns in the following log entry.

log {start 1234567890.123456} {addr 123.456.789.012} {port 12345} {method POST} {url /abc/def.ghi} {agent {Mozilla/4.0 (compatible; MSIE 6.0; MS Web Services Client Protocol 1.1.12345.1234)}} {bytes 1234} {status 200} {end 1234567890.123456} {host 123.456.789.012}
...

In this case we use a positive lookbehind construct to match the start, addr, port, method, url, bytes, status, and end patterns, while excluding the log and agent patterns:

regex:'((?<=start ).[^}]+)|((?<=addr ).[^}]+)|((?<=port ).[^}]+)|((?<=method ).[^}]+)|((?<=url ).[^}]+)|((?<=bytes ).[^}]+)|(?<=\\(\\#)[^\\)]+|((?<=status ).[^}]+)|((?<=end ).[^}]+)|((?<=host ).[^}]+)'

Note that each capture group uses two sets of parentheses. For example,

((?<=start ).[^}]+)

The inner parentheses are used for the positive lookbehind syntax:

(?<=start )

The outer parentheses are used for the capture group. In this example, group[1]=1234567890.123456, which is used by the parser in its data array.

Here is the TQL of the PARSE statement using the regex expression within a FreeFormTextParser:

PARSE USING FreeFormTextParser (
  RecordBegin:'^log ',
  RecordEnd:'\n',
  regex:'((?<=start ).[^}]+)|((?<=addr ).[^}]+)|((?<=port ).[^}]+)|((?<=method ).[^}]+)|((?<=url ).[^}]+)|((?<=bytes ).[^}]+)|(?<=\\(\\#)[^\\)]+|((?<=status ).[^}]+)|((?<=end ).[^}]+)|((?<=host ).[^}]+)',
  separator:'~'
)
Extracting substrings from log entries

The MATCH function allows you to match a string using a regex expression (see Functions for details about this function). The 3rd parameter indicates which capture group is used, which may be useful when the input string to a regex can result in multiple capture groups of differing values. In the TQL example, log data in which session information is captured is extracted via the use of basic regex expressions:

MATCH(data[5], "(?<=process: )([a-zA-Z0-9//$]*)",1), /* process */ 
MATCH(data[5], "(?<=pathway: )([a-zA-Z0-9//$]*)",1), /* pathway */ 
MATCH(data[5], "(?<=service code: )([a-zA-Z0-9//_]*)",1), /* service code */ 
MATCH(data[5], "(?<=model: )([a-zA-Z0-9]*)",1), /* model */ 
MATCH(data[5], "(?<=user id: )([0-9]*)",1), /* userId */ 
MATCH(data[5], "(?<=session IP: )([a-zA-Z0-9//.]*)",1), /* session IP */ 
MATCH(data[5], "(?<=source: )([a-zA-Z0-9//.//_///]*)",1), /* source */ 
MATCH(data[5], "(?<=detail: )(.*$)",1) /* detail message */

Here is an example of a typical log entry that may contain SEVERE or WARNING messages:

Aug 22, 2014 11:17:19 AM org.apache.solr.common.SolrException log
SEVERE: org.apache.solr.common.SolrException: org.apache.solr.search.SyntaxError: Cannot parse '((suggest_title:(04) AND suggest_title:(gmc) AND suggest_title: AND suggest_title:(6.6l) AND suggest_title:(lb7) AND suggest_title:(p1094,p0234)) AND NOT (deleted:(true)))': Encountered " <AND> "AND "" at line 1, column 64.
Was expecting one of:
    <BAREOPER> ...
    "(" ...
    "*" ...
    <QUOTED> ...
    <TERM> ...
    <PREFIXTERM> ...
    <WILDTERM> ...
    <REGEXPTERM> ...
    "[" ...
    "{" ...
    <LPARAMS> ...
    <NUMBER> ...
     
    at org.apache.solr.handler.component.QueryComponent.prepare(QueryComponent.java:147)
    at org.apache.solr.handler.component.SearchHandler.handleRequestBody(SearchHandler.java:187)
    at org.apache.solr.handler.RequestHandlerBase.handleRequest(RequestHandlerBase.java:135) ...

We would like to reduce this information to the following:

fftpInfo: fftpOutType_1_0{
  msg: "SEVERE: org.apache.solr.common.SolrException: org.apache.solr.search.SyntaxError: Cannot parse '((suggest_title:(04) AND suggest_title:(gmc) AND suggest_title: AND suggest_title:(6.6l) AND suggest_title:(lb7) AND suggest_title:(p1094,p0234)) AND NOT (deleted:(true)))': Encountered \" <AND> \"AND \"\" at line 1, column 64."
  origTs: 1408731439000
};

To do this, we will include the following regex in the FreeFormTextParser properties:

regex:'(SEVERE:.*|WARNING:.*)'

Here is the complete TQL:

create source fftpSource using FileReader (
  directory:'Samples/',
  WildCard:'catalina*.log',
  charset:'UTF-8',
  positionByEOF:false
)
parse using FreeFormTextParser (
-- Timestamp format in log is  "Aug 21, 2014 8:33:56 AM"
  TimeStamp:'%mon %d, %yyyy %H:%M:%S %p',
  RecordBegin:'%mon %d, %yyyy %H:%M:%S %p',
  regex:'(SEVERE:.*|WARNING:.*)'
)
OUTPUT TO fftpInStream;

CREATE TYPE fftpOutType (
  msg String,
  origTs long
);

create stream fftpOutStream of fftpOutType;

create cq fftpOutCQ
insert into fftpOutStream
select data[0],
  TO_LONG(META(x,'OriginTimestamp'))
from fftpInStream x;

create Target fftpTarget using SysOut(name:fftpInfo) input from fftpOutStream;

See FreeFormTextParser for more information.

Matching IPv4 subnet octets

As noted previously, multiple escapes for [ and ] in regular expressions are required because they are reserved characters in both the Striim TQL compiler and Java.

match(s.srcIp,'(^\\\\d{1,3}\\\\.)')  - first octet
match(s.srcIp,'(^\\\\d{1,3}\\\\.\\\\d{1,3})\\\\.'), - first and second octet
match(s.srcIp,'(^\\\\d{1,3}\\\\.\\\\d{1,3}\\\\.\\\\d{1,3}\\\\.)') - first, second, and third octet
Parsing SOAP entries

We will use the FreeFormTextParser regex property to match several patterns in the following log entry:

>> 2015/1/14 16:20:10: :<< Received request from remote address: 123.45.6.789
>> 2015/1/14 16:20:10: :<< Path Name: $name1, Class Name: CLASS-1
>> 2015/1/14 16:20:11: :<< Service Name: Service_1, Response Time: 123.456789 milliseconds
<model>E</model>
<userid>0000000103</userid>
...

In this case we also use a positive lookbehind construct to match the remote address, path name, service name, model, and user ID:

regex:'((?<=remote address: )[\\d\\.]+)|((?<=Path Name: )[^ ]+)|((?<=\\<\\< Service Name: )[^,]+)|
  ((?<=Response Time: )[^ ]+)|((?<=\\<model\\>)([a-zA-Z0-9]+))|((?<=\\<userid\\>)([0-9]+))',

Here is the TQL of the PARSE statement using the regex expression within a FreeFormTextParser:

  PARSE USING FreeFormTextParser (
    RecordBegin:'Start>>> POST INPUT',
    TimeStamp:'>> %yyyy/%m/%d %H:%M:%S: :<<',
    linecontains:'>> %yyyy/%m/%d %H:%M:%S: :<<',
    RecordEnd:' milliseconds',
    regex:'((?<=remote address: )[\\d\\.]+)|((?<=Path Name: )[^ ]+)|
      ((?<=\\<\\< Service Name: )[^,]+)|
      ((?<=Response Time: )[^ ]+)|((?<=\\<model\\>)([a-zA-Z0-9]+))|
      ((?<=\\<userid\\>)([0-9]+))',
    separator:'~'
  )

Bounding data with windows

When a CQ's source is a stream, it is limited to acting on single events, as in this example from MultiLogApp:

CREATE CQ GetErrors 
INSERT INTO ErrorStream 
SELECT log4j 
FROM Log4ErrorWarningStream log4j WHERE log4j.level = 'ERROR';

This triggers an alert whenever an error appears in the log.

To aggregate, join, or perform calculations on the data, you must create a bounded data set. The usual way to do this is with a window, which bounds the stream by a specified number of events or period of time. As discussed in the Concepts Guide (see Window), this may be a sliding window, which always contains the most recent set of events, or a jumping window, which breaks the stream up into successive chunks.

The basic programming pattern for a window is:

stream > window > CQ

What properties a window should have depend on the nature of the data in the input stream and what you want the CQ to do with it. There are six basic variations depending on whether you are bounding the data batches (jumping) or continuously (sliding) and whether you are bounding by by time, event count, or both.

Bound data in batches by time

The following uses aggregate functions to summarize events in a stream every fifteen minutes:

CREATE TYPE OrderType(
  storeId      String,
  orderId      String,
  sku          String,
  orderAmount  double,
  dateTime     DateTime
);
CREATE STREAM RetailOrders Of OrderType;
...
CREATE JUMPING WINDOW ProductData_15MIN
OVER RetailOrders
KEEP WITHIN 15 MINUTE ON dateTime;

CREATE CQ GetProductActivity
INSERT INTO ProductTrackingStream
SELECT pd.sku, COUNT(*), SUM(pd.orderAmount), FIRST(pd.dateTime)
FROM ProductData_15MIN pd;

Every 15 minutes, the output stream receives one event per SKU for which there was at least one order. Each SKU's event includes the number of orders (COUNT(*)), the total amount of those orders (SUM(pd.orderAmount)), and the timestamp of the first order in the batch (FIRST(pd.dateTime)). You could use this data to graph changes over time or trigger alerts when the count or total amount vary significantly from what you expect.

Say that orders are not received 24 hours a day, seven days a week, but instead drop to zero after closing hours. In that case, the code above could leave events in the window overnight, where they would be mistakenly reported as occurring in the morning. To avoid that, use RANGE to add a timeout:

CREATE JUMPING WINDOW ProductData_15MIN
OVER RetailOrders
KEEP RANGE 15 MINUTE ON dateTime WITHIN 16 MINUTE;

Now, so long as orders are being placed, the window will jump every 15 minutes, but if orders stop, the window will jump after 16 minutes.

Bound data in batches by event count

Alternatively, you can aggregate data in batches of n number of events. For example:

CREATE JUMPING WINDOW ProductData_100
OVER RetailOrders
KEEP 100 ROWS
PARTITION BY storeId;

CREATE CQ GetProductActivity
INSERT INTO ProductTrackingStream
SELECT pd.storeId, COUNT(*), SUM(pd.orderAmount), FIRST(pd.dateTime)
FROM ProductData_100 pd;

The output stream receives an event for each batch of 100 events from each store. You might use code like this to trigger a potential fraud alert when a store's order count or amount is anomalously high. (PARTITION BY storeId means the window will contain the data for the most recent 100 orders for each store. Without this clause, the window would contain 100 events total for all stores.)

Bound data continuously by time

The following variation on ProductData_15MIN summarizes events continuously:

CREATE WINDOW ProductData_15MIN
OVER RetailOrders
KEEP WITHIN 15 MINUTE ON dateTime;

CREATE CQ GetProductActivity
INSERT INTO ProductTrackingStream
SELECT pd.sku, COUNT(*), SUM(pd.orderAmount)
FROM ProductData_15MIN pd;

Omitting JUMPING from the window definition creates a sliding window (see Window in the Concepts Guide). Every time a new order is received, or one is dropped because it has been in the window for 15 minutes, the output stream receives a new event updating the number of orders and total amount of those orders for the past 15 minutes. FIRST(pd.dateTime) is unnecessary since you know the window always contains the most recent orders.

Bound data continuously by event count

Alternatively, you could bound the window by event count:

CREATE WINDOW ProductData_100
OVER RetailOrders
KEEP 100 ROWS
PARTITION BY sku;

CREATE CQ GetProductActivity100
INSERT INTO ProductTrackingStream
SELECT pd.sku, SUM(pd.orderAmount)
FROM ProductData_100 pd;

Every time an order is received, the oldest order for that SKU is dropped and the output stream receives a new event updating the number of orders and the total amount for those orders. COUNT(*) is unnecessary since the window always contains 100 events. (PARTITION BY sku means the window will contain the data for the most recent 100 orders for each SKU. Without this clause, the window would contain 100 events total for all SKUs.)

Bounding by both time and event count

When appropriate, you may bound a window by both time and event count. For example:

CREATE WINDOW StoreTimeoutWindow
OVER RetailOrders
KEEP 1 ROWS WITHIN 5 MINUTE 
PARTITION BY storeId;

CREATE TYPE StoreNameData(
  storeId String KEY,
  storeName String
);
CREATE CACHE StoreNameLookup using FileReader (
  directory: 'stores',
  wildcard: 'StoreNames.csv'
)
PARSE USING DSVParser(
  header: Yes
) QUERY(keytomap:'storeId') OF StoreNameData;

CREATE StoreTimeoutCheck
INSERT INTO StoreTimeoutStream
SELECT c.storeId
FROM StoreTimeoutWindow w
LEFT OUTER JOIN StoreNameData c
ON w.storeId = c.storeId
WHERE w.storeId IS NULL
GROUP BY c.storeId;

This five-minute sliding window contains the most recent order event (KEEP 1 ROWS) for each store. The output from the CQ could be used to generate an alert whenever five minutes have passed without a new order (indicating that the store is probably offline).

Using a window to define an alert threshold

The following could be used to send an alert when the number of events in the window exceeds 15:

CREATE TYPE myEventType (
  EventTime  DateTime,
  KeyVal     String,
  EventText  String
);
...
CREATE WINDOW oneHourSlidingWindow 
  OVER eventStream
  KEEP WITHIN 1 HOUR ON EventTime
  PARTITION BY KeyVal;

CREATE CQ WactionStore_q INSERT INTO WactionStore_ws
SELECT ISTREAM
  KeyVal,
  EventTime,
  EventText,
  COUNT(KeyVal)
FROM oneHourSlidingWindow
GROUP BY KeyVal
HAVING COUNT(KeyVal) > 15;

The ISTREAM option stops the window from emitting events when COUNT(KeyVal) decreases due to events being removed from the window after the one-hour timeout.

Joining cache data with CQs

The standard programming pattern for getting data from a cache is:

type, cache > CQ

Caches are similar to sources and use the same adapters. The type must must correctly describe the data source, with the correct data type for each field or column.

The following is typical TQL for a cache that is loaded from a file on disk:

CREATE TYPE ZipCache_Type(
  Zip String, 
  City String, 
  State String, 
  LatVal Double, 
  LongVal Double  
);
CREATE  CACHE ZipCache USING FileReader ( 
  directory: 'Samples/PosApp/appData/',
  wildcard: 'USAddressesPreview.txt',
  charset: 'UTF-8',
  blockSize: '64',
  positionbyeof: 'false'
) 
PARSE USING DSVPARSER ( 
  columndelimiter: '\t',
  header: 'true'
) 
QUERY (keytomap: 'Zip') OF ZipCache_Type;

CREATE GenerateWactionContext
INSERT INTO PosSourceData
SELECT p.MERCHANTID,
  p.DATETIME,
  p.AUTHAMOUNT,
  z.Zip,
  z.City,
  z.State,
  z.LatVal,
  z.LongVal
FROM PosSource_TransformedStream p, ZipCache z
WHERE p.ZIP = z.Zip;

GenerateWactionContext enriches PosSource_TransformedStream with location information from ZipCache by matching values in the stream's ZIP field with values in the cache's Zip field. (Though the keyword JOIN does not appear, this is an inner join.) The PosSourceData WActionStore can then be used to populate maps. To track any un-joinable events, see TrapZipMatchingErrors in Handling nulls with CQs.

When a cache's source is a database, use the refreshinterval property to control how often the cache is updated:

CREATE  CACHE ZipCache USING DatabaseReader (
  ConnectionURL:'jdbc:mysql://10.1.10.149/datacenter',
  Username:'username',
  Password:'passwd',
  Query: "SELECT * FROM ZipData"
)
PARSE USING DSVPARSER ( 
  columndelimiter: '\t',
  header: 'true'
) 
QUERY (keytomap: 'Zip', , refreshinterval: '60000000') OF ZipCache_Type;

See CREATE CACHE for more details.

Filtering data with CQs

The basic pattern for a CQ that filters data is:

stream / window / cache / WActionStore > CQ > stream / WActionStore

A CQ may join data from multiple inputs.

Simple filtering by a single criterion

The GetErrors query, from the MultiLogApp sample application, filters the log file data in Log4ErrorWarningStream to pass only error messages to ErrorStream:

CREATE CQ GetErrors 
INSERT INTO ErrorStream 
SELECT log4j 
FROM Log4ErrorWarningStream log4j WHERE log4j.level = 'ERROR';

Messages with any other error level (such as WARN or INFO) are discarded.

Filtering fields

A CQ can select desired fields from a stream, cache, or WActionStore and discard the rest. For example, this CQ from MultiLogApp selects only two of the fields (accessTime and srcIp) from its input stream:

CREATE TYPE AccessLogEntry (
    srcIp String KEY,
    userId String,
    sessionId String,
    accessTime DateTime ...

CREATE STREAM HackerStream OF AccessLogEntry;
...

CREATE CQ SendHackingAlerts 
INSERT INTO HackingAlertStream 
SELECT 'HackingAlert', ''+accessTime, 'warning', 'raise',
  'Possible Hacking Attempt from ' + srcIp + ' in ' + IP_COUNTRY(srcIp)
FROM HackerStream; 
Selecting events based on cache entries

This CQ, from MultiLogApp, selects only events where the IP address is found in a blacklist cache. Events with IP addresses that are not on the blacklist are discarded.

CREATE CQ FindHackers
INSERT INTO HackerStream
SELECT ale 
FROM AccessStream ale, BlackListLookup bll
WHERE ale.srcIp = bll.ip;

In this context, SELECT ale selects all the fields from AccessStream (since its alias is ale) and none from BlackListLookup.

Using multiple CQs for complex criteria

Applications often combine multiple CQs and windows to select events based on complex criteria. For example, from MultiLogApp:

CREATE CQ GetLog4JErrorWarning
INSERT INTO Log4ErrorWarningStream
SELECT l FROM Log4JStream l
WHERE l.level = 'ERROR' OR l.level = 'WARN';

CREATE WINDOW Log4JErrorWarningActivity 
OVER Log4ErrorWarningStream KEEP 300 ROWS;
...

CREATE CQ FindLargeRT
INSERT INTO LargeRTStream
SELECT ale
FROM AccessStream ale
WHERE ale.responseTime > 2000;

CREATE WINDOW LargeRTActivity 
OVER LargeRTStream KEEP 100 ROWS; 
...

CREATE CQ MergeLargeRTAPI
INSERT INTO LargeRTAPIStream
SELECT lrt.accessTime, lrt.sessionId, lrt.srcIp, lrt.userId  ...
FROM LargeRTActivity lrt, Log4JErrorWarningActivity log4j
WHERE lrt.sessionId = log4j.sessionId
  AND lrt.accessTime = log4j.logTime;   
  • The Log4JErrorWarningActivity window, populated by The GetLog4JErrorWarning CQ, contains the most recent 300 error and warning messages from the application log.

  • The LargeRTActivity window, populated by the FindLargeRT CQ, contains the most recent 100 messages from the web server access log with response times over 2000 microseconds.

  • The MergeLargeRTAPI CQ joins events from the two windows that have matching session IDs and access times and filters out unneeded fields. This filtered and joined data triggers alerts about the unusually long response times and is also used to populate dashboard displays.

See MultiLogApp for more details. See TQL programming rules and best practices for discussion of why the windows are required for the join.

Aggregating data with CQs

To aggregate data or perform any sort of calculation on it (see Functions), a CQ must select from a window (see Bounding data with windows). The basic pattern for a CQ that filters data is:

window / cache / WActionStore > CQ > stream / WActionStore

A CQ that aggregates data may have multiple inputs, including one or more streams, but at least one input must be a window, cache, or WactionStores. See TQL programming rules and best practices for more about this limitation.

Here is a simple example, from PosApp:

CREATE JUMPING WINDOW PosData5Minutes
OVER PosDataStream KEEP WITHIN 5 MINUTE ON dateTime PARTITION BY merchantId;

CREATE CQ GenerateMerchantTxRateOnly
INSERT INTO MerchantTxRateOnlyStream
SELECT p.merchantId,
  FIRST(p.zip),
  FIRST(p.dateTime),
  COUNT(p.merchantId),
  SUM(p.amount) ...
FROM PosData5Minutes p ...
GROUP BY p.merchantId;

Every time the PosData5Minutes window jumps, the GenerateMerchantTxRateOnly CQ will generate a summary event for each merchant including the merchant ID, the first Zip code and timestamp of the set, the number of transactions, and the total amount of the transactions. For more details, see PosApp.

Applications may use multiple windows and multiple CQs to perform more complex aggregation tasks. For example, from MultiLogApp:

CREATE JUMPING WINDOW ApiWindow 
OVER ApiEnrichedStream KEEP WITHIN 1 HOUR ON logTime 
PARTITION BY api;

CREATE CQ GetApiUsage 
INSERT INTO ApiUsageStream 
SELECT a.api, a.sobject, COUNT(a.userId), FIRST(a.logTime) 
FROM ApiWindow a ...

CREATE JUMPING WINDOW ApiSummaryWindow 
OVER ApiUsageStream KEEP WITHIN 1 HOUR ON logTime 
PARTITION BY api;

CREATE CQ GetApiSummaryUsage 
INSERT INTO ApiActivity 
SELECT a.api, sum(a.count), first(a.logTime)
FROM ApiSummaryWindow a ...
  • The jumping ApiWindow aggregates the application log events from the ApiEnrichedStream window into one-hour sets for each API call (PARTITION BY api) based on the events' log times.

  • Once an hour, when ApiWindow emits its latest set of data, the GetApiUsage CQ sends a summary event for each sobject in each API call including the name of the API call, the name of the sobject, the count of the sobject, and the log time of the first occurrence of that sobject during that hour (SELECT a.api, a.sobject, COUNT(a.userId), FIRST(a.logTime)).

  • The ApiSummaryWindow contains the summary events emitted by GetApiUsage. This window jumps in sync with ApiWindow since both use KEEP WITHIN 1 HOUR ON logTime.

  • The GetApiSummaryUsage discards the sobject details and generates summary events for the API call.

Handling nulls with CQs

The following CQ would insert events from PosSource_TransformedStream (see Joining cache data with CQs) for which there was no matching zip code in ZipCache into NoMatchingZipStream:

CREATE TrapZipMatchingErrors
INSERT INTO NoMatchingZipStream
SELECT p.MERCHANTID,
  p.DATETIME,
  p.AUTHAMOUNT
FROM PosSource_TransformedStream p
LEFT OUTER JOIN ZipCache z
ON p.ZIP = z.Zip WHERE z.Zip IS NULL;

The following CQ joins events from two streams using the MATCH_PATTERN (see Using pattern matching) clause. If after 24 hours no matching event has been received, the event is output with <Timeout> and null in place of the matching event's data source name and record.

CREATE CQ MatchedRecordsCQ 
INSERT INTO PatternMatchStream
SELECT a.unique_id, a.data_source_name, a.data_source_record,
  CASE WHEN b IS NOT NULL THEN b.data_source_name ELSE "<Timeout>" END, 
  CASE WHEN b IS NOT NULL THEN b.data_source_record ELSE null END
FROM MergedDataSourcesStream 
  MATCH_PATTERN (T a (b | W))
    DEFINE T = timer(interval 24 hour),
      a = MergedDataSourcesStream(),
      b = MergedDataSourcesStream(unique_id == a.unique_id),
      W = wait(T)
PARTITION BY unique_id;

Handling variable-length events with CQs

The following TQL shows an example of handling events with a varying number of fields. The events may have 6, 8, 10, or 12 fields depending on whether they have zero, one, two, or three pairs of objectName and objectValue fields. This TQL discards the events without object fields and consolidates all the object fields from the other events into objectStream.

CREATE TYPE objectStreamType(
  dateTime org.joda.time.DateTime KEY , 
  eventName java.lang.String KEY , 
  objectName java.lang.String KEY , 
  objectValue java.lang.Long  
);
CREATE STREAM objectStream OF objectStreamType;

CREATE CQ RawParser1 
INSERT INTO objectStream
-- the first pair of object fields is present when there are at least 8 fields in the record
SELECT 
  TO_DATEF(data[0],'yyyy-MM-dd HH:mm:ss.SSSZ') AS dateTime,
  TO_STRING(data[1]) AS eventName,
  TO_STRING(data[7]) AS objectName,
  TO_STRING(data[8]) AS objectValue
FROM rawData
WHERE arlen(data) >= 8;

CREATE CQ RawParser2 
INSERT INTO objectStream
-- the second pair of object fields is present when there are at least 10 fields in the record
SELECT 
  TO_DATEF(data[0],'yyyy-MM-dd HH:mm:ss.SSSZ') AS dateTime,
  TO_STRING(data[1]) AS eventName,
  TO_STRING(data[9]) AS objectName,
  TO_STRING(data[10]) AS objectValue
FROM rawData
WHERE arlen(data) >= 15;

CREATE CQ RawParser33 
INSERT INTO objectStream
-- the third pair of object fields is present when there are at least 12 fields in the record
SELECT 
  TO_DATEF(data[0],'yyyy-MM-dd HH:mm:ss.SSSZ') AS dateTime,
  TO_STRING(data[1]) AS eventName,
  TO_STRING(data[11]) AS objectName,
  TO_STRING(data[12]) AS objectValue
FROM rawData
WHERE arlen(data) >= 12;

Sending data to targets

The basic pattern for a target is:

CQ > stream > target

For example, from Writing a simple TQL application:

CREATE CQ JoinDataCQ
INSERT INTO JoinedDataStream ...
 
CREATE TARGET JoinedDataTarget
USING SysOut(name:JoinedData)
INPUT FROM JoinedDataStream;

This writes the output from JoinedDataStream to SysOut.

When writing an application with a more complex target, it may be most efficient to write an end-to-end application that simply gets the data from the source and writes it to the target, then add the "intelligence" to the application once you know that the data is being read and written correctly. For example, this application will read PosApp's sample data, parse it (see Getting data from sources), and write it to Hadoop:

CREATE SOURCE CSVSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  WildCard:'posdata.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:'yes'
)
OUTPUT TO CsvStream;

CREATE TYPE CSVType (
  merchantId String,
  dateTime DateTime,
  hourValue Integer,
  amount Double,
  zip String
);
CREATE STREAM TypedCSVStream OF CSVType;

CREATE CQ CsvToPosData
INSERT INTO TypedCSVStream
SELECT data[1],
  TO_DATEF(data[4],'yyyyMMddHHmmss'),
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')),
  TO_DOUBLE(data[7]),
  data[9]
FROM CsvStream;

CREATE TARGET hdfsOutput USING HDFSWriter(
  filename:'hdfstestOut',
  hadoopurl:'hdfs://192.168.1.13:8020/output/',
  flushinterval: '1'
)
FORMAT USING DSVFormatter (
)
INPUT FROM TypedCSVStream;

After verifying that the data is being written to the target correctly, you could then add additional components between CsvToPosData and hdfsOutput to filter, aggregate, or enrich the data.

For additional end-to-end examples, see Database Writer, JMSWriter, and Kafka Writer.

Sending data to WActionStores

The development pattern for WActionStores is:

CQ > WActionStore

More than one CQ may output to the same WActionStore.

Here is a simple example created by following the instructions in Modifying an application using the Flow Designer. Note that the WActionStore's types must be created before the WActionStore, which must be created before the CQ that populates it.

CREATE TYPE PosSourceContext (
  MerchantId String KEY , 
  DateTime org.joda.time.DateTime , 
  Amount Double , 
  Zip String , 
  City String , 
  State String , 
  LatVal Double , 
  LongVal Double  
);

CREATE WACTIONSTORE PosSourceData
CONTEXT OF PosSourceContext
EVENT TYPES (PosSourceContext KEY (MerchantId))  
PERSIST NONE USING();

CREATE CQ GenerateWactionContext 
INSERT INTO PosSourceData
SELECT p.MERCHANTID,
  p.DATETIME,
  p.AUTHAMOUNT,
  z.Zip,
  z.City,
  z.State,
  z.LatVal,
  z.LongVal
FROM PosSource_TransformedStream p, ZipCache z
WHERE p.ZIP = z.Zip;

This is used to populate the dashboard created by following the instructions in Creating a dashboard.

In MultiLogApp, the CQs GenerateHackerContext, output to the WActionStore UnusualActivity

CREATE TYPE AccessLogEntry (
  srcIp String KEY,
  userId String,
  sessionId String,
  accessTime DateTime,
  request String,
  code integer,
  size integer,
  referrer String,
  userAgent String,
  responseTime integer
);
...

CREATE TYPE UnusualContext (
  typeOfActivity String,
  accessTime DateTime,
  accessSessionId String,
  srcIp String KEY,
  userId String,
  country String,
  city String,
  lat double,
  lon double
);
CREATE TYPE MergedEntry (
  accessTime DateTime,
  accessSessionId String,
  srcIp String KEY,
  userId String,
  request String,
  code integer,
  size integer,
  referrer String,
  userAgent String,
  responseTime integer,
  logTime DateTime,
  logSessionId String,
  level String,
  message String,
  api String,
  sobject String,
  xception String,
  className String,
  method String,
  fileName String,
  lineNum String
);
CREATE WACTIONSTORE UnusualActivity 
CONTEXT OF UnusualContext 
EVENT TYPES (
  MergedEntry KEY(srcIp),
  AccessLogEntry KEY(srcIp)
);
...

CREATE CQ GenerateHackerContext
INSERT INTO UnusualActivity
SELECT 'HackAttempt', accessTime, sessionId, srcIp, userId,
 IP_COUNTRY(srcIp), IP_CITY(srcIP), IP_LAT(srcIP), IP_LON(srcIP)
FROM HackerStream
LINK SOURCE EVENT;
...

CREATE CQ GenerateLargeRTContext
INSERT INTO UnusualActivity
SELECT 'LargeResponseTime', accessTime, accessSessionId, srcIp, userId,
  IP_COUNTRY(srcIp), IP_CITY(srcIP), IP_LAT(srcIP), IP_LON(srcIP)
FROM LargeRTAPIStream
LINK SOURCE EVENT;
...

CREATE CQ GenerateProxyContext
INSERT INTO UnusualActivity
SELECT 'ProxyAccess', accessTime, sessionId, srcIp, userId,
  IP_COUNTRY(srcIp), IP_CITY(srcIP), IP_LAT(srcIP), IP_LON(srcIP)
FROM ProxyStream
LINK SOURCE EVENT;
...

CREATE CQ GenerateZeroContentContext
INSERT INTO UnusualActivity
SELECT 'ZeroContent', accessTime, accessSessionId, srcIp, userId,
  IP_COUNTRY(srcIp), IP_CITY(srcIP), IP_LAT(srcIP), IP_LON(srcIP)
FROM ZeroContentAPIStream
LINK SOURCE EVENT;

This WActionStore stores not just events of the UnusualContext type used by these four CQs, but also of the linked source events of MergedEntry and AccessLogEntry types. See Using EVENTLIST for details on querying the linked source events.

Using FIRST and LAST

The FIRST and LAST functions return a Java.lang.Object type no matter what type they operate on. They support all Supported data types except Byte.

For example, in the following, suppose x is an integer:

SELECT
FIRST(x) – LAST(x) AS difference
FROM
MyWindow;

This results in an “invalid type of operand” error because you can’t perform simile arithmetic on a Java.lang.Object. To work around this, you must recast the object as an integer:

SELECT
TO_INT(FIRST(x)) – TO_INT(LAST(x)) AS difference
FROM
MyWindow;

The following example uses the PosApp sample data:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
)
OUTPUT TO CsvStream;
 
CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_INT(data[3]) AS PosData
FROM CsvStream;

CREATE WINDOW Window1 OVER PosDataStream KEEP 2 ROWS ;
 
CREATE CQ CQ1
INSERT INTO output_ST
SELECT TO_INT(FIRST( Window1.PosData ))
  - TO_INT(LAST( Window1.PosData )) AS LastPosData
FROM Window1;

CREATE TARGET Target1 USING SysOut (
  name: 'SubFirstFromLast'
)
INPUT FROM output_ST;

Detecting device status changes

Code similar to the following can be used to detect when a device goes offline:

CREATE  TYPE MyUser  ( 
	name java.lang.String , 
        myid java.lang.String  
 ) ;

CREATE  CACHE LoadIDsCache USING FileReader ( 
  wildcard: 'user.csv',
  directory: '.'
) 
PARSE USING Global.DSVParser () 
QUERY (  keytomap: 'myid' ) OF  MyUser;

CREATE  CQ PopulateMasterStream 
INSERT INTO MasterStream
SELECT * FROM admin.LoadIDsCache c, heartbeat(interval 5 second) h;

CREATE  WINDOW MasterWinow OVER MasterStream KEEP 1 ROWS  PARTITION BY myid;

CREATE  SOURCE LiveEventStream USING TCPReader  ( 
  IPAddress: 'localhost',
  portno: 1234
 ) 
 PARSE USING DSVParser  ()
  headerlineno: 0
 ) 
OUTPUT TO liveEvStream ;

CREATE OR REPLACE CQ CreateLiveEventsCQ 
  INSERT INTO liveObjectStream
  SELECT TO_STRING(data[0]) as myId FROM liveEvStream l;

CREATE OR REPLACE JUMPING WINDOW LiveObjWindow
  OVER liveObjectStream KEEP 1 ROWS WITHIN 10 second PARTITION BY myId;

/* select * from admin.NonExistObjStream;
[
   myID = 1
   cnt = 1
   status = existent
]
[
   myID = null 
   cnt = 4
   status = non-existent
]
*/

select lw.myId as myID, count(*) as cnt, 
CASE WHEN lw.myId IS NULL 
  THEN "non-existent" 
  ELSE "existent" END as status
from   MasterWinow mw 
left join LiveObjWindow lw on mw.myid = lw.myId
group by lw.myId;

The cache file has the format:

device_name_a,1
device_name_b,2
...

Code similar to the following can be used detect both when a device goes offline and comes back online:

CREATE OR REPLACE CQ UserStateCQ
INSERT INTO CurrentUserStateStream
SELECT
	lw.myId as myID, 
	count(*) as cnt, 
	DNOW() as StateTime,
	CASE WHEN lw.myId IS NULL THEN "offline" ELSE "online" 
	END as status
	
FROM   MasterWinow mw 
LEFT JOIN LiveObjWindow lw on mw.myid = lw.myId
GROUP BY lw.myId;

CREATE SLIDING WINDOW UserState OVER CurrentUserStateStream KEEP 2 ROW PARTITION BY myId;

CREATE OR REPLACE CQ WatchUserStateCQ
INSERT INTO ChangedUserStateStream
SELECT
	lw.myId as myID, 
	TO_LONG(LAST(w.StateTime) - FIRST(w.StateTime)) as StateChangeInterval,
	CASE 
		WHEN LAST(w.status) == 'online' AND FIRST(w.status) == 'online'  THEN "up"
		WHEN LAST(w.status) == 'online' AND FIRST(w.status) == 'offline'  THEN "online"
		WHEN LAST(w.status) == 'offline' AND FIRST(w.status) == 'online'  THEN "down"
		WHEN LAST(w.status) == 'offline' AND FIRST(w.status) == 'offline'  THEN "offline"
	END as ChangeLabel
FROM   UserState w 
GROUP BY w.myID
HAVING ChangeLabel = 'down' OR ChangeLabel ='online';

The two-row window acts as a two-node state machine. The addition of a CQ can generate events that meet business requirements:

  • alerts on state changes

  • accumulating statistics on device uptime, downtime, frequency of change, and so on

3.10.1
Was this article helpful?
0 out of 0 found this helpful

Comments