Striim

Welcome to the Striim Help Center and Community Site

3.10.1 Formatters

Follow

When a writer supports multiple output file formats, the format is specified by selecting the appropriate parser. For example, a target using the FileWriter adapter can format its output as DSV, JSON, or XML depending on whether the DSV Formatter, JSONFormatter, or XMLFormatter is selected. See Supported writer-formatter combinations for more information.

AVRO Formatter

Formats a writer's output for use by Apache Avro and generates an Avro schema file. See Supported writer-formatter combinations.

property

type

default value

notes

Format As

String

default

Do not change default value unless Using the Confluent or Hortonworks schema registry.

Schema File Name

String

A string specifying the path and name of the Avro schema file Striim will create based on the type of the target's input stream. (Be sure Striim has write permission for the specified directory.) If no path is specified, the file will be created in the Striim program directory. To generate the schema file, deploy the application. Then compile the schema file as directed in the Avro documentation for use in your application.

Schema Registry URL

String

Leave blank unless Using the Confluent or Hortonworks schema registry.

The following sample application filters and parses part of PosApp's data and writes it to a file using AvroFormatter:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM CsvStream;

CREATE TARGET AvroFileOut USING FileWriter(
  filename:'AvroTestOutput'
)
FORMAT USING AvroFormatter (
  schemaFileName:'AvroTestParsed.avsc'
)
INPUT FROM PosDataStream;

If you deploy the above application in the namespace avrons, AvroTestParsed.avsc is created with the following contents:

{"namespace": "PosDataStream_Type.avrons",
  "type" : "record",
  "name": "Typed_Record",
  "fields": [
{"name" : "merchantId", "type" : "string"},
{"name" : "dateTime", "type" : "string"},
{"name" : "hourValue", "type" : "int"},
{"name" : "amount", "type" : "double"},
{"name" : "zip", "type" : "string"}
 ]
}

The following application simply writes the raw data in WAEvent format:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;

CREATE TARGET AvroFileOut USING FileWriter(
  filename:'AvroTestOutput'
)
FORMAT USING AvroFormatter (
  schemaFileName:'AvroTestRaw.avsc'
)
INPUT FROM CsvStream;

If you deploy the above application in the namespace avrons, AvroTestRaw.avsc is created with the following contents:

{"namespace": "WAEvent.avrons",
  "type" : "record",
  "name": "WAEvent_Record",
  "fields": [
    {"name" : "data",
    "type" : ["null" , { "type": "map","values":[ "null" , "string"] }]
     },
    {"name" : "before",
    "type" : ["null" , { "type": "map","values":[ "null" , "string"] }]
    },
    {"name" : "metadata",
     "type" : { "type": "map","values":"string" }
    }
  ]
}

See Parsing the data field of WAEvent and Using the META() function for information about this format.

DSV Formatter

Formats a writer's output as delimited text.

property

type

default value

notes

Charset

String

Column Delimiter

String

,

Header

Boolean

False

Set to True to add a header to the output files for FileWriter or S3Writer.

When the target's input stream is of a user-defined type, the header will include the field names.

When the target's input stream is the output of a DatabaseReader or CDC reader source, the header will include the source table's column names. In this case, the writer's properties must include directory: '%@metadata(TableName)%' or, in S3, bucketname: '%@metadata(TableName)%' or foldername: '%@metadata(TableName)%', and events for each table will be written to a separate directory or S3 bucket.

Known issue DEV-14229: in this release, headers may be incorrect if the writer's rolloverpolicy includes an interval. Workaround: use only eventcount and/or filesize in the rolloverpolicy.

Members

String

comma-separated list of fields to be selected from the writer's input stream; if left  blank, selects all fields

One use for this property is to remove fields used only to name the output directory in the target (see Setting output names and rollover / upload policies).

Null Value

String

NULL

Quote Character

String

"

Row Delimiter

String

\n

see Setting rowdelimiter values

Standard

String 

none

set to RFC4180 to format output using the RFC 4180 standard and ignore any conflicting values in other properties

Use Quotes

Boolean

False

set to True to escape values of type String using the quotecharacter

For example, this variation on the PosApp sample application writes to a file using DSVFormatter:

CREATE SOURCE DSVFormatterTestSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO DSVSource_Stream;

CREATE CQ CsvToPosData
INSERT INTO DSVTransformed_Stream
SELECT TO_STRING(data[1]),
       TO_DATEF(data[4],'yyyyMMddHHmmss'),
       DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')),
       TO_DOUBLE(data[7]),
       TO_STRING(data[9])
FROM DSVSource_Stream;

CREATE TARGET DSVFormatterOut using FileWriter(
  filename:'DSVFormatterOutput')
FORMAT USING DSVFormatter ()
INPUT FROM DSVTransformed_Stream;

The first lines of DSVFormatterOutput are:

D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu,2013-03-12T17:32:10.000-07:00,17,2.2,41363
OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1,2013-03-12T17:32:10.000-07:00,17,22.78,16950
ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx,2013-03-12T17:32:10.000-07:00,17,218.57,18224

If you set DSVFormatter to escape the strings, as follows:

FORMAT USING DSVFormatter (
  usequotes:True,
  quotecharacter:'"')

Then the first lines of DSVFormatterOutput would be:

"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",2013-03-12T17:32:10.000+01:00,17,2.2,"41363"
"OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1",2013-03-12T17:32:10.000+01:00,17,22.78,"16950"
"ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx",2013-03-12T17:32:10.000+01:00,17,218.57,"18224"

JSON Formatter

Formats a writer's output as JSON.

property

type

default value

notes

Charset

String

Events as Array of JSON Objects

Boolean

True

With the default value True, output is an array:

[
  {field1:value1,field2:value2,.....} ,
  {field1:value1,field2:value2,.....} ,
  {field1:value1,field2:value2,.....} ]

Set to False to output a collection:

{field1:value1,field2:value2,.....} 
{field1:value1,field2:value2,.....} 
{field1:value1,field2:value2,.....}

JSON Member Delimiter

String

\n

JSON Object Delimiter

String

\n

Members

String

comma-separated list of fields to be selected from the writer's input stream; if left  blank, selects all fields

One use for this property is to remove fields used only to name the output directory in the target (see Setting output names and rollover / upload policies).

For example, this variation on the PosApp sample application writes to a file using JSONFormatter:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;
  
CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM CsvStream;

CREATE TARGET JFFileOut USING FileWriter(
  filename:'JFTestOutput.json'
)
FORMAT USING JSONFormatter()
INPUT FROM PosDataStream;

The first lines of JSONFormatterOutput are:

[
 {
  "merchantId":"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",
  "dateTime":"2013-03-12T17:32:10.000-07:00",
  "amount":2.2
 },
 {
  "merchantId":"OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1",
  "dateTime":"2013-03-12T17:32:10.000-07:00",
  "amount":22.78
 },

Parquet Formatter

Formats a writer's output for use by Apache Parquet and generates one or more schema files. See Supported writer-formatter combinations.

Notes:

  • Encryption Policy can not be set for the associated writer.

  • Data written using Parquet Formatter can not be consumed until the target file is closed (rolls over).

property

type

default value

notes

Block Size

Long

128000000

Sets the parquet.block.size property in Parquet.

Compression Type

String

UNCOMPRESSED

Optionally, specify the target's compression format. Supported types are GZIP, LZO, and SNAPPY.

Format As

String

Default

With the Default setting, a single schema file is created.

  • With an input stream of a user-defined type, the output includes the name and value of each field.

  • With an input stream of type WAEvent (from any source), the output includes all contents of the event excluding dataPresenceBitMap, beforePresenceBitMap, and typeUUID.

The two other settings are supported only with an input stream of type WAEvent from a Database Reader, Incremental Batch Reader, or SQL CDC reader source. A dynamic directory, folder, or bucket name must be specified in the writer (see Setting output names and rollover / upload policies).

A schema file with a timestamp appended to its name is created in each directory, folder, or bucket. With S3 Writer, if f both the bucket name and folder name are dynamic, each combination of bucket and folder will have its own schema file. If there is a DDL change in the source, a new schema file is created and the output file(s) rolls over.

  • When Format As is set to Native, the output includes all contents of the WAEvent except for typeUUID.

  • When Format As is set to Table, the output includes only the column names and values.

See Parquet Formatter examples for sample schema files and output for each setting.

Members

String

Optionally:

  • With an input stream of type WAEvent, specify a comma-separated list of elements to include in the output.

  • With an input stream of type WAEvent from a Database Reader, Incremental Batch Reader, or SQL CDC reader source, specify additional elements (for example, from the METADATA or USERDATA maps) to include in addition to the data array values. See Parquet Formatter examples for a sample schema file and output.

Schema File Name

String

The fully qualified name of the Parquet schema file Striim will create when the application runs. When a dynamic directory is specified in the writer, Striim in some cases writes the files in the target directories and/or appends a timestamp to the file names. See the notes for Format As for more details.

Parquet Formatter data type support and correspondence

The following apply when the input stream is of a user-defined type.

Striim type

Parquet type

Notes

Byte

INT32

DateTime

INT32

Unix epoch (number of days from 1 January 1970)

Double

DOUBLE

IEEE 64-bit

Float

FLOAT

IEEE 32-bit

Integer

INT32

32-bit signed

Long

INT64

64-bit signed

Short

INT32

String

BYTE_ARRAY

UTF-8

The following apply when the input stream is the output of a Database Reader or Incremental Batch Reader source.

JDBC column type

Parquet type

Notes

Types.BIGINT

INT32

32-bit signed

Types.BIT

INT32

Types.CHAR

BYTE_ARRAY

UTF-8

Types.DATE

INT32

Unix epoch (number of days from 1 January 1970)

Types.DECIMAL

BYTE_ARRAY

UTF-8

Types.DOUBLE

DOUBLE

IEEE 64-bit

Types.FLOAT

FLOAT

IEEE 32-bit

Types.INTEGER

INT32

32-bit signed

Types.NUMERIC

BYTE_ARRAY

UTF-8

Types.REAL

FLOAT

IEEE 32-bit

Types.SMALLINT

INT32

Unix epoch (number of days from 1 January 1970)

Types.TIMESTAMP

INT32

Types.TINYINT

INT32

Types.VARCHARCHAR

BYTE_ARRAY

UTF-8

other types

BYTE_ARRAY

UTF-8

The following apply when the input stream is the output of an Oracle Reader source.

Oracle type

Parquet type

ADT

unsupported

ARRAY

unsupported

BFILE

unsupported

BINARY_DOUBLE

DOUBLE

BINARY_FLOAT

FLOAT

BFILE

unsupported

BLOB

BYTE_ARRAY

CHAR

BYTE_ARRAY

CLOB

BYTE_ARRAY

DATE

BYTE_ARRAY

FLOAT

BYTE_ARRAY

INTERVALDAYTOSECOND

unsupported

INTERVALYEARTOMONTH

unsupported

LONG

unsupported

LONG RAW

unsupported

NCHAR

BYTE_ARRAY

NCLOB

BYTE_ARRAY

NESTED TABLE

unsupported

NUMBER

BYTE_ARRAY

NVARCHAR2

unsupported

RAW

unsupported

REF

unsupported

ROWID

unsupported

TIMESTAMP

BYTE_ARRAY

TIMESTAMP WITHLOCALTIMEZONE

BYTE_ARRAY

TIMESTAMP WITHTIMEZONE

BYTE_ARRAY

UDT

unsupported

UROWID

unsupported

VARCHAR2

BYTE_ARRAY

VARRAY

unsupported

XMLTYPE

unsupported

Parquet Formatter examples

The output of Parquet Formatter varies depending on the type of the input stream and the Format As setting.

Input stream of user-defined type, Format As = Default

Input stream type:

Create Type PERSON (
  ID Integer,
  City String,
  Code String,
  Name String);

Schema:

message Person.Person {
  optional int32 ID;  
  optional binary city (UTF8);
  optional binary code (UTF8);
  optional binary name (UTF8);
}

Sample output:

{"ID":1216,"city":"South Kent","code":"USD","name":"COMPANY 4999992"}
Input stream of Oracle Reader WAEvent, Format As = Default

Source table:

CREATE TABLE TABLES1 (
  "EMPNO" NUMBER(4),
  "ENAME" VARCHAR2(10),
  "JOB" VARCHAR(20),
  "HIREDATE" TIMESTAMP(6),
  "SAL" NUMBER DEFAULT 0
);

Schema:

message WAEvent.avro.WAEvent {
  optional group metadata (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group data (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group before (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group userdata (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
}

Sample output, insert:

{
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "29"
      },
      {
        "key": "AuditSessionId",
        "value": "174320"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1215123"
      },
      {
        "key": "SQLRedoLength",
        "value": "152"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "1.20.571"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587561609000"
      },
      {
        "key": "COMMITSCN",
        "value": "1215124"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1215123"
      },
      {
        "key": "SegmentName",
        "value": "TABLES1"
      },
      {
        "key": "OperationName",
        "value": "INSERT"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-22T13:20:09.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "2808"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES1"
      },
      {
        "key": "TxnID",
        "value": "1.20.571"
      },
      {
        "key": "Serial",
        "value": "12481"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-22T13:20:09.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFSWAAEAAAApcAAB"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587561609000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "121512300000081627745086341280001"
      },
      {
        "key": "Session",
        "value": "142"
      }
    ]
  },
  "data": {
    "map": [
      {
        "key": "ENAME",
        "value": "tanuja"
      },
      {
        "key": "EMPNO",
        "value": "1"
      },
      {
        "key": "JOB",
        "value": "So"
      },
      {
        "key": "HIREDATE",
        "value": "2020-04-22T13:20:09.074-07:00"
      },
      {
        "key": "SAL",
        "value": "60000"
      }
    ]
  }

Sample output, update:

{
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "59"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1397647"
      },
      {
        "key": "SQLRedoLength",
        "value": "189"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "8.10.809"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x00003b.00013dfb.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587636418000"
      },
      {
        "key": "COMMITSCN",
        "value": "1397648"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1397647"
      },
      {
        "key": "SegmentName",
        "value": "TABLES4"
      },
      {
        "key": "OperationName",
        "value": "UPDATE"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T10:06:58.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "81403"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES4"
      },
      {
        "key": "TxnID",
        "value": "8.10.809"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T10:06:58.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFprAAEAAAAqlAAA"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587636418000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "139764700000166070289607557280000"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "data": {
    "map": [
      {
        "key": "ENAME",
        "value": "tanuja"
      },
      {
        "key": "EMPNO",
        "value": "6"
      },
      {
        "key": "JOB",
        "value": "So"
      },
      {
        "key": "HIREDATE",
        "value": "2020-04-23T10:05:03.381-07:00"
      },
      {
        "key": "SAL",
        "value": "70000"
      }
    ]
  },
  "before": {
    "map": [
      {
        "key": "ENAME",
        "value": "tanuja"
      },
      {
        "key": "EMPNO",
        "value": "6"
      },
      {
        "key": "JOB",
        "value": "So"
      },
      {
        "key": "HIREDATE",
        "value": "2020-04-23T10:05:03.381-07:00"
      },
      {
        "key": "SAL",
        "value": "60000"
      }
    ]
  }
}

Sample output, delete:

{
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "59"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1397672"
      },
      {
        "key": "SQLRedoLength",
        "value": "174"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "3.33.844"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x00003b.00013dfe.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587636481000"
      },
      {
        "key": "COMMITSCN",
        "value": "1397673"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1397672"
      },
      {
        "key": "SegmentName",
        "value": "TABLES4"
      },
      {
        "key": "OperationName",
        "value": "DELETE"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T10:08:01.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "81406"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES4"
      },
      {
        "key": "TxnID",
        "value": "3.33.844"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T10:08:01.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFprAAEAAAAqlAAA"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587636481000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "139767200000166070289609523360000"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "data": {
    "map": [
      {
        "key": "ENAME",
        "value": "tanuja"
      },
      {
        "key": "EMPNO",
        "value": "6"
      },
      {
        "key": "JOB",
        "value": "So"
      },
      {
        "key": "HIREDATE",
        "value": "2020-04-23T10:05:03.381-07:00"
      },
      {
        "key": "SAL",
        "value": "70000"
      }
    ]
  }
}
Input stream of Oracle Reader WAEvent, Format As = Native

Source table:

CREATE TABLE TABLES1 (
  "EMPNO" NUMBER(4),
  "ENAME" VARCHAR2(10),
  "JOB" VARCHAR(20),
  "HIREDATE" TIMESTAMP(6),
  "SAL" NUMBER DEFAULT 0
);

Schema:

message QATEST.TABLES3 {
  optional group data {
    optional binary EMPNO (UTF8);
    optional binary ENAME (UTF8);
    optional binary JOB (UTF8);
    optional binary HIREDATE (UTF8);
    optional binary SAL (UTF8);
  }
  optional group before {
    optional binary EMPNO (UTF8);
    optional binary ENAME (UTF8);
    optional binary JOB (UTF8);
    optional binary HIREDATE (UTF8);
    optional binary SAL (UTF8);
  }
  optional group metadata (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group userdata (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group datapresenceinfo {
    required boolean EMPNO;
    required boolean ENAME;
    required boolean JOB;
    required boolean HIREDATE;
    required boolean SAL;
  }
  optional group beforepresenceinfo {
    required boolean EMPNO;
    required boolean ENAME;
    required boolean JOB;
    required boolean HIREDATE;
    required boolean SAL;
  }
}

Sample output, insert:

{
  "data": {
    "EMPNO": "5",
    "ENAME": "tanuja",
    "JOB": "So",
    "HIREDATE": "2020-04-23T09:01:46.172-07:00",
    "SAL": "60000"
  },
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "52"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1353271"
      },
      {
        "key": "SQLRedoLength",
        "value": "152"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "2.31.778"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x000034.00006bf0.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587632506000"
      },
      {
        "key": "COMMITSCN",
        "value": "1353272"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1353271"
      },
      {
        "key": "SegmentName",
        "value": "TABLES3"
      },
      {
        "key": "OperationName",
        "value": "INSERT"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T09:01:46.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "27632"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES3"
      },
      {
        "key": "TxnID",
        "value": "2.31.778"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T09:01:46.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFpNAAEAAAAqdAAG"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587632506000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "135327100000146367005998448800006"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "datapresenceinfo": {
    "EMPNO": true,
    "ENAME": true,
    "JOB": true,
    "HIREDATE": true,
    "SAL": true
  },
  "beforepresenceinfo": {
    "EMPNO": false,
    "ENAME": false,
    "JOB": false,
    "HIREDATE": false,
    "SAL": false
  }
}

Sample output, update:

{
  "data": {
    "EMPNO": "3",
    "ENAME": "tanuja",
    "JOB": "So",
    "HIREDATE": "2020-04-23T09:53:17.459-07:00",
    "SAL": "70000"
  },
  "before": {
    "EMPNO": "3",
    "ENAME": "tanuja",
    "JOB": "So",
    "HIREDATE": "2020-04-23T09:53:17.459-07:00",
    "SAL": "60000"
  },
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "55"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1366172"
      },
      {
        "key": "SQLRedoLength",
        "value": "189"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "5.30.788"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x000037.000128c2.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587635716000"
      },
      {
        "key": "COMMITSCN",
        "value": "1366173"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1366172"
      },
      {
        "key": "SegmentName",
        "value": "TABLES4"
      },
      {
        "key": "OperationName",
        "value": "UPDATE"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T09:55:16.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "75970"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES4"
      },
      {
        "key": "TxnID",
        "value": "5.30.788"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T09:55:16.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFprAAEAAAAqlAAA"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587635716000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "136617200000154811286978560160000"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "datapresenceinfo": {
    "EMPNO": true,
    "ENAME": true,
    "JOB": true,
    "HIREDATE": true,
    "SAL": true
  },
  "beforepresenceinfo": {
    "EMPNO": true,
    "ENAME": true,
    "JOB": true,
    "HIREDATE": true,
    "SAL": true
  }
}

Sample output, delete:

{
  "data": {
    "EMPNO": "3",
    "ENAME": "tanuja",
    "JOB": "So",
    "HIREDATE": "2020-04-23T09:53:17.459-07:00",
    "SAL": "70000"
  },
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "55"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1366251"
      },
      {
        "key": "SQLRedoLength",
        "value": "174"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "7.31.697"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x000037.000128df.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587635881000"
      },
      {
        "key": "COMMITSCN",
        "value": "1366252"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1366251"
      },
      {
        "key": "SegmentName",
        "value": "TABLES4"
      },
      {
        "key": "OperationName",
        "value": "DELETE"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T09:58:01.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "75999"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES4"
      },
      {
        "key": "TxnID",
        "value": "7.31.697"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T09:58:01.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFprAAEAAAAqlAAA"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587635881000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "136625100000154811286997565600000"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "datapresenceinfo": {
    "EMPNO": true,
    "ENAME": true,
    "JOB": true,
    "HIREDATE": true,
    "SAL": true
  },
  "beforepresenceinfo": {
    "EMPNO": false,
    "ENAME": false,
    "JOB": false,
    "HIREDATE": false,
    "SAL": false
  }
}
Input stream of Oracle Reader WAEvent, Format As = Table

Source table:

CREATE TABLE TABLES1 (
  "EMPNO" NUMBER(4),
  "ENAME" VARCHAR2(10),
  "JOB" VARCHAR(20),
  "HIREDATE" TIMESTAMP(6),
  "SAL" NUMBER DEFAULT 0
);

Schema:

message QATEST.TABLES1 {
  optional binary EMPNO (UTF8);
  optional binary ENAME (UTF8);
  optional binary JOB (UTF8);
  optional binary HIREDATE (UTF8);
 optional binary SAL (UTF8);
}

Sample output, insert:

{"EMPNO":"1","ENAME":"tanuja","JOB":"So","HIREDATE":"2020-04-22T13:30:25.432-07:00","SAL":"60000"}

Sample output, update:

{"EMPNO":"5","ENAME":"tanuja","JOB":"So","HIREDATE":"2020-04-23T10:01:10.614-07:00","SAL":"70000"}

Sample output, delete:

{"EMPNO":"5","ENAME":"tanuja","JOB":"So","HIREDATE":"2020-04-23T10:01:10.614-07:00","SAL":"70000"}
Input stream of Database Reader WAEvent, Format As = Table, using Members

Source table:

CREATE TABLE ORACLETOPARQUET1 (
  ID INTEGER,
  NAME VARCHAR(30),
  COST BINARY_FLOAT,
  CREATED_TS TIMESTAMP WITH TIME ZONE,
  LARGE_DATA CLOB
);

Members property value:

Table=@metadata(TableName),OpName=@metadata(OperationName)

Schema:

message QATEST.ORACLETOPARQUET1 {
  optional binary ID (UTF8);
  optional binary NAME (UTF8);
  optional float COST;
  optional binary CREATED_TS (UTF8);
  optional binary LARGE_DATA (UTF8);
  optional binary Table (UTF8);
  optional binary OpName (UTF8);
}

Sample output:

{"ID":"79","NAME":"abc","COST":39.5,"CREATED_TS":"2020-05-18T05:54:30.000Z",
"LARGE_DATA":"This is a character literal 1","Table":"QATEST.ORACLETOPARQUET1",
"OpName":"INSERT"}

XML Formatter

Formats a writer's output as XML.

property

type

default value

notes

Charset

String

Element Tuple

String

defines the XML output

Root Element

String

Row Delimiter

String

\n

see Setting rowdelimiter values

For example, this variation on the PosApp sample application writes to a file using XMLFormatter:

CREATE source XMLFormatterTestSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO XMLSource_Stream;

CREATE CQ CsvToPosData
INSERT INTO XMLTransformed_Stream
SELECT TO_STRING(data[0]),
  TO_STRING(data[1]),
  TO_DATEF(data[4],'yyyyMMddHHmmss'),
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')),
  TO_DOUBLE(data[7]),
  TO_STRING(data[9])
FROM XMLSource_Stream;

CREATE TARGET XMLFormatterOut using FileWriter(
  filename:'XMLFormatterOutput')
FORMAT USING XMLFormatter (
  rootelement:'document',
  elementtuple:'MerchantName:merchantid:text=merchantname'
)
INPUT FROM XMLTransformed_Stream; 

The first lines of XMLFormatterOutput are:

<?xml version="1.0" encoding="UTF-8"?>
<document>
 <MerchantName merchantid="D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu" > COMPANY 1 </MerchantName>
 <MerchantName merchantid="OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1" > COMPANY 2 </MerchantName>
 <MerchantName merchantid="ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx" > COMPANY 3 </MerchantName>
3.10.1
Was this article helpful?
0 out of 0 found this helpful

Comments