Change Streams
React to database changes in real-time with NexaDB's powerful event streaming system.
Real-Time Event Notifications
Get notified instantly when data changes. Perfect for cache invalidation, real-time dashboards, audit logging, notifications, and event-driven architectures.
Quick Start
Watch for changes with a simple client.watch() method. NexaDB streams events to your application as they happen.
from nexaclient import NexaClient
# Connect to NexaDB
client = NexaClient(
host='localhost',
port=6970,
username='root',
password='nexadb123'
)
client.connect()
# Watch for changes on 'orders' collection
for change in client.watch('orders'):
operation = change['operationType']
if operation == 'insert':
order = change['fullDocument']
print(f"New order from {order['customer']}: ${order['total']}")
elif operation == 'update':
doc_id = change['documentKey']['_id']
updates = change['updateDescription']['updatedFields']
print(f"Order {doc_id} updated: {updates}")
elif operation == 'delete':
doc_id = change['documentKey']['_id']
print(f"Order {doc_id} was deleted")const NexaClient = require('nexaclient');
// Connect to NexaDB
const client = new NexaClient({
host: 'localhost',
port: 6970,
username: 'root',
password: 'nexadb123'
});
await client.connect();
// Watch for changes using async iteration
for await (const change of client.watch('orders')) {
const operation = change.operationType;
if (operation === 'insert') {
const order = change.fullDocument;
console.log('New order from ' + order.customer + ': $' + order.total);
}
else if (operation === 'update') {
const docId = change.documentKey._id;
const updates = change.updateDescription.updatedFields;
console.log('Order ' + docId + ' updated:', updates);
}
else if (operation === 'delete') {
const docId = change.documentKey._id;
console.log('Order ' + docId + ' was deleted');
}
}Real-World Use Cases
š¾ Cache Invalidation
Automatically invalidate cache entries when data changes
import redis
cache = redis.Redis()
for change in client.watch('products',
operations=['update', 'delete']):
product_id = change['documentKey']['_id']
cache.delete(f"product:{product_id}")
print(f"Cache invalidated for: {product_id}")š Real-Time Notifications
Send instant notifications when events occur
for change in client.watch('orders',
operations=['insert']):
order = change['fullDocument']
send_email(
to=order['customer_email'],
subject=f"Order {order['_id']} Confirmed",
body=f"Total: ${order['total']}"
)š Audit Logging
Track all database changes for compliance
import logging
logger = logging.getLogger('audit')
# Watch ALL collections
for change in client.watch():
logger.info(
f"{change['operationType']} on "
f"{change['ns']['coll']} "
f"(doc: {change['documentKey']['_id']})"
)š Real-Time Dashboards
Update dashboards instantly as data changes
import websocket
ws = websocket.WebSocket()
for change in client.watch('analytics'):
# Push to browser via WebSocket
ws.send(json.dumps({
'type': 'update',
'data': change['fullDocument']
}))Change Event Format
All change events follow a consistent format with detailed information about what changed.
{
"operationType": "insert", // insert, update, delete, dropCollection
"ns": {
"db": "nexadb",
"coll": "orders"
},
"documentKey": {
"_id": "abc123def456"
},
"fullDocument": { // Only for insert/update operations
"_id": "abc123def456",
"customer": "Alice",
"total": 99.99,
"status": "pending",
"_created_at": "2025-11-27T10:30:00Z",
"_updated_at": "2025-11-27T10:30:00Z"
},
"updateDescription": { // Only for update operations
"updatedFields": {
"status": "shipped",
"tracking": "XYZ123"
}
},
"timestamp": 1700000000.123
}Filtering Events
Filter events by collection or operation type to receive only the changes you care about.
# Only watch 'orders' collection
for change in client.watch('orders'):
print(f"Order changed: {change['documentKey']['_id']}")# Only watch inserts and updates (skip deletes)
for change in client.watch('orders', operations=['insert', 'update']):
print(f"New or updated order: {change['fullDocument']}")# Watch ALL collections for ALL operations
for change in client.watch():
collection = change['ns']['coll']
operation = change['operationType']
print(f"{operation} on {collection}")Complete Example: Order Processing System
from nexaclient import NexaClient
import redis
import smtplib
import logging
# Setup
client = NexaClient(host='localhost', port=6970, username='root', password='nexadb123')
client.connect()
cache = redis.Redis()
logger = logging.getLogger('orders')
print("š” Watching for order changes...")
try:
for change in client.watch('orders'):
operation = change['operationType']
doc_id = change['documentKey']['_id']
if operation == 'insert':
# New order received
order = change['fullDocument']
# Log the event
logger.info(f"New order {doc_id} from {order['customer']}")
# Send confirmation email
send_email(
to=order['customer_email'],
subject=f"Order {doc_id} Confirmed",
body=f"Thank you! Your order for ${order['total']} is confirmed."
)
# Cache the order
cache.set(f"order:{doc_id}", json.dumps(order), ex=3600)
elif operation == 'update':
# Order updated
updates = change['updateDescription']['updatedFields']
logger.info(f"Order {doc_id} updated: {updates}")
# Invalidate cache
cache.delete(f"order:{doc_id}")
# Send status update if shipped
if 'status' in updates and updates['status'] == 'shipped':
# Get full document for email
full_order = change.get('fullDocument', {})
send_email(
to=full_order.get('customer_email', 'customer@example.com'),
subject=f"Order {doc_id} Shipped!",
body=f"Tracking: {updates.get('tracking', 'N/A')}"
)
elif operation == 'delete':
# Order cancelled
logger.warning(f"Order {doc_id} was cancelled")
# Invalidate cache
cache.delete(f"order:{doc_id}")
except KeyboardInterrupt:
print("\nš Stopped watching. Goodbye!")
client.disconnect()Key Features
š Works Over Network
Connect to remote NexaDB servers from anywhere. No filesystem access needed - just a network connection.
ā” Real-Time Performance
Events are pushed instantly via binary protocol. Low latency, high throughput, minimal overhead.
š Thread-Safe
Production-ready implementation with thread safety, automatic cleanup, and error handling built-in.
šÆ Flexible Filtering
Filter by collection and operation type. Watch everything or specific changes - you control the stream.
š” Perfect For
- ā Real-time notifications
- ā Cache invalidation
- ā Audit logging
- ā Data synchronization
- ā Analytics pipelines
- ā Event-driven architectures
- ā Real-time dashboards
- ā Workflow automation
# Clone the repo and run the example
git clone https://github.com/krishcdbry/nexadb.git
cd nexadb
python3 network_change_listener_example.py
# In another terminal, insert some data
nexa -u root -p
nexa> use orders
nexa(orders)> create {"customer": "Alice", "total": 99.99}
# Watch the change stream listener receive the event instantly!