Relay MongoDB Change Stream into CrateDB

What’s inside

Documentation and example program how to relay data from MongoDB into CrateDB, using MongoDB Change Streams.

Change streams allow applications to access real-time data changes without the prior complexity and risk of manually tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them.

Note

commons-codec includes MongoDBFullLoadTranslator and MongoDBCDCTranslator. This document and example program is exclusively about the latter.

Prerequisites

Start services CrateDB and MongoDB.

Start CrateDB

docker run --rm --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \
    docker.io/crate:latest '-Cdiscovery.type=single-node'

Start MongoDB

Please note that change streams are only available for replica sets and sharded clusters, so let’s define a replica set by using the --replSet rs-testdrive option when starting the MongoDB server.

docker run --rm --name=mongodb --publish=27017:27017 \
    docker.io/mongo:8 mongod --replSet rs-testdrive

Now, initialize the replica set, by using the mongosh command to invoke the rs.initiate() operation.

export MONGODB_URL="mongodb://localhost/"
docker run -i --rm --network=host docker.io/mongo:8 mongosh ${MONGODB_URL} <<EOF

disableTelemetry()
config = {
    _id: "rs-testdrive",
    members: [{ _id : 0, host : "localhost:27017"}]
};
rs.initiate(config);

EOF

Install

Acquire and set up the basic relay program.

# Install dependencies.
pip install 'commons-codec[mongodb]' pymongo sqlalchemy-cratedb

# Download program.
wget https://github.com/daq-tools/commons-codec/raw/main/examples/mongodb_cdc_cratedb.py

Usage

Configure settings.

export CRATEDB_SQLALCHEMY_URL="crate://"
export MONGODB_URL="mongodb://localhost/"

Invoke relay program.

python mongodb_cdc_cratedb.py cdc-relay

Invoke database workload.

python mongodb_cdc_cratedb.py db-workload

Troubleshooting

When you see this message on MongoDB’s server log, it indicates you tried to configure a replica set, but did not initialize it yet.

pymongo.errors.OperationFailure: The $changeStream stage is only supported on
replica sets, full error: {'ok': 0.0, 'errmsg': 'The $changeStream stage is
only supported on replica sets', 'code': 40573, 'codeName': 'Location40573'}

When you see a Failed to refresh key cache error message on MongoDB’s server log, it indicates the server has been successfully running a replica set last time, but, again, it has not been correctly initialized.