Striim

Welcome to the Striim Help Center and Community Site

3.10.1 Parsers

Follow

When a reader supports multiple input types, the type is selected by specifying the appropriate parser.

AAL (Apache access log) Parser

Parses Apache access logs. See Supported reader-parser combinations for compatible readers.

property

type

default value

notes

Archive Dir

String

if specified, the adapter will also read the rotated log files from the archive directory

Charset

String

UTF-8

Column Delimit Till

Integer

-1

With the default value of -1, all delimiters are interpreted as columns. If a positive value is specified, that number of delimiters are interpreted as columns, and any additional delimiters are treated as if escaped. For example, if the columndelimiter value is a space, and columndelimittill is 4, this row:

 2012-12-10 10:30:30:256 10.1.10.12 jsmith User Login Error, invalid username or password

would be interpreted as five columns:

2012-12-10
10:30:30:256
10.1.10.12
jsmith
User Login Error, invalid username or password

Column Delimiter

String

default value is one space (UTF-8 0x20)

Ignore Empty Eolumn

Boolean

True

Quote Set

String

[]~\"

characters that mark the start and end of each field

Row Delimiter

String

\n

see Setting rowdelimiter values

Separator

String

~

The output type of a source using AALParser is WAEvent.

Sample application:

CREATE SOURCE AALSource USING FileReader (
  directory:'Samples/appData',
  wildcard:'access_log.log',
  positionByEOF:false
)
PARSE USING AALParser ()
OUTPUT TO RawAccessStream;
	
CREATE TYPE AccessLogEntry (
  srcIp String KEY,
  accessTime DateTime,
  timeStr String,
  request String);

CREATE STREAM AccessStream OF AccessLogEntry;

CREATE CQ ParseAccessLog
INSERT INTO AccessStream
SELECT data[0],
  TO_DATE(data[3],"dd/MMM/yyyy:HH:mm:ss Z"),
  data[3],
  data[4]
FROM RawAccessStream;

Avro Parser

Parses input in Avro format. See Supported reader-parser combinations for compatible readers.

When reading .avro files, no properties are required. For non-file sources, specify one of the two properties.

property

type

default value

notes

Schema File Name

String

the path and name of the Avro schema file

Schema Registry URI

String

the URI for a Confluent or Hortonworks schema registry, for example, http://198.51.100.55:8081

For detailed discussion of the schema registry, see Using the Confluent or Hortonworks schema registry.

The output type of a source using AvroParser is AvroEvent, which contains the elements of the metadata map (see Using the META() function) and the data array, and is of the type org.apache.avro.generic.GenericRecord.

The following application will generate an Avro schema file PosDataPreview.avsc and convert Samples/PosApp/appData/PosDataPreview.csv to an Avro data file PosDataPreview.avro:

CREATE APPLICATION WritePosData2Avro;
CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;
 
CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT
  TO_STRING(data[0]) as businessName,
  TO_STRING(data[1]) as merchantId,
  TO_STRING(data[2]) as primaryAccountNumber,
  TO_STRING(data[3]) as posDataCode,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  TO_STRING(data[5])  as expDate,
  TO_STRING(data[6]) as currencyCode,
  TO_DOUBLE(data[7]) as authAmount,
  TO_STRING(data[8]) as terminalId,
  TO_STRING(data[9]) as zip,
  TO_STRING(data[10]) as city
FROM CsvStream;
 
CREATE TARGET AvroFileOut USING FileWriter(
  filename:'Samples.PosDataPreview.avro'
)
FORMAT USING AvroFormatter (
  schemaFileName:'Samples.PosDataPreview.avsc'
)
INPUT FROM PosDataStream;
END APPLICATION WritePosData2Avro;

See AVROFormatter for more information.

The following sample application uses the files created by WritePosData2Avro and writes a subset of the fields from PosDataPreview.avro to SysOut:

CREATE APPLICATION AvroParserTest;

CREATE SOURCE AvroSource USING FileReader (
  directory:'Samples',
  WildCard:'PosDataPreview.avro',
  positionByEOF:false
)
PARSE USING AvroParser (
  schemaFileName:"Samples/PosDataPreview.avsc"
)
OUTPUT TO AvroStream;

CREATE CQ parseAvroStream 
INSERT INTO ParsedAvroStream
SELECT  
-- conversion from org.apache.avro.util.Utf8 to String is required here
  data.get("merchantId").toString() as merchantId,
  TO_DATE(data.get("dateTime").toString()) as dateTime,
  TO_DOUBLE (data.get("authAmount")) as amount,
  data.get("zip").toString() as zip
FROM AvroStream; 
  
CREATE TARGET AvroOut
USING SysOut (name:Avro)
INPUT FROM ParsedAvroStream;

END APPLICATION AvroParserTest;

The following sample application will read from the Kafka topic created by the Oracle2Kafka sample application from Using the Confluent or Hortonworks schema registry.

CREATE APPLICATION ReadFromKafka RECOVERY 1 SECOND INTERVAL;
 
CREATE SOURCE Kafkasource USING KafkaReader VERSION '0.11.0'(
brokerAddress:'localhost:9092',
Topic:'test',
startOffset:0
)
PARSE USING AvroParser()
OUTPUT TO DataStream;
 
CREATE TYPE CompleteRecord(
    completedata com.fasterxml.jackson.databind.JsonNode
);
 
CREATE STREAM CompleteRecordInJSONStream OF CompleteRecord;
 
Create CQ AvroTOJSONCQ
    INSERT INTO CompleteRecordInJSONStream
        SELECT AvroToJson(y.data) FROM DataStream y;
 
CREATE TYPE ElementsOfNativeRecord(
    datarecord com.fasterxml.jackson.databind.JsonNode,
    before com.fasterxml.jackson.databind.JsonNode,
    metadata com.fasterxml.jackson.databind.JsonNode,
    userdata com.fasterxml.jackson.databind.JsonNode,
    datapresenceinfo com.fasterxml.jackson.databind.JsonNode,
    beforepresenceinfo com.fasterxml.jackson.databind.JsonNode
);
 
CREATE STREAM NativeRecordStream OF ElementsOfNativeRecord;
 
CREATE CQ GetNativeRecordInJSONCQ
INSERT INTO NativeRecordStream
SELECT
    completedata.get("data"),
    completedata.get("before"),
    completedata.get("metadata"),
    completedata.get("userdata"),
    completedata.get("datapresenceinfo"),
    completedata.get("beforepresenceinfo")
FROM CompleteRecordInJSONStream;
 
CREATE TAREGT bar using SysOut(name:'complete_record') input from CompleteRecordInJSONStream;
 
END APPLICATION ReadFromKafka;

Binary Parser

Can be configured to parse a wide variety of binary formats. See Supported reader-parser combinations for compatible readers.

property

type

default value

notes

Endian

Boolean

False

False means the format is little endian, set to True if the format is big endian

Metadata

String

path from root or relative to the Striim program directory and the name of JSON file defining how to parse the data

String Terminated By Null

Boolean

True

With the default value of True, strings must be terminated by a single null character (ASCII 00). If set to False, string length must be defined in the JSON file specified by the metadata property.

The output type of a source using BinaryParser is WAEvent.

The data types supported by the metadata JSON are:

type

length in bytes

BYTE

1

DOUBLE

8

FLOAT

4

INTEGER

4

LONG

8

SHORT

2

STRING

as set by stringColumnLength

You can find insorders.cmetadata.json, and bin.tql under the Striim program directory at docs/BinaryParser. To run this sample application:

1. Run insorders.c and copy its test.bin output to the Samples directory.

/***** insorders.c ******************************************
**                                                          *
** Generate order records in binary format                  *
** Usage insorders <recordcount>                            *
** Max recordcount 1000000                                  *
** Note                                                     *
**                                                          *
************************************************************/
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <stdlib.h>
/* Define some useful constants */
#define HIGH  1
#define MEDIUM 2
#define LOW 3
#define PRICE_TYPES 3
#define FILENAME2               "/Users/mahadevan/binary/test.bin"
#define MAX_LEN_CNAME           20
#define MAX_DIG_CNAME           10
#define NAMELEN                 20
#define MAX_ITEM_COUNT          200
#define EXPENSIVE_ITEM_PRICE    745.99
#define MEDIUM_ITEM_PRICE       215.99
#define LOW_ITEM_PRICE          34.79
 
/* Function prototypes */
int testdata(int numRecords);
double setPrice(int cid);
int main(int argc, char *argv[])
{
  int count = 1000;
  int ret = 1;
  /* just basic error checking for now is sufficient */
  
  
   printf(" Inserting %d Records\n", count);
  
  
  if ((ret = testdata(count)) !=0)
     printf("TestData Failed\n");
} /* End Main */
/* Start Function for test data generation */
int testdata(int numRecords)
{
  int i,nm1,nm2 = 0;
  
  /* Declare variables for random record creation \n")*/  
  int order_id=10000;
  int district_id=0;
  int warehouse_id = 100;
  char cname[numRecords][MAX_LEN_CNAME];  
  char PrefixName[NAMELEN]="John";
  char numRecStr[MAX_DIG_CNAME];
  int count_id=0;
  double price=0;
  /* time_t now; Just generate sysdate for now */
  bzero(numRecStr,MAX_DIG_CNAME);
  FILE *fptr_bin =fopen(FILENAME2, "w");
  if (fptr_bin==NULL) return -1;
   for (nm1=0; nm1 < numRecords; nm1++)
   {
      sprintf(numRecStr, " %d",nm1); 
      strcat(PrefixName,numRecStr); 
      strcpy(cname[nm1],PrefixName); 
      //printf("Generated Name is %s\n", cname[nm1]);
      /* Re-init to base root for name */
      strcpy(PrefixName, "John");
      /* Generate a random count of items between 0 and 20 */
      count_id = rand()%MAX_ITEM_COUNT;
      price = setPrice(count_id);
      
      printf("Price is %f\n",price);
      short cnamelen = strlen(cname[nm1]);
      /* Generate record with the following fields */
      fwrite((const void *) (&order_id), sizeof(int), 1, fptr_bin); order_id++;
      fwrite((const void *) (&district_id), sizeof(int), 1, fptr_bin); district_id++;
      fwrite((const void *) (&warehouse_id), sizeof(int), 1, fptr_bin); warehouse_id++;
      fwrite((const void *) (&cnamelen), sizeof(short), 1, fptr_bin); 
      fwrite((const void *) (cname[nm1]), sizeof(char), strlen(cname[nm1]), fptr_bin); 
      fwrite((const void *) (&count_id), sizeof(int), 1, fptr_bin); count_id++;
      fwrite((const void *) (&price), sizeof(double), 1, fptr_bin); 
  }
  fclose(fptr_bin);
  return 0;
}
/* Start setPrice */
  double setPrice(int cid)
  {
    short i;
    double price, total_price=0;
    
    i = rand()%PRICE_TYPES;
    switch (i) 
    {
      case(HIGH):
          price = EXPENSIVE_ITEM_PRICE;
      case(MEDIUM):
          price = MEDIUM_ITEM_PRICE;
      case(LOW):
          price = LOW_ITEM_PRICE;
     }
     total_price = cid*price;
     return total_price;
  } /* End setPrice */

2. Copy metadata.json to the Samples directory.

{
     "namespace": "test",
     "name": "SensorFeed",
     "version": "1.0",
     "type": "record",
     "fields": [
       {
        "name": "order_id",
        "type": "INTEGER",
         "position":"0"
        },        
       {
        "name": "district_id",
        "type": "INTEGER",
         "position":"1"
        },        
       {
        "name": "warehouse_id",
        "type": "INTEGER",
         "position":"2"
        },        
       {
        "name": "customer_name",
        "type": "STRING",
         "position":"3"
        },        
       {
        "name": "count_id",
        "type": "INTEGER",
         "position":"4"
        },        
       {
        "name": "price",
        "type": "DOUBLE",
         "position":"5"
        }        
      ]
    }

4. Load bin.tql and run the bin application.

CREATE APPLICATION bin;
 
CREATE SOURCE BinarySource using FileReader (
  directory:'Samples',
  wildcard:'test.bin',
  positionByEOF:false
)
PARSE USING BinaryParser (
  metadata:'Samples/metadata.json',	
  endian:true
)
OUTPUT TO BinaryStream;

CREATE TARGET BinaryDump USING LogWriter(
  name:BinaryReader, 
  filename:'out.log'
) INPUT FROM BinaryStream;

CREATE TYPE OrderType (
  OrderId integer,
  DistrictId integer,
  WarehouseId integer,
  Name String,
  CountId integer,
  Price double);
     
CREATE STREAM OrderStream OF OrderType;

CREATE CQ OrderCQ 
INSERT into OrderStream
SELECT data[0],
  data[1],
  data[2],
  data[3],
  data[4],
  TO_DOUBLE(data[5])
FROM BinaryStream;

CREATE TARGET DSVOut
USING FileWriter(filename: binary_output)
FORMAT USING DSVFormatter()
INPUT FROM OrderStream;

END APPLICATION bin;

The binary_output.csv output file will start like this:

10000,0,100,John 0,7,243.53
10001,1,101,John 1,73,2539.67
10002,2,102,John 2,130,4522.7
10003,3,103,John 3,144,5009.76
10004,4,104,John 4,123,4279.17
10005,5,105,John 5,40,1391.6

Collectd Parser

After configuring a remote host to collect and transmit system information as detailed in collectd configuration, create a UDPReader and select this parser to use the data in an application.

This parser has a single property, authfilelocation. If the remote host has security enabled for collectd, specify the path (relative to the Striim program directory) and name of the authentication file, otherwise leave the property blank. See "Server setup" in https://collectd.org/wiki/index.php/Networking_introduction for more information.

The output type is CollectdEvent. Its fields are:

field name

type

data

varies depending on the ds-type setting for the pluginName value in collectd's types.db (see http://collectd.org/documentation/manpages/types.db.5.shtml for more information): for ABSOLUTE, COUNTER, and DERIVE, the type is long or long[], for GAUGE, the type is double or double[]

hostName

string

intervalHighResolution

long

message

string

pluginInstanceName

string

pluginName

string

severity

long

time

DateTime

timeHighResolution

DateTime

timeInterval

long

typeInstanceName

string

typeName

string

The fields correspond to collectd part types (see https://collectd.org/wiki/index.php/Binary_protocol#Part_types). data corresponds to the Values part type.

The following example application receives CPU metrics and computes min, max, last, and average values for each one-minute block of data:

CREATE APPLICATION collectd;
CREATE SOURCE CollectdSource USING UDPReader (
  IpAddress:'127.0.0.1',
  PortNo:'25826'
)
PARSE USING CollectdParser ()
OUTPUT TO CollectdStream;

CREATE TYPE CpuUsageType (
  hname String,
  tStamp DateTime,
  tInstanceName String,
  pInstanceName Integer,
  cUsage double      	
);
CREATE STREAM CpuUsageStream OF CpuUsageType;

CREATE CQ CpuUsage 
INSERT INTO CpuUsageStream 
SELECT hostname, 
  TimeHighResolution,
  TypeInstanceName,
  TO_INT(PluginInstanceName),
  TO_DOUBLE(data[0]) 
FROM CollectdStream
WHERE PluginName = 'cpu';
        
CREATE JUMPING WINDOW CpuUsageWindow OVER CpuUsageStream KEEP WITHIN 1 MINUTE ON tStamp;

CREATE TYPE CpuStatisticsType (
  cpuName Integer,
  cpuType String,
  min double,
  max double,
  last double,
  avg double
);
CREATE STREAM CpuStatisticsStream OF CpuStatisticsType;

CREATE CQ CpuStatisticsCQ 
INSERT INTO CpuStatisticsStream
SELECT x.pInstanceName,
  x.tInstanceName,
  MIN(x.cUsage),
  MAX(x.cUsage),
  x.cUsage,
  AVG(x.cUsage)
FROM CpuUsageWindow x
GROUP BY x.pInstanceName,x.tInstanceName;

CREATE TARGET CpuStatisticsDump
USING SysOut(name:Stat)
INPUT FROM CpuStatisticsStream;

END APPLICATION collectd;

The output would look like this:

Stat: CpuStatisticsType_1_0{
  cpuName: "6"
  cpuType: "system"
  min: 252157.0
  max: 252312.0
  last: 252312.0
  avg: 252236.5
};
Stat: CpuStatisticsType_1_0{
  cpuName: "7"
  cpuType: "user"
  min: 33387.0
  max: 33393.0
  last: 33393.0
  avg: 33390.166666666664
};

DSV Parser

Parses delimited text. See Supported reader-parser combinations for compatible readers.

property

type

default value

notes

Block as Complete Record

Boolean

False

With JMSReader or UDPReader, if set to True, the end of a block will be considered the end of the last record in the block, even if the rowdelimiter is missing. Do not change the default value with other readers.

Charset

String

UTF-8

Column Delimiter

String

,

use \t for tab

Column Delimit Till

Integer

-1

With the default value of -1, all delimiters are interpreted as columns. If a positive value is specified, that number of delimiters are interpreted as columns, and any additional delimiters are treated as if escaped. For example, if the columndelimiter value is a space, and columndelimittill is 4, this row:

2012-12-10 10:30:30:256 10.1.10.12 jsmith User Login Error, invalid username or password

would be interpreted as five columns:

2012-12-10
10:30:30:256
10.1.10.12
jsmith
User Login Error,
  invalid username or password

Comment Character

Character

if specified, lines beginning with this character will be skipped

Event Type

String

reserved

Header

Boolean

False

Set to True if the first row (or the row specified by headerlineno) contains field names.

When DSVParser is used with FileReader, the output stream type can be created automatically from the header (see Creating the FileReader output stream type automatically).

Header Line No

Integer

0

if the header is not the first line of the file, set this to the line number of the header row

Ignore Empty Column

Boolean

False

if set to True, empty columns will be skipped instead of output as null values

Ignore Multiple Record Begin

Boolean

True

see FreeFormTextParser

Ignore Row Delimiter in Quote

Boolean

False

if set to True, when the rowdelimiter character appears between a pair of quoteset characters it is treated as if escaped

Line Number

Integer

-1

With the default value of -1, reads all lines. Set to n to skip the first n-1 lines and begin with line number n.

No Column Delimiter

Boolean

False

if set to True, columndelimiter is ignored and the entire line is output as data[0]

Quote Set

String

"

character or characters that mark the start and end of each field; you may specify different start and end characters, such as [] or {}

Record Begin

String

see FreeFormTextParser

Record End

String

see FreeFormTextParser

Row Delimiter

String

\n

see Setting rowdelimiter values

Separator

String

:

character used to separate multiple values for columndelimiter, quoteset, or rowdelimiter (for example, ,:\t to recognize both comma and tab as delimiters)

Trim Quote

Boolean

True

if set to False, the quoteset and quotecharacter characters are not removed from the output

Trim Whitespace

Boolean

False

set to True if the data has spaces between values and delimiters (for example, "1" , "2")

The output type of a source using DSVParser is WAEvent.

Example:

... PARSE USING DSVParser (
	header:'yes'
)...

Free Form Text Parser

Use with a compatible reader to use a regular expression to parse unstructured data, such as log files, with events that span multiple lines.

property

type

default value

notes

Block as Complete Record

Boolean

False

With JMSReader or UDPReader, if blockascompleterecord is set to True, the end of a block will be considered the end of the last record in the block, even if the row delimiter is missing. This does not change the default value with other readers.

Charset

String

UTF-8

Ignore Multiple Record Begin

Boolean

True

With the default setting of True, additional occurrences of the RecordBegin string before the next RecordEnd string will be ignored. Set to False to treat each occurrence of RecordBegin as the beginning of a new event.

Record Begin

String

Specify a string that defines the beginning of each event. The string may include date expressions and/or %IP_ADDRESS% (which will match any IP address). If a RecordBegin pattern starts with the ^ character, the pattern will be excluded from the data.

NOTE: RecordBegin does not support regex.

Record End

String

An optional string that defines the end of each event (see Using regular expressions (regex)). If a RecordEnd pattern starts with the ^ character, the pattern will be excluded from the data.

NOTE: RecordEnd does not support regex.

Regex

String

A regular expression (regex) defining the beginning (RecordBegin) and end (RecordBegin) of each field to be included in the output. To apply more than one pattern, separate the patterns using the '|' character. For example:

regex: '((^([\\w]+)) |
((?<=Author: ).*(\n)) |
((?<=\n\n).*))'

For more information about regular expressions, refer to the following resources:

NOTE: You cannot specify a regex pattern in a RecordBegin or RecordEnd string.

Separator

String

~

the separator between multiple values in other properties For example, if the end of the record could be specified by either "millisec" or "processed," with the default separator ~ the RecordEnd value would be millisec~processed.

Timestamp

String

Defines the format of the timestamp in the source data. The values are output to the originTimeStamp key in WAEvent's metadata map  and as shown in the sample code below can be retrieved using SELECT META(stream_name,'<originTimeStamp>'). Supported pattern strings are:

"EEE, d MMM yyyy HH:mm:ss Z" 
"EEE, MMM d, ''yy" 
"h:mm a" 
"hh 'o''clock' a, zzzz" 
"K:mm a, z" 
"yyMMddHHmmssZ" 
"YYYY-'W'ww-u"
"yyyy-MM-dd'T'HH:mm:ss.SSSXXX" 
"yyyy-MM-dd'T'HH:mm:ss.SSSZ" 
"yyyy.MM.dd G HH:mm:ss z" 
"yyyyy.MMMMM.dd GGG hh:mm aaa"

For more information, see the documentation for the Java class SimpleDateFormat.

The output type of a source using FreeFormTextParser is WAEvent.

Sample application:

CREATE SOURCE fftpSource USING FileReader (
        directory:'Samples/',
        WildCard:'catalina*.log',
        charset:'UTF-8',
        positionByEOF:false
)
PARSE USING FreeFormTextParser (
    -- Timestamp format in log is  "Aug 21, 2014 8:33:56 AM"
        TimeStamp:'%mon %d, %yyyy %H:%M:%S %p',
        RecordBegin:'%mon %d, %yyyy %H:%M:%S %p',
        regex:'(SEVERE:.*|WARNING:.*)'
)
OUTPUT TO fftpInStream;
CREATE TYPE fftpOutType (
        msg String,
        origTs long
);
CREATE STREAM fftpOutStream OF fftpOutType;
CREATE CQ fftpOutCQ
INSERT INTO fftpOutStream
SELECT data[0],
  TO_LONG(META(x,'OriginTimestamp'))
FROM fftpInStream x;
CREATE TARGET fftpTarget
USING SysOut(name:fftpInfo)
INPUT FROM fftpOutStream;

The RecordBegin value %mon %d, %yyyy %H:%M:%S %p defines the beginning of an event as a timestamp like the one at the beginning of the sample shown below. This is also the event timestamp, as defined by the TimeStamp value.

The regular expression (SEVERE:.*|WARNING:.*) looks in each event for a string starting with SEVERE or WARNING. If one is found, the parser returns everything until the next linefeed. If an event does not include SEVERE or WARNING, it is omitted from the output.

The following is the beginning of one of the potentially very long log messages this application is designed to process:

Aug 22, 2014 11:17:19 AM org.apache.solr.common.SolrException log
SEVERE: org.apache.solr.common.SolrException: org.apache.solr.search.SyntaxError: 
Cannot parse '((suggest_title:(04) AND suggest_title:(gmc) AND suggest_title: 
AND suggest_title:(6.6l) AND suggest_title:(lb7) AND suggest_title:(p1094,p0234)) 
AND NOT (deleted:(true)))': Encountered " <AND> "AND "" at line 1, column 64.
Was expecting one of:
    <BAREOPER> ...
    "(" ...
    "*" ...
    <QUOTED> ...
    <TERM> ...
    <PREFIXTERM> ...
    <WILDTERM> ...
    <REGEXPTERM> ...
    "[" ...
    "{" ...
    <LPARAMS> ...
    <NUMBER> ...
    
    at org.apache.solr.handler.component.QueryComponent.prepare(QueryComponent.java:147)
    at org.apache.solr.handler.component.SearchHandler.handleRequestBody(SearchHandler.java:187)
    at org.apache.solr.handler.RequestHandlerBase.handleRequest(RequestHandlerBase.java:135) ...

The parser discards everything except the line beginning with SEVERE, and the TO_LONG function in the CQ converts the log entry's timestamp to the format required by Striim:

fftpInfo: fftpOutType_1_0{
  msg: "SEVERE: org.apache.solr.common.SolrException: org.apache.solr.search.SyntaxError:
Cannot parse '((suggest_title:(04) AND suggest_title:(gmc) AND suggest_title: AND 
suggest_title:(6.6l) AND suggest_title:(lb7) AND suggest_title:(p1094,p0234)) AND NOT
(deleted:(true)))': Encountered \" <AND> \"AND \"\" at line 1, column 64."
  origTs: 1408731439000
};

GG (GoldenGate) Trail Parser

See Oracle GoldenGate.

JSON Parser

Parses JSON data. See Supported reader-parser combinations for compatible readers.

property

type

default value

comments

Event Type

String

the Striim data type to be used (leave blank to parse manually)

Field Name

String

if the JSON includes fields, specify the one containing the events defined by eventType (see example below)

The output type of a source using JSONParser is JSONNodeEvent.

For example, assume that the JSON being parsed has the following format:

{
    "ConnectedToWifi": false,
    "IpAddress": "",
    "WifiSsid": "",
    "device": "LGE Nexus 5",
    "OsVersion": "21",
    "platfrom": "Android",
    "scanresult": {
        "timestamp1": "1424434411",
        "rssi": -90
    },
    "serviceUUID": "8AA10000-0A46-115F-D94E-5A966A3DDBB7",
    "majorId": 15,
    "minorId": 562
}

The following code would extract the timestamp1 and rssi properties from the scanresult field:

CREATE TYPE ScanResultType (
  timestamp1 String,
  rssi String
);
CREATE STREAM ScanResultStream OF ScanResultType;

CREATE  SOURCE JSONSource USING FileReader (
  directory: 'Samples',
  WildCard: 'sample.json',
  positionByEOF: false
)
PARSE USING JSONParser (
  eventType: 'ScanResultType',
  fieldName: 'scanresult'
)
OUTPUT TO ScanResultStream;

In the UI, when creating a source that uses the JSONReader parser, create the output stream first, specifying a data type corresponding to the fields in the JSON data, then enter the name of that data type as the value for the eventType property in the source.

If the data you need is spread among two or more JSON fields, you can parse the file manually by leaving eventType blank. For example, assume your JSON had this format:

{
    "venue": {
        "lat": 41.773228,
        "lon": -88.149109,
        "venue_id": 23419382,
        "venue_name": "Naperville Theater"
    },
    "event": {
        "event_id": "418361985",
        "event_name": "Naperville Film Festival"
    },
    "group": {
        "group_city": "Naperville",
        "group_state": "IL",
        "group_country": "us",
        "group_id": 8625752,
        "group_name": "NFF"
    }
  }

The following would get properties from all three fields:

CREATE SOURCE RawMeetupJSON USING FileReader (
  directory:'./Samples/meetup',
  wildcard:'one_event_pretty.json',
  positionByEOF:false
) 
PARSE USING JSONParser (
  eventType:''
)
OUTPUT TO RawJSONStream;
 
CREATE TYPE MeetupJSONType (
  venue_id string KEY,
  group_name string,
  event_name string,
  venue_name string,
  group_city string,
  group_country string,
  lat double,
  lon double
);
CREATE STREAM ParsedJSONStream of MeetupJSONType;
 
CREATE CQ ParseJSON
INSERT INTO ParsedJSONStream
SELECT
  data.get('venue').get('venue_id').textValue(),
  data.get('group').get('group_name').textValue(),
  data.get('event').get('event_name').textValue(),
  data.get('venue').get('venue_name').textValue(),
  data.get('group').get('group_city').textValue(),
  data.get('group').get('group_country').textValue(),
  data.get('venue').get('lat').doubleValue(),
  data.get('venue').get('lon').doubleValue()
FROM RawJSONStream 
WHERE data IS NOT NULL;
CREATE TARGET MeetupJSONOut USING SysOut(name:meetup) INPUT FROM ParsedJSONStream;

The output for the JSON shown above would be as follows:

meetup: MeetupJSONType_1_0{
  venue_id: null
  group_name: "OpenHack Naperville"
  event_name: "February Hack Night!!"
  venue_name: "Twocanoes Software office (UPSTAIRS above Costellos Jewelry)"
  group_city: "Naperville"
  group_country: "us"
  lat: 41.773228
  lon: -88.149109
};

NetFlow Parser

This adapter supports NetFlow v5 and v9 and requires UDP Reader. Its single property is version, which has a default value of all. With this setting, the adapter will automatically detect the packet type and parse it accordingly.

The output type of a source using NetflowParser is WAEvent.

Example:

CREATE SOURCE NetflowV5Source USING UDPReader (
  IPAddress:'192.0.2.0',
  portno:'9915'
)
PARSE USING NetflowParser()
OUTPUT TO NetflowV5Stream;

The following application counts Type Of Service (TOS) in a NetFlow v9 export packet. The assumption is that a Cisco router has a NetFlow process running that is configured to monitor the type of service, which has three key fields, input interface, output interface, and TOS, plus two non-key fields, in_bytes and in_pkts. The collector address in the NetFlow process is the IP address and port of the Striim server running the application.

CREATE SOURCE NetflowV9Source USING UDPReader (
  IPAddress:'192.0.2.0', 
  portno:'9915'
)
PARSE USING NetflowParser ()
OUTPUT TO NetflowV9Stream;

CREATE TYPE NetflowTOS_Type (
  protocol string,
  source_ip string,
  dest_ip  string,
  input_interface integer,
  output_interface  integer,
  src_tos string,
  in_pkts integer,
  in_bytes integer
);

CREATE TYPE TOS_Type (
  source_ip string,
  dest_ip  string,
  input_interface integer,
  src_tos string,
  type_of_service String,
  count integer
);

CREATE STREAM NetflowTOSMonitorStream of NetflowTOS_Type;

CREATE JUMPING WINDOW NetflowTOSWindow
OVER NetflowTOSMonitorStream KEEP 10 ROWS
PARTITION BY src_tos;

CREATE STREAM TOSCountStream of TOS_Type;

CREATE CQ NetflowTOSMonitorCQ
INSERT INTO NetflowTOSMonitorStream
SELECT VALUE(x,'PROTOCOL').toString(),
  VALUE(x,'IPV4_SRC_ADDR'), 
  VALUE(x,'IPV4_DST_ADDR'),
  VALUE(x,'INPUT_SNMP'),
  VALUE(x,'OUTPUT_SNMP'),
  VALUE(x,'SRC_TOS').toString(),
  VALUE(x,'IN_PKTS'),
  VALUE(x,'IN_BYTES')
FROM NetflowV9Stream x
WHERE META(x,"RecordType").toString() = "Data";

CREATE CQ NetflowTOSCountCQ
INSERT INTO TOSCountStream
SELECT x.source_ip, x.dest_ip, x.input_interface, x.src_tos.toString(), 
CASE WHEN x.src_tos = '0' THEN "Routine"
  WHEN x.src_tos = '1' THEN "Priority"
  WHEN x.src_tos = '2' THEN "Immediate"
  WHEN x.src_tos = '3' THEN "Flash"
  WHEN x.src_tos = '4' THEN "Flash Override"
  WHEN x.src_tos = '5' THEN "CRITIC/ECP"
  WHEN x.src_tos = '6' THEN "Internetwork Control"
  WHEN x.src_tos = '7' THEN "Network Control"
  ELSE "Unsupported Type" END,
COUNT(x.src_tos.toString())
FROM NetflowTOSWindow x
GROUP BY src_tos.toString();

CREATE TARGET NetflowV9StreamDump 
USING SysOut(name:NetflowV9) 
INPUT FROM NetflowTOSMonitorStream;

CREATE TARGET OperationLog USING LogWriter(
  name:NetflowTOSMonitor,
  filename:'NetflowTOSMonitor.log'
)
INPUT FROM TOSCountStream;

NVP (name-value pair) Parser

Parses name-value pairs. See Supported reader-parser combinations for compatible readers.

property

type

default value

notes

Block as Complete Record

Boolean

False

Charset

String

UTF-8

Pair Delimiter

String

default value is one space (UTF-8 0x20)

Quote Set

String

"

Row Delimiter

String

\n

Trim Quote

Boolean

True

Value Delimiter

String

=

The output type of a source using NVPParser is WAEvent.

Output from a source using this parser can be selected using VALUE(x,"<name>"). For example, if given the following input event:

2014-08-22T11:51:52.920281+03:00 10.184.2.46 date=2014-08-22 time=11:51:52 
devname=fw000a08 devid=FGT118 logid=0000000015 type=traffic subtype=forward level=notice 
vd=fbb-dmz srcip=10.46.227.81 srcport=29200 srcintf="Int-Channel1" dstip=195.39.224.106 
dstport=443 dstintf="Mango" sessionid=102719642 status=start policyid=265 
dstcountry="Japan" srccountry="Japan" trandisp=dnat tranip=10.1.1.1 tranport=443 
service=HTTPS proto=6 duration=0 sentbyte=0 rcvdbyte=0

the following code:

CREATE SOURCE NVPSource USING FileReader (
  directory:'Samples',
  WildCard:'NVPTestData.txt',
  positionByEOF:false)
PARSE USING NVPParser ()
OUTPUT TO NvpStream;

CREATE TYPE nvptype (
  ipaddress String,
  deviceName String,
  status String,
  policyid int);
CREATE STREAM nvptypedstream OF nvptype;

CREATE CQ typeconversion
  INSERT INTO nvptypedstream
  SELECT VALUE(x,"column1"), VALUE(x,"devid"),VALUE(x,"status"),TO_INT(VALUE(x,"policyid")) 
  FROM nvpStream x;

CREATE TARGET t USING SysOut(name:NVPtest) INPUT FROM NvptypedStream;

will produce the following output:

NVPtest: nvptype_1_0{
  ipaddress: "10.184.2.46"
  deviceName: "FGT118"
  status: "start"
  policyid: 265
}; 

Note that fields 0 and 1 in the input event are a timestamp and an IP address rather than key-value pairs. The IP address is selected using value(x,"column1"). This syntax can be used only for fields at the beginning of the event, before the first key-value pair.

SNMP Parser

This parser requires the Oracle JDK (see System requirements).

After configuring one or more remote hosts to collect and transmit system information as detailed in SNMP configuration, create a UDPReader and select this parser to use the data in an application.

This parser's single property is alias (string), which can be used to specify a file containing human-readable aliases for SNMP OIDs. For example:

1.3.6.1.4.1.2021.10.1.5.2=laLoadInt.2
1.3.6.1.2.1.88.2.1.4=mteHotOID
1.3.6.1.4.1.2021.10.1.100.1=laErrorFlag.1
1.3.6.1.4.1.2021.10.1.100.2=laErrorFlag.2
1.3.6.1.2.1.1.5=sysName
1.3.6.1.2.1.88.2.1.1=mteHotTrigger
1.3.6.1.4.1.2021.10.1.100.3=laErrorFlag.3
1.3.6.1.4.1.2021.10.1.5.1=laLoadInt.1
1.3.6.1.4.1.2021.10.1.5.3=laLoadInt.3
1.3.6.1.4.1.311.1.13.1.9999.25.0=HOST
1.3.6.1.4.1.311.1.13.1.9999.12.0=MACHINENAME
1.3.6.1.4.1.311.1.13.1.9999.11.0=USERID
1.3.6.1.4.1.311.1.13.1.9999.5.0=EVENTCODE
1.3.6.1.4.1.311.1.13.1.9999.15.0=STATUS
1.3.6.1.4.1.311.1.13.1.9999.13.0=SUBSTATUS
1.3.6.1.2.1.88.2.1.2.0=mteHotTargetName.0
1.3.6.1.2.1.88.2.1.3.0=mteHotContextName.0
1.3.6.1.2.1.88.2.1.5.0=mteHotValue.0 
1.3.6.1.2.1.25.1.2=hrSystemDate
1.3.6.1.2.1.2.2.1.5=ifSpeed
1.3.6.1.2.1.31.1.1.1.15=ifHighSpeed
1.3.6.1.2.1.2.2.1.6=ifPhysAddress
1.3.6.1.2.1.2.2.1.10=ifInOctets 
1.3.6.1.2.1.2.2.1.11=ifInUcastPkts 
1.3.6.1.2.1.2.2.1.16=ifOutOctets
1.3.6.1.2.1.2.2.1.17=ifOutUcastPkts 
1.3.6.1.2.1.2.2.1.19=ifOutDiscards 
1.3.6.1.2.1.2.2.1.20=ifOutErrors 
1.3.6.1.2.1.2.2.1.8=ifOperStatus
1.3.6.1.2.1.2.2.1.6=ifPhysAddress
1.3.6.1.2.1.1.3=sysUpTime

This sample application listens for the data sent using the sample settings from SNMP configuration.

CREATE APPLICATION snmpnt;
CREATE SOURCE SNMPSource USING UDPReader (
  ipaddress:'0.0.0.0',
  PortNo:'15021'
)
PARSE USING SNMPParser (
  alias: 'Samples/snmpalias.txt'
)
OUTPUT TO SNMPStream;

CREATE TYPE networkType(
  mteHotTrigger String,
  mteHotValue Integer, 
  mteHotOID String,
  Sysname String,
  MacAddr String,
  Uptime Integer,
  ifSpeed2 Integer, 
  ifHighSpeed Integer,
  ifInOctets2 Integer, 
  ifInUcastPkts Integer, 
  ifOutOctets2 Integer, 
  ifOutUcastPkts2 Integer,
  ifOutDiscards2 Integer,
  ifOutOfErrors2 Integer,
  ifOperStatus2 Integer);
CREATE STREAM NetworkStream OF networkType;
CREATE CQ ntcq
INSERT INTO NetworkStream
SELECT
  TO_STRING(VALUE(x,'mteHotTrigger')),
  TO_INT(VALUE(x,'mteHotValue')),
  TO_STRING(VALUE(x,'mteHotOID')),
  TO_STRING(VALUE(x,'sysName')),
  TO_MACID(VALUE(x, 'ifPhysAddress'),'-'),
  TO_INT(VALUE(x, 'sysUpTime')),
  TO_INT(VALUE(x,'ifSpeed')), 
  TO_INT(VALUE(x,'ifHighSpeed')), 
  TO_INT(VALUE(x,'ifInOctets')), 
  TO_INT(VALUE(x,'ifInUcastPkts')), 
  TO_INT(VALUE(x,'ifOutOctets')),
  TO_INT(VALUE(x,'ifOutUcastPkts')), 
  TO_INT(VALUE(x,'ifOutDiscards')), 
  TO_INT(VALUE(x,'ifOutErrors')), 
  TO_INT(VALUE(x,'ifOperStatus'))
FROM SNMPStream x;

CREATE TARGET SnmpNetWorkInterface
USING SysOut(name:SNMPNT)
INPUT FROM NetworkStream;

END APPLICATION snmpnt;

The output looks like this:

SNMPNT: networkType_1_0{
  mteHotTrigger: "Interface Details"
  mteHotValue: null
  mteHotOID: "1.3.6.1.2.1.2.2.1.16.2"
  Sysname: "centos57"
  MacAddr: "08-00-27-AE-36-99"
  Uptime: 18006
  ifSpeed2: 1000000000
  ifHighSpeed: 1000
  ifInOctets2: 613514937
  ifInUcastPkts: 658628
  ifOutOctets2: 35572407
  ifOutUcastPkts2: 272710
  ifOutDiscards2: 0
  ifOutOfErrors2: 0
  ifOperStatus2: 1
};

Striim Parser

Use with KafkaReader when reading from a Kafka stream. See Reading a Kafka stream with KafkaReader.

XML Parser

Parses XML. See Supported reader-parser combinations for compatible readers.

property

type

default value

notes

Column List

String

if left blank, all key-value pairs will be returned as part of the data array

Root Node

String

Separator

String

optional

The output type of a source using XMLParser is WAEvent.

Example:

... PARSE USING XMLParser(
  rootnode:'/log4j:event',
  columnlist:'log4j:event/@timestamp,
    log4j:event/@level,
    log4j:event/log4j:message,
    log4j:event/log4j:throwable,
    log4j:event/log4j:locationInfo/@class,
    log4j:event/log4j:locationInfo/@method,
    log4j:event/log4j:locationInfo/@file,
    log4j:event/log4j:locationInfo/@line'
)...

See the discussion of Log4JSource in MultiLogApp for a detailed explanation.

XML Parser V2

Parses XML. See Supported reader-parser combinations for compatible readers.

XMLParserV2 has only a single property, Root Node. Its output format is XMLNodeEvent, which a java.util.Map containing event metadata followed by an org.dom4j.Element containing the data. For example, this is an event from the first sample application's XMLRawStream:

<?xml version="1.0" encoding="UTF-8"?>
<PurchaseOrder>
<event>
<event-metadata SOURCE NAME="PurchaseOrders1.xml" LINE NUMBER="5" CHARACTER OFFSET="209"
FileName="PurchaseOrders1.xml" FileOffset="0" SEQUENCENO="1" COLUMN NUMBER="1"/>
<event-data>
<PurchaseOrder PurchaseOrderNumber="1">
<CustomerName>Jon snow</CustomerName>
<CustomerAddress>Palo Alto</CustomerAddress>
<DeliveryNotes>Please leave packages in shed by driveway.</DeliveryNotes>
</PurchaseOrder>
</event-data>
</event>

For debugging purposes, you may write unparsed XMLNodeEvent to FileWriter and SysOut.

Sample application 1

This example simply converts XML input into JSON.

Save the following as Striim/Samples/PurchaseOrders1.xml:

<PurchaseOrder PurchaseOrderNumber="1">
    <CustomerName>Jon snow</CustomerName>
    <CustomerAddress>Palo Alto</CustomerAddress>
    <DeliveryNotes>Please leave packages in shed by driveway.</DeliveryNotes>
</PurchaseOrder>
<PurchaseOrder PurchaseOrderNumber="2">
    <CustomerName>Tyrion</CustomerName>
    <CustomerAddress>Seattle</CustomerAddress>
    <DeliveryNotes>Preferred time : post 4 PM</DeliveryNotes>
</PurchaseOrder>

Then run the following application:

CREATE APPLICATION XMLParserV2Test1;

CREATE SOURCE XMLSource USING FileReader (
  directory:'Samples',
  wildcard:'PurchaseOrders1.xml',
  positionByEOF:false
) 
PARSE USING XMLParserV2(
  rootnode:'PurchaseOrder')
OUTPUT TO XMLRAwStream;

CREATE CQ XmlCQ
INSERT INTO XmlParsedStream
SELECT
  data.attributeValue("PurchaseOrderNumber") as PONumber,
  data.element("CustomerName").getText() as CustomerName,
  data.element("CustomerAddress").getText() as CustomerAddress,
  data.element("DeliveryNotes").getText() as DeliveryNotes
FROM XMLRawStream;

CREATE TARGET XMLParsedOut USING FileWriter(
  filename:'parsed.json',
  directory: 'XMLParserV2Test1')
FORMAT USING JSONFormatter ()
INPUT FROM XmlParsedStream;

END APPLICATION XMLParserV2Test1;

Striim/XMLParserV2Test1/parsed.00.txt should contain the following:

[
 {
  "PONumber":"1",
  "CustomerName":"Jon snow",
  "CustomerAddress":"Palo Alto",
  "DeliveryNotes":"Please leave packages in shed by driveway."
 },
 {
  "PONumber":"2",
  "CustomerName":"Tyrion",
  "CustomerAddress":"Seattle",
  "DeliveryNotes":"Preferred time : post 4 PM"
 }
]
Sample application 2

This example iterates through child elements (line items in a purchase order).

Save the following as Striim/Samples/PurchaseOrders2.xml:

<PurchaseOrder PurchaseOrderNumber="1">
    <Details>
        <CustomerName>Jon snow</CustomerName>
        <CustomerAddress>Palo Alto</CustomerAddress>
        <DeliveryNotes>Please leave packages in shed by driveway.</DeliveryNotes>
    </Details>
    <Items>
        <Item ItemNumber="1">
            <ProductName>EarPhones</ProductName>
            <USPrice>148.95</USPrice>
        </Item>
        <Item ItemNumber="2">
            <ProductName>Mouse</ProductName>
            <USPrice>39.98</USPrice>
        </Item>
    </Items>
</PurchaseOrder>
<PurchaseOrder PurchaseOrderNumber="2">
    <Details>
        <CustomerName>Tyrion</CustomerName>
        <CustomerAddress>Seattle</CustomerAddress>
        <DeliveryNotes>Preffered time : post 4 PM</DeliveryNotes>
    </Details>
    <Items>
        <Item ItemNumber="1">
            <ProductName>Monitor</ProductName>
            <USPrice>148.95</USPrice>
        </Item>
        <Item ItemNumber="2">
            <ProductName>Keyboard</ProductName>
            <USPrice>39.98</USPrice>
        </Item>
    </Items>
</PurchaseOrder>

Then run the following application:

CREATE APPLICATION XMLParserV2Test2;

CREATE SOURCE XMLSource USING FileReader (
  directory:'Samples',
  wildcard:'PurchaseOrders2.xml',
  positionByEOF:false
) 
PARSE USING XMLParserV2(
  rootnode:'PurchaseOrder' )
OUTPUT TO XMLRAwStream;

CREATE TARGET RawXMLFileOut USING FileWriter(
  filename:'raw.txt',
  directory: 'XMLParserV2Test2')
FORMAT USING XMLFormatter(
  rootelement:'PurchaseOrder')
INPUT FROM XmlRawStream;

CREATE CQ IntermediateTransformation 
INSERT INTO IntermediateStream 
SELECT 
  data.attributeValue("PurchaseOrderNumber") as PONumber,
  data.element("Details") PODetails,
  data.element("Items").elements("Item") itemlist
FROM XMLRawStream;

-- iterates over the items in PO and appends common PO details to each item
CREATE CQ XmlCQ
INSERT INTO XmlParsedStream
SELECT 
  PO.PONumber as PONumber, 
  PO.PODetails.element("CustomerName").getText() as CustomerName,
  PO.PODetails.element("CustomerAddress").getText() as CustomerAddress,
  PO.PODetails.element("DeliveryNotes").getText() as DeliveryNotes, 
  item.attributeValue("ItemNumber") as ItemNumber,
  item.element("ProductName").getText() as ProductName,
  item.element("USPrice").getText() as USPrice
FROM IntermediateStream PO, iterator(PO.itemlist, org.dom4j.Element) item;

CREATE TARGET XMLParsedOut using FileWriter(
  filename:'parsed.json',
  directory: 'XMLParserV2Test2')
FORMAT USING JSONFormatter ()
INPUT FROM XmlParsedStream;

END APPLICATION XMLParserV2Test2;

Striim/XMLParserV2Test2/parsed.00.txt should contain the following:

[
 {
  "PONumber":"1",
  "CustomerName":"Jon snow",
  "CustomerAddress":"Palo Alto",
  "DeliveryNotes":"Please leave packages in shed by driveway.",
  "ItemNumber":"1",
  "ProductName":"EarPhones",
  "USPrice":"148.95"
 },
 {
  "PONumber":"1",
  "CustomerName":"Jon snow",
  "CustomerAddress":"Palo Alto",
  "DeliveryNotes":"Please leave packages in shed by driveway.",
  "ItemNumber":"2",
  "ProductName":"Mouse",
  "USPrice":"39.98"
 },
 {
  "PONumber":"2",
  "CustomerName":"Tyrion",
  "CustomerAddress":"Seattle",
  "DeliveryNotes":"Preffered time : post 4 PM",
  "ItemNumber":"1",
  "ProductName":"Monitor",
  "USPrice":"148.95"
 },
 {
  "PONumber":"2",
  "CustomerName":"Tyrion",
  "CustomerAddress":"Seattle",
  "DeliveryNotes":"Preffered time : post 4 PM",
  "ItemNumber":"2",
  "ProductName":"Keyboard",
  "USPrice":"39.98"
 }
]
3.10.1
Was this article helpful?
0 out of 0 found this helpful

Comments