Zipf's law and sharding

Nicolae Vartolomei · 2022/06

You are an online business with some millions of customers.

customer_count = 1_000 # Enough to illustrate the point and have a quick simulation.

Customers are sending events to your business. You process and store these events.

import collections

class Server:
    def __init__(self):
        self.events_by_customer = collections.defaultdict(int)
    def process_event(self, customer: int, event: dict = None):
        # In practice, more like `self.events_by_customer[customer].append(event)`.
        self.events_by_customer[customer] += 1

After a while, you discover that a single server can’t handle all customers anymore. You add more servers, and using a method you found in a clever book, you route customers to different servers.

import cityhash

class Cluster:
    def __init__(self, servers: list[Server]):
        self.servers = servers
    def process_event(self, customer: int, event: dict = None):
        self._choose_server(customer).process_event(customer, event)
    def _choose_server(self, customer):
        # Consistent hashing is in a different chapter.
        return self.servers[cityhash.CityHash64(customer) % len(self.servers)]

To make sure that you’ve done everything right, you run a simulation:

import numpy as np

server_count = 9
event_count = 100_000_000

cluster = Cluster([Server() for _ in range(server_count)])

for customer in np.random.randint(0, customer_count, event_count):
    cluster.process_event(str(customer))

And check the results:

import matplotlib.pyplot as plt

plt.bar(
    range(0, len(cluster.servers)),
    [sum(server.events_by_customer.values()) for server in cluster.servers],
)
plt.show()

Uniform distribution

You pat yourself on the back and deploy to production. And, you discover Zipf’s law1 🥴.

Customers are large and small, Mostly small, but few are very, very large. You check individual server utilization, and you see a completely different picture.

You love simulations. So, you quickly run a new one which illustrates the reality much better:

cluster = Cluster([Server() for _ in range(server_count)])

event_draws = 0
while event_draws < event_count:
    maybe_customer = np.random.zipf(1.2) - 1
    if maybe_customer < customer_count:
        cluster.process_event(str(maybe_customer))
        event_draws += 1

plt.clf()
plt.bar(
    range(0, len(cluster.servers)),
    [sum(server.events_by_customer.values()) for server in cluster.servers],
)
plt.show()

Zipf's distribution

You cry a bit 😭, and you check event distribution by customer:

plt.clf()
plt.bar(
    range(0, customer_count),
    [
        sum(
            [
                collections.Counter(server.events_by_customer)
                for server in cluster.servers
            ],
            collections.Counter(),
        )[str(customer)]
        for customer in range(0, customer_count)
    ],
)
plt.show()

Zipf's distribution

And, if you are lucky, you’ll have a lightbulb moment 💡: distributing a few large customers (ones on the left side of the chart) to more servers will result in a uniform distribution of events across all servers.

import math

class Cluster:
    def __init__(self, servers: list[Server]):
        self.servers = servers
        # It is enough to track only |servers| * log(|servers|) keys. 18 keys in this example!
        self.very_large_customers = [str(ix) for ix in range(len(servers) * int(math.log(len(servers))))]
    def process_event(self, customer: int, event: dict = None):
        self._choose_server(customer).process_event(customer, event)
    def _choose_server(self, customer):
        if customer in self.very_large_customers:
            return self.servers[np.random.randint(len(self.servers))]
        return self.servers[cityhash.CityHash64(customer) % len(self.servers)]

cluster = Cluster([Server() for _ in range(server_count)])

event_draws = 0
while event_draws < event_count:
    maybe_customer = np.random.zipf(1.2) - 1
    if maybe_customer < customer_count:
        cluster.process_event(str(maybe_customer))
        event_draws += 1

plt.clf()
plt.bar(
    range(0, len(cluster.servers)),
    [sum(server.events_by_customer.values()) for server in cluster.servers],
)
plt.show()

Zipf's distribution mitigated

This is looking much better. You take a break ☕️, and digest what you’ve learned today.

Conclusion

I’ll write one later. For now, you can continue with reading the Pegasus2 paper to see this used in practice.