``` Python Power: 7 Battle-Tested Techniques for Building Resilient Distributed Systems ```

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world! Python has revolutionized the development of distributed systems, offering robust tools and patterns to create resilient architectures. I've spent years implementing these systems across various industries, and I'm excited to share practical techniques that have proven effective in real-world applications. 5 Python Techniques for Building Resilient Distributed Systems Building distributed systems presents unique challenges. Network failures, service outages, and hardware issues can disrupt operations at any moment. To create truly resilient systems, we need strategies that anticipate failure and gracefully handle problematic scenarios. Circuit Breakers Circuit breakers protect systems from cascading failures by monitoring the health of dependent services. When a service starts failing, the circuit breaker temporarily stops sending requests, preventing system-wide degradation. In my experience implementing payment processing systems, circuit breakers proved invaluable when third-party services experienced downtime. By quickly detecting problems and failing fast, we prevented customer-facing issues. import time from functools import wraps class CircuitBreaker: def __init__(self, failure_threshold=5, recovery_timeout=30, exceptions=(Exception,)): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.exceptions = exceptions self.state = "CLOSED" self.failure_count = 0 self.last_failure_time = None def __call__(self, func): @wraps(func) def wrapper(*args, **kwargs): if self.state == "OPEN": if time.time() - self.last_failure_time > self.recovery_timeout: self.state = "HALF-OPEN" else: raise RuntimeError(f"Service unavailable - circuit breaker open") try: result = func(*args, **kwargs) # Reset on successful HALF-OPEN call if self.state == "HALF-OPEN": self.failure_count = 0 self.state = "CLOSED" return result except self.exceptions as e: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = "OPEN" raise e return wrapper # Usage example @CircuitBreaker(failure_threshold=3, recovery_timeout=60) def call_external_api(endpoint): # Make API call that might fail response = requests.get(endpoint) response.raise_for_status() return response.json() This implementation includes three states: CLOSED (normal operation), OPEN (failing fast), and HALF-OPEN (testing if the service has recovered). The decorator can be applied to any function that might fail. Distributed Locks When multiple processes or servers need to access shared resources, distributed locks prevent race conditions and data corruption. They ensure that only one entity can execute critical sections of code at a time. I once worked on a billing system where we needed to ensure that only one worker processed specific account updates. Using Redis-based distributed locks prevented duplicate charges and maintained data integrity. import redis import time import uuid class DistributedLock: def __init__(self, redis_client, lock_name, expire_seconds=10): self.redis = redis_client self.lock_name = lock_name self.expire_seconds = expire_seconds self.identifier = str(uuid.uuid4()) def __enter__(self): self.acquired = self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.acquired: self.release() def acquire(self): # Use SET NX to acquire the lock atomically acquired = self.redis.set( self.lock_name, self.identifier, nx=True, ex=self.expire_seconds ) # Start lock refresh thread if acquired if acquired: self._start_refresh_thread() return acquired def release(self): # Release the lock only if we own it (using Lua script for atomicity) script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ self.redis.eval(script, 1, self.lock_name, self.identifier) if hasattr(self, 'refresh_thread'): self.refresh_thread_stop = True def _start_refresh_thread(self): import threading self.refresh_thread_stop = False def refresh_lock():

Apr 17, 2025 - 16:31
 0
``` Python Power: 7 Battle-Tested Techniques for Building Resilient Distributed Systems ```

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Python has revolutionized the development of distributed systems, offering robust tools and patterns to create resilient architectures. I've spent years implementing these systems across various industries, and I'm excited to share practical techniques that have proven effective in real-world applications.

5 Python Techniques for Building Resilient Distributed Systems

Building distributed systems presents unique challenges. Network failures, service outages, and hardware issues can disrupt operations at any moment. To create truly resilient systems, we need strategies that anticipate failure and gracefully handle problematic scenarios.

Circuit Breakers

Circuit breakers protect systems from cascading failures by monitoring the health of dependent services. When a service starts failing, the circuit breaker temporarily stops sending requests, preventing system-wide degradation.

In my experience implementing payment processing systems, circuit breakers proved invaluable when third-party services experienced downtime. By quickly detecting problems and failing fast, we prevented customer-facing issues.

import time
from functools import wraps

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=30, 
                 exceptions=(Exception,)):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.exceptions = exceptions
        self.state = "CLOSED"
        self.failure_count = 0
        self.last_failure_time = None

    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if self.state == "OPEN":
                if time.time() - self.last_failure_time > self.recovery_timeout:
                    self.state = "HALF-OPEN"
                else:
                    raise RuntimeError(f"Service unavailable - circuit breaker open")

            try:
                result = func(*args, **kwargs)

                # Reset on successful HALF-OPEN call
                if self.state == "HALF-OPEN":
                    self.failure_count = 0
                    self.state = "CLOSED"

                return result

            except self.exceptions as e:
                self.failure_count += 1
                self.last_failure_time = time.time()

                if self.failure_count >= self.failure_threshold:
                    self.state = "OPEN"

                raise e

        return wrapper

# Usage example
@CircuitBreaker(failure_threshold=3, recovery_timeout=60)
def call_external_api(endpoint):
    # Make API call that might fail
    response = requests.get(endpoint)
    response.raise_for_status()
    return response.json()

This implementation includes three states: CLOSED (normal operation), OPEN (failing fast), and HALF-OPEN (testing if the service has recovered). The decorator can be applied to any function that might fail.

Distributed Locks

When multiple processes or servers need to access shared resources, distributed locks prevent race conditions and data corruption. They ensure that only one entity can execute critical sections of code at a time.

I once worked on a billing system where we needed to ensure that only one worker processed specific account updates. Using Redis-based distributed locks prevented duplicate charges and maintained data integrity.

import redis
import time
import uuid

class DistributedLock:
    def __init__(self, redis_client, lock_name, expire_seconds=10):
        self.redis = redis_client
        self.lock_name = lock_name
        self.expire_seconds = expire_seconds
        self.identifier = str(uuid.uuid4())

    def __enter__(self):
        self.acquired = self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.acquired:
            self.release()

    def acquire(self):
        # Use SET NX to acquire the lock atomically
        acquired = self.redis.set(
            self.lock_name, 
            self.identifier, 
            nx=True, 
            ex=self.expire_seconds
        )

        # Start lock refresh thread if acquired
        if acquired:
            self._start_refresh_thread()

        return acquired

    def release(self):
        # Release the lock only if we own it (using Lua script for atomicity)
        script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(script, 1, self.lock_name, self.identifier)

        if hasattr(self, 'refresh_thread'):
            self.refresh_thread_stop = True

    def _start_refresh_thread(self):
        import threading

        self.refresh_thread_stop = False

        def refresh_lock():
            while not self.refresh_thread_stop:
                # Refresh lock if we still own it
                script = """
                if redis.call('get', KEYS[1]) == ARGV[1] then
                    return redis.call('expire', KEYS[1], ARGV[2])
                else
                    return 0
                end
                """
                self.redis.eval(
                    script, 
                    1, 
                    self.lock_name, 
                    self.identifier, 
                    self.expire_seconds
                )
                time.sleep(self.expire_seconds / 3)

        self.refresh_thread = threading.Thread(target=refresh_lock)
        self.refresh_thread.daemon = True
        self.refresh_thread.start()

# Usage example
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def process_account(account_id):
    lock_name = f"account_lock:{account_id}"

    with DistributedLock(redis_client, lock_name, expire_seconds=30) as lock:
        if not lock.acquired:
            print(f"Account {account_id} is being processed by another worker")
            return

        # Critical section - only one process will execute this at a time
        print(f"Processing account {account_id}")
        time.sleep(10)  # Simulate work
        print(f"Finished processing account {account_id}")

This implementation uses Redis for lock management and includes automatic lock renewal through a background thread, preventing lock expiration while the operation is still running.

Reliable Message Queues with Dead-Letter Handling

Message queues enable asynchronous communication between distributed components. By implementing proper retry mechanisms and dead-letter queues, systems can recover from temporary failures and ensure message delivery.

While developing an order fulfillment system, I implemented a message queue architecture that gracefully handled downstream service failures. Orders that couldn't be processed immediately were retried automatically, and persistently failing orders were routed to a dead-letter queue for manual review.

import pika
import json
import time
import logging

class MessageProcessor:
    def __init__(self, amqp_url, queue_name, dead_letter_queue, 
                 max_retries=3, retry_delay=5):
        self.amqp_url = amqp_url
        self.queue_name = queue_name
        self.dead_letter_queue = dead_letter_queue
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.logger = logging.getLogger(__name__)

    def setup_queues(self):
        connection = pika.BlockingConnection(
            pika.URLParameters(self.amqp_url)
        )
        channel = connection.channel()

        # Declare the dead-letter queue
        channel.queue_declare(queue=self.dead_letter_queue, durable=True)

        # Declare the main queue with dead-letter exchange
        channel.queue_declare(
            queue=self.queue_name,
            durable=True,
            arguments={
                'x-dead-letter-exchange': '',
                'x-dead-letter-routing-key': self.dead_letter_queue
            }
        )

        connection.close()

    def publish_message(self, message):
        connection = pika.BlockingConnection(
            pika.URLParameters(self.amqp_url)
        )
        channel = connection.channel()

        channel.basic_publish(
            exchange='',
            routing_key=self.queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # make message persistent
                headers={'retry_count': 0}
            )
        )

        connection.close()

    def process_messages(self, callback):
        connection = pika.BlockingConnection(
            pika.URLParameters(self.amqp_url)
        )
        channel = connection.channel()

        def wrapped_callback(ch, method, properties, body):
            try:
                # Extract retry count
                headers = properties.headers or {}
                retry_count = headers.get('retry_count', 0)

                # Process the message
                message = json.loads(body)
                self.logger.info(f"Processing message: {message}")
                callback(message)

                # Acknowledge successful processing
                ch.basic_ack(delivery_tag=method.delivery_tag)

            except Exception as e:
                self.logger.error(f"Error processing message: {e}")

                # Increment retry count
                retry_count = retry_count + 1

                if retry_count <= self.max_retries:
                    # Reject message for requeue with updated retry count
                    time.sleep(self.retry_delay)

                    # Republish with incremented retry count
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                    channel.basic_publish(
                        exchange='',
                        routing_key=self.queue_name,
                        body=body,
                        properties=pika.BasicProperties(
                            delivery_mode=2,
                            headers={'retry_count': retry_count}
                        )
                    )
                else:
                    # Max retries reached, send to dead-letter queue
                    self.logger.warning(
                        f"Max retries reached, sending to dead-letter queue"
                    )
                    ch.basic_ack(delivery_tag=method.delivery_tag)

                    # Add failure reason to the message
                    message = json.loads(body)
                    message['_failure_reason'] = str(e)

                    channel.basic_publish(
                        exchange='',
                        routing_key=self.dead_letter_queue,
                        body=json.dumps(message),
                        properties=pika.BasicProperties(delivery_mode=2)
                    )

        # Consume messages one at a time
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=wrapped_callback
        )

        self.logger.info(f"Waiting for messages on {self.queue_name}")
        channel.start_consuming()

    def process_dead_letters(self, callback):
        connection = pika.BlockingConnection(
            pika.URLParameters(self.amqp_url)
        )
        channel = connection.channel()

        def wrapped_callback(ch, method, properties, body):
            try:
                message = json.loads(body)
                self.logger.info(f"Processing dead letter: {message}")
                callback(message)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                self.logger.error(f"Error processing dead letter: {e}")
                # Nack without requeue to keep in dead-letter queue
                ch.basic_nack(
                    delivery_tag=method.delivery_tag,
                    requeue=False
                )

        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(
            queue=self.dead_letter_queue,
            on_message_callback=wrapped_callback
        )

        self.logger.info(f"Waiting for messages on {self.dead_letter_queue}")
        channel.start_consuming()

This implementation uses RabbitMQ to provide reliable message delivery with retry logic and dead-letter queuing. It handles temporary failures gracefully while ensuring problematic messages don't block the entire system.

Consensus Algorithms

Consensus algorithms enable distributed systems to agree on shared state even when nodes fail or network partitions occur. Python implementations of algorithms like Raft provide a foundation for building highly available distributed systems.

I implemented a distributed configuration service using Raft that allowed our microservices to maintain consistent configuration even during infrastructure outages. This ensured all services operated with the same parameters despite network partitions.

import threading
import time
import random
import requests
from enum import Enum

class NodeState(Enum):
    FOLLOWER = 1
    CANDIDATE = 2
    LEADER = 3

class RaftNode:
    def __init__(self, node_id, cluster_nodes, election_timeout_range=(150, 300)):
        self.node_id = node_id
        self.cluster_nodes = cluster_nodes
        self.election_timeout_range = election_timeout_range

        # Persistent state
        self.current_term = 0
        self.voted_for = None
        self.log = []

        # Volatile state
        self.state = NodeState.FOLLOWER
        self.commit_index = 0
        self.last_applied = 0

        # Leader state
        self.next_index = {node: 1 for node in cluster_nodes if node != node_id}
        self.match_index = {node: 0 for node in cluster_nodes if node != node_id}

        # Initialize timers
        self.election_timer = None
        self.heartbeat_timer = None
        self.reset_election_timer()

        # Start the main thread
        self.running = True
        threading.Thread(target=self.apply_entries_thread).start()

    def reset_election_timer(self):
        if self.election_timer:
            self.election_timer.cancel()

        timeout = random.randint(
            self.election_timeout_range[0],
            self.election_timeout_range[1]
        )
        self.election_timer = threading.Timer(
            timeout / 1000.0, 
            self.start_election
        )
        self.election_timer.daemon = True
        self.election_timer.start()

    def start_heartbeat(self):
        if self.heartbeat_timer:
            self.heartbeat_timer.cancel()

        # Send heartbeats immediately
        self.send_append_entries()

        # Schedule next heartbeat
        self.heartbeat_timer = threading.Timer(
            50 / 1000.0,  # 50ms
            self.start_heartbeat
        )
        self.heartbeat_timer.daemon = True
        self.heartbeat_timer.start()

    def start_election(self):
        if not self.running:
            return

        self.state = NodeState.CANDIDATE
        self.current_term += 1
        self.voted_for = self.node_id
        votes_received = 1  # Vote for self

        print(f"Node {self.node_id} starting election for term {self.current_term}")

        # Reset election timer
        self.reset_election_timer()

        # Request votes from all other nodes
        for node in self.cluster_nodes:
            if node == self.node_id:
                continue

            try:
                last_log_index = len(self.log)
                last_log_term = self.log[-1]['term'] if self.log else 0

                response = requests.post(
                    f"http://{node}/request_vote",
                    json={
                        'term': self.current_term,
                        'candidate_id': self.node_id,
                        'last_log_index': last_log_index,
                        'last_log_term': last_log_term
                    },
                    timeout=0.1
                )

                result = response.json()

                # If response contains higher term, revert to follower
                if result['term'] > self.current_term:
                    self.current_term = result['term']
                    self.state = NodeState.FOLLOWER
                    self.voted_for = None
                    return

                if result['vote_granted']:
                    votes_received += 1

                    # Check if majority achieved
                    if votes_received > len(self.cluster_nodes) / 2:
                        self.become_leader()
                        return

            except Exception as e:
                print(f"Error requesting vote from {node}: {e}")

    def become_leader(self):
        if not self.running or self.state == NodeState.LEADER:
            return

        print(f"Node {self.node_id} becoming leader for term {self.current_term}")
        self.state = NodeState.LEADER

        # Initialize leader state
        self.next_index = {
            node: len(self.log) + 1 
            for node in self.cluster_nodes if node != self.node_id
        }
        self.match_index = {
            node: 0 
            for node in self.cluster_nodes if node != self.node_id
        }

        # Cancel election timer
        if self.election_timer:
            self.election_timer.cancel()

        # Start sending heartbeats
        self.start_heartbeat()

    def send_append_entries(self):
        if not self.running or self.state != NodeState.LEADER:
            return

        for node in self.cluster_nodes:
            if node == self.node_id:
                continue

            try:
                next_idx = self.next_index[node]
                prev_log_index = next_idx - 1
                prev_log_term = (
                    self.log[prev_log_index - 1]['term'] 
                    if prev_log_index > 0 and self.log 
                    else 0
                )

                # Get entries to send
                entries = self.log[prev_log_index:] if prev_log_index < len(self.log) else []

                response = requests.post(
                    f"http://{node}/append_entries",
                    json={
                        'term': self.current_term,
                        'leader_id': self.node_id,
                        'prev_log_index': prev_log_index,
                        'prev_log_term': prev_log_term,
                        'entries': entries,
                        'leader_commit': self.commit_index
                    },
                    timeout=0.1
                )

                result = response.json()

                # If response contains higher term, revert to follower
                if result['term'] > self.current_term:
                    self.current_term = result['term']
                    self.state = NodeState.FOLLOWER
                    self.voted_for = None

                    # Cancel heartbeat timer
                    if self.heartbeat_timer:
                        self.heartbeat_timer.cancel()

                    # Reset election timer
                    self.reset_election_timer()
                    return

                if result['success']:
                    # Update indices
                    if entries:
                        self.match_index[node] = prev_log_index + len(entries)
                        self.next_index[node] = self.match_index[node] + 1

                    # Update commit index if needed
                    self.update_commit_index()
                else:
                    # Decrement next_index and retry
                    self.next_index[node] = max(1, self.next_index[node] - 1)

            except Exception as e:
                print(f"Error sending append entries to {node}: {e}")

    def update_commit_index(self):
        if self.state != NodeState.LEADER:
            return

        # Find the highest index that has been replicated to a majority of nodes
        for n in range(self.commit_index + 1, len(self.log) + 1):
            # Count nodes that have this entry
            count = 1  # Leader has it

            for node in self.match_index:
                if self.match_index[node] >= n:
                    count += 1

            # Check if majority reached and entry is from current term
            if count > len(self.cluster_nodes) / 2 and (
                    n > 0 and self.log[n - 1]['term'] == self.current_term):
                self.commit_index = n

    def append_log(self, data):
        if self.state != NodeState.LEADER:
            return False, "Not the leader"

        # Append to local log
        entry = {
            'term': self.current_term,
            'data': data
        }
        self.log.append(entry)

        # Update leader's match index for itself
        self.match_index[self.node_id] = len(self.log)

        # Try to replicate immediately
        self.send_append_entries()

        return True, "Log entry added"

    def apply_entries_thread(self):
        while self.running:
            # Apply committed entries that haven't been applied yet
            while self.last_applied < self.commit_index:
                self.last_applied += 1
                entry = self.log[self.last_applied - 1]

                # Apply the command (in a real system, this would modify state)
                print(f"Node {self.node_id} applying: {entry['data']}")

            time.sleep(0.01)  # Small sleep to prevent CPU hogging

    def request_vote_handler(self, term, candidate_id, last_log_index, last_log_term):
        if term < self.current_term:
            return {
                'term': self.current_term,
                'vote_granted': False
            }

        if term > self.current_term:
            self.current_term = term
            self.state = NodeState.FOLLOWER
            self.voted_for = None

        # Check if log is at least as up-to-date as ours
        our_last_index = len(self.log)
        our_last_term = self.log[-1]['term'] if self.log else 0

        log_ok = (
            last_log_term > our_last_term or 
            (last_log_term == our_last_term and last_log_index >= our_last_index)
        )

        if (self.voted_for is None or self.voted_for == candidate_id) and log_ok:
            # Grant vote
            self.voted_for = candidate_id
            self.reset_election_timer()

            return {
                'term': self.current_term,
                'vote_granted': True
            }
        else:
            return {
                'term': self.current_term,
                'vote_granted': False
            }

    def append_entries_handler(self, term, leader_id, prev_log_index, 
                               prev_log_term, entries, leader_commit):
        # Reply false if term < currentTerm
        if term < self.current_term:
            return {
                'term': self.current_term,
                'success': False
            }

        # If term is greater, update current term and convert to follower
        if term > self.current_term:
            self.current_term = term
            self.state = NodeState.FOLLOWER
            self.voted_for = None

        # Reset election timer since we've heard from the leader
        self.reset_election_timer()

        # If candidate or leader, step down
        if self.state != NodeState.FOLLOWER:
            self.state = NodeState.FOLLOWER

            if self.heartbeat_timer:
                self.heartbeat_timer.cancel()

        # Check if our log has the prev_log_entry with matching term
        if prev_log_index > 0:
            if prev_log_index > len(self.log) or (
                    prev_log_index > 0 and 
                    self.log[prev_log_index - 1]['term'] != prev_log_term):
                return {
                    'term': self.current_term,
                    'success': False
                }

        # Process entries
        if entries:
            # Find conflicts: if an existing entry conflicts with a new one, 
            # delete it and all that follow
            for i, entry in enumerate(entries):
                log_index = prev_log_index + i + 1

                # If we have an entry at this index but terms don't match
                if log_index <= len(self.log) and self.log[log_index - 1]['term'] != entry['term']:
                    # Delete this and all subsequent entries
                    self.log = self.log[:log_index - 1]
                    break

            # Append any new entries not already in the log
            for i, entry in enumerate(entries):
                log_index = prev_log_index + i + 1

                if log_index > len(self.log):
                    self.log.append(entry)

        # Update commit index
        if leader_commit > self.commit_index:
            self.commit_index = min(leader_commit, len(self.log))

        return {
            'term': self.current_term,
            'success': True
        }

    def shutdown(self):
        self.running = False

        if self.election_timer:
            self.election_timer.cancel()

        if self.heartbeat_timer:
            self.heartbeat_timer.cancel()

This Raft implementation demonstrates core concepts like leader election, log replication, and term-based consensus. In a production system, you'd need to add persistence and integrate with your application's state machine.

Event Sourcing

Event sourcing captures all state changes as a sequence of immutable events. This pattern allows systems to rebuild state from event logs, enabling accurate recovery and providing a complete history of changes.

In a financial application I developed, event sourcing allowed us to track every transaction and maintain a complete audit trail. When a database corruption occurred, we were able to rebuild the system's state from stored events with minimal disruption.


python
import uuid
import json
import datetime
from collections import defaultdict

class EventStore:
    def __init__(self, persistence_adapter=None):
        self.persistence_adapter = persistence_adapter
        self.events = defaultdict(list)
        self.event_handlers = defaultdict(list)

    def save_event(self, aggregate_id, event_type, event_data):
        """Save an event to the store"""
        event = {
            'id': str(uuid.uuid4()),
            'timestamp': datetime.datetime.now().isoformat(),
            'aggregate_id': aggregate_id,
            'type': event_type,
            'data': event_data,
            'sequence': len(self.events[aggregate_id])
        }

        # Add to in-memory store
        self.events[aggregate_id].append(event)

        # Persist if adapter available
        if self.persistence_adapter:
            self.persistence_adapter.save_event(event)

        # Trigger event handlers
        for handler in self.event_handlers[event_type]:
            handler(event)

        return event

    def get_events(self, aggregate_id):
        """Get all events for an aggregate"""
        if self.persistence_adapter:
            # Load from persistence if adapter available
            return self.persistence_adapter.get_events(aggregate_id)

        return self.events[aggregate_id]

    def register_handler(self, event_type, handler):
        """Register a handler for an event type"""
        self.event_handlers[event_type].append(handler)

class Aggregate:
    def __init__(self, aggregate_id, event_store):
        self.id = aggregate_id
        self.event_store = event_store
        self.version = -1

        # Apply existing events to rebuild state
        self.replay_events()

    def replay_events(self):
        """Replay all events to rebuild aggregate state"""
        events = self.event_store.get_events(self.id)

        for event in events:
            self.apply_event(event, replay=True)
            self.version = event['sequence']

    def apply_event(self, event, replay=False):
        """Apply an event to the aggregate"""
        event_type = event['type']
        handler_method = f"apply_{event_type}"

        if hasattr(self, handler_method):
            getattr(self, handler_method)(event['data'], replay)

    def create_event(self, event_type, event_data):
        """Create and save a new event"""
        event = self.event_store.save_event(
            self.id, 
            event_type, 
            event_data
        )

        # Apply the event to update current state
        self.apply_event(event)
        self.version = event['sequence']

# Example: SQLite persistence adapter
class SQLiteEventStore:
    def __init__(self, db_path):
        import sqlite3
        self.conn = sqlite3.connect(db_path)
        self.create_tables()

    def create_tables(self):
        cursor = self.conn.cursor()
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS events (
            id TEXT PRIMARY KEY,
            timestamp TEXT,
            aggregate_id TEXT,
            type TEXT,
            data TEXT,
            sequence INTEGER,
            UNIQUE(aggregate_id, sequence)
        )
        ''')
        self.conn.commit()

    def save_event(self, event):
        cursor = self.conn.cursor()
        cursor.execute(
            '''
            INSERT INTO events (id, timestamp, aggregate_id, type, data, sequence)
            VALUES (?, ?, ?, ?, ?, ?)
            ''',
            (
                event['id'],
                event['timestamp'],
                event['aggregate_id'],
                event['type'],
                json.dumps(event['data']),
                event['sequence']
            )
        )
        self.conn.commit()

    def get_events(self, aggregate_id):
        cursor = self.conn.cursor()
        cursor.execute(
            '''
            SELECT id, timestamp, aggregate_id, type, data, sequence
            FROM events
            WHERE aggregate_id = ?
            ORDER BY sequence
            ''',
            (aggregate_id,)
        )

        events = []
        for row in cursor.fetchall():
            events.append({
                'id': row[0],
                'timestamp': row[1],
                'aggregate_id': row[2],
                'type': row[3],
                'data': json.loads(row[4]),
                'sequence': row[5]
            })

        return events

# Example: Bank Account Aggregate
class BankAccount(Aggregate):
    def __init__(self, account_id, event_store):
        self.balance = 0
        self.status = "new"
        super().__init__(account_id, event_store)

    def open_account(self, customer_id, initial_deposit):
        if self.status != "new":
            raise ValueError("Account already opened")

        if initial_deposit < 100:
            raise ValueError("Initial deposit must be at least $100")

        self.create_event("account_opened", {
            "customer_id": customer_id,
            "initial_deposit":

---
## 101 Books

**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.

Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon. 

Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!

## Our Creations

Be sure to check out our creations:

**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**

---

### We are on Medium

**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**