Chapter 07

Sharding

Scale horizontally across N nodes. Consistent hashing. Online rebalance with zero downtime.

When to shard

ScaleRecommendation
< 10M docsSingle file. Add a follower for HA. Done.
10M – 100M docsSingle file, 2–3 followers for read scale.
100M – 1B docsShard. Each shard holds 50–100M on commodity hardware.
> 1B docsShard with care. Pick your partition key thoughtfully; it's expensive to change.

DocumentForgeCluster

The router is a thin C# class that sits in front of N shards. Each shard is a full DocumentForgeDb accessed through an IShardTransport (in-process for tests, HTTP for real deployments).

using DocumentForge.Engine.Cluster;

using var cluster = new DocumentForgeCluster()
    .AddShard(new HttpShardTransport("dubai",     "http://dubai.internal:5000"))
    .AddShard(new HttpShardTransport("singapore", "http://sg.internal:5000"))
    .AddShard(new HttpShardTransport("london",    "http://ldn.internal:5000"))
    // per-collection policy
    .ShardCollection("orders",  shardKeyPath: "pnr")
    .ShardCollection("flights", shardKeyPath: "flightNumber")
    .ReplicateCollection("airports")
    .ReplicateCollection("aircraft");

Per-collection policies

Two strategies, one per collection:

Hash (sharded)

Each document lives on exactly one shard, chosen by hashing the shard key path. Good for large collections where most queries have the key in WHERE.

.ShardCollection("orders", shardKeyPath: "pnr")

Replicated

Every shard holds a full copy. Good for small reference data so JOINs stay local (no cross-shard lookup).

.ReplicateCollection("airports")

Routing rules

Query patternHow it's routed
WHERE shardKey = 'X'Straight to one shard — fast path
WHERE otherField = ...Scatter to every shard, merge results
SELECT COUNT(*)Scatter; cluster sums the per-shard counts
SELECT SUM / MIN / MAX / AVGScatter; cluster merges across shards
GROUP BYScatter with group-by; cluster merges buckets across shards
INSERTHash the shard key, send to one shard (or all if replicated)
JOIN with replicated tableStays local on each shard — no cross-shard fetch

Consistent hashing

Under the hood, shard selection uses a consistent hash ring (Ketama-style):

  1. Each shard gets 150 virtual nodes placed at deterministic positions on a 232 ring
  2. A key is hashed (FNV-1a 32-bit) to a ring position
  3. Walk clockwise to the next vnode — that's the owning shard
Why not just hash(key) % N? Because going from 3 shards to 4 would move ~75% of your keys. Consistent hashing moves only ~25% (the keys near the new shard's ring positions).

Cluster config file

Every node should read the same cluster.json. This is how they all agree on which document belongs where.

{
  "Version": 1,
  "VirtualNodesPerShard": 150,
  "Shards": [
    { "Name": "shard-a", "Endpoint": "http://host-a:5000" },
    { "Name": "shard-b", "Endpoint": "http://host-b:5000" },
    { "Name": "shard-c", "Endpoint": "http://host-c:5000" }
  ],
  "Collections": {
    "orders":   { "Strategy": "Hash", "ShardKeyPath": "pnr" },
    "flights":  { "Strategy": "Hash", "ShardKeyPath": "flightNumber" },
    "airports": { "Strategy": "Replicated" }
  }
}

Load it and build a cluster:

var config = ClusterConfig.Load("cluster.json");
using var cluster = DocumentForgeCluster.FromConfig(config,
    desc => new HttpShardTransport(desc.Name, desc.Endpoint));

Running locally (multi-node dev)

The repo ships with scripts to run a 3-node cluster on localhost — perfect for development and for understanding the system.

# PowerShell
.\scripts\start-cluster.ps1

# bash
./scripts/start-cluster.sh

This launches three dfdb processes (one per shard):

NodeURLData folder
shard-ahttp://localhost:5001scripts/sample-cluster/data/shard-a
shard-bhttp://localhost:5002scripts/sample-cluster/data/shard-b
shard-chttp://localhost:5003scripts/sample-cluster/data/shard-c

Each node reads its own node-X.json:

{
  "nodeName": "shard-a",
  "port": 5001,
  "dataDir": "./scripts/sample-cluster/data/shard-a"
}

Check them all are alive:

dfdb health scripts/sample-cluster/cluster.json

Scale up (add shards)

Adding shards has two parts — update the config, then rebalance the data so keys that hash to new shards actually live there.

# 1. Edit cluster.json to add more shards (via CLI or admin UI):
dfdb cluster add-shard new-cluster.json shard-d http://host-d:5000
dfdb cluster add-shard new-cluster.json shard-e http://host-e:5000

# 2. Preview what moves (no data change yet):
dfdb rebalance cluster.json new-cluster.json --plan-only

# 3. Execute (prompts for confirmation):
dfdb rebalance cluster.json new-cluster.json

With consistent hashing, going from 3 to 5 shards moves only ~40% of keys — not all of them.

Scale down (drop a shard)

Same command, just omit the shard you want to drop from the new config. Its data redistributes to the surviving shards. After the rebalance completes, the dropped node ends up empty and you can decommission it.

# Build new config without shard-c, then run:
dfdb rebalance cluster-with-c.json cluster-without-c.json

Online rebalance (zero downtime)

The commands above run an offline rebalance — you need a maintenance window. For production traffic that can't pause, use the online rebalance API from your C# bootstrap code:

using DocumentForge.Engine.Cluster;

// 1. Build the NEW cluster with the new topology
var newConfig = ClusterConfig.Load("cluster-new.json");
using var cluster = DocumentForgeCluster.FromConfig(newConfig, MakeTransport);

// 2. Open connections to the OLD shards (for reads during migration)
var oldConfig = ClusterConfig.Load("cluster-old.json");
var previousShards = oldConfig.Shards.Select(MakeTransport).ToList();

// 3. Run the online rebalance - clients keep reading and writing
var report = await ClusterRebalancer.RunOnlineAsync(
    cluster, oldConfig, newConfig, previousShards,
    onProgress: p => Console.WriteLine($"{p.CollectionName}: moved {p.DocsMoved}"));

// 4. When done, drop the previous ring
ClusterRebalancer.CompleteOnlineRebalance(cluster, previousShards);

What happens during the migration

Verified: the test suite includes Cluster_OnlineRebalance_ZeroDataLossWithConcurrentWrites — 200 initial docs on 2 shards, 50 concurrent writes during the rebalance to 4 shards. Every doc still findable via single-shard queries after completion. No losses, no duplicates.