Striim

Welcome to the Striim Help Center and Community Site

3.10.3 Extending Striim with custom Java code

Follow

The topics in this section describe how to extend Striim's capabilities by writing your own functions and components.

Using custom Java functions

TQL applications may import Java functions. The basic procedure is:

  • write a custom function

  • compile it to a .jar

  • add the .jar to .../Striim/lib

  • load the function into Striim

  • import the function to an application

Functions are loaded at the cluster level, so your new function would be available for import to any application in any namespace. Imported functions required by an application are included as IMPORT statements in exported TQL.

The rest of this section describes in detail how to create and use custom functions.

Importing Striim JAR files into your IDE

To develop your own custom functions, you must copy the JAR files from Striim's library into your IDE. The following instructions are for Eclipse. If you use another ID, the procedure should be similar.

  1. Copy the Striim JAR files, available in Striim/lib, into your Java project so you can reference them in your code.

  2. In Eclipse, import the External Jars during the Java Project creation. Otherwise ensure the Project Name is selected in the Project Explorer, select File > Properties, select Java Build Path > Libraries, click the Libraries tab, and click Add External JARS.

  3. When you are finished with your development, copy your custom JAR file into Striim/lib and restart the Striim server.

Setting up packages and classes

The next step is to set up the following in your Java IDE:

  • A package to contain your custom abstract classes.

  • Import statements for required classes.

  • Abstract classes that will contain your custom functions.

  • A logger (recommended).

Caution

Do not use reserved keywords in your package name (see List of reserved keywords).

The following template will help you get started:

// You will need your own package name:
package com.mycompany.custom.packagename;  

 
// You'll need to reference this is a Striim class in your own code if you
// define an Aggregate Window function:
import com.webaction.runtime.compiler.custom.AggHandlerDesc;

// Logging classes required by Striim  
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
  
// Any other classes you may need for your development
import java.lang.Math;
import java.util.LinkedList;
import org.joda.time.DateTime;

// Abstract class needed to contain your custom functions
public abstract class MyCustomFunctions {

  // Required logger
  private static Logger logger = Logger.getLogger( MyCustomFunctions.class );
}
Developing single-row functions

Single row functions are the simplest types of Striim functions. Start by defining your Java methods inside the custom abstract class you created (see Setting up packages and classes). A best practice is to use techniques that operate on a row of values by iterating on them without storing them, thereby reducing memory requirements and increasing performance.

For example, the following function iterates through a row of values and keeps track of the highest value. Once its loop has reached the termination condition by reaching the end of the row of values, it returns the highest value of that row:

public static double maxSingleRow(double... n) {
  int    i=0;
  double runningmax = n[i];
 
  // Iterate through the row and continually update runningmax to the highest value
  while (++i < n.length)
    if (n[i] > runningmax)
      runningmax = n[i];
 
  return runningmax;
}

Generate your custom JAR and deploy it to your Striim environment (see Setting up packages and classes).

Now you can import your static custom class into TQL and use the newly created custom function:

USE StriimWindows_NS2;
DROP APPLICATION StriimWindows2 CASCADE;
 
IMPORT STATIC com.mycompany.custom.packagename.MyCustomFunctions.*;
 
CREATE APPLICATION StriimWindows2;
 
CREATE OR REPLACE CQ myStream_CQ
INSERT INTO myStream_ST
SELECT
      maxSingleRow( field1, field2, field3, field4 )
FROM
      inputStream_ST;

Here are a few other single row functions you can include by defining the following methods in the same abstract class:

  • minSingleRow iterates through a row of values and keeps track of the lowest value

  • rangeSingleRow calculates the range of values in the row by using the previously created minSingleRow and maxSingleRow functions

  • avgSingleRow iterates through a row of values and calculates the average by maintaining a running total and dividing by the length

	public static double minSingleRow(double... n) {
	    int i = 0;
	    double runningmin = n[i];
	    while (++i < n.length)
	        if (n[i] < runningmin)
	        	runningmin = n[i];
	    return runningmin;
	}	
	
	public static double rangeSingleRow(double... n) {
	    double dRange = maxSingleRow( n ) - minSingleRow( n );
    	return dRange;
	}	

	public static double avgSingleRow(double... n) {
	    int i = 0;
	    double runningsum = 0;
	    while (i < n.length) {
	        runningsum += n[i];
	        i++;
	    }
	    return ( runningsum / i );
	}	
Developing multi-row functions

Multi-row functions operate across multiple rows in a Window. A best practice when designing multi-row functions is to use accumulators that eliminate the need to iterate through all the rows in a Window. As you add and eliminate rows from a Window, maintain state on the fly. While it is still possible to iterate through all the rows, the approach using accumulators and maintaining state will help improve performance.

To design a class supporting multi-row functions:

  1. Ensure that you've imported the necessary Class to declare an aggregate function.

    import com.webaction.runtime.compiler.custom.AggHandlerDesc;
  2. Declare a handler that defines a Java class used to create the TQL function signature supporting the required accumulators and functions. For example:

    @AggHandlerDesc(handler=AvgDebug_Float.class)
  3. Declare an abstract static method specifying the expected arguments and return type for the multi-row function. The Java signature will be converted to an equivalent TQL signature. For example:

        public abstract Float AvgDebug( String sKey, Float fArg );
  4. Define a public static nested class whose name is identical to the one you specified in your handler. After setting its initialization variables, your class must implement the following methods:

    • decAggValue()

    • incAggValue()

    • getAggValue()

The following example illustrates these basic steps in a class that maintains a running sum and can return the average of all floating point values through its getAggValue() function. The incAggValue() function adds a new value to the running sum, and decAggValue() removes a value from the running sum:

    // Required handler declaration:
    // handler name must match the name of your static class. 
    @AggHandlerDesc(handler=AvgDebug_Float.class)

    // Name of the function to be exported to TQL:
    public abstract Float AvgDebug( String sKey, Float fArg );

    // Handler class implementing the required accumulators:
    // must match the name in the handler declaration.
    public static class AvgDebug_Float
    {
        // initializers
        float fRunningSum = 0;
        int iRunningCount = 0;
        
        // Calculates and returns the average value at any moment
        public Float getAggValue()
        {
            logger.debug( "RunningSum: " + fRunningSum + ",
              RunningCount: " + iRunningCount + "\n");
            return (iRunningCount == 0) ? 0 : (fRunningSum / iRunningCount);
        }
        // Adds the new value to the running total, maintaining the count
        // of values included:
        public void incAggValue(String sKey, Float fArg)
        {
            if(fArg != null) {
            	iRunningCount++;
                fRunningSum += fArg;
            }
            logger.debug( "[Key: " + sKey + ", Value: " +  fArg + "]
              RunningSum is now: " + fRunningSum + ",  RunningCount is now: "
              + iRunningCount );
        }

        // Removes the specified value from the running total, maintaining
        // the count of values included:
        public void decAggValue(String sKey, Float fArg)
        {
            if(fArg != null) {
            	iRunningCount--;
                fRunningSum -= fArg;
            }
            logger.debug( "[Key: " + sKey + ", Value: " +  fArg + "]
              RunningSum is now: " + fRunningSum + ",  RunningCount is now: " 
              + iRunningCount );
        }
    }

The following TQL example illustrates the mapping between the Java code shown above and its equivalent class and function once deployed in the Striim environment:

USE StriimWindows_NS2;
DROP APPLICATION StriimWindows2 CASCADE;
 
IMPORT STATIC com.mycompany.custom.packagename.MyCustomFunctions.*;
 
CREATE APPLICATION StriimWindows2;
 
CREATE OR REPLACE CQ MovingAvg_CQ
INSERT INTO MovingAvg_ST
SELECT
     avgDebug( OrderId, OrderValue )
FROM
     MovingAvg3ES_WN;

The following Java code implements a handler for an imported TQL function called LastBut, which returns the string located one position before the last specified index:

	@AggHandlerDesc(handler=LastBut_String.class)
    public abstract String LastBut(String arg, int idx);
    public static class LastBut_String
    {
        LinkedList<String> list = new LinkedList<String>();
        int i;
        
        // Returns the string located in the position just before 
        // the last specified index:
        public String getAggValue() 
        { 
            if( list.isEmpty()  || (i < 0) || (i > (list.size()-1)) ) { 
                return null;
            } else {
                return list.get( ( list.size()-i-1) );
            }
        }
 
        // Adds a string to the end of the linked list, 
        //updating the index location:
        public void incAggValue(String arg, int idx) 
        { 
            list.addLast(arg);
            i = idx;  // no safety logic required here - 
                      // all handled by the getAggValue() method
        }


        // Removes the string located at the head of the linked list,
        // assuming it is not an empty list:
        public void decAggValue(String arg, int idx) 
        { 
            if(!list.isEmpty()) list.removeFirst();
        }
    }

Based on the Java code specified in the example above, the TQL class would be called LastBut_String, and the custom method that you could use in your TQL would be called LastBut. Based on this pattern, you can create a collection of functions that take advantage of the Java LinkedList template class, substituting other types for String, such as Byte, Short, Integer, Long, Float, Double, Boolean, DateTime, and even Object. For example, the following example extends the pattern to a linked list of Byte objects:

    @AggHandlerDesc(handler=LastBut_Byte.class)
    public abstract Byte LastBut(Byte arg, int idx);
    public static class LastBut_Byte
    {
        LinkedList<Byte> list = new LinkedList<Byte>();
        int i;
        
        public Byte getAggValue() 
        { 
            if( list.isEmpty()  || (i < 0) || (i > (list.size()-1)) ) { 
                return null;
            } else {
                return list.get( ( list.size()-i-1) );
            }
        }
        public void incAggValue(Byte arg, int idx) 
        { 
            list.addLast(arg);
            i = idx;  // no safety logic required here - 
                      // all handled by the getAggValue() method
        }
        public void decAggValue(Byte arg, int idx) 
        { 
            if(!list.isEmpty()) list.removeFirst();
        }
    }
Understanding windows in custom functions

As you work with multiple rows of data, Striim provides you with the support you need to use sliding windows of data. To understand how this works, consider the following complete dataset:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

2222

2/1/2016

222.22

3333

3/1/2016

333.33

4444

4/1/2016

444.44

5555

5/1/2016

555.55

6666

6/1/2016

666.66

7777

7/1/2016

777.77

8888

8/1/2016

888.88

9999

9/1/2016

999.99

The following describes how the dataset is treated in a sliding window in which up to 3 rows of data at a time are included.

Initialization

When your object is initialized, you do not yet have a working record (a row of data), and the window has no data yet. Consider the following TQL statement:

SELECT
  AVG(OrderValue),
  OrderID,
  OrderDate,
  OrderValue
FROM
  STREAM_ST

As you learned in Developing multi-row functions, the getAggValue() function defines return values, the incAggValue() function defines incremental logic, and decAggValue() defines decremental logic.

At the time of initialization, the running sum and count for the data would each be zero.

First row

Using the dataset above, the incAggValue() function adds the first row to the window, and the getAggValue() function is called and adds those values to the running sum and count.

Here is the state of the data at this point. The working record is:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

The window is:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

Sum: 111.11

Count: 1

AVG: 111.11

Second row

Using the dataset above, the incAggValue() function adds the second row to the window, and the getAggValue() function is called and adds those values to the running sum and count.

Here is the state of the data at this point. The working record is:

OrderID

OrderDate

OrderValue

2222

2/1/2016

222.22

The window is:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

2222

2/1/2016

222.22

Sum: 333.33

Count: 2

AVG: 166.65

Third row

Using the dataset above, the incAggValue() function adds the third row to the window, and the getAggValue() function is called and adds those values to the running sum and count.

Here is the state of the data at this point. The working record is:

OrderID

OrderDate

OrderValue

3333

3/1/2016

333.33

The window is:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

2222

2/1/2016

222.22

3333

3/1/2016

333.33

Sum: 666.66

Count: 3

AVG: 222.22

Fourth row

Up till now, since the sliding window could contain up to 3 rows, for each new row the incAggValue() function added the new row to the window, and the getAggValue() function was called and added those values to the running sum and count. With a 4th row, this algorithm changes. The decAggValue() function is called to remove the 1st row, the incAggValue() function adds the 4th row to the window, and the getAggValue() function is called and adds those values to the running sum and count.

Here is the state of the data at this point. The working record is:

OrderID

OrderDate

OrderValue

4444

4/1/2016

444.44

The window now only includes rows 2-4 and excludes row 1, and the count holds steady at 3 rows:

OrderID

OrderDate

OrderValue

2222

2/1/2016

222.22

3333

3/1/2016

333.33

4444

4/1/2016

444.44

Sum: 999.99

Count: 3

AVG: 333.33

Toward the end

The trend continues as new rows are added: a row is dropped in order to add the new row, and the values are updated. Eventually, as input events cease and windowed events being to timeout, only the decAggValue() and getAggValue()functions are called.

Understanding logging in custom functions

We recommend that you use org.apache.log4j for your logging, as that is what Striim uses with its own functions.

Configuring Log4j

Edit the conf/log4j.server.properties file and add the following new section:

log4j.logger.com.mycompany.custom.packagename.MyCustomFunctions=debug, MyCustomFunctionsAppender
log4j.additivity.com.mycompany.custom.packagename.MyCustomFunctions=false
log4j.appender.MyCustomFunctionsAppender=org.apache.log4j.FileAppender
log4j.appender.MyCustomFunctionsAppender.File=logs/MyCustomFunctions.log.out
# Define the layout for file appender
log4j.appender.MyCustomFunctionsAppender.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.MyCustomFunctionsAppender.layout.conversionPattern=%-6p:%-15M:%m%n

In this example, a separate logs/MyCustomFunctions.log.out file will be created. Its log level will be debug, and it will output the log level, method, and message.

In the following Java class, the logger is used to output the running sum and count values:

    @AggHandlerDesc(handler=AvgDebug_Float.class)
    public abstract Float AvgDebug( String sKey, Float fArg );
    public static class AvgDebug_Float
    {
        float fRunningSum = 0;
        int iRunningCount = 0;
        
        public Float getAggValue()
        {
            logger.debug( "RunningSum: " + fRunningSum + ",  RunningCount: " + iRunningCount + "\n");
            return (iRunningCount == 0) ? 0 : (fRunningSum / iRunningCount);
        }
        public void incAggValue(String sKey, Float fArg)
        {
            if(fArg != null) {
            	iRunningCount++;
                fRunningSum += fArg;
            }
            logger.debug( "[Key: " + sKey + ", Value: " +  fArg + "] RunningSum is now:
              " + fRunningSum + ",  RunningCount is now: " + iRunningCount );
        }
        public void decAggValue(String sKey, Float fArg)
        {
            if(fArg != null) {
            	iRunningCount--;
                fRunningSum -= fArg;
            }
            logger.debug( "[Key: " + sKey + ", Value: " +  fArg + "] RunningSum is now:
              " + fRunningSum + ",  RunningCount is now: " + iRunningCount );
        }
    }

One recommendation for your logging applications is to use StringBuffer to concatenate the strings used in your logging output. This will increase efficiency and ensure greater security in your applications.

Set the log level

Striim supports dynamic changing of log levels without the need to restart the server. To dynamically set the log level to debug for your custom class:

set loglevel = {com.mycompany.custom.packagename.MyCustomFunctions : debug};

See Changing the log level for more information.

Calling a shell script from a custom Java function

The following custom function will call a shell script specified in TQL.

import com.webaction.runtime.compiler.custom.Nondeterministic; 
...

@Nondeterministic
public static int kickShell (String args) {
        int ret = 0;
        try {
            if (args.indexOf(".sh") == -1) {
                System.out.println("Error:" + args + " is not shell.");
                return ret;
            }
            Process process = new ProcessBuilder("sh", args).start();
            ret = process.waitFor();
            System.out.println("return:" + ret);
        }catch (ArrayIndexOutOfBoundsException e) {
            System.out.println("Info: Please specify one argument.");
            System.out.println("return:" + ret);
        }catch (Exception e){
            System.out.println("return:" + ret);
            e.printStackTrace();
        }finally{
            return ret;
        }
    }

You could use that in TQL as follows:

CREATE CQ ...
SELECT kickShell('/etc/striim/test.sh') as retVal ...

import com.webaction.runtime.compiler.custom.Nondeterministic; and @Nondeterministic cause the script to be executed every time the CQ is run. Without those lines, the script would be called only once, on deployment.

Warning

Consider the potential security issues of allowing TQL applications to call scripts on the Striim host system. Depending on your environment and requirements, it may be more appropriate to call a specific script from the custom Java function.

Loading and unloading custom functions

Functions are loaded at the cluster level, so are available for import to any application in any namespace. Imported functions are included as IMPORT statements in exported TQL. The LOAD and UNLOAD commands require the Global.admin role.

Before loading your custom function, copy its .jar file to striim/lib. Whenever Striim is restarted, the function will be loaded automatically.

To load the function using the console, enter LOAD "lib/<file name>.jar";

To import a loaded function in the console or a TQL file, use IMPORT STATIC <package name>.

To load and import a custom function in the web UI, select App Settings > Add Another Package, enter IMPORT STATIC <package name> in the Java Package field, and click Save.

load_custom_function.png

To unload a custom function, in the console enter UNLOAD "lib/<file name>.jar";

To stop the function from reloading when Striim is restarted, delete its .jar file from striim/lib.

Creating an open processor component

A Striim open processor contains a custom Java application that reads data from a window or stream, processes it, optionally enriching it with data from a cache, and writes to an output stream.

The SDK includes the following:

  • StriimOpenProcessor-SDK.jar, which contains classes to be included in the Java application, installed with Striim at striim/StriimSDK.

  • A Javadoc API reference for methods you may use in your Java application, installed with Striim at striim/docs/StriimOpenProcessor-SDKdocs.

The component must be built with Maven, since it requires the Maven Shade Plugin.

An open processor can be used only in the Striim namespace from which the types are exported.

The following simple example shows all the steps required to create an open processor and use it in a Striim application.

Step 1: define the input and output streams in Striim

The following TQL defines the input and output streams for the example open processor you will add later. It includes a FileWriter source, a cache that will be specified in the open processor's ENRICH option, and a FileWriter target.

CREATE NAMESPACE ns1;
USE ns1;
CREATE APPLICATION OPExample;

CREATE source CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
)
OUTPUT TO CsvStream;
 
CREATE TYPE MerchantHourlyAve(
  merchantId String,
  hourValue integer,
  hourlyAve integer
);

CREATE CACHE HourlyAveLookup using FileReader (
  directory: 'Samples/PosApp/appData',
  wildcard: 'hourlyData.txt'
)
PARSE USING DSVParser (
  header: Yes,
  trimquote:false,
  trimwhitespace:true
) 
QUERY (keytomap:'merchantId') 
OF MerchantHourlyAve;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream partition by merchantId
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_INT(data[9]) as zip
FROM CsvStream;
 
CREATE CQ cq2
INSERT INTO SendToOPStream
SELECT makeList(dateTime) as dateTime,
  makeList(zip) as zip
FROM PosDataStream;
 
CREATE TYPE ReturnFromOPStream_Type ( time DateTime , val Integer );
CREATE STREAM ReturnFromOPStream OF ReturnFromOPStream_Type;

CREATE TARGET OPExampleTarget 
USING FileWriter (filename: 'OPExampleOut') 
FORMAT USING JSONFormatter() 
INPUT FROM ReturnFromOPStream;
 
END APPLICATION OPExample;
Step 2: export the input and output stream types

If you create OPExample in the ns1 workspace, the following Striim console command will export the types from the application to /home/myhome/OpExampleTypes.jar:

EXPORT TYPES OF ns1.OPExample TO "/home/myhome/OpExampleTypes.jar";

The EXPORT TYPES command requires read permission on the namespace. If you do not specify a path, the file will be created in the striim directory.

Step 3: set up Maven

Install the SDK and exported types .jar files:

mvn install:install-file -DgroupId=com.example -DartifactId=OpenProcessorSDK \
  -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar -Dfile=/opt/striim/StriimSDK/StriimOpenProcessor-SDK.jar
mvn install:install-file -DgroupId=com.example -DartifactId=OPExample -Dversion=1.0.0-SNAPSHOT \
  -Dpackaging=jar -Dfile=/home/myhome/OpExampleTypes.jar

Create a Maven project in which you will create your custom Java application:

mvn archetype:generate -DgroupId=com.example.opexample -DartifactId=opexample \
  -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Replace the default pom.xml created by Maven with the following, adjusting as necessary for your environment:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example.opexample</groupId>
  <artifactId>opexample</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>opexample</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
<!-- OpenProcessorSDK jar -->
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>OpenProcessorSDK</artifactId>
            <version>1.0.0-SNAPSHOT</version>
            <scope>provided</scope>
        </dependency>
<!-- exported types jar -->
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>OPExample</artifactId>
            <version>1.0.0-SNAPSHOT</version>
            <scope>provided</scope>
        </dependency>
  </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-release-plugin</artifactId>
                <version>2.2.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
<!-- 
The output SCM filename is defined here.
-->
                    <finalName>OPExample.scm</finalName>
                    <transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <manifestEntries>
                                <Striim-Module-Name>OPExample</Striim-Module-Name>
                                <Striim-Service-Interface>
                                  com.webaction.runtime.components.openprocessor.StriimOpenProcessor
                                </Striim-Service-Interface>
                                <Striim-Service-Implementation>
                                  com.example.opexample.App
                                </Striim-Service-Implementation>
                            </manifestEntries>
                        </transformer>
                    </transformers>
                    <artifactSet>
                        <excludes>
                            <exclude>org.slf4j:*</exclude>
                            <exlcude>log4j:*</exlcude>
                        </excludes>
                    </artifactSet>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>com.coderplus.maven.plugins</groupId>
                <artifactId>copy-rename-maven-plugin</artifactId>
                <version>1.0</version>
                <executions>
                    <execution>
                        <id>copy-file</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy</goal>
                        </goals>
<!--
The location and name for the .scm file to be imported into Striim is defined here.
Preferred location is module/modules folder under the Maven project main folder.
-->
                        <configuration>
    <sourceFile>/home/myhome/opexample/target/OpExample.scm.jar</sourceFile>
    <destinationFile>/home/myhome/opexample/modules/OpExample.scm</destinationFile>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
Step 4: write your Java application and build the .scm

Replace the default App.java with the following:

package com.example.opexample;
 
import wa.ns1.SendToOPStream_Type_1_0;
import wa.ns1.ReturnFromOPStream_Type_1_0;
 
import com.webaction.anno.PropertyTemplateProperty;
import com.webaction.runtime.components.openprocessor.StriimOpenProcessor;
import org.joda.time.DateTime;
 
import com.webaction.anno.AdapterType;
import com.webaction.anno.PropertyTemplate;
import com.webaction.runtime.containers.WAEvent;
 
import com.webaction.runtime.containers.IBatch;
import java.util.*;
 
@PropertyTemplate(name = "TupleConverter", type = AdapterType.process,
properties = {
@PropertyTemplateProperty(name="ahead", type=Integer.class, required=true, defaultValue="0"),
@PropertyTemplateProperty(name="lastItemSeen", type=Boolean.class, required=true, defaultValue="0")
},
// The names of outputType and inputType are relative to Striim: output from a native Striim
// code to your custom component, and input from your custom component to a native component.
outputType = SendToOPStream_Type_1_0.class,
inputType = ReturnFromOPStream_Type_1_0.class
)
public class App extends StriimOpenProcessor
{
  
public void run() {
		IBatch<WAEvent> event = getAdded();
		Iterator<WAEvent> it = event.iterator();
		while (it.hasNext()) {
			SendToOPStream_Type_1_0  type = (SendToOPStream_Type_1_0 ) it.next().data;
			//  ... Additional operations
		}

		ReturnFromOPStream_Type_1_0 ReturnFromOPStream_Type_1_0  = new ReturnFromOPStream_Type_1_0 ();
		ReturnFromOPStream_Type_1_0.time = DateTime.now();
		Random rand = new Random(System.currentTimeMillis());

		ReturnFromOPStream_Type_1_0.val= rand.nextInt(50) + 1;
		send(ReturnFromOPStream_Type_1_0 );

	}

public void close() throws Exception {
        // TODO Auto-generated method stub
 
    }
 
    public Map getAggVec() {
        // TODO Auto-generated method stub
        return null;
    }
 
    public void setAggVec(Map aggVec) {
        // TODO Auto-generated method stub
 
    }
}

Change to the opexample directory created by Maven and enter mvn package.

Step 5: import the .scm into Striim

Loading an open processor requires the Global.admin permission (see Permissions).

Copy opexample/modules/OpExample.scm to a directory accessible by the Striim server, then use the following console command to load it:

LOAD OPEN PROCESSOR "<path>/OpExample.scm";

Alternatively, you may load it in Flow Designer at Configuration > App settings > Load / unload open processor.

In either case, when the application is restarted, Striim will reload the open processor from the same location.

Step 6: add the open processor to your application

Return to the application you created in step 1, open the Base Components section of the component palette, drag a Striim Open Processor into the workspace, set its options as follows, and click Save. Note that Ahead and Last Item Seen are defined by the Java class. The other properties will appear in all open processor components.

ExampleOP_settings.png

If you run the application, it will create output files in the striim directory.

Modifying an open processor

To modify your open processor, unload it in Flow Designer at Configuration > App settings > Load / unload open processor or using the command UNLOAD OPEN PROCESSOR "<path>/<file name>.scm";. Then make changes to the Java application, compile, and load the new .scm.

Loading and unloading open processors

The LOAD and UNLOAD commands require the Global.admin role.

Caution

If you unload an open processor, revise it, and load it again, do not change the name of the .scm file.

To load an open processor using the console, enter: LOAD OPEN PROCESSOR "<path>/<file name>.scm";

To load an open processor in the Flow Designer, select App Settings, enter <path>/<file name>.scm in the Load/Unload Open Processor field, and click Load.

load_open_processor.png

To unload an open processor using the console, enter: UNLOAD "<path>/<file name>.scm";

To unload an open processor in the Flow Designer, select App Settings, enter <path>/<file name>.scm in the Load/Unload Open Processor field, and click Unload.

3.10.3
Was this article helpful?
0 out of 0 found this helpful

Comments