When to shard
| Scale | Recommendation |
|---|---|
| < 10M docs | Single file. Add a follower for HA. Done. |
| 10M – 100M docs | Single file, 2–3 followers for read scale. |
| 100M – 1B docs | Shard. Each shard holds 50–100M on commodity hardware. |
| > 1B docs | Shard with care. Pick your partition key thoughtfully; it's expensive to change. |
DocumentForgeCluster
The router is a thin C# class that sits in front of N shards. Each shard is a full DocumentForgeDb accessed through an IShardTransport (in-process for tests, HTTP for real deployments).
using DocumentForge.Engine.Cluster; using var cluster = new DocumentForgeCluster() .AddShard(new HttpShardTransport("dubai", "http://dubai.internal:5000")) .AddShard(new HttpShardTransport("singapore", "http://sg.internal:5000")) .AddShard(new HttpShardTransport("london", "http://ldn.internal:5000")) // per-collection policy .ShardCollection("orders", shardKeyPath: "pnr") .ShardCollection("flights", shardKeyPath: "flightNumber") .ReplicateCollection("airports") .ReplicateCollection("aircraft");
Per-collection policies
Two strategies, one per collection:
Hash (sharded)
Each document lives on exactly one shard, chosen by hashing the shard key path. Good for large collections where most queries have the key in WHERE.
.ShardCollection("orders", shardKeyPath: "pnr")
Replicated
Every shard holds a full copy. Good for small reference data so JOINs stay local (no cross-shard lookup).
.ReplicateCollection("airports")
Routing rules
| Query pattern | How it's routed |
|---|---|
WHERE shardKey = 'X' | Straight to one shard — fast path |
WHERE otherField = ... | Scatter to every shard, merge results |
SELECT COUNT(*) | Scatter; cluster sums the per-shard counts |
SELECT SUM / MIN / MAX / AVG | Scatter; cluster merges across shards |
GROUP BY | Scatter with group-by; cluster merges buckets across shards |
INSERT | Hash the shard key, send to one shard (or all if replicated) |
JOIN with replicated table | Stays local on each shard — no cross-shard fetch |
Consistent hashing
Under the hood, shard selection uses a consistent hash ring (Ketama-style):
- Each shard gets 150 virtual nodes placed at deterministic positions on a 232 ring
- A key is hashed (FNV-1a 32-bit) to a ring position
- Walk clockwise to the next vnode — that's the owning shard
hash(key) % N? Because going from 3 shards to 4 would move ~75% of your keys. Consistent hashing moves only ~25% (the keys near the new shard's ring positions).
Cluster config file
Every node should read the same cluster.json. This is how they all agree on which document belongs where.
{
"Version": 1,
"VirtualNodesPerShard": 150,
"Shards": [
{ "Name": "shard-a", "Endpoint": "http://host-a:5000" },
{ "Name": "shard-b", "Endpoint": "http://host-b:5000" },
{ "Name": "shard-c", "Endpoint": "http://host-c:5000" }
],
"Collections": {
"orders": { "Strategy": "Hash", "ShardKeyPath": "pnr" },
"flights": { "Strategy": "Hash", "ShardKeyPath": "flightNumber" },
"airports": { "Strategy": "Replicated" }
}
}
Load it and build a cluster:
var config = ClusterConfig.Load("cluster.json"); using var cluster = DocumentForgeCluster.FromConfig(config, desc => new HttpShardTransport(desc.Name, desc.Endpoint));
Running locally (multi-node dev)
The repo ships with scripts to run a 3-node cluster on localhost — perfect for development and for understanding the system.
# PowerShell .\scripts\start-cluster.ps1 # bash ./scripts/start-cluster.sh
This launches three dfdb processes (one per shard):
| Node | URL | Data folder |
|---|---|---|
| shard-a | http://localhost:5001 | scripts/sample-cluster/data/shard-a |
| shard-b | http://localhost:5002 | scripts/sample-cluster/data/shard-b |
| shard-c | http://localhost:5003 | scripts/sample-cluster/data/shard-c |
Each node reads its own node-X.json:
{
"nodeName": "shard-a",
"port": 5001,
"dataDir": "./scripts/sample-cluster/data/shard-a"
}
Check them all are alive:
dfdb health scripts/sample-cluster/cluster.json
Scale up (add shards)
Adding shards has two parts — update the config, then rebalance the data so keys that hash to new shards actually live there.
# 1. Edit cluster.json to add more shards (via CLI or admin UI): dfdb cluster add-shard new-cluster.json shard-d http://host-d:5000 dfdb cluster add-shard new-cluster.json shard-e http://host-e:5000 # 2. Preview what moves (no data change yet): dfdb rebalance cluster.json new-cluster.json --plan-only # 3. Execute (prompts for confirmation): dfdb rebalance cluster.json new-cluster.json
With consistent hashing, going from 3 to 5 shards moves only ~40% of keys — not all of them.
Scale down (drop a shard)
Same command, just omit the shard you want to drop from the new config. Its data redistributes to the surviving shards. After the rebalance completes, the dropped node ends up empty and you can decommission it.
# Build new config without shard-c, then run:
dfdb rebalance cluster-with-c.json cluster-without-c.json
Online rebalance (zero downtime)
The commands above run an offline rebalance — you need a maintenance window. For production traffic that can't pause, use the online rebalance API from your C# bootstrap code:
using DocumentForge.Engine.Cluster; // 1. Build the NEW cluster with the new topology var newConfig = ClusterConfig.Load("cluster-new.json"); using var cluster = DocumentForgeCluster.FromConfig(newConfig, MakeTransport); // 2. Open connections to the OLD shards (for reads during migration) var oldConfig = ClusterConfig.Load("cluster-old.json"); var previousShards = oldConfig.Shards.Select(MakeTransport).ToList(); // 3. Run the online rebalance - clients keep reading and writing var report = await ClusterRebalancer.RunOnlineAsync( cluster, oldConfig, newConfig, previousShards, onProgress: p => Console.WriteLine($"{p.CollectionName}: moved {p.DocsMoved}")); // 4. When done, drop the previous ring ClusterRebalancer.CompleteOnlineRebalance(cluster, previousShards);
What happens during the migration
- Writes go to the new ring. The cluster also issues a best-effort delete on the previous ring location to clean stale copies.
- Reads by shard key try the new ring first; on miss, fall back to the previous ring (the doc may not have migrated yet).
- Scatter reads query both rings and dedup by
_idso no document appears twice even mid-move. - The rebalancer copies data from old to new in the background. Idempotent — if a client already wrote a newer version at the destination, the rebalancer leaves it alone.
Cluster_OnlineRebalance_ZeroDataLossWithConcurrentWrites — 200 initial docs on 2 shards, 50 concurrent writes during the rebalance to 4 shards. Every doc still findable via single-shard queries after completion. No losses, no duplicates.