
Sharding: Horizontal Partitioning
Understanding database sharding and handling massive traffic through practical experience

Understanding database sharding and handling massive traffic through practical experience
Understanding database connection pooling and performance optimization through practical experience

Understanding database transactions and ACID properties through practical experience

Understanding high availability and read performance improvement through database replication

Understanding vector database principles and practical applications through project experience

What do you do when a DB gets slow? You've added indexes, optimized queries — still slow. Following that question to its end leads to sharding.
The first instinct is vertical scaling (Scale-up): more CPUs, more memory, faster SSDs. But in large-scale systems, this approach hits a wall. As data grows, queries slow down, disk fills up, and backups can take hours. There's a point where throwing more hardware at the problem stops working.
The next step is horizontal scaling (Scale-out): instead of one big DB, split it into multiple smaller ones. That's sharding. When data grows large enough, each shard only handles a portion of it — so queries stay fast and the load is distributed. The tradeoff, though, is significant complexity.
When I first encountered sharding, the most confusing part was "How do you split the data?" Do you just copy tables to multiple DBs? Or distribute data randomly?
Another confusion was "How do you choose the shard key?" Split by user ID? By region? By time? I had no sense of what problems wrong choices would cause.
And the biggest confusion was "How do you JOIN?" If data is scattered across multiple DBs, you can't JOIN data in different shards, right? So do you JOIN at the application level? Will performance be okay?
The decisive analogy that helped me understand sharding was "library branches."
What do you do when one library has too many books? Two methods:
Method 1: Build a bigger building (vertical scaling)Sharding is method 2. Splitting one giant database into multiple small databases (shards).
When I heard this analogy, it clicked. Ah, that's why the "shard key" is important. Just like deciding criteria for which branch gets which books, you need to decide criteria for which shard gets which data.
In code:
// Shard key: user_id
function getShardId(userId: number): number {
const SHARD_COUNT = 10;
return userId % SHARD_COUNT; // 0~9
}
// Query user data
async function getUser(userId: number) {
const shardId = getShardId(userId);
const db = getShardConnection(shardId); // Connect to that shard
return await db.query('SELECT * FROM users WHERE id = ?', [userId]);
}
Determine shard by user_id modulo 10. If user_id = 1234, then 1234 % 10 = 4, stored in shard 4.
Split data by specific ranges.
function getShardId(userId: number): number {
if (userId < 1000000) return 0;
if (userId < 2000000) return 1;
if (userId < 3000000) return 2;
// ...
return 9;
}
Pros:
Watch out: Time-based sharding concentrates load on the shard holding the most recent data — this is the hotspot problem. Hash-based sharding avoids this by distributing data evenly regardless of when it was created.
Distribute data evenly using hash functions.
function getShardId(userId: number): number {
const SHARD_COUNT = 10;
return userId % SHARD_COUNT;
}
Pros:
Technique to minimize data movement when adding/removing shards:
class ConsistentHash {
private ring: Map<number, number> = new Map();
private VIRTUAL_NODES = 150; // Virtual node count
addShard(shardId: number) {
for (let i = 0; i < this.VIRTUAL_NODES; i++) {
const hash = this.hash(`shard-${shardId}-${i}`);
this.ring.set(hash, shardId);
}
}
getShardId(key: string): number {
const hash = this.hash(key);
const sortedHashes = Array.from(this.ring.keys()).sort((a, b) => a - b);
for (const ringHash of sortedHashes) {
if (hash <= ringHash) {
return this.ring.get(ringHash)!;
}
}
return this.ring.get(sortedHashes[0])!;
}
private hash(key: string): number {
// Simple hash function (use better one in production)
let hash = 0;
for (let i = 0; i < key.length; i++) {
hash = ((hash << 5) - hash) + key.charCodeAt(i);
hash = hash & hash;
}
return Math.abs(hash);
}
}
Determine shards using a separate lookup table.
// shard_directory table
// user_id | shard_id
// 1 | 0
// 2 | 3
// 3 | 1
// ...
async function getShardId(userId: number): Promise<number> {
const result = await directoryDb.query(
'SELECT shard_id FROM shard_directory WHERE user_id = ?',
[userId]
);
return result[0].shard_id;
}
Pros:
Split shards by user region.
function getShardId(region: string): number {
const regionMap = {
'us-east': 0,
'us-west': 1,
'eu-west': 2,
'ap-northeast': 3
};
return regionMap[region] || 0;
}
Pros:
Can't JOIN data in different shards.
Problem:-- If users in shard 0, posts in shard 3?
SELECT u.name, p.title
FROM users u
JOIN posts p ON u.id = p.user_id
WHERE u.id = 1234;
Solution 1: Store in same shard
// Store users and posts in same shard based on user_id
function getShardId(userId: number): number {
return userId % SHARD_COUNT;
}
// users table: shard by user_id
// posts table: shard by user_id (not post_id!)
Solution 2: Application-level JOIN
async function getUserWithPosts(userId: number) {
const shardId = getShardId(userId);
const db = getShardConnection(shardId);
const user = await db.query('SELECT * FROM users WHERE id = ?', [userId]);
const posts = await db.query('SELECT * FROM posts WHERE user_id = ?', [userId]);
return { ...user, posts };
}
Solution 3: Data duplication (Denormalization)
// Add user_name column to posts table
// Can display author name by querying only posts, no JOIN needed
Transactions across multiple shards are complex.
Problem:// If user A in shard 0, user B in shard 5?
// How to handle transfer transaction between them?
Solution 1: 2PC (Two-Phase Commit)
async function transfer(fromUserId: number, toUserId: number, amount: number) {
const shard1 = getShardConnection(getShardId(fromUserId));
const shard2 = getShardConnection(getShardId(toUserId));
// Phase 1: Prepare
await shard1.query('BEGIN');
await shard2.query('BEGIN');
try {
await shard1.query('UPDATE users SET balance = balance - ? WHERE id = ?', [amount, fromUserId]);
await shard2.query('UPDATE users SET balance = balance + ? WHERE id = ?', [amount, toUserId]);
// Phase 2: Commit
await shard1.query('COMMIT');
await shard2.query('COMMIT');
} catch (error) {
await shard1.query('ROLLBACK');
await shard2.query('ROLLBACK');
throw error;
}
}
Solution 2: Saga Pattern
async function transfer(fromUserId: number, toUserId: number, amount: number) {
try {
// Step 1: Deduct
await deduct(fromUserId, amount);
// Step 2: Deposit
await deposit(toUserId, amount);
} catch (error) {
// Compensating transaction
await deposit(fromUserId, amount); // Cancel deduction
throw error;
}
}
Using independent auto_increment per shard causes ID duplication.
Solution 1: Use UUIDimport { v4 as uuidv4 } from 'uuid';
const userId = uuidv4(); // '550e8400-e29b-41d4-a716-446655440000'
Solution 2: Snowflake ID
// Twitter's Snowflake algorithm
// 64bit = timestamp(41) + datacenter(5) + worker(5) + sequence(12)
class SnowflakeId {
private sequence = 0;
private lastTimestamp = -1;
generate(shardId: number): bigint {
let timestamp = Date.now();
if (timestamp === this.lastTimestamp) {
this.sequence = (this.sequence + 1) & 0xFFF;
if (this.sequence === 0) {
timestamp = this.waitNextMillis(timestamp);
}
} else {
this.sequence = 0;
}
this.lastTimestamp = timestamp;
return (BigInt(timestamp) << 22n) |
(BigInt(shardId) << 12n) |
BigInt(this.sequence);
}
private waitNextMillis(current: number): number {
while (Date.now() <= current) {}
return Date.now();
}
}
Sharding is hard to reverse once started. Prepare thoroughly.
Checklist:- [ ] Measure current DB size and growth rate
- [ ] Select shard key (must be immutable!)
- [ ] Decide sharding strategy (Hash/Range/Directory)
- [ ] Decide shard count (hard to increase later)
- [ ] Plan application code modifications
- [ ] Plan migration (minimize downtime)
- [ ] Plan rollback
Moving all data at once is risky. Proceed gradually.
Steps:// Step 1: Read from old DB, write to both old DB and shards
async function createUser(userData) {
// Save to old DB
await masterDb.query('INSERT INTO users ...', userData);
// Also save to shard (async)
const shardId = getShardId(userData.id);
const shard = getShardConnection(shardId);
await shard.query('INSERT INTO users ...', userData).catch(err => {
logger.error('Shard write failed', err);
});
}
// Step 2: Read from shards (fallback to old DB)
async function getUser(userId) {
const shardId = getShardId(userId);
const shard = getShardConnection(shardId);
let user = await shard.query('SELECT * FROM users WHERE id = ?', [userId]);
if (!user) {
// If not in shard, query old DB
user = await masterDb.query('SELECT * FROM users WHERE id = ?', [userId]);
if (user) {
// Copy to shard
await shard.query('INSERT INTO users ...', user);
}
}
return user;
}
// Step 3: Remove old DB
Hide sharding logic from application code.
class ShardedUserRepository {
async findById(userId: number) {
const shard = this.getShard(userId);
return await shard.query('SELECT * FROM users WHERE id = ?', [userId]);
}
async create(userData) {
const shard = this.getShard(userData.id);
return await shard.query('INSERT INTO users ...', userData);
}
async findByRegion(region: string) {
// Must query all shards
const results = await Promise.all(
this.getAllShards().map(shard =>
shard.query('SELECT * FROM users WHERE region = ?', [region])
)
);
return results.flat();
}
private getShard(userId: number) {
const shardId = getShardId(userId);
return getShardConnection(shardId);
}
private getAllShards() {
return Array.from({ length: SHARD_COUNT }, (_, i) =>
getShardConnection(i)
);
}
}
When user data grows large enough, the users table is typically the first candidate for sharding.
Shard key: user_id (immutable, even distribution)
Sharding strategy: Hash-based (Consistent Hashing)
Shard count: 16 (set to power of 2)
const SHARD_COUNT = 16;
function getShardId(userId: number): number {
return userId % SHARD_COUNT;
}
// Connection pool per shard
const shardPools = Array.from({ length: SHARD_COUNT }, (_, i) =>
mysql.createPool({
host: `shard-${i}.db.example.com`,
database: `users_shard_${i}`,
// ...
})
);
Sharded posts by user_id to enable JOINs.
// Store users and posts in same shard
function getShardId(userId: number): number {
return userId % SHARD_COUNT;
}
// JOIN possible
async function getUserWithPosts(userId: number) {
const shardId = getShardId(userId);
const shard = shardPools[shardId];
return await shard.query(`
SELECT u.*, p.*
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
WHERE u.id = ?
`, [userId]);
}
Parallel processing when querying all shards:
async function searchUsers(keyword: string) {
const queries = shardPools.map(shard =>
shard.query('SELECT * FROM users WHERE name LIKE ? LIMIT 10', [`%${keyword}%`])
);
const results = await Promise.all(queries);
const merged = results.flat();
// Sort and paginate
return merged.sort((a, b) => b.created_at - a.created_at).slice(0, 20);
}
Sharding is a technique that horizontally partitions one giant database into multiple small databases, distributing data based on shard keys to achieve scalability. Various strategies exist including hash-based, range-based, and directory-based, and you must solve problems like JOIN constraints, transaction complexity, and ID duplication. In production, gradual migration and shard routing abstraction are key.