Striim

Welcome to the Striim Help Center and Community Site

3.10.3 MongoDB

Follow

Striim supports MongoDB versions 2.6 through 4.0.

Striim provides a template for creating applications that read from MongoDB and write to Cosmos DB. See Creating a new application using a template for details.

MongoDB setup

MongoDBReader reads data from the Replica Set Oplog, so to use it you must be running a replica set (see Deploy a Replica Set or Convert a Standalone to a Replica Set). For more information, see Replication and Replica Set Data Synchronization.

In InitialLoad mode, the user specified in MongoDBReader's Username property must have read access to all databases containing the specified collections. In Incremental mode, the user must have read access to the local database and the oplog.rs collection. The oplog is a capped collection, which means that the oldest data is automatically removed to keep it within the specified size. To support recovery, the oplog must be large enough to retain all data that may need to be recovered. See Oplog Size and Change Oplog Size for more information.

To support recovery (see Recovering applications), the replica set's oplog must be large enough to retain all the events generated while Striim is offline.

MongoDBReader properties

The MongoDB driver is bundled with Striim, so no installation is necessary.

The adapter properties are:

property

type

default value

notes

authDB

String

admin

Specify the authentication database for the specified username. If not specified, uses the admin database.  When the specified authType uses an external authentication server, this setting is ignored.

authType

enum

Default

Specify the authentication mechanism used by your MongoDB instance. The default setting uses MongoDB's default authentication mechanism, SCRAM. Other supported values are MONGODBCR and SCRAMSHA1. Set to NoAuth if authentication is not enabled. 

GSSAPI (Kerberos), MONGODBX509, and PLAIN (LDAP) are valid values for this property, but those authentication mechanisms have not been tested so are not supported.

Collections

String

The fully-qualified name(s) of the MongoDB collection(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas.

You may use the $ wildcard, for example, mydb.$ or mydb.a$. Note that data will be read only from collections that exist when the Striim application starts, additional collections added later will be ignored until the application is restarted.

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

Specify one or more instances in the replica set, for example, 192.168.1.10:27107,192.168.1.11:27107. If you specify more than one instance, one should be the primary.

When reading from a sharded cluster:

  • when the mode is InitialLoad, specify a mongos instance.

  • when the mode is Incremental, create a separate MongoDB Reader source for each shard.

Exclude Collections

String

Any collections to be excluded from the set specified in the Collections property. Specify as for the Collections property.

Mode

String

InitialLoad

With the default setting, will load all existing data using db.collection.find()and stop. In this mode, MongoDBReader is not a CDC reader.

Set to Incremental to read CDC data continuously from the oplog.

Password

encrypted password

The password for the specified Username.

Read Preference

String

primaryPreferred

See Read Preference Modes. Supported values are primary, primaryPreferred, secondary, secondaryPreferred, and nearest.

SSL Enabled

Boolean

False

If your MongoDB instance is using SSL (see Configure mongod and mongos for TLS/SSL), set to True.

Start Timestamp

String

Leave blank to read only new data. Specify a UTC DateTime value (for example, 2018-07-18T04:56:10) to read all data from that time forward or to wait to start reading until a time in the future. If the MongoDB and Striim servers are in different time zones, adjust the value to match the Striim time zone. If the oplog no longer contains data back to the specified time, reading will start from the beginning of the oplog.

If milliseconds are specified (for example, 2017-07-18T04:56:10.999), they will be interpreted as the incrementing ordinal for the MongoDB timestamp (see Timestamps).

Username

String

A MongoDB user with access as described in MongoDB setup.

MongoDBReader JSONNodeEvent fields

The output type for MongoDBReader is JSONNodeEvent. The fields are:

data: contains the field names and values of a document, for example:

data: {"_id":2441,"company":"Striim","city":"Palo Alto"}

Updates include only the modified values. Deletes include only the document ID.

metadata: contains the following elements:

  • CollectionName: the collection from which the document was read

  • OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT, UPDATE, or DELETE

  • DatabaseName: the database of the collection

  • DocumentKey: the document ID (same as the _id value in data)

  • Namespace<database name>.<collection name>

  • Timestamp: in InitialLoad mode, the current time of the Striim server when the document was read; in Incremental mode, the timestamp from the oplog

For example:

metadata: {"CollectionName":"employee","OperationName":"SELECT","DatabaseName":"test",
  "DocumentKey":1.0,"NameSpace":"test.employee","TimeStamp":1537433999609}

MongoDBReader example application and output

The following Striim application will write change data for the specified collection to SysOut. To run this yourself, replace striim and ****** with the user name and password for the MongoDB user account discussed in MongoDB setup, specify the correct connection URL for your instance, and replace mydb with the name of your database.

CREATE APPLICATION MongoDBTest;

CREATE SOURCE MongoDBIn USING MongoDBReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'192.168.1.10:27107',
  Collections:'mydb.employee'
) 
OUTPUT TO MongoDBStream;

CREATE TARGET MongoDBCOut
USING SysOut(name:MongoDB)
INPUT FROM MongoDBStream;

END APPLICATION MongoDBTest;

With the above application running, the following MongoDB shell commands:

use mydb;
db.employee.save({_id:1,"firstname":"Larry","lastname":"Talbot","age":10,"comment":"new record"});
db.employee.save({_id:1,"firstname":"Larry","lastname":"Talbot","age":40,"comment":"updated record"});
db.employee.update({_id:1},{$set:{"comment":"partial update"}});
db.employee.remove({_id:1});

would produce output similar to the following:

data: {"_id":1.0,"firstname":"Larry","lastname":"Talbot","age":10.0,"comment":"new record"}
metadata: {"CollectionName":"employee","OperationName":"INSERT","DatabaseName":"mydb",
  "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250474}
...
data: {"_id":1.0,"firstname":"Larry","lastname":"Talbot","age":40.0,"comment":"updated record"}
metadata: {"CollectionName":"employee","OperationName":"UPDATE","DatabaseName":"mydb",
  "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250474}
...
data: {"_id":1.0,"comment":"partial update"}
metadata: {"CollectionName":"employee","OperationName":"UPDATE","DatabaseName":"mydb",
  "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250474}
...
data: {"_id":1.0}
metadata: {"CollectionName":"employee","OperationName":"DELETE","DatabaseName":"mydb",
  "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250477}

The Mongo Java driver adds the decimals. These have no effect.

Note that output for the "partial" update and delete operations includes only the fields specified in the shell commands. See Replicating MongoDB data to Azure CosmosDB for discussion of the issues this can cause when writing to targets and how to work around those issues.

Converting MongoDB output to a user-defined type

Using the application and data from MongoDBReader example application and output, the following CQ would convert the JSONNodeEvent data array values to a Striim type, which could then be consumed by any other Striim component.

CREATE CQ JSON2StriimType
INSERT INTO EmployeeStream
SELECT data.get("_id").toString() AS id,
  data.get("firstname").toString() AS firstname,
  data.get("lastname").toString() AS lastname,
  data.get("age").toString() AS age
FROM MongoDBStream;

Adding user-defined data to JSONNodeEvent streams

Use the PutUserData function in a CQ to add a field to the JSONNodeEvent USERDATA map. For examples of how to use USERDATA elements in TQL, see Modifying output using ColumnMap and the discussions of PartitionKey in Kafka Writer and S3 Writer.

The following example would add the sixth element (counting from zero) in the JSONNodeEvent userdata array to USERDATA as the field "city":

CREATE CQ AddUserData
INSERT INTO EnrichedStream
SELECT putUserData(x, 'city', data[5])
FROM MongoDBSourceStream x;

To remove an element from the USERDATA map, use the removeUserData function (you may specify multiple elements, separated by commas):

CREATE CQ RemoveUserData
INSERT INTO EnrichedStream 
SELECT removeUserData(x, 'city')
FROM MongoDBSourceStream x;

To remove all elements from the USERDATA map, use the clearUserData function:

CREATE CQ ClearUserData
INSERT INTO EnrichedStream 
SELECT clearUserData(x)
FROM MongoDBSourceStream x;

Replicating MongoDB data to Azure CosmosDB

To replicate one or many MongoDB collections to Cosmos DB, specify multiple collections in the Collections properties of MongoDBReader and CosmosDBWriter. You may use wildcards ($ for MongoDB, % for Cosmos DB) to replicate all collections in a database, as in the example below, or specify multiple collections manually, as described in the notes for Cosmos DB Writer's Collections property.

You must create the target collections in Cosmos DB manually. The partition key names must match one of the fields in the MongoDB documents.

Data will be read only from collections that exist when the source starts. Additional collections added later will be ignored until the source is restarted. When the target collection is in a fixed container (see Partition and scale in Azure Cosmos DB), inserts, updates, and deletes are handled automatically. When the target collection is in an unlimited container, updates require special handling and deletes must be done manually, as discussed below.

If you wish to run the examples, adjust the MongoDBReader properties and Cosmos DB Writer properties to reflect your own environment.

When the target collection is in a fixed container

Note

Writing to a target collection in a fixed container will not be possible until Microsoft fixes the bug discussed in this Azure forum discussion.

  1. In Cosmos DB, create database mydb containing the collection employee with partition key /name  (note that the collection and partition names are case-sensitive).

  2. In MongoDB, create the collection employee and populate it as follows:

    use mydb;
    db.employee.insertMany([
    {_id:1,"name":"employee1","company":"Striim","city":"Madras"},
    {_id:2,"name":"employee2","company":"Striim","city":"Seattle"},
    {_id:3,"name":"employee3","company":"Striim","city":"California"}
    ]);
  3. In Striim, run the following application to perform the initial load of the existing data:

    CREATE APPLICATION Mongo2CosmosInitialLoad; 
     
    CREATE MongoDBIn USING MongoDBReader (
      Username:'striim',
      Password:'******',
      ConnectionURL:'<MongoDB connection string>',
      Collections:'mydb.$'
     )
    OUTPUT TO MongoDBStream;
     
    CREATE TARGET WriteToCosmos USING CosmosDBWriter (
      ServiceEndpoint: '<Cosmos DB connection string>',
      AccessKey: '<Cosmos DB account read-write key>',
      Collections: 'mydb.$,mydb.%'
    )
    INPUT FROM MongoDBStream;
     
    END APPLICATION Mongo2CosmosInitialLoad;

    After the application is finished, the Cosmos DB employee collection should contain the following:

    {
        "_id": 1,
        "name": "employee1",
        "company": "striim",
        "city": "madras",
        "id": "1.0",
        "_rid": "HnpSALVXpu4BAAAAAAAACA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/",
        "_etag": "\"0800b33d-0000-0000-0000-5bb5aafa0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632442
    }
    {
        "_id": 2,
        "name": "employee2",
        "company": "striim",
        "city": "seattle",
        "id": "2.0",
        "_rid": "HnpSALVXpu4BAAAAAAAADA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/",
        "_etag": "\"2b00f87b-0000-0000-0000-5bb5aafb0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 3,
        "name": "employee3",
        "company": "striim",
        "city": "california",
        "id": "3.0",
        "_rid": "HnpSALVXpu4BAAAAAAAAAA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/",
        "_etag": "\"2700ad2a-0000-0000-0000-5bb5aafb0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    
  4. In Striim, run the following application to continuously replicate new data from MongoDB to Cosmos DB:

    CREATE APPLICATION Mongo2CosmosIncrementalFixedContainer; 
     
    CREATE MongoDBIn USING MongoDBReader (
      Username:'striim',
      Password:'******',
      ConnectionURL:'<MongoDB connection string>',
      authType: 'NoAuth',
      Mode:'Incremental',
      startTimestamp: '<timestamp>'
      Collections:'mydb.$'
     )
    OUTPUT TO MongoDBStream;
    
    CREATE TARGET WriteToCosmos USING CosmosDBWriter (
      ServiceEndpoint:'<Cosmos DB connection string>',
      AccessKey:'<Cosmos DB account read-write key>',
      Collections:'mydb.$,mydb.%'
    )
    INPUT FROM MongoDBStream ;
    
    CREATE CQ SelectDeleteOperations
    INSERT INTO DeleteOpsStream
    SELECT META(MongoDBStream,"DatabaseName"),
      META(MongoDBStream,"CollectionName"),
      META(MongoDBStream,"DocumentKey")
    FROM MongoDBStream
    WHERE META(MongoDBStream,"OperationName").toString() = "DELETE";
    
    CREATE TARGET WriteIgnoredDeleteOps USING FileWriter (
      filename:'DeleteOperations.json'
    )
    FORMAT USING JSONFormatter()
    INPUT FROM DeleteOpsStream;
     
    END APPLICATION Mongo2CosmosIncrementalFixedContainer;
  5. In MongoDB, modify the employees collection as follows to add employee4:

    use mydb;
    db.employee.save({_id:4,"name":"employee4","company":"Striim","city":"Palo Alto"});
    db.employee.save({_id:1,"name":"employee1","company":"Striim","city":"Seattle"});
    db.employee.update({_id:2},{$set : {"city":"Palo Alto"}});
    db.employee.remove({_id:3});

    Within 30 seconds, those changes should be replicated to the corresponding Cosmos DB collection with results similar to the following:

    {
        "_id": 1,
        "name": "employee1",
        "company": "striim",
        "city": “Seattle”,
        "id": "1.0",
        "_rid": "HnpSALVXpu4BAAAAAAAACA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/",
        "_etag": ""0800b33d-0000-0000-0000-5bb5aafa0000"",
        "_attachments": "attachments/",
        "_ts": 1538632442
    }
    {
        "_id": 2,
        "name": "employee2",
        "company": "striim",
        "city": “Palo Alto”,
        "id": "2.0",
        "_rid": "HnpSALVXpu4BAAAAAAAADA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/",
        "_etag": ""2b00f87b-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 3,
        "name": "employee3",
        "company": "striim",
        "city": "california",
        "id": "3.0",
        "_rid": "HnpSALVXpu4BAAAAAAAAAA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/",
        "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 4,
        "name": "employee4”,
        "company": "striim",
        "city": “Palo Alto”,
        "id": “4.0”,
        "_rid": "HnpSALVXpu4BAAAAAAAAAE==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAE==/",
        "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
When the target collection is in an unlimited container

When a Cosmos DB collection is in an unlimited container, it must have a partition key, which must be specified when you create the collection.

  • When MongoDB save operations create new documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can write to the correct partition.

  • When MongoDB save operations update existing documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can use the partition key and document ID to update the correct target document.

  • MongoDB update operations do not include all fields, so the partition key may be missing from MongoDBReader's output. In those cases, the PartialRecordPolicy open processor retrieves the missing fields from MongoDB and adds them before passing the data to CosmosDBWriter.

  • MongoDB remove operations include only the document ID, so the partition key is missing from MongoDBReader's output. Since CosmosDBWriter would be unable to determine the correct partition, the application writes the database name, collection name, and document key to a DeleteOps collection in CosmosDB.

incrementalMongo2Cosmos.png
CREATE APPLICATION Mongo2CosmosIncrementalUnlimitedContainer; 
 
CREATE SOURCE MongoDBIn USING MongoDBReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'<MongoDB connection string>',
  authType: 'NoAuth',
  Mode:'Incremental',
  startTimestamp: '<timestamp>',
  Collections:'mydb.$'
 )
OUTPUT TO MongoDBStream;

CREATE STREAM FilteredMongoDBStream OF Global.JsonNodeEvent;

CREATE CQ ExcludeDeleteOperations
INSERT INTO FilteredMongoDBStream
SELECT META(MongoDBStream,"DatabaseName"),
  META(MongoDBStream,"CollectionName"),
  META(MongoDBStream,"DocumentKey")
FROM MongoDBStream
WHERE META(MongoDBStream,"OperationName").toString() != "DELETE";

CREATE STREAM FullDocstream OF Global.JsonNodeEvent;

CREATE OPEN PROCESSOR CompletePartialDocs USING MongoPartialRecordPolicy ( 
  ConnectionURL:'<MongoDB connection string>', 
  authType:'NoAuth',
  OnMissingDocument: 'Process'
)
INSERT INTO FullDocstream
FROM FilteredMongoDBStream;

CREATE TARGET WriteToCosmos USING CosmosDBWriter (
  ServiceEndpoint:'<Cosmos DB connection string>',
  AccessKey:'<Cosmos DB account read-write key>',
  Collections:'mydb.$,mydb.%',
  IgnorableExceptionCode:'PARTITION_KEY_NOT_FOUND'
)
INPUT FROM FullDocstream;

CREATE CQ SelectDeleteOperations
INSERT INTO DeleteOpsStream
SELECT TO_STRING(META(MongoDBStream,"DatabaseName")) AS DatabaseName,
  TO_STRING(META(MongoDBStream,"CollectionName")) AS CollectionName,
  TO_STRING(META(MongoDBStream,"DocumentKey")) AS DocumentKey
FROM MongoDBStream
WHERE META(MongoDBStream,"OperationName").toString() = "DELETE";

CREATE TARGET WriteDeleteOpsToCosmos USING CosmosDBWriter (
  ServiceEndpoint:'<Cosmos DB connection string>',
  AccessKey:'<Cosmos DB account read-write key>',
  Collections:'mydb.DeleteOps'
)
INPUT FROM DeleteOpsStream;
 
END APPLICATION Mongo2CosmosIncrementalUnlimitedContainer;
3.10.3
Was this article helpful?
0 out of 0 found this helpful

Comments