Sharding
Scale horizontally across N nodes with consistent hashing and online rebalance with zero downtime. A stateless router fronts the cluster and spreads collections across shards.
When to shard
| Scale | Recommendation |
|---|---|
| Under 10M docs | Single file. Add a follower for HA. Done. |
| 10M – 100M docs | Single file, 2–3 followers for read scale. |
| 100M – 1B docs | Shard. Each shard holds 50–100M on commodity hardware. |
| Over 1B docs | Shard with care. Pick your partition key thoughtfully; it’s expensive to change. |
DocumentForgeCluster
The router is a thin, stateless C# class that sits in front of N shards. Each shard is a full DocumentForgeDb accessed through an IShardTransport (in-process for tests, HTTP for real deployments).
using DocumentForge.Engine.Cluster;
using var cluster = new DocumentForgeCluster()
.AddShard(new HttpShardTransport("dubai", "http://dubai.internal:5000"))
.AddShard(new HttpShardTransport("singapore", "http://sg.internal:5000"))
.AddShard(new HttpShardTransport("london", "http://ldn.internal:5000"))
// per-collection policy
.ShardCollection("orders", shardKeyPath: "pnr")
.ShardCollection("flights", shardKeyPath: "flightNumber")
.ReplicateCollection("airports")
.ReplicateCollection("aircraft");Per-collection policies
Two strategies, one per collection.
Hash (sharded)
Each document lives on exactly one shard, chosen by hashing the shard key path. Good for large collections where most queries have the key in WHERE.
.ShardCollection("orders", shardKeyPath: "pnr")Replicated
Every shard holds a full copy. Good for small reference data so JOINs stay local (no cross-shard lookup).
.ReplicateCollection("airports")Routing rules
| Query pattern | How it’s routed |
|---|---|
WHERE shardKey = 'X' | Straight to one shard — fast path |
WHERE otherField = ... | Scatter to every shard, merge results |
SELECT COUNT(*) | Scatter; cluster sums the per-shard counts |
SELECT SUM / MIN / MAX / AVG | Scatter; cluster merges across shards |
GROUP BY | Scatter with group-by; cluster merges buckets across shards |
INSERT | Hash the shard key, send to one shard (or all if replicated) |
JOIN with replicated table | Stays local on each shard — no cross-shard fetch |
Consistent hashing
Under the hood, shard selection uses a consistent hash ring (Ketama-style):
- Each shard gets 150 virtual nodes placed at deterministic positions on a 2^32 ring.
- A key is hashed (FNV-1a 32-bit) to a ring position.
- Walk clockwise to the next vnode — that’s the owning shard.
Why not just hash(key) % N? Because going from 3 shards to 4 would move ~75% of your keys. Consistent hashing moves only ~25% (the keys near the new shard’s ring positions).
Cluster config file
Every node should read the same cluster.json. This is how they all agree on which document belongs where — listing the shards, their endpoints (leaders), and the per-collection strategies.
{
"Version": 1,
"VirtualNodesPerShard": 150,
"Shards": [
{ "Name": "shard-a", "Endpoint": "http://host-a:5000" },
{ "Name": "shard-b", "Endpoint": "http://host-b:5000" },
{ "Name": "shard-c", "Endpoint": "http://host-c:5000" }
],
"Collections": {
"orders": { "Strategy": "Hash", "ShardKeyPath": "pnr" },
"flights": { "Strategy": "Hash", "ShardKeyPath": "flightNumber" },
"airports": { "Strategy": "Replicated" }
}
}Load it and build a cluster:
var config = ClusterConfig.Load("cluster.json");
using var cluster = DocumentForgeCluster.FromConfig(config,
desc => new HttpShardTransport(desc.Name, desc.Endpoint));Running locally (multi-node dev)
The repo ships with scripts to run a 3-node cluster on localhost — perfect for development and for understanding the system.
# PowerShell
.\scripts\start-cluster.ps1
# bash
./scripts/start-cluster.shThis launches three dfdb processes (one per shard):
| Node | URL | Data folder |
|---|---|---|
| shard-a | http://localhost:5001 | scripts/sample-cluster/data/shard-a |
| shard-b | http://localhost:5002 | scripts/sample-cluster/data/shard-b |
| shard-c | http://localhost:5003 | scripts/sample-cluster/data/shard-c |
Each node reads its own node-X.json:
{
"nodeName": "shard-a",
"port": 5001,
"dataDir": "./scripts/sample-cluster/data/shard-a"
}Check they are all alive:
dfdb health scripts/sample-cluster/cluster.jsonScale up (add shards)
Adding shards has two parts — update the config, then rebalance the data so keys that hash to new shards actually live there.
# 1. Edit cluster.json to add more shards (via CLI or admin UI):
dfdb cluster add-shard new-cluster.json shard-d http://host-d:5000
dfdb cluster add-shard new-cluster.json shard-e http://host-e:5000
# 2. Preview what moves (no data change yet):
dfdb rebalance cluster.json new-cluster.json --plan-only
# 3. Execute (prompts for confirmation):
dfdb rebalance cluster.json new-cluster.jsonWith consistent hashing, going from 3 to 5 shards moves only ~40% of keys — not all of them.
Scale down (drop a shard)
Same command, just omit the shard you want to drop from the new config. Its data redistributes to the surviving shards. After the rebalance completes, the dropped node ends up empty and you can decommission it.
# Build new config without shard-c, then run:
dfdb rebalance cluster-with-c.json cluster-without-c.jsonOnline rebalance (zero downtime)
The commands above run an offline rebalance — you need a maintenance window. For production traffic that can’t pause, use the online rebalance API from your C# bootstrap code:
using DocumentForge.Engine.Cluster;
// 1. Build the NEW cluster with the new topology
var newConfig = ClusterConfig.Load("cluster-new.json");
using var cluster = DocumentForgeCluster.FromConfig(newConfig, MakeTransport);
// 2. Open connections to the OLD shards (for reads during migration)
var oldConfig = ClusterConfig.Load("cluster-old.json");
var previousShards = oldConfig.Shards.Select(MakeTransport).ToList();
// 3. Run the online rebalance - clients keep reading and writing
var report = await ClusterRebalancer.RunOnlineAsync(
cluster, oldConfig, newConfig, previousShards,
onProgress: p => Console.WriteLine($"{p.CollectionName}: moved {p.DocsMoved}"));
// 4. When done, drop the previous ring
ClusterRebalancer.CompleteOnlineRebalance(cluster, previousShards);What happens during the migration
- Writes go to the new ring. The cluster also issues a best-effort delete on the previous ring location to clean stale copies.
- Reads by shard key try the new ring first; on miss, fall back to the previous ring (the doc may not have migrated yet).
- Scatter reads query both rings and dedup by
_idso no document appears twice even mid-move. - The rebalancer copies data from old to new in the background. Idempotent — if a client already wrote a newer version at the destination, the rebalancer leaves it alone.
Verified: the test suite includes Cluster_OnlineRebalance_ZeroDataLossWithConcurrentWrites — 200 initial docs on 2 shards, 50 concurrent writes during the rebalance to 4 shards. Every doc still findable via single-shard queries after completion. No losses, no duplicates.
Multi-document writes across shards are coordinated by two-phase commit; see transactions. Each shard can itself run a leader with followers — see replication.