Build a Shared-Nothing Distributed Queue with SQLite and Python

Need a lightweight job queue that runs across multiple machines without Redis, RabbitMQ, or cloud services? In this post, we’ll build a surprisingly capable distributed queue using SQLite, Python, and a little file-locking magic. This pattern works best when you need a shared queue across local or networked disks — think cron clusters, render farms, or batch processors — without spinning up infrastructure. Why Use SQLite as a Queue? No external services to maintain Great for jobs with infrequent updates Works well on shared network storage (NFS, SMB) Step 1: Create the Queue Table We’ll use a simple table with a claimed flag and a timestamp: import sqlite3 def init_db(): conn = sqlite3.connect("queue.db") conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY, task TEXT, claimed_by TEXT, claimed_at DATETIME ) """) conn.commit() conn.close() Step 2: Enqueue Jobs Add a job by inserting a row: def enqueue(task): conn = sqlite3.connect("queue.db") conn.execute("INSERT INTO jobs (task) VALUES (?)", (task,)) conn.commit() conn.close() Step 3: Claim a Job with Locking To safely claim jobs across machines, we’ll use a conditional update: import datetime, socket def claim_job(): conn = sqlite3.connect("queue.db", isolation_level="IMMEDIATE") conn.row_factory = sqlite3.Row hostname = socket.gethostname() now = datetime.datetime.utcnow().isoformat() cur = conn.execute(""" UPDATE jobs SET claimed_by = ?, claimed_at = ? WHERE id = ( SELECT id FROM jobs WHERE claimed_by IS NULL LIMIT 1 ) RETURNING * """, (hostname, now)) job = cur.fetchone() conn.commit() conn.close() return job Step 4: Process and Delete the Job def process(job): print(f"Processing: {job['task']}") conn = sqlite3.connect("queue.db") conn.execute("DELETE FROM jobs WHERE id = ?", (job["id"],)) conn.commit() conn.close() Step 5: Worker Loop This can run in a cron or systemd timer on multiple machines: if name == "main": init_db() job = claim_job() if job: process(job) Pros and Cons ✅ Pros Zero external dependencies Easy to inspect/debug Works across NFS or shared volumes ⚠️ Cons Not suitable for high-throughput workloads Concurrent writes are serialized SQLite file locking can behave differently per OS

Apr 23, 2025 - 02:57
 0
Build a Shared-Nothing Distributed Queue with SQLite and Python

Need a lightweight job queue that runs across multiple machines without Redis, RabbitMQ, or cloud services? In this post, we’ll build a surprisingly capable distributed queue using SQLite, Python, and a little file-locking magic. This pattern works best when you need a shared queue across local or networked disks — think cron clusters, render farms, or batch processors — without spinning up infrastructure.

Why Use SQLite as a Queue?


  • No external services to maintain
  • Great for jobs with infrequent updates
  • Works well on shared network storage (NFS, SMB)

Step 1: Create the Queue Table


We’ll use a simple table with a claimed flag and a timestamp:

import sqlite3

def init_db():
conn = sqlite3.connect("queue.db")
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY,
task TEXT,
claimed_by TEXT,
claimed_at DATETIME
)
""")
conn.commit()
conn.close()

Step 2: Enqueue Jobs


Add a job by inserting a row:

def enqueue(task):
conn = sqlite3.connect("queue.db")
conn.execute("INSERT INTO jobs (task) VALUES (?)", (task,))
conn.commit()
conn.close()

Step 3: Claim a Job with Locking


To safely claim jobs across machines, we’ll use a conditional update:

import datetime, socket

def claim_job():
conn = sqlite3.connect("queue.db", isolation_level="IMMEDIATE")
conn.row_factory = sqlite3.Row
hostname = socket.gethostname()
now = datetime.datetime.utcnow().isoformat()

cur = conn.execute("""
UPDATE jobs
SET claimed_by = ?, claimed_at = ?
WHERE id = (
SELECT id FROM jobs WHERE claimed_by IS NULL LIMIT 1
)
RETURNING *
""", (hostname, now))

job = cur.fetchone()
conn.commit()
conn.close()
return job

Step 4: Process and Delete the Job


def process(job):
print(f"Processing: {job['task']}")

conn = sqlite3.connect("queue.db")
conn.execute("DELETE FROM jobs WHERE id = ?", (job["id"],))
conn.commit()
conn.close()

Step 5: Worker Loop


This can run in a cron or systemd timer on multiple machines:

if name == "main":
init_db()
job = claim_job()
if job:
process(job)

Pros and Cons


✅ Pros


  • Zero external dependencies
  • Easy to inspect/debug
  • Works across NFS or shared volumes

⚠️ Cons


  • Not suitable for high-throughput workloads
  • Concurrent writes are serialized
  • SQLite file locking can behave differently per OS