``` Python Power: 7 Battle-Tested Techniques for Building Resilient Distributed Systems ```
As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world! Python has revolutionized the development of distributed systems, offering robust tools and patterns to create resilient architectures. I've spent years implementing these systems across various industries, and I'm excited to share practical techniques that have proven effective in real-world applications. 5 Python Techniques for Building Resilient Distributed Systems Building distributed systems presents unique challenges. Network failures, service outages, and hardware issues can disrupt operations at any moment. To create truly resilient systems, we need strategies that anticipate failure and gracefully handle problematic scenarios. Circuit Breakers Circuit breakers protect systems from cascading failures by monitoring the health of dependent services. When a service starts failing, the circuit breaker temporarily stops sending requests, preventing system-wide degradation. In my experience implementing payment processing systems, circuit breakers proved invaluable when third-party services experienced downtime. By quickly detecting problems and failing fast, we prevented customer-facing issues. import time from functools import wraps class CircuitBreaker: def __init__(self, failure_threshold=5, recovery_timeout=30, exceptions=(Exception,)): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.exceptions = exceptions self.state = "CLOSED" self.failure_count = 0 self.last_failure_time = None def __call__(self, func): @wraps(func) def wrapper(*args, **kwargs): if self.state == "OPEN": if time.time() - self.last_failure_time > self.recovery_timeout: self.state = "HALF-OPEN" else: raise RuntimeError(f"Service unavailable - circuit breaker open") try: result = func(*args, **kwargs) # Reset on successful HALF-OPEN call if self.state == "HALF-OPEN": self.failure_count = 0 self.state = "CLOSED" return result except self.exceptions as e: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = "OPEN" raise e return wrapper # Usage example @CircuitBreaker(failure_threshold=3, recovery_timeout=60) def call_external_api(endpoint): # Make API call that might fail response = requests.get(endpoint) response.raise_for_status() return response.json() This implementation includes three states: CLOSED (normal operation), OPEN (failing fast), and HALF-OPEN (testing if the service has recovered). The decorator can be applied to any function that might fail. Distributed Locks When multiple processes or servers need to access shared resources, distributed locks prevent race conditions and data corruption. They ensure that only one entity can execute critical sections of code at a time. I once worked on a billing system where we needed to ensure that only one worker processed specific account updates. Using Redis-based distributed locks prevented duplicate charges and maintained data integrity. import redis import time import uuid class DistributedLock: def __init__(self, redis_client, lock_name, expire_seconds=10): self.redis = redis_client self.lock_name = lock_name self.expire_seconds = expire_seconds self.identifier = str(uuid.uuid4()) def __enter__(self): self.acquired = self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.acquired: self.release() def acquire(self): # Use SET NX to acquire the lock atomically acquired = self.redis.set( self.lock_name, self.identifier, nx=True, ex=self.expire_seconds ) # Start lock refresh thread if acquired if acquired: self._start_refresh_thread() return acquired def release(self): # Release the lock only if we own it (using Lua script for atomicity) script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ self.redis.eval(script, 1, self.lock_name, self.identifier) if hasattr(self, 'refresh_thread'): self.refresh_thread_stop = True def _start_refresh_thread(self): import threading self.refresh_thread_stop = False def refresh_lock():

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Python has revolutionized the development of distributed systems, offering robust tools and patterns to create resilient architectures. I've spent years implementing these systems across various industries, and I'm excited to share practical techniques that have proven effective in real-world applications.
5 Python Techniques for Building Resilient Distributed Systems
Building distributed systems presents unique challenges. Network failures, service outages, and hardware issues can disrupt operations at any moment. To create truly resilient systems, we need strategies that anticipate failure and gracefully handle problematic scenarios.
Circuit Breakers
Circuit breakers protect systems from cascading failures by monitoring the health of dependent services. When a service starts failing, the circuit breaker temporarily stops sending requests, preventing system-wide degradation.
In my experience implementing payment processing systems, circuit breakers proved invaluable when third-party services experienced downtime. By quickly detecting problems and failing fast, we prevented customer-facing issues.
import time
from functools import wraps
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=30,
exceptions=(Exception,)):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.exceptions = exceptions
self.state = "CLOSED"
self.failure_count = 0
self.last_failure_time = None
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF-OPEN"
else:
raise RuntimeError(f"Service unavailable - circuit breaker open")
try:
result = func(*args, **kwargs)
# Reset on successful HALF-OPEN call
if self.state == "HALF-OPEN":
self.failure_count = 0
self.state = "CLOSED"
return result
except self.exceptions as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise e
return wrapper
# Usage example
@CircuitBreaker(failure_threshold=3, recovery_timeout=60)
def call_external_api(endpoint):
# Make API call that might fail
response = requests.get(endpoint)
response.raise_for_status()
return response.json()
This implementation includes three states: CLOSED (normal operation), OPEN (failing fast), and HALF-OPEN (testing if the service has recovered). The decorator can be applied to any function that might fail.
Distributed Locks
When multiple processes or servers need to access shared resources, distributed locks prevent race conditions and data corruption. They ensure that only one entity can execute critical sections of code at a time.
I once worked on a billing system where we needed to ensure that only one worker processed specific account updates. Using Redis-based distributed locks prevented duplicate charges and maintained data integrity.
import redis
import time
import uuid
class DistributedLock:
def __init__(self, redis_client, lock_name, expire_seconds=10):
self.redis = redis_client
self.lock_name = lock_name
self.expire_seconds = expire_seconds
self.identifier = str(uuid.uuid4())
def __enter__(self):
self.acquired = self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.acquired:
self.release()
def acquire(self):
# Use SET NX to acquire the lock atomically
acquired = self.redis.set(
self.lock_name,
self.identifier,
nx=True,
ex=self.expire_seconds
)
# Start lock refresh thread if acquired
if acquired:
self._start_refresh_thread()
return acquired
def release(self):
# Release the lock only if we own it (using Lua script for atomicity)
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
self.redis.eval(script, 1, self.lock_name, self.identifier)
if hasattr(self, 'refresh_thread'):
self.refresh_thread_stop = True
def _start_refresh_thread(self):
import threading
self.refresh_thread_stop = False
def refresh_lock():
while not self.refresh_thread_stop:
# Refresh lock if we still own it
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
"""
self.redis.eval(
script,
1,
self.lock_name,
self.identifier,
self.expire_seconds
)
time.sleep(self.expire_seconds / 3)
self.refresh_thread = threading.Thread(target=refresh_lock)
self.refresh_thread.daemon = True
self.refresh_thread.start()
# Usage example
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def process_account(account_id):
lock_name = f"account_lock:{account_id}"
with DistributedLock(redis_client, lock_name, expire_seconds=30) as lock:
if not lock.acquired:
print(f"Account {account_id} is being processed by another worker")
return
# Critical section - only one process will execute this at a time
print(f"Processing account {account_id}")
time.sleep(10) # Simulate work
print(f"Finished processing account {account_id}")
This implementation uses Redis for lock management and includes automatic lock renewal through a background thread, preventing lock expiration while the operation is still running.
Reliable Message Queues with Dead-Letter Handling
Message queues enable asynchronous communication between distributed components. By implementing proper retry mechanisms and dead-letter queues, systems can recover from temporary failures and ensure message delivery.
While developing an order fulfillment system, I implemented a message queue architecture that gracefully handled downstream service failures. Orders that couldn't be processed immediately were retried automatically, and persistently failing orders were routed to a dead-letter queue for manual review.
import pika
import json
import time
import logging
class MessageProcessor:
def __init__(self, amqp_url, queue_name, dead_letter_queue,
max_retries=3, retry_delay=5):
self.amqp_url = amqp_url
self.queue_name = queue_name
self.dead_letter_queue = dead_letter_queue
self.max_retries = max_retries
self.retry_delay = retry_delay
self.logger = logging.getLogger(__name__)
def setup_queues(self):
connection = pika.BlockingConnection(
pika.URLParameters(self.amqp_url)
)
channel = connection.channel()
# Declare the dead-letter queue
channel.queue_declare(queue=self.dead_letter_queue, durable=True)
# Declare the main queue with dead-letter exchange
channel.queue_declare(
queue=self.queue_name,
durable=True,
arguments={
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': self.dead_letter_queue
}
)
connection.close()
def publish_message(self, message):
connection = pika.BlockingConnection(
pika.URLParameters(self.amqp_url)
)
channel = connection.channel()
channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
headers={'retry_count': 0}
)
)
connection.close()
def process_messages(self, callback):
connection = pika.BlockingConnection(
pika.URLParameters(self.amqp_url)
)
channel = connection.channel()
def wrapped_callback(ch, method, properties, body):
try:
# Extract retry count
headers = properties.headers or {}
retry_count = headers.get('retry_count', 0)
# Process the message
message = json.loads(body)
self.logger.info(f"Processing message: {message}")
callback(message)
# Acknowledge successful processing
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
self.logger.error(f"Error processing message: {e}")
# Increment retry count
retry_count = retry_count + 1
if retry_count <= self.max_retries:
# Reject message for requeue with updated retry count
time.sleep(self.retry_delay)
# Republish with incremented retry count
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
headers={'retry_count': retry_count}
)
)
else:
# Max retries reached, send to dead-letter queue
self.logger.warning(
f"Max retries reached, sending to dead-letter queue"
)
ch.basic_ack(delivery_tag=method.delivery_tag)
# Add failure reason to the message
message = json.loads(body)
message['_failure_reason'] = str(e)
channel.basic_publish(
exchange='',
routing_key=self.dead_letter_queue,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)
)
# Consume messages one at a time
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=self.queue_name,
on_message_callback=wrapped_callback
)
self.logger.info(f"Waiting for messages on {self.queue_name}")
channel.start_consuming()
def process_dead_letters(self, callback):
connection = pika.BlockingConnection(
pika.URLParameters(self.amqp_url)
)
channel = connection.channel()
def wrapped_callback(ch, method, properties, body):
try:
message = json.loads(body)
self.logger.info(f"Processing dead letter: {message}")
callback(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
self.logger.error(f"Error processing dead letter: {e}")
# Nack without requeue to keep in dead-letter queue
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=self.dead_letter_queue,
on_message_callback=wrapped_callback
)
self.logger.info(f"Waiting for messages on {self.dead_letter_queue}")
channel.start_consuming()
This implementation uses RabbitMQ to provide reliable message delivery with retry logic and dead-letter queuing. It handles temporary failures gracefully while ensuring problematic messages don't block the entire system.
Consensus Algorithms
Consensus algorithms enable distributed systems to agree on shared state even when nodes fail or network partitions occur. Python implementations of algorithms like Raft provide a foundation for building highly available distributed systems.
I implemented a distributed configuration service using Raft that allowed our microservices to maintain consistent configuration even during infrastructure outages. This ensured all services operated with the same parameters despite network partitions.
import threading
import time
import random
import requests
from enum import Enum
class NodeState(Enum):
FOLLOWER = 1
CANDIDATE = 2
LEADER = 3
class RaftNode:
def __init__(self, node_id, cluster_nodes, election_timeout_range=(150, 300)):
self.node_id = node_id
self.cluster_nodes = cluster_nodes
self.election_timeout_range = election_timeout_range
# Persistent state
self.current_term = 0
self.voted_for = None
self.log = []
# Volatile state
self.state = NodeState.FOLLOWER
self.commit_index = 0
self.last_applied = 0
# Leader state
self.next_index = {node: 1 for node in cluster_nodes if node != node_id}
self.match_index = {node: 0 for node in cluster_nodes if node != node_id}
# Initialize timers
self.election_timer = None
self.heartbeat_timer = None
self.reset_election_timer()
# Start the main thread
self.running = True
threading.Thread(target=self.apply_entries_thread).start()
def reset_election_timer(self):
if self.election_timer:
self.election_timer.cancel()
timeout = random.randint(
self.election_timeout_range[0],
self.election_timeout_range[1]
)
self.election_timer = threading.Timer(
timeout / 1000.0,
self.start_election
)
self.election_timer.daemon = True
self.election_timer.start()
def start_heartbeat(self):
if self.heartbeat_timer:
self.heartbeat_timer.cancel()
# Send heartbeats immediately
self.send_append_entries()
# Schedule next heartbeat
self.heartbeat_timer = threading.Timer(
50 / 1000.0, # 50ms
self.start_heartbeat
)
self.heartbeat_timer.daemon = True
self.heartbeat_timer.start()
def start_election(self):
if not self.running:
return
self.state = NodeState.CANDIDATE
self.current_term += 1
self.voted_for = self.node_id
votes_received = 1 # Vote for self
print(f"Node {self.node_id} starting election for term {self.current_term}")
# Reset election timer
self.reset_election_timer()
# Request votes from all other nodes
for node in self.cluster_nodes:
if node == self.node_id:
continue
try:
last_log_index = len(self.log)
last_log_term = self.log[-1]['term'] if self.log else 0
response = requests.post(
f"http://{node}/request_vote",
json={
'term': self.current_term,
'candidate_id': self.node_id,
'last_log_index': last_log_index,
'last_log_term': last_log_term
},
timeout=0.1
)
result = response.json()
# If response contains higher term, revert to follower
if result['term'] > self.current_term:
self.current_term = result['term']
self.state = NodeState.FOLLOWER
self.voted_for = None
return
if result['vote_granted']:
votes_received += 1
# Check if majority achieved
if votes_received > len(self.cluster_nodes) / 2:
self.become_leader()
return
except Exception as e:
print(f"Error requesting vote from {node}: {e}")
def become_leader(self):
if not self.running or self.state == NodeState.LEADER:
return
print(f"Node {self.node_id} becoming leader for term {self.current_term}")
self.state = NodeState.LEADER
# Initialize leader state
self.next_index = {
node: len(self.log) + 1
for node in self.cluster_nodes if node != self.node_id
}
self.match_index = {
node: 0
for node in self.cluster_nodes if node != self.node_id
}
# Cancel election timer
if self.election_timer:
self.election_timer.cancel()
# Start sending heartbeats
self.start_heartbeat()
def send_append_entries(self):
if not self.running or self.state != NodeState.LEADER:
return
for node in self.cluster_nodes:
if node == self.node_id:
continue
try:
next_idx = self.next_index[node]
prev_log_index = next_idx - 1
prev_log_term = (
self.log[prev_log_index - 1]['term']
if prev_log_index > 0 and self.log
else 0
)
# Get entries to send
entries = self.log[prev_log_index:] if prev_log_index < len(self.log) else []
response = requests.post(
f"http://{node}/append_entries",
json={
'term': self.current_term,
'leader_id': self.node_id,
'prev_log_index': prev_log_index,
'prev_log_term': prev_log_term,
'entries': entries,
'leader_commit': self.commit_index
},
timeout=0.1
)
result = response.json()
# If response contains higher term, revert to follower
if result['term'] > self.current_term:
self.current_term = result['term']
self.state = NodeState.FOLLOWER
self.voted_for = None
# Cancel heartbeat timer
if self.heartbeat_timer:
self.heartbeat_timer.cancel()
# Reset election timer
self.reset_election_timer()
return
if result['success']:
# Update indices
if entries:
self.match_index[node] = prev_log_index + len(entries)
self.next_index[node] = self.match_index[node] + 1
# Update commit index if needed
self.update_commit_index()
else:
# Decrement next_index and retry
self.next_index[node] = max(1, self.next_index[node] - 1)
except Exception as e:
print(f"Error sending append entries to {node}: {e}")
def update_commit_index(self):
if self.state != NodeState.LEADER:
return
# Find the highest index that has been replicated to a majority of nodes
for n in range(self.commit_index + 1, len(self.log) + 1):
# Count nodes that have this entry
count = 1 # Leader has it
for node in self.match_index:
if self.match_index[node] >= n:
count += 1
# Check if majority reached and entry is from current term
if count > len(self.cluster_nodes) / 2 and (
n > 0 and self.log[n - 1]['term'] == self.current_term):
self.commit_index = n
def append_log(self, data):
if self.state != NodeState.LEADER:
return False, "Not the leader"
# Append to local log
entry = {
'term': self.current_term,
'data': data
}
self.log.append(entry)
# Update leader's match index for itself
self.match_index[self.node_id] = len(self.log)
# Try to replicate immediately
self.send_append_entries()
return True, "Log entry added"
def apply_entries_thread(self):
while self.running:
# Apply committed entries that haven't been applied yet
while self.last_applied < self.commit_index:
self.last_applied += 1
entry = self.log[self.last_applied - 1]
# Apply the command (in a real system, this would modify state)
print(f"Node {self.node_id} applying: {entry['data']}")
time.sleep(0.01) # Small sleep to prevent CPU hogging
def request_vote_handler(self, term, candidate_id, last_log_index, last_log_term):
if term < self.current_term:
return {
'term': self.current_term,
'vote_granted': False
}
if term > self.current_term:
self.current_term = term
self.state = NodeState.FOLLOWER
self.voted_for = None
# Check if log is at least as up-to-date as ours
our_last_index = len(self.log)
our_last_term = self.log[-1]['term'] if self.log else 0
log_ok = (
last_log_term > our_last_term or
(last_log_term == our_last_term and last_log_index >= our_last_index)
)
if (self.voted_for is None or self.voted_for == candidate_id) and log_ok:
# Grant vote
self.voted_for = candidate_id
self.reset_election_timer()
return {
'term': self.current_term,
'vote_granted': True
}
else:
return {
'term': self.current_term,
'vote_granted': False
}
def append_entries_handler(self, term, leader_id, prev_log_index,
prev_log_term, entries, leader_commit):
# Reply false if term < currentTerm
if term < self.current_term:
return {
'term': self.current_term,
'success': False
}
# If term is greater, update current term and convert to follower
if term > self.current_term:
self.current_term = term
self.state = NodeState.FOLLOWER
self.voted_for = None
# Reset election timer since we've heard from the leader
self.reset_election_timer()
# If candidate or leader, step down
if self.state != NodeState.FOLLOWER:
self.state = NodeState.FOLLOWER
if self.heartbeat_timer:
self.heartbeat_timer.cancel()
# Check if our log has the prev_log_entry with matching term
if prev_log_index > 0:
if prev_log_index > len(self.log) or (
prev_log_index > 0 and
self.log[prev_log_index - 1]['term'] != prev_log_term):
return {
'term': self.current_term,
'success': False
}
# Process entries
if entries:
# Find conflicts: if an existing entry conflicts with a new one,
# delete it and all that follow
for i, entry in enumerate(entries):
log_index = prev_log_index + i + 1
# If we have an entry at this index but terms don't match
if log_index <= len(self.log) and self.log[log_index - 1]['term'] != entry['term']:
# Delete this and all subsequent entries
self.log = self.log[:log_index - 1]
break
# Append any new entries not already in the log
for i, entry in enumerate(entries):
log_index = prev_log_index + i + 1
if log_index > len(self.log):
self.log.append(entry)
# Update commit index
if leader_commit > self.commit_index:
self.commit_index = min(leader_commit, len(self.log))
return {
'term': self.current_term,
'success': True
}
def shutdown(self):
self.running = False
if self.election_timer:
self.election_timer.cancel()
if self.heartbeat_timer:
self.heartbeat_timer.cancel()
This Raft implementation demonstrates core concepts like leader election, log replication, and term-based consensus. In a production system, you'd need to add persistence and integrate with your application's state machine.
Event Sourcing
Event sourcing captures all state changes as a sequence of immutable events. This pattern allows systems to rebuild state from event logs, enabling accurate recovery and providing a complete history of changes.
In a financial application I developed, event sourcing allowed us to track every transaction and maintain a complete audit trail. When a database corruption occurred, we were able to rebuild the system's state from stored events with minimal disruption.
python
import uuid
import json
import datetime
from collections import defaultdict
class EventStore:
def __init__(self, persistence_adapter=None):
self.persistence_adapter = persistence_adapter
self.events = defaultdict(list)
self.event_handlers = defaultdict(list)
def save_event(self, aggregate_id, event_type, event_data):
"""Save an event to the store"""
event = {
'id': str(uuid.uuid4()),
'timestamp': datetime.datetime.now().isoformat(),
'aggregate_id': aggregate_id,
'type': event_type,
'data': event_data,
'sequence': len(self.events[aggregate_id])
}
# Add to in-memory store
self.events[aggregate_id].append(event)
# Persist if adapter available
if self.persistence_adapter:
self.persistence_adapter.save_event(event)
# Trigger event handlers
for handler in self.event_handlers[event_type]:
handler(event)
return event
def get_events(self, aggregate_id):
"""Get all events for an aggregate"""
if self.persistence_adapter:
# Load from persistence if adapter available
return self.persistence_adapter.get_events(aggregate_id)
return self.events[aggregate_id]
def register_handler(self, event_type, handler):
"""Register a handler for an event type"""
self.event_handlers[event_type].append(handler)
class Aggregate:
def __init__(self, aggregate_id, event_store):
self.id = aggregate_id
self.event_store = event_store
self.version = -1
# Apply existing events to rebuild state
self.replay_events()
def replay_events(self):
"""Replay all events to rebuild aggregate state"""
events = self.event_store.get_events(self.id)
for event in events:
self.apply_event(event, replay=True)
self.version = event['sequence']
def apply_event(self, event, replay=False):
"""Apply an event to the aggregate"""
event_type = event['type']
handler_method = f"apply_{event_type}"
if hasattr(self, handler_method):
getattr(self, handler_method)(event['data'], replay)
def create_event(self, event_type, event_data):
"""Create and save a new event"""
event = self.event_store.save_event(
self.id,
event_type,
event_data
)
# Apply the event to update current state
self.apply_event(event)
self.version = event['sequence']
# Example: SQLite persistence adapter
class SQLiteEventStore:
def __init__(self, db_path):
import sqlite3
self.conn = sqlite3.connect(db_path)
self.create_tables()
def create_tables(self):
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
timestamp TEXT,
aggregate_id TEXT,
type TEXT,
data TEXT,
sequence INTEGER,
UNIQUE(aggregate_id, sequence)
)
''')
self.conn.commit()
def save_event(self, event):
cursor = self.conn.cursor()
cursor.execute(
'''
INSERT INTO events (id, timestamp, aggregate_id, type, data, sequence)
VALUES (?, ?, ?, ?, ?, ?)
''',
(
event['id'],
event['timestamp'],
event['aggregate_id'],
event['type'],
json.dumps(event['data']),
event['sequence']
)
)
self.conn.commit()
def get_events(self, aggregate_id):
cursor = self.conn.cursor()
cursor.execute(
'''
SELECT id, timestamp, aggregate_id, type, data, sequence
FROM events
WHERE aggregate_id = ?
ORDER BY sequence
''',
(aggregate_id,)
)
events = []
for row in cursor.fetchall():
events.append({
'id': row[0],
'timestamp': row[1],
'aggregate_id': row[2],
'type': row[3],
'data': json.loads(row[4]),
'sequence': row[5]
})
return events
# Example: Bank Account Aggregate
class BankAccount(Aggregate):
def __init__(self, account_id, event_store):
self.balance = 0
self.status = "new"
super().__init__(account_id, event_store)
def open_account(self, customer_id, initial_deposit):
if self.status != "new":
raise ValueError("Account already opened")
if initial_deposit < 100:
raise ValueError("Initial deposit must be at least $100")
self.create_event("account_opened", {
"customer_id": customer_id,
"initial_deposit":
---
## 101 Books
**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.
Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!
## Our Creations
Be sure to check out our creations:
**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**
---
### We are on Medium
**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**