The topics in this section describe how to extend Striim's capabilities by writing your own functions and components.
TQL applications may import Java functions. The basic procedure is:
write a custom function
compile it to a .jar
add the .jar to
.../Striim/lib
load the function into Striim
import the function to an application
Functions are loaded at the cluster level, so your new function would be available for import to any application in any namespace. Imported functions required by an application are included as IMPORT
statements in exported TQL.
The rest of this section describes in detail how to create and use custom functions.
To develop your own custom functions, you must copy the JAR files from Striim's library into your IDE. The following instructions are for Eclipse. If you use another ID, the procedure should be similar.
Copy the Striim JAR files, available in
Striim/lib
, into your Java project so you can reference them in your code.In Eclipse, import the External Jars during the Java Project creation. Otherwise ensure the Project Name is selected in the Project Explorer, select File > Properties, select Java Build Path > Libraries, click the Libraries tab, and click Add External JARS.
When you are finished with your development, copy your custom JAR file into
Striim/lib
and restart the Striim server.
The next step is to set up the following in your Java IDE:
A package to contain your custom abstract classes.
Import statements for required classes.
Abstract classes that will contain your custom functions.
A logger (recommended).
Caution
Do not use reserved keywords in your package name (see List of reserved keywords).
The following template will help you get started:
// You will need your own package name: package com.mycompany.custom.packagename; // You'll need to reference this is a Striim class in your own code if you // define an Aggregate Window function: import com.webaction.runtime.compiler.custom.AggHandlerDesc; // Logging classes required by Striim import org.apache.log4j.Level; import org.apache.log4j.Logger; // Any other classes you may need for your development import java.lang.Math; import java.util.LinkedList; import org.joda.time.DateTime; // Abstract class needed to contain your custom functions public abstract class MyCustomFunctions { // Required logger private static Logger logger = Logger.getLogger( MyCustomFunctions.class ); }
Single row functions are the simplest types of Striim functions. Start by defining your Java methods inside the custom abstract class you created (see Setting up packages and classes). A best practice is to use techniques that operate on a row of values by iterating on them without storing them, thereby reducing memory requirements and increasing performance.
For example, the following function iterates through a row of values and keeps track of the highest value. Once its loop has reached the termination condition by reaching the end of the row of values, it returns the highest value of that row:
public static double maxSingleRow(double... n) { int i=0; double runningmax = n[i]; // Iterate through the row and continually update runningmax to the highest value while (++i < n.length) if (n[i] > runningmax) runningmax = n[i]; return runningmax; }
Generate your custom JAR and deploy it to your Striim environment (see Setting up packages and classes).
Now you can import your static custom class into TQL and use the newly created custom function:
USE StriimWindows_NS2; DROP APPLICATION StriimWindows2 CASCADE; IMPORT STATIC com.mycompany.custom.packagename.MyCustomFunctions.*; CREATE APPLICATION StriimWindows2; CREATE OR REPLACE CQ myStream_CQ INSERT INTO myStream_ST SELECT maxSingleRow( field1, field2, field3, field4 ) FROM inputStream_ST;
Here are a few other single row functions you can include by defining the following methods in the same abstract class:
minSingleRow
iterates through a row of values and keeps track of the lowest valuerangeSingleRow
calculates the range of values in the row by using the previously createdminSingleRow
andmaxSingleRow
functionsavgSingleRow
iterates through a row of values and calculates the average by maintaining a running total and dividing by the length
public static double minSingleRow(double... n) { int i = 0; double runningmin = n[i]; while (++i < n.length) if (n[i] < runningmin) runningmin = n[i]; return runningmin; } public static double rangeSingleRow(double... n) { double dRange = maxSingleRow( n ) - minSingleRow( n ); return dRange; } public static double avgSingleRow(double... n) { int i = 0; double runningsum = 0; while (i < n.length) { runningsum += n[i]; i++; } return ( runningsum / i ); }
Multi-row functions operate across multiple rows in a Window. A best practice when designing multi-row functions is to use accumulators that eliminate the need to iterate through all the rows in a Window. As you add and eliminate rows from a Window, maintain state on the fly. While it is still possible to iterate through all the rows, the approach using accumulators and maintaining state will help improve performance.
To design a class supporting multi-row functions:
-
Ensure that you've imported the necessary Class to declare an aggregate function.
import com.webaction.runtime.compiler.custom.AggHandlerDesc;
-
Declare a handler that defines a Java class used to create the TQL function signature supporting the required accumulators and functions. For example:
@AggHandlerDesc(handler=AvgDebug_Float.class)
-
Declare an abstract static method specifying the expected arguments and return type for the multi-row function. The Java signature will be converted to an equivalent TQL signature. For example:
public abstract Float AvgDebug( String sKey, Float fArg );
-
Define a public static nested class whose name is identical to the one you specified in your handler. After setting its initialization variables, your class must implement the following methods:
decAggValue()
incAggValue()
getAggValue()
The following example illustrates these basic steps in a class that maintains a running sum and can return the average of all floating point values through its getAggValue()
function. The incAggValue()
function adds a new value to the running sum, and decAggValue()
removes a value from the running sum:
// Required handler declaration: // handler name must match the name of your static class. @AggHandlerDesc(handler=AvgDebug_Float.class) // Name of the function to be exported to TQL: public abstract Float AvgDebug( String sKey, Float fArg ); // Handler class implementing the required accumulators: // must match the name in the handler declaration. public static class AvgDebug_Float { // initializers float fRunningSum = 0; int iRunningCount = 0; // Calculates and returns the average value at any moment public Float getAggValue() { logger.debug( "RunningSum: " + fRunningSum + ", RunningCount: " + iRunningCount + "\n"); return (iRunningCount == 0) ? 0 : (fRunningSum / iRunningCount); } // Adds the new value to the running total, maintaining the count // of values included: public void incAggValue(String sKey, Float fArg) { if(fArg != null) { iRunningCount++; fRunningSum += fArg; } logger.debug( "[Key: " + sKey + ", Value: " + fArg + "] RunningSum is now: " + fRunningSum + ", RunningCount is now: " + iRunningCount ); } // Removes the specified value from the running total, maintaining // the count of values included: public void decAggValue(String sKey, Float fArg) { if(fArg != null) { iRunningCount--; fRunningSum -= fArg; } logger.debug( "[Key: " + sKey + ", Value: " + fArg + "] RunningSum is now: " + fRunningSum + ", RunningCount is now: " + iRunningCount ); } }
The following TQL example illustrates the mapping between the Java code shown above and its equivalent class and function once deployed in the Striim environment:
USE StriimWindows_NS2; DROP APPLICATION StriimWindows2 CASCADE; IMPORT STATIC com.mycompany.custom.packagename.MyCustomFunctions.*; CREATE APPLICATION StriimWindows2; CREATE OR REPLACE CQ MovingAvg_CQ INSERT INTO MovingAvg_ST SELECT avgDebug( OrderId, OrderValue ) FROM MovingAvg3ES_WN;
The following Java code implements a handler for an imported TQL function called LastBut, which returns the string located one position before the last specified index:
@AggHandlerDesc(handler=LastBut_String.class) public abstract String LastBut(String arg, int idx); public static class LastBut_String { LinkedList<String> list = new LinkedList<String>(); int i; // Returns the string located in the position just before // the last specified index: public String getAggValue() { if( list.isEmpty() || (i < 0) || (i > (list.size()-1)) ) { return null; } else { return list.get( ( list.size()-i-1) ); } } // Adds a string to the end of the linked list, //updating the index location: public void incAggValue(String arg, int idx) { list.addLast(arg); i = idx; // no safety logic required here - // all handled by the getAggValue() method } // Removes the string located at the head of the linked list, // assuming it is not an empty list: public void decAggValue(String arg, int idx) { if(!list.isEmpty()) list.removeFirst(); } }
Based on the Java code specified in the example above, the TQL class would be called LastBut_String
, and the custom method that you could use in your TQL would be called LastBut
. Based on this pattern, you can create a collection of functions that take advantage of the Java LinkedList
template class, substituting other types for String
, such as Byte
, Short
, Integer
, Long
, Float
, Double
, Boolean
, DateTime
, and even Object
. For example, the following example extends the pattern to a linked list of Byte
objects:
@AggHandlerDesc(handler=LastBut_Byte.class) public abstract Byte LastBut(Byte arg, int idx); public static class LastBut_Byte { LinkedList<Byte> list = new LinkedList<Byte>(); int i; public Byte getAggValue() { if( list.isEmpty() || (i < 0) || (i > (list.size()-1)) ) { return null; } else { return list.get( ( list.size()-i-1) ); } } public void incAggValue(Byte arg, int idx) { list.addLast(arg); i = idx; // no safety logic required here - // all handled by the getAggValue() method } public void decAggValue(Byte arg, int idx) { if(!list.isEmpty()) list.removeFirst(); } }
As you work with multiple rows of data, Striim provides you with the support you need to use sliding windows of data. To understand how this works, consider the following complete dataset:
OrderID |
OrderDate |
OrderValue |
---|---|---|
1111 |
1/1/2016 |
111.11 |
2222 |
2/1/2016 |
222.22 |
3333 |
3/1/2016 |
333.33 |
4444 |
4/1/2016 |
444.44 |
5555 |
5/1/2016 |
555.55 |
6666 |
6/1/2016 |
666.66 |
7777 |
7/1/2016 |
777.77 |
8888 |
8/1/2016 |
888.88 |
9999 |
9/1/2016 |
999.99 |
The following describes how the dataset is treated in a sliding window in which up to 3 rows of data at a time are included.
Initialization
When your object is initialized, you do not yet have a working record (a row of data), and the window has no data yet. Consider the following TQL statement:
SELECT AVG(OrderValue), OrderID, OrderDate, OrderValue FROM STREAM_ST
As you learned in Developing multi-row functions, the getAggValue()
function defines return values, the incAggValue()
function defines incremental logic, and decAggValue()
defines decremental logic.
At the time of initialization, the running sum and count for the data would each be zero.
First row
Using the dataset above, the incAggValue()
function adds the first row to the window, and the getAggValue()
function is called and adds those values to the running sum and count.
Here is the state of the data at this point. The working record is:
OrderID |
OrderDate |
OrderValue |
---|---|---|
1111 |
1/1/2016 |
111.11 |
The window is:
OrderID |
OrderDate |
OrderValue |
---|---|---|
1111 |
1/1/2016 |
111.11 |
Sum: 111.11
Count: 1
AVG: 111.11
Second row
Using the dataset above, the incAggValue()
function adds the second row to the window, and the getAggValue()
function is called and adds those values to the running sum and count.
Here is the state of the data at this point. The working record is:
OrderID |
OrderDate |
OrderValue |
---|---|---|
2222 |
2/1/2016 |
222.22 |
The window is:
OrderID |
OrderDate |
OrderValue |
---|---|---|
1111 |
1/1/2016 |
111.11 |
2222 |
2/1/2016 |
222.22 |
Sum: 333.33
Count: 2
AVG: 166.65
Third row
Using the dataset above, the incAggValue()
function adds the third row to the window, and the getAggValue()
function is called and adds those values to the running sum and count.
Here is the state of the data at this point. The working record is:
OrderID |
OrderDate |
OrderValue |
---|---|---|
3333 |
3/1/2016 |
333.33 |
The window is:
OrderID |
OrderDate |
OrderValue |
---|---|---|
1111 |
1/1/2016 |
111.11 |
2222 |
2/1/2016 |
222.22 |
3333 |
3/1/2016 |
333.33 |
Sum: 666.66
Count: 3
AVG: 222.22
Fourth row
Up till now, since the sliding window could contain up to 3 rows, for each new row the incAggValue()
function added the new row to the window, and the getAggValue()
function was called and added those values to the running sum and count. With a 4th row, this algorithm changes. The decAggValue()
function is called to remove the 1st row, the incAggValue()
function adds the 4th row to the window, and the getAggValue()
function is called and adds those values to the running sum and count.
Here is the state of the data at this point. The working record is:
OrderID |
OrderDate |
OrderValue |
---|---|---|
4444 |
4/1/2016 |
444.44 |
The window now only includes rows 2-4 and excludes row 1, and the count holds steady at 3 rows:
OrderID |
OrderDate |
OrderValue |
---|---|---|
2222 |
2/1/2016 |
222.22 |
3333 |
3/1/2016 |
333.33 |
4444 |
4/1/2016 |
444.44 |
Sum: 999.99
Count: 3
AVG: 333.33
Toward the end
The trend continues as new rows are added: a row is dropped in order to add the new row, and the values are updated. Eventually, as input events cease and windowed events being to timeout, only the decAggValue()
and getAggValue()
functions are called.
We recommend that you use org.apache.log4j
for your logging, as that is what Striim uses with its own functions.
Edit the conf/log4j.server.properties
file and add the following new section:
log4j.logger.com.mycompany.custom.packagename.MyCustomFunctions=debug, MyCustomFunctionsAppender log4j.additivity.com.mycompany.custom.packagename.MyCustomFunctions=false log4j.appender.MyCustomFunctionsAppender=org.apache.log4j.FileAppender log4j.appender.MyCustomFunctionsAppender.File=logs/MyCustomFunctions.log.out # Define the layout for file appender log4j.appender.MyCustomFunctionsAppender.layout=org.apache.log4j.EnhancedPatternLayout log4j.appender.MyCustomFunctionsAppender.layout.conversionPattern=%-6p:%-15M:%m%n
In this example, a separate logs/MyCustomFunctions.log.out
file will be created. Its log level will be debug, and it will output the log level, method, and message.
In the following Java class, the logger is used to output the running sum and count values:
@AggHandlerDesc(handler=AvgDebug_Float.class) public abstract Float AvgDebug( String sKey, Float fArg ); public static class AvgDebug_Float { float fRunningSum = 0; int iRunningCount = 0; public Float getAggValue() { logger.debug( "RunningSum: " + fRunningSum + ", RunningCount: " + iRunningCount + "\n"); return (iRunningCount == 0) ? 0 : (fRunningSum / iRunningCount); } public void incAggValue(String sKey, Float fArg) { if(fArg != null) { iRunningCount++; fRunningSum += fArg; } logger.debug( "[Key: " + sKey + ", Value: " + fArg + "] RunningSum is now: " + fRunningSum + ", RunningCount is now: " + iRunningCount ); } public void decAggValue(String sKey, Float fArg) { if(fArg != null) { iRunningCount--; fRunningSum -= fArg; } logger.debug( "[Key: " + sKey + ", Value: " + fArg + "] RunningSum is now: " + fRunningSum + ", RunningCount is now: " + iRunningCount ); } }
One recommendation for your logging applications is to use StringBuffer
to concatenate the strings used in your logging output. This will increase efficiency and ensure greater security in your applications.
Striim supports dynamic changing of log levels without the need to restart the server. To dynamically set the log level to debug for your custom class:
set loglevel = {com.mycompany.custom.packagename.MyCustomFunctions : debug};
See Changing the log level for more information.
The following custom function will call a shell script specified in TQL.
import com.webaction.runtime.compiler.custom.Nondeterministic; ... @Nondeterministic public static int kickShell (String args) { int ret = 0; try { if (args.indexOf(".sh") == -1) { System.out.println("Error:" + args + " is not shell."); return ret; } Process process = new ProcessBuilder("sh", args).start(); ret = process.waitFor(); System.out.println("return:" + ret); }catch (ArrayIndexOutOfBoundsException e) { System.out.println("Info: Please specify one argument."); System.out.println("return:" + ret); }catch (Exception e){ System.out.println("return:" + ret); e.printStackTrace(); }finally{ return ret; } }
You could use that in TQL as follows:
CREATE CQ ... SELECT kickShell('/etc/striim/test.sh') as retVal ...
import com.webaction.runtime.compiler.custom.Nondeterministic;
and @Nondeterministic
cause the script to be executed every time the CQ is run. Without those lines, the script would be called only once, on deployment.
Warning
Consider the potential security issues of allowing TQL applications to call scripts on the Striim host system. Depending on your environment and requirements, it may be more appropriate to call a specific script from the custom Java function.
Functions are loaded at the cluster level, so are available for import to any application in any namespace. Imported functions are included as IMPORT
statements in exported TQL. The LOAD and UNLOAD commands require the Global.admin role.
Before loading your custom function, copy its .jar file to striim/lib
. Whenever Striim is restarted, the function will be loaded automatically.
To load the function using the console, enter LOAD "lib/<file name>.jar";
To import a loaded function in the console or a TQL file, use IMPORT STATIC <package name>
.
To load and import a custom function in the web UI, select App Settings > Add Another Package, enter IMPORT STATIC <package name>
in the Java Package field, and click Save.

To unload a custom function, in the console enter UNLOAD "lib/<file name>.jar";
To stop the function from reloading when Striim is restarted, delete its .jar file from striim/lib
.
A Striim open processor contains a custom Java application that reads data from a window or stream, processes it, optionally enriching it with data from a cache, and writes to an output stream.
The SDK includes the following:
StriimOpenProcessor-SDK.jar, which contains classes to be included in the Java application, installed with Striim at
striim/StriimSDK
.A Javadoc API reference for methods you may use in your Java application, installed with Striim at
striim/docs/StriimOpenProcessor-SDKdocs
.
The component must be built with Maven, since it requires the Maven Shade Plugin.
An open processor can be used only in the Striim namespace from which the types are exported.
The following simple example shows all the steps required to create an open processor and use it in a Striim application.
Step 1: define the input and output streams in Striim
The following TQL defines the input and output streams for the example open processor you will add later. It includes a FileWriter source, a cache that will be specified in the open processor's ENRICH option, and a FileWriter target.
CREATE NAMESPACE ns1; USE ns1; CREATE APPLICATION OPExample; CREATE source CsvDataSource USING FileReader ( directory:'Samples/PosApp/appData', wildcard:'PosDataPreview.csv', positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO CsvStream; CREATE TYPE MerchantHourlyAve( merchantId String, hourValue integer, hourlyAve integer ); CREATE CACHE HourlyAveLookup using FileReader ( directory: 'Samples/PosApp/appData', wildcard: 'hourlyData.txt' ) PARSE USING DSVParser ( header: Yes, trimquote:false, trimwhitespace:true ) QUERY (keytomap:'merchantId') OF MerchantHourlyAve; CREATE CQ CsvToPosData INSERT INTO PosDataStream partition by merchantId SELECT TO_STRING(data[1]) as merchantId, TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime, DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue, TO_DOUBLE(data[7]) as amount, TO_INT(data[9]) as zip FROM CsvStream; CREATE CQ cq2 INSERT INTO SendToOPStream SELECT makeList(dateTime) as dateTime, makeList(zip) as zip FROM PosDataStream; CREATE TYPE ReturnFromOPStream_Type ( time DateTime , val Integer ); CREATE STREAM ReturnFromOPStream OF ReturnFromOPStream_Type; CREATE TARGET OPExampleTarget USING FileWriter (filename: 'OPExampleOut') FORMAT USING JSONFormatter() INPUT FROM ReturnFromOPStream; END APPLICATION OPExample;
Step 2: export the input and output stream types
If you create OPExample in the ns1 workspace, the following Striim console command will export the types from the application to /home/myhome/OpExampleTypes.jar
:
EXPORT TYPES OF ns1.OPExample TO "/home/myhome/OpExampleTypes.jar";
The EXPORT TYPES command requires read permission on the namespace. If you do not specify a path, the file will be created in the striim
directory.
Step 3: set up Maven
Install the SDK and exported types .jar files:
mvn install:install-file -DgroupId=com.example -DartifactId=OpenProcessorSDK \ -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar -Dfile=/opt/striim/StriimSDK/StriimOpenProcessor-SDK.jar mvn install:install-file -DgroupId=com.example -DartifactId=OPExample -Dversion=1.0.0-SNAPSHOT \ -Dpackaging=jar -Dfile=/home/myhome/OpExampleTypes.jar
Create a Maven project in which you will create your custom Java application:
mvn archetype:generate -DgroupId=com.example.opexample -DartifactId=opexample \ -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
Replace the default pom.xml created by Maven with the following, adjusting as necessary for your environment:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.opexample</groupId> <artifactId>opexample</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>opexample</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <!-- OpenProcessorSDK jar --> <dependency> <groupId>com.example</groupId> <artifactId>OpenProcessorSDK</artifactId> <version>1.0.0-SNAPSHOT</version> <scope>provided</scope> </dependency> <!-- exported types jar --> <dependency> <groupId>com.example</groupId> <artifactId>OPExample</artifactId> <version>1.0.0-SNAPSHOT</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-release-plugin</artifactId> <version>2.2.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <!-- The output SCM filename is defined here. --> <finalName>OPExample.scm</finalName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <manifestEntries> <Striim-Module-Name>OPExample</Striim-Module-Name> <Striim-Service-Interface> com.webaction.runtime.components.openprocessor.StriimOpenProcessor </Striim-Service-Interface> <Striim-Service-Implementation> com.example.opexample.App </Striim-Service-Implementation> </manifestEntries> </transformer> </transformers> <artifactSet> <excludes> <exclude>org.slf4j:*</exclude> <exlcude>log4j:*</exlcude> </excludes> </artifactSet> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>com.coderplus.maven.plugins</groupId> <artifactId>copy-rename-maven-plugin</artifactId> <version>1.0</version> <executions> <execution> <id>copy-file</id> <phase>package</phase> <goals> <goal>copy</goal> </goals> <!-- The location and name for the .scm file to be imported into Striim is defined here. Preferred location is module/modules folder under the Maven project main folder. --> <configuration> <sourceFile>/home/myhome/opexample/target/OpExample.scm.jar</sourceFile> <destinationFile>/home/myhome/opexample/modules/OpExample.scm</destinationFile> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
Step 4: write your Java application and build the .scm
Replace the default App.java with the following:
package com.example.opexample; import wa.ns1.SendToOPStream_Type_1_0; import wa.ns1.ReturnFromOPStream_Type_1_0; import com.webaction.anno.PropertyTemplateProperty; import com.webaction.runtime.components.openprocessor.StriimOpenProcessor; import org.joda.time.DateTime; import com.webaction.anno.AdapterType; import com.webaction.anno.PropertyTemplate; import com.webaction.runtime.containers.WAEvent; import com.webaction.runtime.containers.IBatch; import java.util.*; @PropertyTemplate(name = "TupleConverter", type = AdapterType.process, properties = { @PropertyTemplateProperty(name="ahead", type=Integer.class, required=true, defaultValue="0"), @PropertyTemplateProperty(name="lastItemSeen", type=Boolean.class, required=true, defaultValue="0") }, // The names of outputType and inputType are relative to Striim: output from a native Striim // code to your custom component, and input from your custom component to a native component. outputType = SendToOPStream_Type_1_0.class, inputType = ReturnFromOPStream_Type_1_0.class ) public class App extends StriimOpenProcessor { public void run() { IBatch<WAEvent> event = getAdded(); Iterator<WAEvent> it = event.iterator(); while (it.hasNext()) { SendToOPStream_Type_1_0 type = (SendToOPStream_Type_1_0 ) it.next().data; // ... Additional operations } ReturnFromOPStream_Type_1_0 ReturnFromOPStream_Type_1_0 = new ReturnFromOPStream_Type_1_0 (); ReturnFromOPStream_Type_1_0.time = DateTime.now(); Random rand = new Random(System.currentTimeMillis()); ReturnFromOPStream_Type_1_0.val= rand.nextInt(50) + 1; send(ReturnFromOPStream_Type_1_0 ); } public void close() throws Exception { // TODO Auto-generated method stub } public Map getAggVec() { // TODO Auto-generated method stub return null; } public void setAggVec(Map aggVec) { // TODO Auto-generated method stub } }
Change to the opexample
directory created by Maven and enter mvn package
.
Step 5: import the .scm into Striim
Loading an open processor requires the Global.admin permission (see Permissions).
Copy opexample/modules/OpExample.scm
to a directory accessible by the Striim server, then use the following console command to load it:
LOAD OPEN PROCESSOR "<path>/OpExample.scm";
Alternatively, you may load it in Flow Designer at Configuration > App settings > Load / unload open processor.
In either case, when the application is restarted, Striim will reload the open processor from the same location.
Step 6: add the open processor to your application
Return to the application you created in step 1, open the Base Components section of the component palette, drag a Striim Open Processor into the workspace, set its options as follows, and click Save. Note that Ahead and Last Item Seen are defined by the Java class. The other properties will appear in all open processor components.

If you run the application, it will create output files in the striim
directory.
Modifying an open processor
To modify your open processor, unload it in Flow Designer at Configuration > App settings > Load / unload open processor or using the command UNLOAD OPEN PROCESSOR "<path>/<file name>.scm";
. Then make changes to the Java application, compile, and load the new .scm.
The LOAD and UNLOAD commands require the Global.admin role.
Caution
If you unload an open processor, revise it, and load it again, do not change the name of the .scm file.
To load an open processor using the console, enter: LOAD OPEN PROCESSOR "<path>/<file name>.scm";
To load an open processor in the Flow Designer, select App Settings, enter <path>/<file name>.scm
in the Load/Unload Open Processor field, and click Load.

To unload an open processor using the console, enter: UNLOAD "<path>/<file name>.scm";
To unload an open processor in the Flow Designer, select App Settings, enter <path>/<file name>.scm
in the Load/Unload Open Processor field, and click Unload.
Comments