Relay MongoDB Change Stream into CrateDB¶
What’s inside
Documentation and example program how to relay data from MongoDB into CrateDB, using MongoDB Change Streams.
Change streams allow applications to access real-time data changes without the prior complexity and risk of manually tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them.
Note
commons-codec includes MongoDBFullLoadTranslator and MongoDBCDCTranslator.
This document and example program is exclusively about the latter.
Prerequisites¶
Start services CrateDB and MongoDB.
Start CrateDB
docker run --rm --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \
docker.io/crate:latest '-Cdiscovery.type=single-node'
Start MongoDB
Please note that change streams are only available for replica sets and
sharded clusters, so let’s define a replica set by using the
--replSet rs-testdrive option when starting the MongoDB server.
docker run --rm --name=mongodb --publish=27017:27017 \
docker.io/mongo:8 mongod --replSet rs-testdrive
Now, initialize the replica set, by using the mongosh command to invoke
the rs.initiate() operation.
export MONGODB_URL="mongodb://localhost/"
docker run -i --rm --network=host docker.io/mongo:8 mongosh ${MONGODB_URL} <<EOF
disableTelemetry()
config = {
_id: "rs-testdrive",
members: [{ _id : 0, host : "localhost:27017"}]
};
rs.initiate(config);
EOF
Install¶
Acquire and set up the basic relay program.
# Install dependencies.
pip install 'commons-codec[mongodb]' pymongo sqlalchemy-cratedb
# Download program.
wget https://github.com/daq-tools/commons-codec/raw/main/examples/mongodb_cdc_cratedb.py
Usage¶
Configure settings.
export CRATEDB_SQLALCHEMY_URL="crate://"
export MONGODB_URL="mongodb://localhost/"
Invoke relay program.
python mongodb_cdc_cratedb.py cdc-relay
Invoke database workload.
python mongodb_cdc_cratedb.py db-workload
Troubleshooting¶
When you see this message on MongoDB’s server log, it indicates you tried to configure a replica set, but did not initialize it yet.
pymongo.errors.OperationFailure: The $changeStream stage is only supported on
replica sets, full error: {'ok': 0.0, 'errmsg': 'The $changeStream stage is
only supported on replica sets', 'code': 40573, 'codeName': 'Location40573'}
When you see a Failed to refresh key cache error message on MongoDB’s server
log, it indicates the server has been successfully running a replica set last
time, but, again, it has not been correctly initialized.