Change Streams

React to database changes in real-time with NexaDB's powerful event streaming system.

Real-Time Event Notifications

Get notified instantly when data changes. Perfect for cache invalidation, real-time dashboards, audit logging, notifications, and event-driven architectures.

Quick Start

Watch for changes with a simple client.watch() method. NexaDB streams events to your application as they happen.

Python - Watch for Changes
from nexaclient import NexaClient

# Connect to NexaDB
client = NexaClient(
    host='localhost',
    port=6970,
    username='root',
    password='nexadb123'
)
client.connect()

# Watch for changes on 'orders' collection
for change in client.watch('orders'):
    operation = change['operationType']

    if operation == 'insert':
        order = change['fullDocument']
        print(f"New order from {order['customer']}: ${order['total']}")

    elif operation == 'update':
        doc_id = change['documentKey']['_id']
        updates = change['updateDescription']['updatedFields']
        print(f"Order {doc_id} updated: {updates}")

    elif operation == 'delete':
        doc_id = change['documentKey']['_id']
        print(f"Order {doc_id} was deleted")
JavaScript - Watch for Changes
const NexaClient = require('nexaclient');

// Connect to NexaDB
const client = new NexaClient({
  host: 'localhost',
  port: 6970,
  username: 'root',
  password: 'nexadb123'
});
await client.connect();

// Watch for changes using async iteration
for await (const change of client.watch('orders')) {
  const operation = change.operationType;

  if (operation === 'insert') {
    const order = change.fullDocument;
    console.log('New order from ' + order.customer + ': $' + order.total);
  }

  else if (operation === 'update') {
    const docId = change.documentKey._id;
    const updates = change.updateDescription.updatedFields;
    console.log('Order ' + docId + ' updated:', updates);
  }

  else if (operation === 'delete') {
    const docId = change.documentKey._id;
    console.log('Order ' + docId + ' was deleted');
  }
}

Real-World Use Cases

šŸ’¾ Cache Invalidation

Automatically invalidate cache entries when data changes

import redis cache = redis.Redis() for change in client.watch('products', operations=['update', 'delete']): product_id = change['documentKey']['_id'] cache.delete(f"product:{product_id}") print(f"Cache invalidated for: {product_id}")

šŸ”” Real-Time Notifications

Send instant notifications when events occur

for change in client.watch('orders', operations=['insert']): order = change['fullDocument'] send_email( to=order['customer_email'], subject=f"Order {order['_id']} Confirmed", body=f"Total: ${order['total']}" )

šŸ“ Audit Logging

Track all database changes for compliance

import logging logger = logging.getLogger('audit') # Watch ALL collections for change in client.watch(): logger.info( f"{change['operationType']} on " f"{change['ns']['coll']} " f"(doc: {change['documentKey']['_id']})" )

šŸ“Š Real-Time Dashboards

Update dashboards instantly as data changes

import websocket ws = websocket.WebSocket() for change in client.watch('analytics'): # Push to browser via WebSocket ws.send(json.dumps({ 'type': 'update', 'data': change['fullDocument'] }))

Change Event Format

All change events follow a consistent format with detailed information about what changed.

Change Event Structure
{
  "operationType": "insert",  // insert, update, delete, dropCollection

  "ns": {
    "db": "nexadb",
    "coll": "orders"
  },

  "documentKey": {
    "_id": "abc123def456"
  },

  "fullDocument": {  // Only for insert/update operations
    "_id": "abc123def456",
    "customer": "Alice",
    "total": 99.99,
    "status": "pending",
    "_created_at": "2025-11-27T10:30:00Z",
    "_updated_at": "2025-11-27T10:30:00Z"
  },

  "updateDescription": {  // Only for update operations
    "updatedFields": {
      "status": "shipped",
      "tracking": "XYZ123"
    }
  },

  "timestamp": 1700000000.123
}

Filtering Events

Filter events by collection or operation type to receive only the changes you care about.

Watch Specific Collection
# Only watch 'orders' collection
for change in client.watch('orders'):
    print(f"Order changed: {change['documentKey']['_id']}")
Watch Specific Operations
# Only watch inserts and updates (skip deletes)
for change in client.watch('orders', operations=['insert', 'update']):
    print(f"New or updated order: {change['fullDocument']}")
Watch All Collections
# Watch ALL collections for ALL operations
for change in client.watch():
    collection = change['ns']['coll']
    operation = change['operationType']
    print(f"{operation} on {collection}")

Complete Example: Order Processing System

Production-Ready Change Stream Listener
from nexaclient import NexaClient
import redis
import smtplib
import logging

# Setup
client = NexaClient(host='localhost', port=6970, username='root', password='nexadb123')
client.connect()

cache = redis.Redis()
logger = logging.getLogger('orders')

print("šŸ“” Watching for order changes...")

try:
    for change in client.watch('orders'):
        operation = change['operationType']
        doc_id = change['documentKey']['_id']

        if operation == 'insert':
            # New order received
            order = change['fullDocument']

            # Log the event
            logger.info(f"New order {doc_id} from {order['customer']}")

            # Send confirmation email
            send_email(
                to=order['customer_email'],
                subject=f"Order {doc_id} Confirmed",
                body=f"Thank you! Your order for ${order['total']} is confirmed."
            )

            # Cache the order
            cache.set(f"order:{doc_id}", json.dumps(order), ex=3600)

        elif operation == 'update':
            # Order updated
            updates = change['updateDescription']['updatedFields']

            logger.info(f"Order {doc_id} updated: {updates}")

            # Invalidate cache
            cache.delete(f"order:{doc_id}")

            # Send status update if shipped
            if 'status' in updates and updates['status'] == 'shipped':
                # Get full document for email
                full_order = change.get('fullDocument', {})
                send_email(
                    to=full_order.get('customer_email', 'customer@example.com'),
                    subject=f"Order {doc_id} Shipped!",
                    body=f"Tracking: {updates.get('tracking', 'N/A')}"
                )

        elif operation == 'delete':
            # Order cancelled
            logger.warning(f"Order {doc_id} was cancelled")

            # Invalidate cache
            cache.delete(f"order:{doc_id}")

except KeyboardInterrupt:
    print("\nšŸ‘‹ Stopped watching. Goodbye!")
    client.disconnect()

Key Features

🌐 Works Over Network

Connect to remote NexaDB servers from anywhere. No filesystem access needed - just a network connection.

⚔ Real-Time Performance

Events are pushed instantly via binary protocol. Low latency, high throughput, minimal overhead.

šŸ”’ Thread-Safe

Production-ready implementation with thread safety, automatic cleanup, and error handling built-in.

šŸŽÆ Flexible Filtering

Filter by collection and operation type. Watch everything or specific changes - you control the stream.

šŸ’” Perfect For

  • āœ… Real-time notifications
  • āœ… Cache invalidation
  • āœ… Audit logging
  • āœ… Data synchronization
  • āœ… Analytics pipelines
  • āœ… Event-driven architectures
  • āœ… Real-time dashboards
  • āœ… Workflow automation
# Clone the repo and run the example
git clone https://github.com/krishcdbry/nexadb.git
cd nexadb
python3 network_change_listener_example.py

# In another terminal, insert some data
nexa -u root -p
nexa> use orders
nexa(orders)> create {"customer": "Alice", "total": 99.99}

# Watch the change stream listener receive the event instantly!