12 min read
Dillon Browne

Build Distributed Queues with Object Storage

Build lightweight distributed queues using S3 and object storage for serverless workloads. Skip complex message brokers with proven, battle-tested patterns.

serverless cloud infrastructure
Build Distributed Queues with Object Storage

In my work scaling serverless applications, I’ve often encountered a recurring pattern: teams reaching for heavyweight message brokers when their workloads don’t justify the operational complexity. SQS works great until you need cross-cloud portability. RabbitMQ is powerful but requires constant care and feeding. Kafka is overkill for most batch processing workflows.

What if I told you that object storage—S3, GCS, or any blob store—could function as a surprisingly effective distributed queue for specific use cases?

Understanding Object Storage Queue Patterns

The core insight is simple: object storage provides atomic writes and effectively unlimited horizontal scaling, with consistency semantics that vary by provider (S3 and GCS, for example, offer strong read-after-write for new objects and overwrites). For distributed queues that can tolerate slight delays and don’t require strict message ordering, this becomes a viable queuing mechanism, and the pattern relies on conditional requests/optimistic concurrency (for example, using ETags with If-Match) rather than any assumption of eventual consistency.

I’ve used this pattern for:

  • Batch job coordination across multiple serverless functions
  • Event processing where ordering isn’t critical
  • Dead letter queues with built-in durability
  • Cross-cloud task distribution without vendor lock-in

The key is understanding when this pattern fits and when it doesn’t.

Implement a Distributed Queue with Python

Here’s a practical implementation using Python and boto3 for object storage queues. The queue state lives in a single JSON file, updated atomically using conditional writes.

import json
import boto3
from datetime import datetime
from botocore.exceptions import ClientError

class ObjectStorageQueue:
    def __init__(self, bucket, key):
        self.s3 = boto3.client('s3')
        self.bucket = bucket
        self.key = key
        
    def _get_queue_state(self):
        """Fetch current queue state with version"""
        try:
            response = self.s3.get_object(Bucket=self.bucket, Key=self.key)
            etag = response['ETag'].strip('"')
            state = json.loads(response['Body'].read())
            return state, etag
        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchKey':
                # Queue doesn't exist yet
                return {'messages': [], 'processed': []}, None
            raise
    
    def enqueue(self, message):
        """Add message to queue with optimistic locking"""
        import uuid
        max_retries = 5
        for attempt in range(max_retries):
            state, etag = self._get_queue_state()
            
            # Add message with timestamp
            state['messages'].append({
                'id': f"{datetime.utcnow().isoformat()}-{uuid.uuid4()}",
                'payload': message,
                'enqueued_at': datetime.utcnow().isoformat()
            })
            
            try:
                # Conditional write: only succeed if ETag matches
                args = {
                    'Bucket': self.bucket,
                    'Key': self.key,
                    'Body': json.dumps(state),
                    'ContentType': 'application/json'
                }
                if etag:
                    args['IfMatch'] = etag
                else:
                    # First write: use If-None-Match to avoid race condition
                    args['IfNoneMatch'] = '*'
                    
                self.s3.put_object(**args)
                return True
            except ClientError as e:
                if e.response['Error']['Code'] == 'PreconditionFailed':
                    # State changed, retry
                    continue
                raise
        
        return False
    
    def dequeue(self):
        """Get next message and mark as processing"""
        max_retries = 5
        for attempt in range(max_retries):
            state, etag = self._get_queue_state()
            
            if not state['messages']:
                return None
            
            # Get first message
            message = state['messages'].pop(0)
            message['processing_started'] = datetime.utcnow().isoformat()
            
            # Move to processing (temporary)
            if 'processing' not in state:
                state['processing'] = []
            state['processing'].append(message)
            
            try:
                args = {
                    'Bucket': self.bucket,
                    'Key': self.key,
                    'Body': json.dumps(state),
                    'ContentType': 'application/json'
                }
                if etag:
                    args['IfMatch'] = etag
                    
                self.s3.put_object(**args)
                return message
            except ClientError as e:
                if e.response['Error']['Code'] == 'PreconditionFailed':
                    continue
                raise
        
        return None
    
    def ack(self, message_id):
        """Mark message as processed"""
        max_retries = 5
        for attempt in range(max_retries):
            state, etag = self._get_queue_state()
            
            # Remove from processing, add to processed
            state['processing'] = [m for m in state.get('processing', []) 
                                  if m['id'] != message_id]
            state['processed'].append({
                'id': message_id,
                'completed_at': datetime.utcnow().isoformat()
            })
            
            try:
                args = {
                    'Bucket': self.bucket,
                    'Key': self.key,
                    'Body': json.dumps(state),
                    'ContentType': 'application/json'
                }
                if etag:
                    args['IfMatch'] = etag
                    
                self.s3.put_object(**args)
                return True
            except ClientError as e:
                if e.response['Error']['Code'] == 'PreconditionFailed':
                    continue
                raise
        
        return False

Handle Race Conditions in Distributed Queues

The critical piece for distributed queues is the IfMatch conditional write. S3’s ETag serves as our optimistic lock—if two workers try to dequeue simultaneously, only one succeeds. The other retries with fresh state.

I learned this the hard way when my first implementation had concurrent workers corrupting queue state. The conditional write prevents this entirely.

Here’s how you’d use it in a Lambda function:

import os
from object_storage_queue import ObjectStorageQueue

def lambda_handler(event, context):
    queue = ObjectStorageQueue(
        bucket=os.environ['QUEUE_BUCKET'],
        key='jobs/pending.json'
    )
    
    # Process messages until queue is empty
    while True:
        message = queue.dequeue()
        if not message:
            break
            
        try:
            # Process the message
            process_job(message['payload'])
            
            # Acknowledge successful processing
            queue.ack(message['id'])
        except Exception as e:
            # Message stays in 'processing' state
            # Implement visibility timeout logic separately
            print(f"Failed to process {message['id']}: {e}")
            
    return {'processed': 'complete'}

def process_job(payload):
    """Your actual job processing logic"""
    print(f"Processing: {payload}")

When This Pattern Works

This approach shines in specific scenarios:

Batch Processing: When you’re coordinating hourly or daily jobs across multiple workers, the slight latency is irrelevant. I use this for ETL pipelines that process data dumps.

Cross-Cloud Coordination: Need to coordinate work between AWS Lambda and GCP Cloud Functions? Object storage works everywhere. No vendor lock-in.

Durability Over Speed: Every message is persisted to object storage immediately. You’ll never lose a message due to broker failure.

Cost Optimization: For infrequent workloads, you pay pennies for storage. No idle broker infrastructure.

When It Doesn’t Work

Be honest about the limitations:

High Throughput: If you’re processing thousands of messages per second, don’t use this. The conditional write retry logic becomes a bottleneck.

Strict Ordering: Object storage doesn’t guarantee ordering beyond “eventual consistency.” If order matters, use a real queue.

Low Latency: The round-trip to object storage adds 50-200ms. Not suitable for real-time systems.

Large Messages: Rewriting the entire queue state for every operation doesn’t scale beyond a few thousand messages.

Scale Object Storage Queues with Sharding

For larger workloads, partition your distributed queue:

# Instead of one queue file
queue/pending.json

# Use multiple shards
queue/shard-0/pending.json
queue/shard-1/pending.json
queue/shard-2/pending.json
queue/shard-3/pending.json

Workers can claim a shard using a similar optimistic locking pattern:

import boto3
from botocore.exceptions import ClientError

def claim_shard(bucket, shard_id, worker_id):
    """Attempt to claim a shard for exclusive processing"""
    s3 = boto3.client('s3')
    lock_key = f"queue/shard-{shard_id}/lock.json"
    
    try:
        s3.put_object(
            Bucket=bucket,
            Key=lock_key,
            Body=json.dumps({
                'worker': worker_id,
                'claimed_at': datetime.utcnow().isoformat()
            }),
            # Only succeed if lock doesn't exist
            IfNoneMatch='*'
        )
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == 'PreconditionFailed':
            return False
        raise

Monitor Object Storage Queue Performance

One advantage I’ve found: monitoring distributed queues in object storage is trivial. The queue state is just JSON—you can query it directly:

def get_queue_metrics(bucket, key):
    """Get queue depth and processing stats"""
    state, _ = ObjectStorageQueue(bucket, key)._get_queue_state()
    
    return {
        'pending': len(state['messages']),
        'processing': len(state.get('processing', [])),
        'processed': len(state.get('processed', [])),
        'oldest_message': state['messages'][0]['enqueued_at'] if state['messages'] else None
    }

Expose this via CloudWatch custom metrics or Prometheus, and you have full visibility without complex broker instrumentation.

Real-World Application

I deployed this pattern for a client processing regulatory compliance reports. They needed to coordinate batch jobs across AWS and Azure, running 3-4 times per day. Traditional message brokers would have cost $500+/month just to sit idle.

With object storage queues:

  • Monthly cost: ~$2 in S3 storage and API calls
  • Zero operational overhead
  • Built-in disaster recovery (S3 replication)
  • Cross-cloud compatibility

The latency didn’t matter—jobs ran on schedules measured in hours, not milliseconds.

Testing Strategy

Here’s a Go test showing race condition handling:

package queue

import (
    "sync"
    "testing"
)

func TestConcurrentDequeue(t *testing.T) {
    queue := NewObjectStorageQueue("test-bucket", "test-queue.json")
    
    // Enqueue 100 messages
    for i := 0; i < 100; i++ {
        queue.Enqueue(map[string]interface{}{
            "job": i,
        })
    }
    
    // Launch 10 concurrent workers
    var wg sync.WaitGroup
    processed := make(map[int]bool)
    var mu sync.Mutex
    
    for w := 0; w < 10; w++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            for {
                msg := queue.Dequeue()
                if msg == nil {
                    break
                }
                
                jobID := int(msg.Payload["job"].(float64))
                
                mu.Lock()
                if processed[jobID] {
                    t.Errorf("Worker %d got duplicate job %d", workerID, jobID)
                }
                processed[jobID] = true
                mu.Unlock()
                
                queue.Ack(msg.ID)
            }
        }(w)
    }
    
    wg.Wait()
    
    // Verify all messages processed exactly once
    if len(processed) != 100 {
        t.Errorf("Expected 100 processed messages, got %d", len(processed))
    }
}

Alternatives and Trade-offs

Before implementing this, consider:

SQS/Cloud Tasks: If you’re already in a single cloud, use the managed service. It’s battle-tested and costs pennies.

Redis Lists: For sub-second latency requirements, Redis queues are faster and still simple.

Full Message Brokers: If you need features like message routing, topics, or guaranteed ordering, invest in RabbitMQ or Kafka.

The object storage queue pattern sits in a sweet spot: simpler than full brokers, more portable than cloud-native queues, more durable than in-memory solutions.

Harden Production Object Storage Queues

A few lessons from production distributed queue deployments:

Implement Visibility Timeouts: Add logic to move stuck messages from ‘processing’ back to ‘messages’ after a timeout.

Archive Old Messages: Periodically move processed messages to a separate archive file to prevent unbounded growth.

Add Retry Limits: Track retry counts per message and move to a dead letter queue after max attempts.

Monitor ETag Conflicts: High conflict rates indicate you need to shard your queue.

Here’s a visibility timeout implementation:

from datetime import datetime, timedelta

def requeue_stale_messages(queue, timeout_seconds=300):
    """Move messages stuck in processing back to pending"""
    state, etag = queue._get_queue_state()
    
    now = datetime.utcnow()
    stale_messages = []
    active_messages = []
    
    for msg in state.get('processing', []):
        started = datetime.fromisoformat(msg['processing_started'])
        if (now - started).total_seconds() > timeout_seconds:
            stale_messages.append(msg)
        else:
            active_messages.append(msg)
    
    if stale_messages:
        # Move stale back to pending
        state['processing'] = active_messages
        state['messages'].extend(stale_messages)
        
        args = {
            'Bucket': queue.bucket,
            'Key': queue.key,
            'Body': json.dumps(state),
            'ContentType': 'application/json'
        }
        if etag:
            args['IfMatch'] = etag
        
        queue.s3.put_object(**args)
    
    return len(stale_messages)

Conclusion

Distributed queues built on object storage won’t replace traditional message brokers for high-throughput, low-latency workloads. But for batch processing, cross-cloud coordination, and cost-sensitive architectures, they’re a pragmatic solution.

I’ve deployed this object storage queue pattern in production systems processing millions of jobs per month with zero queue-related incidents. The key is matching the tool to the use case.

The next time you’re reaching for a message broker, ask yourself: do I really need sub-second latency and strict ordering? Or can I leverage object storage I’m already paying for?

Sometimes the simplest distributed queue solution is the one hiding in plain sight.

Found this helpful? Share it with others: