The topics in this section assume that you are familiar with material covered in Fundamentals of TQL programming and Intermediate TQL programming: common patterns.
First, implement your application for a single server and verify that it is working as expected.
Multi-server deployment for fault tolerance
If you are deploying to multiple servers solely for fault tolerance, no changes to the application are required. See Continuing operation after node failure.
Using PARTITION BY
The key to distributing an application across multiple servers in a Striim cluster is the PARTITION BY
option when creating a stream. For example:
CREATE CQ CsvToPosData INSERT INTO PosDataStream PARTITION BY merchantId SELECT TO_STRING(data[1]) as merchantId, TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime, DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue, TO_DOUBLE(data[7]) as amount, TO_STRING(data[9]) as zip FROM CsvStream;
With this PARTITION BY
clause, when the output from PosDataStream
is distributed among multiple servers, all of the events for each merchantId
value go to the same server. This is essential for aggregating and performing calculations on the data. (Note that the PARTITION BY
clause in CREATE WINDOW statements has no effect on how events are distributed among servers.)
You can also partition by expression. For example, you could partition the output stream of an OracleReader source using PARTITION BY meta(OraData, 'TableName')
in order to distribute events among servers based on the source table names.
Sources in a multi-server cluster
General rules for sources in a multi-server environment:
Put the source and the CQ that parses its data (see Parsing the data field of WAEvent) in a flow by themselves.
Partition the CQ's output stream as discussed above.
If the source ingests data from a remote host, deploy it to a single-server deployment group, or to one server of a two-server deployment group (to support failover as discussed in Continuing operation after node failure). The source cannot be distributed across multiple servers since Striim has no way of distributing the incoming data among servers until it is put into a partitioned stream. Note that a server can belong to more than one deployment group, so if the source is not resource-intensive, it might share a server with other flows in another deployment group.
If the source ingests local data on a remote host, it may be deployed to a deployment group containing multiple Forwarding Agents (see Using the Striim Forwarding Agent).
Which field the output stream should be partitioned by depends on which events need to end up on the same server for aggregation or calculations. If there is no such requirement, pick a field that should distribute events evenly, such as a timestamp.
Windows in a multi-server cluster
When a window is running on multiple servers, events are distributed among the servers based on the PARTITION BY
field of its OVER
stream. Put a window in the same flow as the CQ that consumes its data.
Caches in a multi-server cluster
Put a cache in the same flow as the CQ that consumes its data.
A cache's data is distributed among the servers of its distribution group based on the value of the keytomap
field. For best performance, this should match the PARTITION BY
field used to distribute streams containing the data with which the cache data will be joined. In other words, partition the CQ's input stream on the cache key, and distribution of the cache will match that of the CQ. This will be much faster than doing a cache lookup on another server.
For example, if you modified the PosApp sample application for use in a multi-server environment:
CREATE CQ CsvToPosData INSERT INTO PosDataStream PARTITION BY merchantId ... CREATE JUMPING WINDOW PosData5Minutes OVER PosDataStream ... CREATE CACHE HourlyAveLookup ... (keytomap:'merchantId') ... CREATE STREAM MerchantTxRateOnlyStream ... PARTITION BY merchantId; ... CREATE CQ GenerateMerchantTxRateOnly ... FROM PosData5Minutes p, HourlyAveLookup l WHERE p.merchantId = l.merchantId ...
Since merchantId
is the PARTITION BY
field for PosDataStream
and the cache key for HourlyAveLookup
, the events for the join performed by GenerateMerchantTxRateOnly
will be on the same servers.
When a server is added to a cluster, cache data is redistributed automatically.
The replicas
property controls how many duplicate copies of each cache entry are maintained in the deployment group:
The default value of
replicas
is1
. This means the deployment group contains only only a single copy of each entry, on the server to which it is distributed based on itskeytomap
field value.Setting
replicas
to2
enables fault tolerance. In the event one server goes down, all cache data is still available from other servers.Setting
replicas
toall
creates a full copy of each cache on each server. If joins are not always on thekeytomap
field, this may improve performance. Note that with large caches this may significantly increase memory requirements, potentially degrading performance.
CQs in a multi-server cluster
When a CQ is running on multiple servers, events are distributed among the servers based on the PARTITION BY
field of the stream referenced by its FROM
clause. General rules for CQs:
put any cache(s) or window(s) referenced by the FROM clause in the same flow as the CQ
if the CQ's FROM clause references a cache, partition the CQ's input stream by the same key used for cache lookups
put the CQ's INSERT INTO (output) stream, if any, in the same flow as the CQ
partition the INSERT INTO stream if it is consumed by a CQ or target that is deployed to more than one server
WActionStores in a multi-server cluster
There are no special programming issues when a WActionStore is running on multiple servers. It may be in any flow or its own flow.
Note that if a WActionStore runs out of memory it will automatically drop the oldest WActions. Putting a WActionStore in its own flow and deploying that flow to servers not used by other flows will give you maximum control over how much memory is reserved for the WActionStore.
Targets in a multi-server cluster
When a target is running on multiple servers, events are distributed among the servers based on the PARTITION BY
field of its INPUT FROM
stream.
Using multiple flows
You may create multiple flows within the application to control which components are deployed to which (and how many) servers in the cluster. See Managing deployment groups for more information.
Exceptions (see Handling exceptions) are sent to Global.exceptionsStream and written to Striim's server log (Reading log files). If useful, you may store exceptions in a WActionStore using an application such as this:
CREATE APPLICATION StoreExceptions; CREATE TYPE myExceptionType ( entityName java.lang.String, className java.lang.String, message java.lang.String); CREATE STREAM myExceptionsStream OF myExceptionType; CREATE CQ selectExceptions INSERT INTO myExceptionsStream SELECT entityName, className, message FROM Global.exceptionsStream; CREATE WACTIONSTORE ExceptionsWAS CONTEXT OF myExceptionType EVENT TYPES (myExceptionType KEY (entityId)); CREATE CQ stream2WActionStore INSERT INTO ExceptionsWAS SELECT * from myExceptionsStream; END APPLICATION StoreExceptions;
To store only certain events, add an appropriate WHERE clause to the CQ.
The Striim Forwarding Agent is a stripped-down version of a Striim server that can be used to run sources and CQs locally on a remote host. Windows, caches, and other components are not supported.
To use the Agent, first follow the instructions in Installing the Striim Forwarding Agent. Then, in your application, create a flow for the source that will run on the agent, and deploy it to the agent's deployment group.
Here is a simple example that reads from a file on the remote host and writes to a file on the Striim server:
CREATE APPLICATION agentTest; CREATE FLOW AgentFlow WITH ENCRYPTION; CREATE SOURCE CsvDataSource USING FileReader ( directory:'Samples/PosApp/appData', wildcard:'posdata.csv', blocksize: 10240, positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO CsvStream; END FLOW AgentFlow; CREATE FLOW ServerFlow; CREATE TARGET t USING FileWriter( filename:'AgentOut') FORMAT USING DSVFormatter () INPUT FROM CsvStream END FLOW ServerFlow; END APPLICATION agentTest; DEPLOY APPLICATION agentTest ON default WITH AgentFlow ON ALL IN agent;
Be sure the Agent is running when you load the application. If there are multiple agents in the deployment group the source will automatically combine their data. The WITH ENCRYPTION option will encrypt the stream connecting the agent to the server (see CREATE APPLICATION ... END APPLICATION).
Note
CQs running on an Agent may not select from Kafka streams or include AS <field name>
, GROUP BY
, HAVING
, ITERATOR
, or MATCH_PATTERN
.
The following variation on the beginning of the PosApp sample application filters out unneeded columns and partitions the stream (see Adapting TQL applications for multi-server deployment):
CREATE FLOW AgentFlow; CREATE SOURCE CsvDataSource USING FileReader ( directory:'Samples/PosApp/appData', wildcard:'posdata.csv', blocksize: 10240, positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO CsvStream; CREATE CQ CsvToPosData INSERT INTO PosDataStream partition by merchantId SELECT TO_STRING(data[1]) as merchantId, TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime, DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue, TO_DOUBLE(data[7]) as amount, TO_STRING(data[9]) as zip FROM CsvStream; END FLOW AgentFlow; ...
PARTITION BY merchantId
specifies that events with the same merchantId
values will be processed on the same server. This is required for ServerFlow
to be deployed to multiple servers. An unpartitioned source is automatically deployed to a single server.
See Filtering events using OUTPUT TO and WHERE for additional examples of filter syntax that are compatible with the Forwarding Agent.
To support recovery when a source running on the Forwarding Agent is in one application and the server-side components that consume its output are in one or more other applications, persist the source's output to Kafka (see Persisting a stream to Kafka). For example:
CREATE APPLICATION agentApp; CREATE FLOW AgentFlow WITH ENCRYPTION; CREATE STREAM PersistedStream OF Global.WAEvent PERSIST; CREATE SOURCE OracleCDCIn USING OracleReader ( Username:'striim', Password:'passwd', ConnectionURL:'203.0.113.49:1521:orcl', Tables:'myschema.%' ) OUTPUT TO PersistedStream; END FLOW AgentFlow; END APPLICATION agentApp; DEPLOY APPLICATION agentApp ON default WITH AgentFlow ON ALL IN agent; CREATE APPLICATION ServerApp; CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' ( Mode: 'Sync', Topic: 'OracleData', brokerAddress: '198.51.100.55:9092' ) FORMAT USING AvroFormatter ( schemaFileName: 'OracleData.avro' ) INPUT FROM PersistedStream; END APPLICATION ServerApp; DEPLOY APPLICATION serverApp ON default;
The following simple example sends KafkaWriter output to partition 0 or 1 based on the PARTITION BY field of the target's input stream or the PartitionKey property value. This example assumes you are using Eclipse, but it may be adapted to any Java development environment.
Create a new Java project.
After naming the project and making any other necessary changes, click Next > Libraries > Add External JARs, navigate to
Striim/lib
, and double-click Common-3.10.3.jar.Finish creating the new Java project.
Add a new class with package name com.custompartitioner and class name KafkaCityPartitioner.
-
Replace the default contents of the new class with the following:
package com.custompartitioner; import com.webaction.kafka.PartitionerIntf; import java.util.ArrayList; public class KafkaCityPartitioner implements PartitionerIntf { @Override public void close() { // TODO Auto-generated method stub } @Override public int partition(String topic, Object keylist, Object event, int noOfPartitions) { if(noOfPartitions < 2) { throw new RuntimeException("Number of partitions is less than 2"); } if(keylist != null) { ArrayList<String> partitionKeyList = (ArrayList<String>) keylist; String partitionKey = partitionKeyList.get(0); if(partitionKey.equalsIgnoreCase("Amsterdam")) { return 0; } } return 1; } }
If the partition key field value is Amsterdam, the event will be written to Kafka partition 0, otherwise it will be written to partition 1. This logic may be extended to handle additional values and partitions by adding else clauses. To guarantee that there will be no duplicate events after recovery, for each partition key the partitioning logic must always write to the same partition number.
If Eclipse's Build Automatically option is disabled, build the project.
Click File > Export > Java > JAR File > Next, select the class, and click Finish.
Save the JAR file in
Striim/lib
.Restart Striim.
-
To test the custom partitioner, create an application using this TQL. If you are not using Striim's internal Kafka instance, change the broker address to that of your Kafka instance.
CREATE SOURCE CSVSource USING FileReader ( positionbyeof: false, directory: 'Samples', wildcard: 'city.csv' ) PARSE USING DSVParser () OUTPUT TO CSVStream; CREATE CQ CQ1 INSERT INTO CQStream SELECT data[0] as data0 java.lang.Object, data[1] as data1 java.lang.Object, data[2] as data2 java.lang.Object, data[3] as data3 java.lang.Object FROM CSVStream; CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.9.0' ( Topic: 'test01', brokerAddress: 'localhost:9092', KafkaConfig: 'partitioner.class=com.kafka.custompartitioner.KafkaCityPartitioner' ) FORMAT USING DSVFormatter() INPUT FROM CQStream;
Save the following as
Striim/Samples/city.csv
:Amsterdam,0,0,0 Los Angeles,1,1,1 Amsterdam,2,2,0 Los Angeles,3,3,1 Amsterdam,4,4,0 Los Angeles,5,5,1 Amsterdam,6,6,0 Los Angeles,7,7,1 Amsterdam,8,8,0 Los Angeles,9,9,1
Deploy and run the application.
To read a Kafka stream's persisted data in an external application:
Include
dataformat:'avro'
in the stream's property set.Generate an Avro schema file for the stream using the console command
EXPORT <namespace>.<stream name>
. This will create a schema file<namespace>_<stream name>_schema.avsc
in the Striim program directory. Optionally, you may specify a path or file name usingEXPORT <namespace>.<stream name> '<path>' '<file name>'
. The .avsc extension will be added automatically.Copy the schema file to a location accessible by the external application.
In the external application, use the Zookeeper and broker addresses in the stream's property set, and reference the stream using
<namespace>_<stream name>
.
Note
Recovery (see Recovering applications) is not supported for Avro-formatted Kafka streams.
For example, the following program would read from Samples.PosDataStream:
import java.io.File; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; public class SimpleKafkaConsumer { /** * This method issues exactly one fetch request to a kafka topic/partition and prints out * all the data from the response of the fetch request. * @param topic_name Topic to fetch messages from * @param schema_filename Avro schema file used for deserializing the messages * @param host_name Host where the kafka broker is running * @param port Port on which the kafka broker is listening * @param clientId Unique id of a client doing the fetch request * @throws Exception */ public void read(String topic_name, String schema_filename, String host_name, int port, String clientId) throws Exception { SimpleConsumer simpleConsumer = new SimpleConsumer(host_name, port, 100000, 64 * 1024, clientId); // This is just an example to read from partition 1 of a topic. int partitionId = 1; // Finds the first offset in the logs and starts fetching messages from that offset. long offset = getOffset(simpleConsumer, topic_name, partitionId, kafka.api.OffsetRequest.EarliestTime(), clientId); // Builds a fetch request. FetchRequestBuilder builder = new FetchRequestBuilder(); builder.clientId(clientId); builder.addFetch(topic_name, partitionId, offset, 43264200); FetchRequest fetchRequest = builder.build(); // Get the response of the fetch request. FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest); // Instantiates an avro deserializer based on the schema file. SpecificDatumReader datumReader = getAvroReader(schema_filename); if (fetchResponse.hasError()) { System.out.println("Error processing fetch request: Reason -> "+fetchResponse.errorCode(topic_name, 1)); } else { ByteBufferMessageSet bbms = fetchResponse.messageSet(topic_name, partitionId); int count = 0; for (MessageAndOffset messageAndOffset : bbms) { ByteBuffer payload = messageAndOffset.message().payload(); { // The message format is Striim specific and it looks like : // 1. First 4 bytes represent an integer(k) which tells the size of the actual message in the byte buffer. // 2. Next 'k' bytes stores the actual message. // 3. This logic runs in a while loop until the byte buffer limit. while (payload.hasRemaining()) { int size = payload.getInt(); // if the size is invalid, or if there aren't enough bytes to process this chunk, then just bail. if ((payload.position() + size > payload.limit()) || size == 0) { break; } else { byte[] current_bytes = new byte[size]; payload.get(current_bytes, 0, size); BinaryDecoder recordDecoder = DecoderFactory.get().binaryDecoder(current_bytes, null); GenericData.Record record = (GenericData.Record) datumReader.read(null, recordDecoder); System.out.println(count++ +":"+record); } } } } } } private SpecificDatumReader getAvroReader(String schema_filename) throws Exception { File schemaFile = new File(schema_filename); if(schemaFile.exists() && !schemaFile.isDirectory()) { Schema schema = new Schema.Parser().parse(schemaFile); SpecificDatumReader avroReader = new SpecificDatumReader(schema); return avroReader; } return null; } public long getOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } public static void main(String[] args) { SimpleKafkaConsumer readKafka = new SimpleKafkaConsumer(); String topic_name = "Samples_PosDataStream"; String schema_filename = "./Platform/conf/Samples_PosDataStream_schema.avsc"; try { readKafka.read(topic_name, schema_filename, "localhost", 9092, "ItsAUniqueClient"); } catch (Exception e) { System.out.println(e); } } }
This section covers the use of MODIFY on streams of user-defined types. For streams of type WAEvent, see Modifying and masking values in the WAEvent data array using MODIFY.
Using MODIFY in a CQ's SELECT statement allows you to change or mask the contents of specified fields. When a stream has many fields, this can be much simpler and more efficient approach than writing a SELECT statement that handles each of the fields.
The syntax is:
SELECT <field name> FROM <stream name> MODIFY (<field name> = <expression>)
The expression can use the same operators and functions as SELECT. The MODIFY clause may include CASE statements.
The following simple example would convert a monetary amount in the Amount
field using an exchange rate of 1.09:
CREATE CQ ConvertAmount INSERT INTO ConvertedStream SELECT * FROM UnconvertedStream MODIFY(Amount = TO_FLOAT(Amount) * 1.09);
The next example illustrates the use of CASE statements. It uses the maskPhoneNumber function (see Masking functions) to mask individually identifiable information from US and India telephone numbers (as dialed from the US) while preserving the country and area codes. The US numbers have the format ###-###-####, where the first three digits are the area code. India numbers have the format 91-###-###-####, where 91 is the country code and the third through fifth digits are the subscriber trunk dialing (STD) code. The telephone numbers are in the PhoneNum
field and the country codes are in the Country
field.
CREATE CQ maskData INSERT INTO maskedDataStream SELECT * FROM unmaskedDataStream MODIFY( data[4] = CASE WHEN Country == "US" THEN maskPhoneNumber(PhoneNum, "###-xxx-xxx") ELSE maskPhoneNumber(PhoneNum), "#####x#xxx#xxxx") END );
This could be extended with additional WHEN statements to mask numbers from additional countries, or with additional masking functions to mask individually identifiable information such as credit card, Social Security, and national identification numbers.
See Masking functions for additional examples.
See also Modifying output using ColumnMap. In some cases that may be a more straightforward and efficient solution than using MODIFY.
Every TQL component exists in a namespace, which is used to assign privileges and control access. Every user account has a personal namespace with the same name. For example, the admin
user has an admin
namespace.
When you create an application in the console, by default it will be created in the current namespace, which is shown in the prompt (for example, W (admin) >). You can override that default by preceding CREATE APPLICATION
with USE <namespace name>;
to change the current namespace.
When you create an application in the UI, by default it will be created in your personal namespace. The "Create new application" dialog allows you to override the default by selecting a different namespace or creating a new one.
Note
When you import a TQL file in the UI, any CREATE NAMESPACE <name>;
and USE <namespace name>;
statements in the file will override the choice in the UI.
If useful, you may create an empty namespace:
CREATE NAMESPACE <name>;
One use for this is to keep applications that use components with the same names from interfering with each other. For example, you might have the current version of an application in one namespace and the next version in another.
Another use for a namespace is to hold a library of common types to be shared by various applications. For example:
CREATE NAMESPACE CommonTypes;
USE CommonTypes;
CREATE TYPE MerchantName(
merchantId String KEY,
companyName String
);
USE my_user_name;
CREATE APPLICATION PosMonitor;
CREATE STREAM MerchantNames OF CommonTypes.MerchantName;
...
The MerchantName
type is now available in any application by specifying CommonTypes.MerchantName
. The USE
my_user_name
command stops creation of components in the CommonTypes
namespace. If you left that out, the PosMonitor
application would be created in the CommonTypes
namespace rather than in your personal namespace.
See Managing users, permissions, and roles for information on the role namespaces play in the Striim security model.
When LINK SOURCE EVENT
is specified in the CQ that populates the WActionStore, and the type for the linked events is specified in the EVENT TYPES
clause of the CREATE WACTIONSTORE
statement, you can use EVENTLIST
in a subquery and ITERATOR
over the subquery to return values from fields in the linked events.
For example, in MultiLogApp, the WActionStore ApiActivity
includes linked events of the type ApiUsage
:
CREATE TYPE ApiUsage ( api String key, sobject String, count Integer, logTime DateTime ); CREATE TYPE ApiContext ( api String key, count Integer, logTime DateTime ); CREATE WACTIONSTORE ApiActivity CONTEXT OF ApiContext EVENT TYPES ( ApiUsage KEY(api)) ... CREATE CQ GetApiSummaryUsage INSERT INTO ApiActivity SELECT a.api, sum(a.count), first(a.logTime) FROM ApiSummaryWindow a GROUP BY a.api LINK SOURCE EVENT;
The following query will return all fields from all linked events:
SELECT it.api, it.count, it.logTime, it.sobject FROM (SELECT EVENTLIST(t) AS list_of_events FROM Samples.ApiActivity t) z, ITERATOR(z.list_of_events) it;
SELECT [DISTINCT] { <field name>, ... } [ ISTREAM ] FROM ITERATOR (<nested collection name>.<member name>[, <type>]) <iterator name>, <nested collection name>
Usage |
Description |
---|---|
|
|
|
|
The ITERATOR function allows you to access the members of nested collections in a dataset, so long as the collection implements the java.lang.Iterable interface or is a JsonNode.
For example, suppose you have the following statement:
create stream s1 ( id string, json_array JsonNode, list_of_objects java.util.List );
The json_array
and list_of_objects
are both nested data collections.
Since java.util.List implements the java.lang.Iterable interface, you can create an iterator and use it to access the members of list_of_objects
:
SELECT lst from s1, ITERATOR(s1.list_of_objects) lst;
Suppose the list_of_objects
contains objects of this Java type:
package my.package; class MyType { int attrA; int attrB; }
You would access its members using this statement:
SELECT attrA, attrB FROM s1, ITERATOR(s1.list_of_objects, my.package.MyType) lst;
The stream also includes a JsonNode member. Suppose the following events are added to the stream:
('a', [1,2,3], null) ('b', [1,3,4], null)
Here is how you can iterate through json_array
:
SELECT id, a FROM s1, iterator(s1.json_array) a; OUTPUT: ======= a 1 a 2 a 3 b 1 b 3 b 4
This statement illustrates a cross join between json_array
and list_of_objects
:
SELECT a, b FROM ITERATOR(s1.json_array) a, ITERATOR(s1.list_of_objects) b, s1;
You can iterate through multiple levels of nesting. For example, this statement iterates through 2 levels, where json_array
contains another_nested_collection
:
SELECT x FROM s1, ITERATOR(s1.json_array) a, ITERATOR(a.another_nested_collection) x;
Warning
Arbitrary data types, such as a Java List, cannot be used with a WActionStore that is persisted to Elasticsearch. Also, it is not possible to convert a JSON List into a Java List.
You can use the META()
function to query the metadata
map of the WAEvent type, which is used by the output streams of several Striim readers. For example, the following creates a stream containing invalid records:
CREATE STREAM ExceptionStream of ExceptionRecord; CREATE CQ CQExceptionRecord INSERT INTO ExceptionStream SELECT data[0] FROM CsvStream WHERE META(CsvStream, ‘RecordStatus’).toString() == ‘INVALID_RECORD’;
The elements of the metadata map vary depending on the reader and parser used.
reader |
metadata elements |
---|---|
DatabaseReader |
|
FileReader |
|
HDFSReader |
|
HTTPReader |
|
JMSReader |
no metadata returned |
KafkaReader |
|
MultiFileReader |
|
The following parsers append metadata elements to those of the associated reader:
parser |
metadata elements |
---|---|
DSVParser |
|
FreeFormTextParser |
|
NetflowParser (version 5) |
|
NetflowParser (version 9) |
|
SNMPParser |
|
XMLParser |
|
You can use pattern matching in your queries to conveniently specify multiple combinations of events via a MATCH_PATTERN
expression. Pattern expressions consist of references to stream and timer events according to this syntax :
CREATE CQ <CQ name> INSERT INTO <output stream> SELECT <list of expressions> FROM <data source> [ <data source alias>] [, <data source> [ <data source alias> ], ...] MATCH_PATTERN <pattern> DEFINE <pattern variable> = [<data source or alias name>] ‘(‘ <predicate> ‘)’ [ PARTITION BY <data source field name> ];
In this syntax, dataSource
is a stream or timer event.
The MATCH_PATTERN
clause represents groups of events along with condition variables. New events are referenced in the predicates, where event variables are defined. Condition variables are expressions used to check additional conditions, such as timer events, before continuing to match the pattern.
The pattern matching expression syntax uses the following notation:
Notation |
Description |
---|---|
* |
0 or more quantifiers |
? |
0 or 1 quantifiers |
+ |
1 or more quantifiers |
{m,n} |
m to n quantifiers |
| |
logical OR (alternative) |
& |
logical AND (permutation) |
( ) |
grouping |
# |
restart matching markers for overlapping patterns |
For example, the pattern A B* C+ would contain exactly 1 new event matching A, 0 or more new events matching B, and 1 or more new events matching C.
NOTE: You cannot refer to new events in conditions or expressions contained in the SELECT clause.
The DEFINE clause defines a list of event variables and conditions. The conditions are described in the predicates associated with the event variables.
For example, A = S(sensor < 100) matches events on stream S where the sensor attribute is less than 100.
The optional PARTITION BY clause partitions the stream by key on sub-streams, and pattern matching is performed independently for every partition.
Let's examine this example:
CREATE CQ SELECT LIST(B.id) as eventValues, count(B.id) as eventCount, A.id as eventTime FROM eventStream S MATCH_PATTERN T A W B* P C DEFINE A = S(sensor < 100), B = S(sensor >= 100), C = S(sensor < 100), T = TIMER(interval 60 second), W = STOP(T), P = (count(B) > 5) PARTITION BY eventId;
The pattern of event variables in the MATCH_PATTERN
clause (T A W B* P C) represents the following:
T A W: Event A is matched within 60 seconds. Since this pattern ends with W, subsequent events (B, P, C) can only be matched after the first 60 seconds have elapsed.
B* P C: After the first 60 seconds, the match pattern consists of 0 or more instances of event B, followed by event P, followed by event C.
Once you have established event variables, you can refer to groups of matched events using 0-based array index notation.
For example, B[3].sensor
refers to the 4th event in group B. If you omit the index (for example, B.sensor
), the last event in the group is returned.
If you pass event variables to built-in or user-defined functions, the compiler will generate code differently depending on whether a scalar or aggregating function is used.
In this example, the expression aggregates all sensor attributes for all matched events in event variable B.
avg(B.sensor)
In this example, the expression evaluates to the absolute value of the sensor attribute of the last matched event in event variable B:
abs(B.sensor)
If a match on the next event depends on the value of an attribute in a previous event, use the PREV() built-in function. The only parameter is an integer constant indicating how far back to match in the event pattern. For example, PREV() or PREV(1) refers to the immediately preceding event, but PREV(2) refers to the event that occurred before the immediately preceding event. For example:
DEFINE -- Compare the new event's sensor with the previous event's sensor. -- The default index value of 1 is used. A=streamx(sensor < PREV().sensor) -- Compare the new event's sensor with the sensor of the event before the previous sensor. B=streamx(sensor < PREV(2).sensor)
You can set timer events using the following functions:
Function |
Description |
---|---|
|
A condition that does not wait for a new event from any stream. When the timer expires, it send a signal as an event from the timer stream. |
|
The next event is expected from the timer defined by the specified variable. |
|
Stops the timer and cancels sending the timer event. |
For example, this pattern matches no events for a period of 50 seconds:
PATTERN T W DEFINE T=timer(interval 50 second), W=signal(T)
This pattern matches all events from streamA for 30 seconds (until event W is received from the timer):
PATTERN T A* W -- matching all events from streamA for 30 seconds DEFINE T timer(interval 30 second), A=streamA, W=signal(T)
This pattern matches an event from streamA within 30 seconds. If an event from streamA does not occur within 30 seconds, an event from the timer will be received causing the pattern matching to fail:
PATTERN T A -- matching event from streamA within 30 seconds DEFINE T=timer(interval 30 second), A=streamA
This pattern matches events from streamA for 30 seconds. It subsequently matches events from streamB for 30 seconds:
PATTERN T A C T2 B -- matching event A for 30 seconds and then event B within 30 seconds DEFINE T=timer(interval 30 second), A=streamA, C=stop(T), T2=timer(interval 30 second), B=streamB
If you would like to specify several variations of an event sequence, use the alternation operator ( | ). In case there are equivalent variations in the alternation expression, the first (leftmost) variation will be matched first. For example, the following pattern matches A but not AA since it is equivalent to A:
PATTERN (A|AA|B|C) DEFINE A=streamA(sensor between 10 and 20), AA=streamA(sensor between 10 and 20), -- it is same as A, and will never be matched B=streamA(sensor > 20), C=streamA(sensor < 10)
Suppose you would like to match the sequence ABA and the stream contains events ABABA. Normally the first instance (ABA)BA will be matched, but the second instance AB(ABA) will not be matched. If you would like both instances to be matched, include a ( # ) operator in the pattern wherever you would like the engine to restart its matching (for example, AB#ABA). If the pattern contains multiple # operators, the matching restarts from the earliest occurrence of the last successful match.
For example, consider this pattern:
PATTERN A (B # A | C D # A)
If the actual event sequence is ABACDABACDA, the following subsequences will be matched:
ABA
ACDA
ABA
ACDA
The com.webaction.analytics
classes provide you with several regression functions that enable you to make predictions based on the regression model you have chosen. To get started, begin with the following IMPORT statements in your TQL:
IMPORT STATIC com.webaction.analytics.regressions.LeastSquares.*; IMPORT STATIC com.webaction.analytics.regressions.LeastSquaresPredictor.*; IMPORT STATIC com.webaction.analytics.regressions.PredictionResult.*; IMPORT STATIC com.webaction.analytics.regressions.Helpers.*;
These imports provide you with the following regression functions, along with supporting helper methods. Use these aggregate functions in a CQ that selects from a window, which is used as the training set of the regression algorithm.
Regression Class |
Description |
---|---|
|
Performs simple linear regression using a single independent variable and a single dependent variable. |
|
Performs simple linear regression, constrained so that the fitted line passes through the "rightmost" (newest) point in the window. |
|
Performs multiple linear regression using multiple independent variables and a single dependent variable. |
|
Performs multiple linear regression, constrained so that the fitted hyperplane passes through the desired number of "rightmost" (newest) points in the window. |
|
Performs polynomial regression, creating a nonlinear model using a single independent variable and a single dependent variable. |
|
Performs polynomial regression, constrained so that the fitted polynomial passes through the desired number of "rightmost" (newest) points in the window. |
The InventoryPredictor example illustrates the usage of these regression analytics functions through the use of a prediction algorithm, which sends alerts when the inventory is predicted to be low. The general approach to using regression in time-series predictions is as follows:
Create a window.
Specify the regression model.
Specify the variable on which predictions are made and the independent variable (in the following example, the timestamp).
Determine whether there are enough points to make a valid prediction.
The syntax is:
SELECT [ISTREAM] [CONSTRAINED_] {SIMPLE_LINEAR_REGRESSION | MULTIPLE_LINEAR_REGRESSION | POLYNOMIAL_REGRESSION} (properties) AS pred, CASE IS_PREDICTOR_READY(pred) WHEN TRUE THEN PREDICT[_MULTIPLE](properties) ELSE ZERO_PREDICTION_RESULT([properties]) END AS result, output1, output2, [COMPUTE_BOUNDS(properties),] [{GREATER|LESS}_THAN_PROBABILITY()], ...
In this first section of the TQL, we import the required com.webaction.analytics
classes, create the application (InventoryPredictor
), create the inventory stream (InventoryStream
), and finally create a two-hour window over that inventory stream (InventoryChangesWindow
):
IMPORT STATIC com.webaction.analytics.regressions.LeastSquares.*; IMPORT STATIC com.webaction.analytics.regressions.LeastSquaresPredictor.*; IMPORT STATIC com.webaction.analytics.regressions.PredictionResult.*; IMPORT STATIC com.webaction.analytics.regressions.Helpers.*; CREATE APPLICATION InventoryPredictor; CREATE TYPE InventoryType( ts org.joda.time.DateTime, SKU java.lang.String, inventory java.lang.Double, location_id java.lang.Integer ); CREATE STREAM InventoryStream OF InventoryType; CREATE SOURCE JSONSource USING FileReader ( directory: 'Samples', WildCard: 'inventory.json', positionByEOF: false ) PARSE USING JSONParser ( eventType: 'InventoryType' ) OUTPUT TO InventoryStream; CREATE WINDOW InventoryChangesWindow OVER InventoryStream KEEP WITHIN 2 HOUR PARTITION BY SKU;
In this example, simple linear regression will be chosen to create a model in which the inventory is predicted according to the current system time. We will begin by setting up an AlertStream
based on inventory predictions:
CREATE STREAM AlertStream OF Global.AlertEvent; CREATE CQ AlertOnInventoryPredictions INSERT INTO AlertStream . . .
The SELECT
statement that follows sets up inventory alert messages based on boolean variables indicating whether the inventory is low (isInventoryLow
) or ok (isInventoryOK
) based on a 90% confidence interval used in the prediction model:
SELECT "Low Inventory Alert", SKU, CASE WHEN isInventoryLow THEN "warning" WHEN isInventoryOK THEN "info" END, CASE WHEN isInventoryLow THEN "raise" WHEN isInventoryOK THEN "cancel" END, CASE WHEN isInventoryLow THEN "Inventory for SKU " + SKU + " is expected to run low within 2 hours (90% confidence)." WHEN isInventoryOK THEN "Inventory status for SKU " + SKU + " looks OK for the next 2 hours (90% confidence)." END ...
The next part of the SELECT
statement sets up the simple linear regression model (pred
) based on the inventory variable (inventory
) and a timestamp variable (ts
):
FROM (SELECT ISTREAM SKU, DNOW() AS ts, SIMPLE_LINEAR_REGRESSION(inventory, ts) AS pred, ...
Next we determine whether there are enough points to make a valid prediction by calling the IS_PREDICTOR_READY
function. If there are, we compute the probability of the event that the inventory has fallen below (or exceeded) a given critical threshold by calling the LESS_THAN_PROBABILITY
(or GREATER_THAN_PROBABILITY
) function. If the computed probabilities are higher than 90%, we raise the appropriate flag (isInventoryLow
or isInventoryOK
).
IS_PREDICTOR_READY(pred) AND LESS_THAN_PROBABILITY(PREDICT(pred, DADD(ts, DHOURS(2))), 5) > 0.9 AS isInventoryLow, IS_PREDICTOR_READY(pred) AND GREATER_THAN_PROBABILITY(PREDICT(pred, DADD(ts, DHOURS(2))), 5) > 0.9 AS isInventoryOK, ...
Note that the PREDICT
function feeds the data into the prediction model, which returns a PredictionResult
object that is subsequently passed to the probability utility functions.
Here is the entire CREATE CQ
statement:
CREATE CQ AlertOnInventoryPredictions INSERT INTO AlertStream SELECT "Low Inventory Alert", SKU, CASE WHEN isInventoryLow THEN "warning" WHEN isInventoryOK THEN "info" END, CASE WHEN isInventoryLow THEN "raise" WHEN isInventoryOK THEN "cancel" END, CASE WHEN isInventoryLow THEN "Inventory for SKU " + SKU + " is expected to run low within 2 hours (90% confidence)." WHEN isInventoryOK THEN "Inventory status for SKU " + SKU + " looks OK for the next 2 hours (90% confidence)." END FROM (SELECT ISTREAM SKU, DNOW() AS ts, SIMPLE_LINEAR_REGRESSION(inventory, ts) AS pred, IS_PREDICTOR_READY(pred) AND LESS_THAN_PROBABILITY(PREDICT(pred, DADD(ts, DHOURS(2))), 5) > 0.9 AS isInventoryLow, IS_PREDICTOR_READY(pred) AND GREATER_THAN_PROBABILITY(PREDICT(pred, DADD(ts, DHOURS(2))), 5) > 0.9 AS isInventoryOK, FROM InventoryChangesWindow GROUP BY SKU HAVING isInventoryLow OR isInventoryOK) AS subQuery;
Having created the prediction model, we can send email alerts whenever there is at least 90% confidence that the inventory is low:
CREATE SUBSCRIPTION InventoryEmailAlert USING EmailAdapter ( smtp_auth: false, smtpurl: "smtp.company.com", subject: "Low Inventory Alert", emailList: "sysadmin@company.com", senderEmail:"striim@company.com" ) INPUT FROM AlertStream; END APPLICATION InventoryPredictor;
Striim can receive data from Apache Flume using the WebActionSink (see Apache Flume integration) as a source. Its properties are defined in configuration files on the Flume server rather than in TQL.
The WebActionSink properties are:
property |
value |
notes |
---|---|---|
agent.sinks.webactionSink.type |
|
|
agent.sinks.webactionSink.serverUri |
|
the IP address and port of the Striim server (adjust the port number if you are not using the default) |
agent.sinks.webactionSink.username |
the Striim login to be used by the WebActionSink |
|
agent.sinks.webactionSink.password |
the password for that login |
|
agent.sinks.webactionSink.stream |
|
specify the stream name to be used in TQL (see example below) |
agent.sinks.webactionSink.parser.handler |
|
in this release, only DSVParser is supported |
You must also specify the properties for the specified parser. See the example below.
The following example application assumes that Flume is running on the same system as Striim.
1. Perform the first two steps described in Apache Flume integration.
2. Save the following as a TQL file, then load, deploy, and start it:
CREATE APPLICATION flumeTest; CREATE STREAM flumeStream of Global.WAEvent; CREATE TARGET flumeOut USING SysOut(name:flumeTest) INPUT FROM flumeStream; END APPLICATION flumeTest;
This application does not need a CREATE SOURCE
statement because the data is being collected and parsed by Flume. The stream name must match the one specified in the WebActionSink properties and the type must be Global.WAEvent.
2. Save the following as waflume.conf
in the flume/conf
directory, replacing the two IP addresses with the test system's IP address and the username and password with the credentials you used to load the application:
# Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = netcatSrc agent.channels = memoryChannel agent.sinks = webactionSink # For each one of the sources, the type is defined agent.sources.netcatSrc.type = netcat agent.sources.netcatSrc.bind = 192.168.1.2 agent.sources.netcatSrc.port = 41414 # The channel can be defined as follows. agent.sources.netcatSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.webactionSink.type = com.webaction.flume.WebActionSink agent.sinks.webactionSink.serverUri = watp://192.168.1.2:9080 agent.sinks.webactionSink.username = flumeusr agent.sinks.webactionSink.password = passwd agent.sinks.webactionSink.stream = flume:flumeStream agent.sinks.webactionSink.parser.handler = DSVParser agent.sinks.webactionSink.parser.blocksize = 256 agent.sinks.webactionSink.parser.columndelimiter = "," agent.sinks.webactionSink.parser.rowdelimiter = "\n" agent.sinks.webactionSink.parser.charset = "UTF-8" agent.sinks.webactionSink.parser.blockAsCompleteRecord = "True" #Specify the channel the sink should use agent.sinks.webactionSink.channel = memoryChannel # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100
3. Start Flume, specifying the configuration file:
bin/flume-ng agent --conf conf --conf-file conf/waflume.conf --name agent - Dflume.root.logger=INFO,console
4. Save the following as flumetestdata.csv
:
100,first 200,second 300,third
5. Open a terminal, change to the directory where you saved flumetestdata.csv
, and enter the following command, replacing the IP address with the test system's:
cat flumetestdata.csv | nc 192.168.1.2 41414
The following output should appear in striim-node.log
(see Reading log files):
flumeTest: WAEvent{ data: ["ID","Name"] metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0} before: null dataPresenceBitMap: "AA==" beforePresenceBitMap: "AA==" typeUUID: null }; flumeTest: WAEvent{ data: ["100","first"] metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0} before: null dataPresenceBitMap: "AA==" beforePresenceBitMap: "AA==" typeUUID: null }; ...
See Parsing the data field of WAEvent for more information about this data format.
Comments