Skip to content

Idempotency

Idempotency: Preventing Duplicate Execution

Section titled “Idempotency: Preventing Duplicate Execution”

Idempotency means an operation can be safely repeated without unintended side effects. OnceOnly guarantees this through distributed locking.

Timeline of a Payment with No Idempotency Protection:
10:30:00 │ Agent: "Charge $100"
│ ↓
├────────────────────────────────────┐
│ Stripe API processes charge │
│ Returns: Charge ID ch_123 │
│ But network latency occurs... │
│ │
10:30:05 │ Agent: (timeout, no response) │
│ Network still processing │
│ │
10:30:10 │ Agent: Retry! "Charge $100 again" │
│ ↓ │
│ Stripe: OK! New charge: ch_124 │
│ │
RESULT: │ Customer charged $200 instead of │
│ $100! 💥 │

This happens because:

  1. Network timeouts — Agent doesn’t receive response
  2. Server errors — Stripe charges, but returns 500 error
  3. Retry logic — Agent automatically retries, not knowing it already succeeded

The Solution: OnceOnly’s Idempotency Lock

Section titled “The Solution: OnceOnly’s Idempotency Lock”

OnceOnly uses a unique key per action + Redis lock to ensure it only happens once.

Timeline of a Payment WITH OnceOnly:
10:30:00 │ Agent: POST /check-lock
│ key="payment_123"
│ ↓
│ OnceOnly: "locked" ✓ (first time)
│ Agent: Charge $100
│ → Stripe: OK ch_123
10:30:05 │ (Network timeout, agent confused)
10:30:10 │ Agent: Retry POST /check-lock
│ key="payment_123"
│ ↓
│ OnceOnly: "duplicate" ⚠️
│ Agent: Returns previous result
│ Does NOT charge again!
RESULT: │ Customer charged $100 (once) ✓
Request 1:
POST /v1/check-lock {"key": "payment_123", "ttl": 3600}
Check Redis: Does lock exist? NO
Create lock in Redis with 1 hour expiry
Return: {"success": true, "status": "locked", "first_seen_at": null}
Agent executes payment
Request 2 (within 1 hour):
POST /v1/check-lock {"key": "payment_123", "ttl": 3600}
Check Redis: Does lock exist? YES
Return: {"success": false, "status": "duplicate", "first_seen_at": "10:30:00Z"}
Agent: Do NOT execute payment
Return cached result instead

Your key should be unique, stable, and deterministic.

# Invoice payment — unique per invoice
key = f"payment_invoice_{invoice_id}" # payment_invoice_INV-123
# Email sending — unique per user and email type
key = f"email_{user_id}_welcome" # email_user_456_welcome
# Database record creation — unique per intended record
key = f"create_user_{email}" # create_user_customer@example.com
# API call — unique per resource and operation
key = f"update_profile_{user_id}_{timestamp}" # Includes timestamp if needed
# Too generic — will collide with other operations
key = "payment" # ❌ Multiple payments will share lock!
# Non-deterministic — different each time
key = f"payment_{uuid.uuid4()}" # ❌ New UUID each retry!
# External ID — may change unexpectedly
key = f"payment_{api_response.id}" # ❌ ID might change!
# Time-based — defeats idempotency
key = f"payment_{time.time()}" # ❌ Different each second!

The ttl parameter determines how long OnceOnly remembers this action.

# Quick operations: database writes, API calls
ttl = 60 # Remember for 1 minute (example; default depends on plan/server config)
# Operations with longer network retry window
ttl = 300 # 5 minutes

Reason: Retries typically happen within seconds. After 1-5 minutes, it’s probably a different user action, not a retry.

# If your retry logic has exponential backoff
# (1s, 2s, 4s, 8s, 16s...)
ttl = 60 # 1 minute covers most retry attempts
# ❌ DON'T use check-lock for long-running tasks!
# These take minutes or hours. Use AI Lease instead.
# ✅ Use AI Lease
POST /v1/ai/lease {"key": "support_chat_1", "ttl": 1800}

OnceOnly only tells you “new” or “duplicate”. You must cache the result to return on duplicates.

import redis
import json
redis_client = redis.Redis(host='localhost', port=6379)
def process_with_cache(action_key: str, processor_fn, ttl: int = 3600) -> dict:
"""
Execute action with idempotency and result caching.
processor_fn: Callable that does the actual work
Returns the result (cached on duplicate)
"""
# Step 1: Check OnceOnly for idempotency
result = requests.post(
"https://api.onceonly.tech/v1/check-lock",
headers={"Authorization": f"Bearer {api_key}"},
json={"key": action_key, "ttl": ttl}
).json()
# Step 2: If duplicate, return cached result
if result["status"] == "duplicate":
cached = redis_client.get(f"result:{action_key}")
if cached:
return json.loads(cached)
else:
# Cache expired (shouldn't happen if TTLs match)
raise CacheExpired(f"Cache miss for {action_key}")
# Step 3: If new, execute the processor
output = processor_fn()
# Step 4: Cache the result for future duplicates
redis_client.setex(
f"result:{action_key}",
ttl,
json.dumps(output)
)
return output
# Usage
def charge_customer(invoice_id: str, amount: float) -> dict:
key = f"payment_inv_{invoice_id}"
return process_with_cache(
action_key=key,
processor_fn=lambda: stripe.Charge.create(amount=int(amount*100)),
ttl=3600
)
from functools import lru_cache
from datetime import datetime, timedelta
cache = {}
def process_with_memory_cache(action_key: str, processor_fn, ttl: int = 3600) -> dict:
"""Cache results in memory (suitable for single-process apps)"""
# Check OnceOnly
result = requests.post(...).json()
if result["status"] == "duplicate":
if action_key in cache:
cached_result, expiry = cache[action_key]
if datetime.now() < expiry:
return cached_result
raise CacheExpired(f"Cache miss for {action_key}")
# New action
output = processor_fn()
# Cache result
cache[action_key] = (output, datetime.now() + timedelta(seconds=ttl))
return output

When you get "duplicate", you have several options:

Section titled “Option 1: Return Cached Result (Recommended)”
def charge_payment(invoice_id):
lock = check_lock(f"payment_inv_{invoice_id}")
if lock["status"] == "locked":
# New action
charge = stripe.create_charge()
cache[invoice_id] = charge
return {"status": "charged", "id": charge.id}
else:
# Duplicate — return cached result
cached = cache.get(invoice_id)
if cached:
return {"status": "cached", "id": cached.id}
else:
return {"error": "cache_miss"}
def fetch_user_data(user_id):
lock = check_lock(f"fetch_user_{user_id}")
if lock["status"] == "locked":
data = db.query(user_id)
cache[user_id] = data
return data
else:
# For read-only ops, logging dupes is fine
logger.info(f"Duplicate fetch attempt for user {user_id}")
return cache.get(user_id)
def transfer_money(account_a, account_b, amount):
lock = check_lock(f"transfer_{account_a}_{account_b}_{amount}")
if lock["status"] == "locked":
return perform_transfer(account_a, account_b, amount)
else:
# For financial ops, fail safely if cache is lost
return {
"error": "duplicate_detected",
"message": "This transfer was already attempted",
"first_seen_at": lock["first_seen_at"]
}

  1. Use stable identifiers — Base keys on resource IDs, not random tokens
  2. Match cache TTL to lock TTL — Keep them in sync
  3. Store full results — Cache the complete response
  4. Log duplicates — For debugging retry patterns
  5. Use descriptive keys — Makes logs easier to read
# Good practice example
key = f"email_user_{user_id}_reset_password"
ttl = 3600 # 1 hour
result = check_lock(key, ttl)
if result["status"] == "locked":
output = send_reset_email(user_id)
cache.set(f"result:{key}", output, ttl)
else:
output = cache.get(f"result:{key}")
logger.info(f"Duplicate email send: {key}")
  1. Don’t retry with new keys — Defeats idempotency
  2. Don’t use timestamps in keys — TTL should be enough
  3. Don’t forget to cache results — Then duplicates have nothing to return
  4. Don’t use for long operations — Use AI Lease instead
  5. Don’t ignore duplicate responses — They’re important signals

FeatureCheck-LockAI Lease
Best for< 1 min operations1-24 hour operations
Max TTL3600 seconds86400 seconds (24h)
Can extendNoYes (via /extend)
OwnershipAny agent can queryOnly owner can complete
Use casePayments, emails, DB writesSupport chats, document processing

For long operations, use AI Leases instead.


# ❌ WRONG
def pay_invoice(invoice_id):
key = f"payment_{uuid.uuid4()}" # New UUID!
check_lock(key) # Lock is different each time
charge_stripe()
# ❌ WRONG
def send_email(user_id):
key = f"email_{user_id}"
check_lock(key, ttl=5) # Too short!
# If user's email client retries after 6 seconds, duplicate!
send_email_api()
# ❌ WRONG
def create_record(name):
key = f"create_{name}"
result = check_lock(key)
if result["status"] == "duplicate":
return "already created" # But which one? Lost!
return create_db_record(name)
# ❌ WRONG
def process_file(file_id):
key = f"file_{file_id}"
check_lock(key, ttl=3600) # File might take 2 hours!
process_heavy_file()
# If another agent retries after 1 hour, it will also start processing!
# ✅ RIGHT
def create_record(name):
key = f"create_record_{name}"
result = check_lock(key, ttl=300) # 5 min window
if result["status"] == "locked":
# New action
record = db.create(name)
cache.set(f"result:{key}", record, ttl=300)
return record
else:
# Duplicate
cached = cache.get(f"result:{key}")
if cached:
return cached # Return the original record
raise CacheExpiredError()

Track idempotency patterns in your logs:

import logging
logger = logging.getLogger(__name__)
def tracked_check_lock(key, ttl=3600):
result = check_lock(key, ttl)
if result["status"] == "locked":
logger.info(f"idempotency_new key={key}")
else:
logger.warning(f"idempotency_duplicate key={key} first_seen={result['first_seen_at']}")
return result
# High duplicate rate might indicate:
# - Overly aggressive retry logic
# - Network instability
# - Clock skew (same timestamp used)

  • Idempotency prevents duplicate execution via distributed locks
  • Key selection is critical — be descriptive and stable
  • TTL depends on your retry window (usually 60-300 seconds)
  • Caching results is your responsibility
  • For long tasks use AI Leases instead