Striim

Welcome to the Striim Help Center and Community Site

3.10.1 Advanced TQL programming

Follow

The topics in this section assume that you are familiar with material covered in Fundamentals of TQL programming and Intermediate TQL programming: common patterns.

Adapting TQL applications for multi-server deployment

First, implement your application for a single server and verify that it is working as expected.

Multi-server deployment for fault tolerance

If you are deploying to multiple servers solely for fault tolerance, no changes to the application are required. See Continuing operation after node failure.

Using PARTITION BY

The key to distributing an application across multiple servers in a Striim cluster is the PARTITION BY option when creating a stream. For example:

CREATE CQ CsvToPosData
INSERT INTO PosDataStream PARTITION BY merchantId
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM CsvStream;

With this PARTITION BY clause, when the output from PosDataStream is distributed among multiple servers, all of the events for each merchantId value go to the same server. This is essential for aggregating and performing calculations on the data. (Note that the PARTITION BY clause in CREATE WINDOW statements has no effect on how events are distributed among servers.)

You can also partition by expression. For example, you could partition the output stream of an OracleReader source using PARTITION BY meta(OraData, 'TableName') in order to distribute events among servers based on the source table names.

Sources in a multi-server cluster

General rules for sources in a multi-server environment:

  • Put the source and the CQ that parses its data (see Parsing the data field of WAEvent) in a flow by themselves.

  • Partition the CQ's output stream as discussed above.

  • If the source ingests data from a remote host, deploy it to a single-server deployment group, or to one server of a two-server deployment group (to support failover as discussed in Continuing operation after node failure). The source cannot be distributed across multiple servers since Striim has no way of distributing the incoming data among servers until it is put into a partitioned stream. Note that a server can belong to more than one deployment group, so if the source is not resource-intensive, it might share a server with other flows in another deployment group.

  • If the source ingests local data on a remote host, it may be deployed to a deployment group containing multiple Forwarding Agents (see Using the Striim Forwarding Agent).

Which field the output stream should be partitioned by depends on which events need to end up on the same server for aggregation or calculations. If there is no such requirement, pick a field that should distribute events evenly, such as a timestamp.

Windows in a multi-server cluster

When a window is running on multiple servers, events are distributed among the servers based on the PARTITION BY field of its OVER stream. Put a window in the same flow as the CQ that consumes its data.

Caches in a multi-server cluster

Put a cache in the same flow as the CQ that consumes its data.

A cache's data is distributed among the servers of its distribution group based on the value of the keytomap field. For best performance, this should match the PARTITION BY field used to distribute streams containing the data with which the cache data will be joined. In other words, partition the CQ's input stream on the cache key, and distribution of the cache will match that of the CQ. This will be much faster than doing a cache lookup on another server.

For example, if you modified the PosApp sample application for use in a multi-server environment:

CREATE CQ CsvToPosData
INSERT INTO PosDataStream PARTITION BY merchantId ...

CREATE JUMPING WINDOW PosData5Minutes
OVER PosDataStream ...

CREATE CACHE HourlyAveLookup ... (keytomap:'merchantId') ...
CREATE STREAM MerchantTxRateOnlyStream ... PARTITION BY merchantId; ...

CREATE CQ GenerateMerchantTxRateOnly ... 
FROM PosData5Minutes p, HourlyAveLookup l
WHERE p.merchantId = l.merchantId ...

Since merchantId is the PARTITION BY field for PosDataStream and the cache key for HourlyAveLookup, the events for the join performed by GenerateMerchantTxRateOnly will be on the same servers.

When a server is added to a cluster, cache data is redistributed automatically.

The replicas property controls how many duplicate copies of each cache entry are maintained in the deployment group:

  • The default value of replicas is 1. This means the deployment group contains only only a single copy of each entry, on the server to which it is distributed based on its keytomap field value.

  • Setting replicas to 2 enables fault tolerance. In the event one server goes down, all cache data is still available from other servers.

  • Setting replicas to all creates a full copy of each cache on each server. If joins are not always on the keytomap field, this may improve performance. Note that with large caches this may significantly increase memory requirements, potentially degrading performance.

CQs in a multi-server cluster

When a CQ is running on multiple servers, events are distributed among the servers based on the PARTITION BY field of the stream referenced by its FROM clause. General rules for CQs:

  • put any cache(s) or window(s) referenced by the FROM clause in the same flow as the CQ

  • if the CQ's FROM clause references a cache, partition the CQ's input stream by the same key used for cache lookups

  • put the CQ's INSERT INTO (output) stream, if any, in the same flow as the CQ

  • partition the INSERT INTO stream if it is consumed by a CQ or target that is deployed to more than one server

WActionStores in a multi-server cluster

There are no special programming issues when a WActionStore is running on multiple servers. It may be in any flow or its own flow.

Note that if a WActionStore runs out of memory it will automatically drop the oldest WActions. Putting a WActionStore in its own flow and deploying that flow to servers not used by other flows will give you maximum control over how much memory is reserved for the WActionStore.

Targets in a multi-server cluster

When a target is running on multiple servers, events are distributed among the servers based on the PARTITION BY field of its INPUT FROM stream.

Using multiple flows

You may create multiple flows within the application to control which components are deployed to which (and how many) servers in the cluster. See Managing deployment groups for more information.

Writing exceptions to a WActionStore

Exceptions (see Handling exceptions) are sent to Global.exceptionsStream and written to Striim's server log (Reading log files). If useful, you may store exceptions in a WActionStore using an application such as this:

CREATE APPLICATION StoreExceptions;

CREATE TYPE myExceptionType (
  entityName java.lang.String,
  className java.lang.String,
  message java.lang.String);

CREATE STREAM myExceptionsStream OF myExceptionType;

CREATE  CQ selectExceptions 
INSERT INTO myExceptionsStream
SELECT entityName,
  className,
  message
FROM Global.exceptionsStream;

CREATE WACTIONSTORE ExceptionsWAS
CONTEXT OF myExceptionType
EVENT TYPES (myExceptionType KEY (entityId));

CREATE CQ stream2WActionStore
  INSERT INTO ExceptionsWAS
  SELECT * from myExceptionsStream;

END APPLICATION StoreExceptions;

To store only certain events, add an appropriate WHERE clause to the CQ.

Using the Striim Forwarding Agent

The Striim Forwarding Agent is a stripped-down version of a Striim server that can be used to run sources and CQs locally on a remote host. Windows, caches, and other components are not supported.

To use the Agent, first follow the instructions in Installing the Striim Forwarding Agent. Then, in your application, create a flow for the source that will run on the agent, and deploy it to the agent's deployment group.

Here is a simple example that reads from a file on the remote host and writes to a file on the Striim server:

CREATE APPLICATION agentTest;

CREATE FLOW AgentFlow WITH ENCRYPTION;
CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;
END FLOW AgentFlow;
  
CREATE FLOW ServerFlow;
CREATE TARGET t USING FileWriter(  filename:'AgentOut')
FORMAT USING DSVFormatter ()
INPUT FROM CsvStream
END FLOW ServerFlow;

END APPLICATION agentTest;

DEPLOY APPLICATION agentTest ON default
WITH AgentFlow ON ALL IN agent;

Be sure the Agent is running when you load the application. If there are multiple agents in the deployment group the source will automatically combine their data. The WITH ENCRYPTION option will encrypt the stream connecting the agent to the server (see CREATE APPLICATION ... END APPLICATION).

Note

CQs running on an Agent may not select from Kafka streams or include AS <field name>, GROUP BY, HAVING, ITERATOR, or MATCH_PATTERN.

The following variation on the beginning of the PosApp sample application filters out unneeded columns and partitions the stream (see Adapting TQL applications for multi-server deployment):

CREATE FLOW AgentFlow; 
 CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream partition by merchantId
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM CsvStream;

END FLOW AgentFlow; ...

PARTITION BY merchantId specifies that events with the same merchantId values will be processed on the same server. This is required for ServerFlow to be deployed to multiple servers. An unpartitioned source is automatically deployed to a single server.

See Filtering events using OUTPUT TO and WHERE for additional examples of filter syntax that are compatible with the Forwarding Agent.

To support recovery when a source running on the Forwarding Agent is in one application and the server-side components that consume its output are in one or more other applications, persist the source's output to Kafka (see Persisting a stream to Kafka). For example:

CREATE APPLICATION agentApp;
CREATE FLOW AgentFlow WITH ENCRYPTION;
CREATE STREAM PersistedStream OF Global.WAEvent PERSIST; 
CREATE SOURCE OracleCDCIn USING OracleReader (
  Username:'striim',
  Password:'passwd',
  ConnectionURL:'203.0.113.49:1521:orcl',
  Tables:'myschema.%'
) 
OUTPUT TO PersistedStream;
END FLOW AgentFlow;
END APPLICATION agentApp;
DEPLOY APPLICATION agentApp ON default
WITH AgentFlow ON ALL IN agent;
  
  
CREATE APPLICATION ServerApp;
CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' (
  Mode: 'Sync',
  Topic: 'OracleData',
  brokerAddress: '198.51.100.55:9092'
)
FORMAT USING AvroFormatter ( schemaFileName: 'OracleData.avro' )
INPUT FROM PersistedStream;
END APPLICATION ServerApp;
DEPLOY APPLICATION serverApp ON default;

Creating a custom Kafka partitioner

The following simple example sends KafkaWriter output to partition 0 or 1 based on the PARTITION BY field of the target's input stream or the PartitionKey property value. This example assumes you are using Eclipse, but it may be adapted to any Java development environment.

  1. Create a new Java project.

  2. After naming the project and making any other necessary changes, click Next > Libraries > Add External JARs, navigate to Striim/lib, and double-click Common-3.10.1.jar.

  3. Finish creating the new Java project.

  4. Add a new class with package name com.custompartitioner and class name KafkaCityPartitioner.

  5. Replace the default contents of the new class with the following:

    package com.custompartitioner;
    
    import com.webaction.kafka.PartitionerIntf;
    
    import java.util.ArrayList;
    
    public class KafkaCityPartitioner implements PartitionerIntf {
    
    	@Override
    	public void close() {
    		// TODO Auto-generated method stub
    		
    	}
    
    	@Override
    	public int partition(String topic, Object keylist, Object event, int noOfPartitions) {
    		if(noOfPartitions < 2) {
    			throw new RuntimeException("Number of partitions is less than 2");
    		}
    		if(keylist != null) {
    			ArrayList<String> partitionKeyList = (ArrayList<String>) keylist;
    			String partitionKey = partitionKeyList.get(0);
    			if(partitionKey.equalsIgnoreCase("Amsterdam")) {
    				return 0;
    			}
    		}
    		return 1; 
    	}
    
    }
    

    If the partition key field value is Amsterdam, the event will be written to Kafka partition 0, otherwise it will be written to partition 1. This logic may be extended to handle additional values and partitions by adding else clauses.

  6. If Eclipse's Build Automatically option is disabled, build the project.

  7. Click File > Export > Java > JAR File > Next, select the class, and click Finish.

  8. Save the JAR file in Striim/lib.

  9. Restart Striim.

  10. To test the custom partitioner, create an application using this TQL. If you are not using Striim's internal Kafka instance, change the broker address to that of your Kafka instance.

    CREATE SOURCE CSVSource USING FileReader  ( 
      positionbyeof: false,
      directory: 'Samples',
      wildcard: 'city.csv'
    ) 
    PARSE USING DSVParser () 
    OUTPUT TO CSVStream;
    
    CREATE CQ CQ1
    INSERT INTO CQStream
    SELECT data[0] as data0 java.lang.Object,
      data[1] as data1 java.lang.Object,
      data[2] as data2 java.lang.Object,
      data[3] as data3 java.lang.Object
    FROM CSVStream;
    
    
    CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.9.0' ( 
      Topic: 'test01',
      brokerAddress: 'localhost:9092',
      KafkaConfig: 'partitioner.class=com.kafka.custompartitioner.KafkaCityPartitioner'
    ) 
    FORMAT USING DSVFormatter()
    INPUT FROM CQStream;

    Save the following as Striim/Samples/city.csv:

    Amsterdam,0,0,0
    Los Angeles,1,1,1
    Amsterdam,2,2,0
    Los Angeles,3,3,1
    Amsterdam,4,4,0
    Los Angeles,5,5,1
    Amsterdam,6,6,0
    Los Angeles,7,7,1
    Amsterdam,8,8,0
    Los Angeles,9,9,1

    Deploy and run the application.

Reading a Kafka stream with an external Kafka consumer

To read a Kafka stream's persisted data in an external application:

  • Include dataformat:'avro' in the stream's property set.

  • Generate an Avro schema file for the stream using the console command EXPORT <namespace>.<stream name>. This will create a schema file <namespace>_<stream name>_schema.avsc in the Striim program directory. Optionally, you may specify a path or file name using EXPORT <namespace>.<stream name> '<path>' '<file name>'. The .avsc extension will be added automatically.

  • Copy the schema file to a location accessible by the external application.

  • In the external application, use the Zookeeper and broker addresses in the stream's property set, and reference the stream using <namespace>_<stream name>.

Note

Recovery (see Recovering applications) is not supported for Avro-formatted Kafka streams.

For example, the following program would read from Samples.PosDataStream:

import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;

public class SimpleKafkaConsumer
{
    /**
 * This method issues exactly one fetch request to a kafka topic/partition and prints out
 * all the data from the response of the fetch request.
 * @param topic_name Topic to fetch messages from
 * @param schema_filename Avro schema file used for deserializing the messages
 * @param host_name Host where the kafka broker is running
 * @param port Port on which the kafka broker is listening
 * @param clientId Unique id of a client doing the fetch request
 * @throws Exception
 */
 public void read(String topic_name, String schema_filename, String host_name, int port, String clientId) throws Exception
    {
        SimpleConsumer simpleConsumer = new SimpleConsumer(host_name, port, 100000, 64 * 1024, clientId);
        // This is just an example to read from partition 1 of a topic.
 int partitionId = 1;

        // Finds the first offset in the logs and starts fetching messages from that offset.
 long offset = getOffset(simpleConsumer, topic_name, partitionId, kafka.api.OffsetRequest.EarliestTime(), clientId);

        // Builds a fetch request.
 FetchRequestBuilder builder = new FetchRequestBuilder();
        builder.clientId(clientId);
        builder.addFetch(topic_name, partitionId, offset, 43264200);
        FetchRequest fetchRequest = builder.build();

        // Get the response of the fetch request.
 FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);

        // Instantiates an avro deserializer based on the schema file.
 SpecificDatumReader datumReader = getAvroReader(schema_filename);

        if (fetchResponse.hasError())
        {
            System.out.println("Error processing fetch request: Reason -> "+fetchResponse.errorCode(topic_name, 1));
        }
        else
  {
            ByteBufferMessageSet bbms = fetchResponse.messageSet(topic_name, partitionId);
            int count = 0;
            for (MessageAndOffset messageAndOffset : bbms)
            {
                ByteBuffer payload = messageAndOffset.message().payload();
                {
                    // The message format is Striim specific and it looks like :
 // 1. First 4 bytes represent an integer(k) which tells the size of the actual message in the byte buffer.
 // 2. Next 'k' bytes stores the actual message.
 // 3. This logic runs in a while loop until the byte buffer limit.
 while (payload.hasRemaining())
                    {
                        int size = payload.getInt();

                        // if the size is invalid, or if there aren't enough bytes to process this chunk, then just bail.
 if ((payload.position() + size > payload.limit()) || size == 0)
                        {
                            break;
                        }
                        else
 {
                            byte[] current_bytes = new byte[size];
                            payload.get(current_bytes, 0, size);
                            BinaryDecoder recordDecoder = DecoderFactory.get().binaryDecoder(current_bytes, null);
                            GenericData.Record record = (GenericData.Record) datumReader.read(null, recordDecoder);
                            System.out.println(count++ +":"+record);
                        }
                    }
                }
            }
        }
    }

    private SpecificDatumReader getAvroReader(String schema_filename) throws Exception
    {
        File schemaFile = new File(schema_filename);
        if(schemaFile.exists() && !schemaFile.isDirectory())
        {
            Schema schema = new Schema.Parser().parse(schemaFile);
            SpecificDatumReader avroReader = new SpecificDatumReader(schema);
            return avroReader;
        }
        return null;
    }

    public long getOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName)
    {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
          kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError())
        {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    public static void main(String[] args)
    {
        SimpleKafkaConsumer readKafka = new SimpleKafkaConsumer();
        String topic_name = "Samples_PosDataStream";
        String schema_filename = "./Platform/conf/Samples_PosDataStream_schema.avsc";
        try
 {
            readKafka.read(topic_name, schema_filename, "localhost", 9092, "ItsAUniqueClient");
        } catch (Exception e)
        {
            System.out.println(e);
        }
    }
}

Changing and masking field values using MODIFY

This section covers the use of MODIFY on streams of user-defined types. For streams of type WAEvent, see Modifying and masking values in the WAEvent data array using MODIFY.

Using MODIFY in a CQ's SELECT statement allows you to change or mask the contents of specified fields. When a stream has many fields, this can be much simpler and more efficient approach than writing a SELECT statement that handles each of the fields.

The syntax is:

SELECT <field name> FROM <stream name> MODIFY (<field name> = <expression>)

The expression can use the same operators and functions as SELECT. The MODIFY clause may include CASE statements.

The following simple example would convert a monetary amount in the Amount field using an exchange rate of 1.09:

CREATE CQ ConvertAmount 
INSERT INTO ConvertedStream
SELECT * FROM UnconvertedStream
MODIFY(Amount = TO_FLOAT(Amount) * 1.09);

The next example illustrates the use of CASE statements. It uses the maskPhoneNumber function (see Masking functions) to mask individually identifiable information from US and India telephone numbers (as dialed from the US) while preserving the country and area codes. The US numbers have the format ###-###-####, where the first three digits are the area code. India numbers have the format 91-###-###-####, where 91 is the country code and the third through fifth digits are the subscriber trunk dialing (STD) code. The telephone numbers are in the PhoneNum field and the country codes are in the  Country field.

CREATE CQ maskData 
INSERT INTO maskedDataStream
SELECT * FROM unmaskedDataStream
MODIFY(
data[4] = CASE
    WHEN Country == "US" THEN maskPhoneNumber(PhoneNum, "###-xxx-xxx")
    ELSE maskPhoneNumber(PhoneNum), "#####x#xxx#xxxx")
  END
);

This could be extended with additional WHEN statements to mask numbers from additional countries, or with additional masking functions to mask individually identifiable information such as credit card, Social Security, and national identification numbers.

See Masking functions for additional examples.

See also Modifying output using ColumnMap. In some cases that may be a more straightforward and efficient solution than using MODIFY.

Using namespaces

Every TQL component exists in a namespace, which is used to assign privileges and control access. Every user account has a personal namespace with the same name. For example, the admin user has an admin namespace.

When you create an application in the console, by default it will be created in the current namespace, which is shown in the prompt (for example, W (admin) >). You can override that default by preceding CREATE APPLICATION with USE <namespace name>; to change the current namespace.

When you create an application in the UI, by default it will be created in your personal namespace. The "Create new application" dialog allows you to override the default by selecting a different namespace or creating a new one.

Note

When you import a TQL file in the UI, any CREATE NAMESPACE <name>; and USE <namespace name>; statements in the file will override the choice in the UI.

If useful, you may create an empty namespace:

CREATE NAMESPACE <name>;

One use for this is to keep applications that use components with the same names from interfering with each other. For example, you might have the current version of an application in one namespace and the next version in another.

Another use for a namespace is to hold a library of common types to be shared by various applications. For example:

CREATE NAMESPACE CommonTypes;
USE CommonTypes;
CREATE TYPE MerchantName(
  merchantId String KEY,
  companyName String
);
USE my_user_name;
CREATE APPLICATION PosMonitor;
CREATE STREAM MerchantNames OF CommonTypes.MerchantName;
...

The MerchantName type is now available in any application by specifying CommonTypes.MerchantName. The USE my_user_name command stops creation of components in the CommonTypes namespace. If you left that out, the PosMonitor application would be created in the CommonTypes namespace rather than in your personal namespace.

See Managing users, permissions, and roles for information on the role namespaces play in the Striim security model.

Using EVENTLIST

When LINK SOURCE EVENT is specified in the CQ that populates the WActionStore, and the type for the linked events is specified in the EVENT TYPES clause of the CREATE WACTIONSTORE statement, you can use EVENTLIST in a subquery and ITERATOR over the subquery to return values from fields in the linked events.

For example, in MultiLogApp, the WActionStore ApiActivity includes linked events of the type ApiUsage:

CREATE TYPE ApiUsage (
  api String key, 
  sobject String, 
  count Integer, 
  logTime DateTime
);
CREATE TYPE ApiContext (
  api String key, 
  count Integer, 
  logTime DateTime
);
CREATE WACTIONSTORE ApiActivity 
CONTEXT OF ApiContext 
EVENT TYPES (
  ApiUsage KEY(api)) ...

CREATE CQ GetApiSummaryUsage 
INSERT INTO ApiActivity 
SELECT a.api,  
  sum(a.count),
  first(a.logTime)
FROM ApiSummaryWindow a 
GROUP BY a.api
LINK SOURCE EVENT;

The following query will return all fields from all linked events:

SELECT it.api, it.count, it.logTime, it.sobject FROM
  (SELECT EVENTLIST(t) AS list_of_events FROM Samples.ApiActivity t) z,
  ITERATOR(z.list_of_events) it;

Using ITERATOR

SELECT [DISTINCT] { <field name>, ... }
[ ISTREAM ]
FROM ITERATOR
  (<nested collection name>.<member name>[, <type>])
  <iterator name>, 
  <nested collection name>

Usage

Description

ITERATOR(<var>)

<var> is of any supported type (see below)

ITERATOR(<var>,<type>)

<var> is bound to the specified <type>

The ITERATOR function allows you to access the members of nested collections in a dataset, so long as the collection implements the java.lang.Iterable interface or is a JsonNode.

For example, suppose you have the following statement:

create stream s1 (
    id string, 
    json_array JsonNode,
    list_of_objects java.util.List
);

The json_array and list_of_objects are both nested data collections.

Since java.util.List implements the java.lang.Iterable interface, you can create an iterator and use it to access the members of list_of_objects :

SELECT lst from s1, ITERATOR(s1.list_of_objects) lst;

Suppose the list_of_objects contains objects of this Java type:

package my.package;
class MyType {
    int attrA;
    int attrB;
}

You would access its members using this statement:

SELECT attrA, attrB FROM s1, ITERATOR(s1.list_of_objects, my.package.MyType) lst;

The stream also includes a JsonNode member. Suppose the following events are added to the stream:

('a', [1,2,3], null)
('b', [1,3,4], null)

Here is how you can iterate through json_array:

SELECT id, a FROM s1, iterator(s1.json_array) a;
 
OUTPUT:
=======
a 1
a 2
a 3
b 1
b 3
b 4

This statement illustrates a cross join between json_array and list_of_objects:

SELECT a, b FROM ITERATOR(s1.json_array) a, ITERATOR(s1.list_of_objects) b, s1;

You can iterate through multiple levels of nesting. For example, this statement iterates through 2 levels, where json_array contains another_nested_collection:

SELECT x FROM s1, ITERATOR(s1.json_array) a, ITERATOR(a.another_nested_collection) x;

Warning

Arbitrary data types, such as a Java List, cannot be used with a WActionStore that is persisted to Elasticsearch. Also, it is not possible to convert a JSON List into a Java List.

Using the META() function

You can use the META() function to query the metadata map of the WAEvent type, which is used by the output streams of several Striim readers. For example, the following creates a stream containing invalid records:

CREATE STREAM ExceptionStream of ExceptionRecord;
CREATE CQ CQExceptionRecord
INSERT INTO ExceptionStream
SELECT data[0]
FROM CsvStream
WHERE META(CsvStream, ‘RecordStatus’).toString == ‘INVALID_RECORD’;

The elements of the metadata map vary depending on the reader and parser used.

reader

metadata elements

DatabaseReader

  • TableName (only if using the Tables property, omitted if using Query): the fully qualified name of the table to which the record belongs, either in the form <CATALOG>.<SCHEMA>.<TABLE> or <SCHEMA>.<TABLE>, depending on the database

  • OperationName: always SELECT

  • ColumnCount: number of columns in this record

FileReader

  • FileName: fully qualified file name

  • FileOffset: offset in bytes from the beginning of the file to the start of the current event

HDFSReader

  • FileName: Hadoop URL including the fully qualified file name

  • FileOffset: offset in bytes from the beginning of the file to the start of the current event

HTTPReader

  • ClientIPAddress: IP address of the client

  • ClientProtocolVersion: name and version of the protocol the request uses in the form protocol/majorVersion.minorVersion

  • ClientContentLength: length of the request body in bytes, -1 if length is not known.

  • ClientURL: URL the client used to make the request along with the query string

  • Referrer: HTTP header field specified by client that contains the address of the webpage linked to the resource being requested

JMSReader

no metadata returned

KafkaReader

  • TopicName: Kafka topic from which the current event was read

  • PartitionID: Kafka partition from which the current event was read

  • RecordOffset: offset of the current event within the partition

MultiFileReader

  • FileName: fully qualified file name

  • FileOffset: offset in bytes from the beginning of the file to the start of the current event

The following parsers append metadata elements to those of the associated reader:

parser

metadata elements

DSVParser

  • RecordStatus: value is always VALID_RECORD

  • RecordOffset: offset in characters from the beginning of the record to the start of the current event

  • OriginTimeStamp: event origin timestamp

  • RecordEnd: ending character offset of this record in the source

FreeFormTextParser

  • RecordOffset: starting character offset of this record in the source

  • RecordStatus: value is always VALID_RECORD

  • OriginTimeStamp: event origin timestamp

  • RecordEnd: ending character offset of this record in the source

NetflowParser (version 5)

  • version: 5

  • count: number of flows (data records) exported in this packet.

  • sys_uptime: current time in milliseconds since the export device booted

  • unix_secs: current count of seconds since 0000 UTC 1970

  • unix_nsec: residual nanoseconds since 0000 UTC 1970

  • flow_sequence: sequence counter of total flows seen

  • engine_type: type of flow-switching engine

  • engine_id: slot number of the flow-switching engine

  • sampling_interval: first two bits hold the sampling mode, remaining 14 bits hold value of sampling interval

NetflowParser (version 9)

  • Count: total number of records in the Export Packet, which is the sum of Options FlowSet records, Template FlowSet records, and Data FlowSet records

  • ErrorMsg (invalid record only): can be used for debugging

  • package_sequence: incremental sequence counter of all export packets sent from the current observation domain by the exporter

  • Reason (invalid record only): can be used for debugging

  • RecordType: Data (Netflow data record), Template (template details) or Options Template  (information about the Netflow process running in the export device)

  • source_id: 32-bit value that identifies the exporter observation domain

  • SourceIP: source address for packet

  • SourcePort: source port for packet

  • Status: Valid for valid record and template, INVALID for invalid record

  • sys_uptime: time in milliseconds since this export device was first booted

  • Template_ID: ID of the template record (template record is nothing but metdata for a data record) used to decode the data record

  • TemplateStructure (invalid record only): can be used for debugging

  • unix_secs: time in seconds since 0000 UTC 1970, at which the export packet leaves the exporter

  • version: 9

SNMPParser

  • Type: SNMP type

  • Version: SNMP version

  • Community: Community name

  • AgentIp: SNMP agent IP address

  • AgentPort: SNMP agent port no

  • TrapTime: time in milliseconds since the trap was started

  • Enterprise: the management enterprise under whose registration authority the trap was defined

XMLParser

  • RecordOffset: starting character offset of this record in the source

  • RecordStatus: value is always VALID_RECORD

  • OriginTimeStamp: event origin timestamp

  • RecordEnd: ending character offset of this record in the source

Using pattern matching

You can use pattern matching in your queries to conveniently specify multiple combinations of events via a MATCH_PATTERN expression. Pattern expressions consist of references to stream and timer events according to this syntax :

CREATE CQ <CQ name>
INSERT INTO <output stream>
SELECT <list of expressions>
FROM <data source> [ <data source alias>] [, <data source> [ <data source alias> ], ...]
MATCH_PATTERN <pattern>
DEFINE <pattern variable> = [<data source or alias name>] ‘(‘ <predicate> ‘)’ 
[ PARTITION BY <data source field name> ];

In this syntax, dataSource is a stream or timer event.

The MATCH_PATTERN clause represents groups of events along with condition variables. New events are referenced in the predicates, where event variables are defined. Condition variables are expressions used to check additional conditions, such as timer events, before continuing to match the pattern.

The pattern matching expression syntax uses the following notation:

Notation

Description

*

0 or more quantifiers

?

0 or 1 quantifiers

+

1 or more quantifiers

{m,n}

m to n quantifiers

|

logical OR (alternative)

&

logical AND (permutation)

( )

grouping

#

restart matching markers for overlapping patterns

For example, the pattern A B* C+ would contain exactly 1 new event matching A, 0 or more new events matching B, and 1 or more new events matching C.

NOTE: You cannot refer to new events in conditions or expressions contained in the SELECT clause.

The DEFINE clause defines a list of event variables and conditions. The conditions are described in the predicates associated with the event variables.

For example, A = S(sensor < 100) matches events on stream S where the sensor attribute is less than 100.

The optional PARTITION BY clause partitions the stream by key on sub-streams, and pattern matching is performed independently for every partition.

Let's examine this example:

CREATE CQ 
SELECT LIST(B.id) as eventValues, count(B.id) as eventCount, A.id as eventTime 
FROM eventStream S
MATCH_PATTERN T A W B* P C
DEFINE
  A = S(sensor < 100), 
  B = S(sensor >= 100),
  C = S(sensor < 100),
  T = TIMER(interval 60 second),
  W = STOP(T), 
  P  = (count(B) > 5)
PARTITION BY eventId;

The pattern of event variables in the MATCH_PATTERN clause (T A W B* P C) represents the following:

  • T A W: Event A is matched within 60 seconds. Since this pattern ends with W, subsequent events (B, P, C) can only be matched after the first 60 seconds have elapsed.

  • B* P C: After the first 60 seconds, the match pattern consists of 0 or more instances of event B, followed by event P, followed by event C.

Event Variables

Once you have established event variables, you can refer to groups of matched events using 0-based array index notation.

For example, B[3].sensor refers to the 4th event in group B. If you omit the index (for example, B.sensor), the last event in the group is returned.

If you pass event variables to built-in or user-defined functions, the compiler will generate code differently depending on whether a scalar or aggregating function is used.

In this example, the expression aggregates all sensor attributes for all matched events in event variable B.

avg(B.sensor)

In this example, the expression evaluates to the absolute value of the sensor attribute of the last matched event in event variable B:

abs(B.sensor)
Referring to Past Events

If a match on the next event depends on the value of an attribute in a previous event, use the PREV() built-in function. The only parameter is an integer constant indicating how far back to match in the event pattern. For example, PREV() or PREV(1) refers to the immediately preceding event, but PREV(2) refers to the event that occurred before the immediately preceding event. For example:

DEFINE
-- Compare the new event's sensor with the previous event's sensor.
-- The default index value of 1 is used.
A=streamx(sensor < PREV().sensor)  
-- Compare the new event's sensor with the sensor of the event before the previous sensor.
B=streamx(sensor < PREV(2).sensor) 
Timer Events

You can set timer events using the following functions:

Function

Description

timer(interval <int> {second|minute|hour})

A condition that does not wait for a new event from any stream.

When the timer expires, it send a signal as an event from the timer stream.

signal( variable )

The next event is expected from the timer defined by the specified variable.

stop( variable )

Stops the timer and cancels sending the timer event.

For example, this pattern matches no events for a period of 50 seconds:

PATTERN T W
DEFINE
T=timer(interval 50 second),
W=signal(T)

This pattern matches all events from streamA for 30 seconds (until event W is received from the timer):

PATTERN T A* W -- matching all events from streamA for 30 seconds
DEFINE
T timer(interval 30 second),
A=streamA,
W=signal(T)

This pattern matches an event from streamA within 30 seconds. If an event from streamA does not occur within 30 seconds, an event from the timer will be received causing the pattern matching to fail:

PATTERN T A -- matching event from streamA within 30 seconds
DEFINE
T=timer(interval 30 second),
A=streamA

This pattern matches events from streamA for 30 seconds. It subsequently matches events from streamB for 30 seconds:

PATTERN T A C T2 B -- matching event A for 30 seconds and then event B within 30 seconds
DEFINE
T=timer(interval 30 second),
A=streamA,
C=stop(T),
T2=timer(interval 30 second),
B=streamB
Alternation ( | )

If you would like to specify several variations of an event sequence, use the alternation operator ( | ). In case there are equivalent variations in the alternation expression, the first (leftmost) variation will be matched first. For example, the following pattern matches A but not AA since it is equivalent to A:

PATTERN (A|AA|B|C)
DEFINE
A=streamA(sensor between 10 and 20),
AA=streamA(sensor between 10 and 20), -- it is same as A, and will never be matched
B=streamA(sensor > 20),
C=streamA(sensor < 10)
Matching overlapping patterns ( # )

Suppose you would like to match the sequence ABA and the stream contains events ABABA. Normally the first instance (ABA)BA will be matched, but the second instance AB(ABA) will not be matched. If you would like both instances to be matched, include a ( # ) operator in the pattern wherever you would like the engine to restart its matching (for example, AB#ABA). If the pattern contains multiple # operators, the matching restarts from the earliest occurrence of the last successful match.

For example, consider this pattern:

PATTERN A (B # A | C D # A)

If the actual event sequence is ABACDABACDA, the following subsequences will be matched:

  • ABA

  • ACDA

  • ABA

  • ACDA

Using analytics and regression functions

The com.webaction.analytics classes provide you with several regression functions that enable you to make predictions based on the regression model you have chosen. To get started, begin with the following IMPORT statements in your TQL:

IMPORT STATIC com.webaction.analytics.regressions.LeastSquares.*;
IMPORT STATIC com.webaction.analytics.regressions.LeastSquaresPredictor.*;
IMPORT STATIC com.webaction.analytics.regressions.PredictionResult.*;
IMPORT STATIC com.webaction.analytics.regressions.Helpers.*;

These imports provide you with the following regression functions, along with supporting helper methods. Use these aggregate functions in a CQ that selects from a window, which is used as the training set of the regression algorithm.

Regression Class

Description

SIMPLE_LINEAR_REGRESSION(Object yArg, Object xArg)

Performs simple linear regression using a single independent variable and a single dependent variable.

CONSTRAINED_SIMPLE_LINEAR_REGRESSION(Object yArg, Object xArg)

Performs simple linear regression, constrained so that the fitted line passes through the "rightmost" (newest) point in the window.

MULTIPLE_LINEAR_REGRESSION(Object yArg, Object... xArgs)

Performs multiple linear regression using multiple independent variables and a single dependent variable.

CONSTRAINED_MULTIPLE_LINEAR_REGRESSION(int numFixedPoints, Object yArg, Object... xArgs)

Performs multiple linear regression, constrained so that the fitted hyperplane passes through the desired number of "rightmost" (newest) points in the window.

POLYNOMIAL_REGRESSION(int degree, Object yArg, Object xArg)

Performs polynomial regression, creating a nonlinear model using a single independent variable and a single dependent variable.

CONSTRAINED_POLYNOMIAL_REGRESSION(int numFixedPoints, int degree, Object yArg, Object xArg)

Performs polynomial regression, constrained so that the fitted polynomial passes through the desired number of "rightmost" (newest) points in the window.

The InventoryPredictor example illustrates the usage of these regression analytics functions through the use of a prediction algorithm, which sends alerts when the inventory is predicted to be low. The general approach to using regression in time-series predictions is as follows:

  1. Create a window.

  2. Specify the regression model.

  3. Specify the variable on which predictions are made and the independent variable (in the following example, the timestamp).

  4. Determine whether there are enough points to make a valid prediction.

The syntax is:

SELECT [ISTREAM] [CONSTRAINED_]
  {SIMPLE_LINEAR_REGRESSION | MULTIPLE_LINEAR_REGRESSION | POLYNOMIAL_REGRESSION}
    (properties)
    AS pred,
      CASE IS_PREDICTOR_READY(pred)
        WHEN TRUE THEN PREDICT[_MULTIPLE](properties)
        ELSE ZERO_PREDICTION_RESULT([properties])
    END AS result,
    output1,
    output2,
    [COMPUTE_BOUNDS(properties),]
    [{GREATER|LESS}_THAN_PROBABILITY()], ...

In this first section of the TQL, we import the required com.webaction.analytics classes, create the application (InventoryPredictor), create the inventory stream (InventoryStream), and finally create a two-hour window over that inventory stream (InventoryChangesWindow):

IMPORT STATIC com.webaction.analytics.regressions.LeastSquares.*;
IMPORT STATIC com.webaction.analytics.regressions.LeastSquaresPredictor.*;
IMPORT STATIC com.webaction.analytics.regressions.PredictionResult.*;
IMPORT STATIC com.webaction.analytics.regressions.Helpers.*;

CREATE APPLICATION InventoryPredictor;

CREATE TYPE InventoryType(
  ts org.joda.time.DateTime,
  SKU java.lang.String,
  inventory java.lang.Double,
  location_id java.lang.Integer
);
CREATE STREAM InventoryStream OF InventoryType;
 
CREATE  SOURCE JSONSource USING FileReader (
  directory: 'Samples',
  WildCard: 'inventory.json',
  positionByEOF: false
 )
 PARSE USING JSONParser (
  eventType: 'InventoryType'
 )
OUTPUT TO InventoryStream;

CREATE WINDOW InventoryChangesWindow OVER InventoryStream KEEP WITHIN 2 HOUR PARTITION BY SKU;

In this example, simple linear regression will be chosen to create a model in which the inventory is predicted according to the current system time. We will begin by setting up an AlertStream based on inventory predictions:

CREATE STREAM AlertStream OF Global.AlertEvent;

CREATE CQ AlertOnInventoryPredictions
INSERT INTO AlertStream
. . .

The SELECT statement that follows sets up inventory alert messages based on boolean variables indicating whether the inventory is low (isInventoryLow) or ok (isInventoryOK) based on a 90% confidence interval used in the prediction model:

SELECT 
  "Low Inventory Alert",
  SKU,
  CASE WHEN isInventoryLow THEN "warning" WHEN isInventoryOK THEN "info" END,
  CASE WHEN isInventoryLow THEN "raise" WHEN isInventoryOK THEN "cancel" END,
  CASE WHEN isInventoryLow THEN "Inventory for SKU " + SKU + 
      " is expected to run low within 2 hours (90% confidence)."
    WHEN isInventoryOK THEN "Inventory status for SKU " + SKU +
      " looks OK for the next 2 hours (90% confidence)."
    END ...

The next part of the SELECT statement sets up the simple linear regression model (pred) based on the inventory variable (inventory) and a timestamp variable (ts):

FROM (SELECT ISTREAM
  SKU,
  DNOW() AS ts,
  SIMPLE_LINEAR_REGRESSION(inventory, ts) AS pred, ...

Next we determine whether there are enough points to make a valid prediction by calling the IS_PREDICTOR_READY function. If there are, we compute the probability of the event that the inventory has fallen below (or exceeded) a given critical threshold by calling the LESS_THAN_PROBABILITY (or GREATER_THAN_PROBABILITY) function. If the computed probabilities are higher than 90%, we raise the appropriate flag (isInventoryLow or isInventoryOK).

IS_PREDICTOR_READY(pred) AND LESS_THAN_PROBABILITY(PREDICT(pred,
  DADD(ts, DHOURS(2))), 5) > 0.9 AS isInventoryLow,
IS_PREDICTOR_READY(pred) AND GREATER_THAN_PROBABILITY(PREDICT(pred, 
  DADD(ts, DHOURS(2))), 5) > 0.9 AS isInventoryOK, ...

Note that the PREDICT function feeds the data into the prediction model, which returns a PredictionResult object that is subsequently passed to the probability utility functions.

Here is the entire CREATE CQ statement:

CREATE CQ AlertOnInventoryPredictions
INSERT INTO AlertStream
SELECT "Low Inventory Alert",
  SKU,
  CASE WHEN isInventoryLow THEN "warning" WHEN isInventoryOK THEN "info" END,
  CASE WHEN isInventoryLow THEN "raise" WHEN isInventoryOK THEN "cancel" END,
  CASE
  WHEN isInventoryLow THEN "Inventory for SKU " + SKU + 
    " is expected to run low within 2 hours (90% confidence)."
  WHEN isInventoryOK THEN "Inventory status for SKU " + SKU + 
    " looks OK for the next 2 hours (90% confidence)."
  END 
FROM
  (SELECT ISTREAM
    SKU,
    DNOW() AS ts,
    SIMPLE_LINEAR_REGRESSION(inventory, ts) AS pred,
    IS_PREDICTOR_READY(pred) AND LESS_THAN_PROBABILITY(PREDICT(pred,
      DADD(ts, DHOURS(2))), 5) > 0.9 AS isInventoryLow,
    IS_PREDICTOR_READY(pred) AND GREATER_THAN_PROBABILITY(PREDICT(pred,
      DADD(ts, DHOURS(2))), 5) > 0.9 AS isInventoryOK,
  FROM InventoryChangesWindow
  GROUP BY SKU 
  HAVING isInventoryLow OR isInventoryOK)
  AS subQuery;

Having created the prediction model, we can send email alerts whenever there is at least 90% confidence that the inventory is low:

CREATE SUBSCRIPTION InventoryEmailAlert USING EmailAdapter (
  smtp_auth: false,
  smtpurl: "smtp.company.com",
  subject: "Low Inventory Alert",
  emailList: "sysadmin@company.com",
  senderEmail:"striim@company.com" 
)
INPUT FROM AlertStream;
END APPLICATION InventoryPredictor;

Using Apache Flume

Striim can receive data from Apache Flume using the WebActionSink (see Apache Flume integration) as a source. Its properties are defined in configuration files on the Flume server rather than in TQL.

The WebActionSink properties are:

property

value

notes

agent.sinks.webactionSink.type

com.webaction.flume.WebActionSink

agent.sinks.webactionSink.serverUri

watp:// <ip_address > :9080

the IP address and port of the Striim server (adjust the port number if you are not using the default)

agent.sinks.webactionSink.username

the Striim login to be used by the WebActionSink

agent.sinks.webactionSink.password

the password for that login

agent.sinks.webactionSink.stream

flume:<stream name>

specify the stream name to be used in TQL (see example below)

agent.sinks.webactionSink.parser.handler

DSVParser

in this release, only DSVParser is supported

You must also specify the properties for the specified parser. See the example below.

The following example application assumes that Flume is running on the same system as Striim.

1. Perform the first two steps described in Apache Flume integration.

2. Save the following as a TQL file, then load, deploy, and start it:

CREATE APPLICATION flumeTest;
CREATE STREAM flumeStream of Global.WAEvent;
CREATE TARGET flumeOut USING SysOut(name:flumeTest) INPUT FROM flumeStream;
END APPLICATION flumeTest;

This application does not need a CREATE SOURCE statement because the data is being collected and parsed by Flume. The stream name must match the one specified in the WebActionSink properties and the type must be Global.WAEvent.

2. Save the following as waflume.conf in the flume/conf directory, replacing the two IP addresses with the test system's IP address and the username and password with the credentials you used to load the application:

# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = netcatSrc
agent.channels = memoryChannel
agent.sinks = webactionSink

# For each one of the sources, the type is defined
agent.sources.netcatSrc.type = netcat
agent.sources.netcatSrc.bind = 192.168.1.2
agent.sources.netcatSrc.port = 41414

# The channel can be defined as follows.
agent.sources.netcatSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.webactionSink.type = com.webaction.flume.WebActionSink
agent.sinks.webactionSink.serverUri = watp://192.168.1.2:9080
agent.sinks.webactionSink.username = flumeusr
agent.sinks.webactionSink.password = passwd
agent.sinks.webactionSink.stream = flume:flumeStream
agent.sinks.webactionSink.parser.handler = DSVParser
agent.sinks.webactionSink.parser.blocksize = 256
agent.sinks.webactionSink.parser.columndelimiter = ","
agent.sinks.webactionSink.parser.rowdelimiter = "\n" 
agent.sinks.webactionSink.parser.charset = "UTF-8"
agent.sinks.webactionSink.parser.blockAsCompleteRecord = "True"

#Specify the channel the sink should use
agent.sinks.webactionSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

3. Start Flume, specifying the configuration file:

bin/flume-ng agent --conf conf --conf-file conf/waflume.conf --name agent -
Dflume.root.logger=INFO,console

4. Save the following as flumetestdata.csv:

100,first
200,second
300,third

5. Open a terminal, change to the directory where you saved flumetestdata.csv, and enter the following command, replacing the IP address with the test system's:

cat flumetestdata.csv | nc 192.168.1.2 41414

The following output should appear in striim-node.log (see Reading log files):

flumeTest: WAEvent{
  data: ["ID","Name"]
  metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0}
  before: null
  dataPresenceBitMap: "AA=="
  beforePresenceBitMap: "AA=="
  typeUUID: null
};
flumeTest: WAEvent{
  data: ["100","first"]
  metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0}
  before: null
  dataPresenceBitMap: "AA=="
  beforePresenceBitMap: "AA=="
  typeUUID: null
};
...

See Parsing the data field of WAEvent for more information about this data format.

3.10.1
Was this article helpful?
0 out of 0 found this helpful

Comments