Build Distributed Queues with Object Storage
Build lightweight distributed queues using S3 and object storage for serverless workloads. Skip complex message brokers with proven, battle-tested patterns.
In my work scaling serverless applications, I’ve often encountered a recurring pattern: teams reaching for heavyweight message brokers when their workloads don’t justify the operational complexity. SQS works great until you need cross-cloud portability. RabbitMQ is powerful but requires constant care and feeding. Kafka is overkill for most batch processing workflows.
What if I told you that object storage—S3, GCS, or any blob store—could function as a surprisingly effective distributed queue for specific use cases?
Understanding Object Storage Queue Patterns
The core insight is simple: object storage provides atomic writes and effectively unlimited horizontal scaling, with consistency semantics that vary by provider (S3 and GCS, for example, offer strong read-after-write for new objects and overwrites). For distributed queues that can tolerate slight delays and don’t require strict message ordering, this becomes a viable queuing mechanism, and the pattern relies on conditional requests/optimistic concurrency (for example, using ETags with If-Match) rather than any assumption of eventual consistency.
I’ve used this pattern for:
- Batch job coordination across multiple serverless functions
- Event processing where ordering isn’t critical
- Dead letter queues with built-in durability
- Cross-cloud task distribution without vendor lock-in
The key is understanding when this pattern fits and when it doesn’t.
Implement a Distributed Queue with Python
Here’s a practical implementation using Python and boto3 for object storage queues. The queue state lives in a single JSON file, updated atomically using conditional writes.
import json
import boto3
from datetime import datetime
from botocore.exceptions import ClientError
class ObjectStorageQueue:
def __init__(self, bucket, key):
self.s3 = boto3.client('s3')
self.bucket = bucket
self.key = key
def _get_queue_state(self):
"""Fetch current queue state with version"""
try:
response = self.s3.get_object(Bucket=self.bucket, Key=self.key)
etag = response['ETag'].strip('"')
state = json.loads(response['Body'].read())
return state, etag
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
# Queue doesn't exist yet
return {'messages': [], 'processed': []}, None
raise
def enqueue(self, message):
"""Add message to queue with optimistic locking"""
import uuid
max_retries = 5
for attempt in range(max_retries):
state, etag = self._get_queue_state()
# Add message with timestamp
state['messages'].append({
'id': f"{datetime.utcnow().isoformat()}-{uuid.uuid4()}",
'payload': message,
'enqueued_at': datetime.utcnow().isoformat()
})
try:
# Conditional write: only succeed if ETag matches
args = {
'Bucket': self.bucket,
'Key': self.key,
'Body': json.dumps(state),
'ContentType': 'application/json'
}
if etag:
args['IfMatch'] = etag
else:
# First write: use If-None-Match to avoid race condition
args['IfNoneMatch'] = '*'
self.s3.put_object(**args)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'PreconditionFailed':
# State changed, retry
continue
raise
return False
def dequeue(self):
"""Get next message and mark as processing"""
max_retries = 5
for attempt in range(max_retries):
state, etag = self._get_queue_state()
if not state['messages']:
return None
# Get first message
message = state['messages'].pop(0)
message['processing_started'] = datetime.utcnow().isoformat()
# Move to processing (temporary)
if 'processing' not in state:
state['processing'] = []
state['processing'].append(message)
try:
args = {
'Bucket': self.bucket,
'Key': self.key,
'Body': json.dumps(state),
'ContentType': 'application/json'
}
if etag:
args['IfMatch'] = etag
self.s3.put_object(**args)
return message
except ClientError as e:
if e.response['Error']['Code'] == 'PreconditionFailed':
continue
raise
return None
def ack(self, message_id):
"""Mark message as processed"""
max_retries = 5
for attempt in range(max_retries):
state, etag = self._get_queue_state()
# Remove from processing, add to processed
state['processing'] = [m for m in state.get('processing', [])
if m['id'] != message_id]
state['processed'].append({
'id': message_id,
'completed_at': datetime.utcnow().isoformat()
})
try:
args = {
'Bucket': self.bucket,
'Key': self.key,
'Body': json.dumps(state),
'ContentType': 'application/json'
}
if etag:
args['IfMatch'] = etag
self.s3.put_object(**args)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'PreconditionFailed':
continue
raise
return False
Handle Race Conditions in Distributed Queues
The critical piece for distributed queues is the IfMatch conditional write. S3’s ETag serves as our optimistic lock—if two workers try to dequeue simultaneously, only one succeeds. The other retries with fresh state.
I learned this the hard way when my first implementation had concurrent workers corrupting queue state. The conditional write prevents this entirely.
Here’s how you’d use it in a Lambda function:
import os
from object_storage_queue import ObjectStorageQueue
def lambda_handler(event, context):
queue = ObjectStorageQueue(
bucket=os.environ['QUEUE_BUCKET'],
key='jobs/pending.json'
)
# Process messages until queue is empty
while True:
message = queue.dequeue()
if not message:
break
try:
# Process the message
process_job(message['payload'])
# Acknowledge successful processing
queue.ack(message['id'])
except Exception as e:
# Message stays in 'processing' state
# Implement visibility timeout logic separately
print(f"Failed to process {message['id']}: {e}")
return {'processed': 'complete'}
def process_job(payload):
"""Your actual job processing logic"""
print(f"Processing: {payload}")
When This Pattern Works
This approach shines in specific scenarios:
Batch Processing: When you’re coordinating hourly or daily jobs across multiple workers, the slight latency is irrelevant. I use this for ETL pipelines that process data dumps.
Cross-Cloud Coordination: Need to coordinate work between AWS Lambda and GCP Cloud Functions? Object storage works everywhere. No vendor lock-in.
Durability Over Speed: Every message is persisted to object storage immediately. You’ll never lose a message due to broker failure.
Cost Optimization: For infrequent workloads, you pay pennies for storage. No idle broker infrastructure.
When It Doesn’t Work
Be honest about the limitations:
High Throughput: If you’re processing thousands of messages per second, don’t use this. The conditional write retry logic becomes a bottleneck.
Strict Ordering: Object storage doesn’t guarantee ordering beyond “eventual consistency.” If order matters, use a real queue.
Low Latency: The round-trip to object storage adds 50-200ms. Not suitable for real-time systems.
Large Messages: Rewriting the entire queue state for every operation doesn’t scale beyond a few thousand messages.
Scale Object Storage Queues with Sharding
For larger workloads, partition your distributed queue:
# Instead of one queue file
queue/pending.json
# Use multiple shards
queue/shard-0/pending.json
queue/shard-1/pending.json
queue/shard-2/pending.json
queue/shard-3/pending.json
Workers can claim a shard using a similar optimistic locking pattern:
import boto3
from botocore.exceptions import ClientError
def claim_shard(bucket, shard_id, worker_id):
"""Attempt to claim a shard for exclusive processing"""
s3 = boto3.client('s3')
lock_key = f"queue/shard-{shard_id}/lock.json"
try:
s3.put_object(
Bucket=bucket,
Key=lock_key,
Body=json.dumps({
'worker': worker_id,
'claimed_at': datetime.utcnow().isoformat()
}),
# Only succeed if lock doesn't exist
IfNoneMatch='*'
)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'PreconditionFailed':
return False
raise
Monitor Object Storage Queue Performance
One advantage I’ve found: monitoring distributed queues in object storage is trivial. The queue state is just JSON—you can query it directly:
def get_queue_metrics(bucket, key):
"""Get queue depth and processing stats"""
state, _ = ObjectStorageQueue(bucket, key)._get_queue_state()
return {
'pending': len(state['messages']),
'processing': len(state.get('processing', [])),
'processed': len(state.get('processed', [])),
'oldest_message': state['messages'][0]['enqueued_at'] if state['messages'] else None
}
Expose this via CloudWatch custom metrics or Prometheus, and you have full visibility without complex broker instrumentation.
Real-World Application
I deployed this pattern for a client processing regulatory compliance reports. They needed to coordinate batch jobs across AWS and Azure, running 3-4 times per day. Traditional message brokers would have cost $500+/month just to sit idle.
With object storage queues:
- Monthly cost: ~$2 in S3 storage and API calls
- Zero operational overhead
- Built-in disaster recovery (S3 replication)
- Cross-cloud compatibility
The latency didn’t matter—jobs ran on schedules measured in hours, not milliseconds.
Testing Strategy
Here’s a Go test showing race condition handling:
package queue
import (
"sync"
"testing"
)
func TestConcurrentDequeue(t *testing.T) {
queue := NewObjectStorageQueue("test-bucket", "test-queue.json")
// Enqueue 100 messages
for i := 0; i < 100; i++ {
queue.Enqueue(map[string]interface{}{
"job": i,
})
}
// Launch 10 concurrent workers
var wg sync.WaitGroup
processed := make(map[int]bool)
var mu sync.Mutex
for w := 0; w < 10; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
msg := queue.Dequeue()
if msg == nil {
break
}
jobID := int(msg.Payload["job"].(float64))
mu.Lock()
if processed[jobID] {
t.Errorf("Worker %d got duplicate job %d", workerID, jobID)
}
processed[jobID] = true
mu.Unlock()
queue.Ack(msg.ID)
}
}(w)
}
wg.Wait()
// Verify all messages processed exactly once
if len(processed) != 100 {
t.Errorf("Expected 100 processed messages, got %d", len(processed))
}
}
Alternatives and Trade-offs
Before implementing this, consider:
SQS/Cloud Tasks: If you’re already in a single cloud, use the managed service. It’s battle-tested and costs pennies.
Redis Lists: For sub-second latency requirements, Redis queues are faster and still simple.
Full Message Brokers: If you need features like message routing, topics, or guaranteed ordering, invest in RabbitMQ or Kafka.
The object storage queue pattern sits in a sweet spot: simpler than full brokers, more portable than cloud-native queues, more durable than in-memory solutions.
Harden Production Object Storage Queues
A few lessons from production distributed queue deployments:
Implement Visibility Timeouts: Add logic to move stuck messages from ‘processing’ back to ‘messages’ after a timeout.
Archive Old Messages: Periodically move processed messages to a separate archive file to prevent unbounded growth.
Add Retry Limits: Track retry counts per message and move to a dead letter queue after max attempts.
Monitor ETag Conflicts: High conflict rates indicate you need to shard your queue.
Here’s a visibility timeout implementation:
from datetime import datetime, timedelta
def requeue_stale_messages(queue, timeout_seconds=300):
"""Move messages stuck in processing back to pending"""
state, etag = queue._get_queue_state()
now = datetime.utcnow()
stale_messages = []
active_messages = []
for msg in state.get('processing', []):
started = datetime.fromisoformat(msg['processing_started'])
if (now - started).total_seconds() > timeout_seconds:
stale_messages.append(msg)
else:
active_messages.append(msg)
if stale_messages:
# Move stale back to pending
state['processing'] = active_messages
state['messages'].extend(stale_messages)
args = {
'Bucket': queue.bucket,
'Key': queue.key,
'Body': json.dumps(state),
'ContentType': 'application/json'
}
if etag:
args['IfMatch'] = etag
queue.s3.put_object(**args)
return len(stale_messages)
Conclusion
Distributed queues built on object storage won’t replace traditional message brokers for high-throughput, low-latency workloads. But for batch processing, cross-cloud coordination, and cost-sensitive architectures, they’re a pragmatic solution.
I’ve deployed this object storage queue pattern in production systems processing millions of jobs per month with zero queue-related incidents. The key is matching the tool to the use case.
The next time you’re reaching for a message broker, ask yourself: do I really need sub-second latency and strict ordering? Or can I leverage object storage I’m already paying for?
Sometimes the simplest distributed queue solution is the one hiding in plain sight.