Skip to content

AI Leases

AI Leases are the OnceOnly primitive for long-running asynchronous operations that can take anywhere from minutes to 24 hours. They provide:

  • Exclusive ownership — Only one agent can own a lease
  • Polling-safe — Other agents can check status without interfering
  • Automatic cleanup — Results are stored for 7 days
  • TTL extension — Extend the deadline if needed
  • Failure handling — Track errors and stack traces
OperationDurationUseExample
Payment< 1 seccheck-lockStripe charge
Email send1-5 seccheck-lockSendGrid email
Support chat5-30 minai/leaseCustomer support
Document process1-2 hoursai/leasePDF parsing
Bulk operation24 hoursai/leaseData migration

Rule of thumb: If the operation takes > 60 seconds, use AI Leases.


Basic Flow: Acquire → Execute → Complete

Section titled “Basic Flow: Acquire → Execute → Complete”
Agent 1 OnceOnly Agent 2
│ │ │
├─ POST /ai/lease ──→│ │
│ key=chat_1 │ │
│ ttl=1800 │ │
│←─ "acquired" ─────┤ │
│ lease_id=L123 │ │
│ │ │
│ [Doing work for 20 min...] │
│ │ │
│ │←─ GET /ai/status ─┤
│ │ key=chat_1 │
│ ├─ "in_progress" ──→│
│ │ ttl_left=1000 │
│ │ │
├─ POST /ai/complete ──→│ │
│ lease_id=L123 │ │
│ result={...} │ │
│←─ "completed" ────┤ │
│ │ │
│ │←─ GET /ai/result ─┤
│ ├─ result data ────→│

Step-by-step Example: Customer Support Chat

Section titled “Step-by-step Example: Customer Support Chat”
import requests
import json
ONCEONLY_API = "https://api.onceonly.tech"
API_KEY = "once_live_xxxxxxxxxxxxx"
def start_support_chat(customer_id: str, chat_id: str):
"""Start a long-running customer support chat"""
# Acquire lease for 30 minutes
response = requests.post(
f"{ONCEONLY_API}/v1/ai/lease",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"key": f"support_chat_{chat_id}",
"ttl": 1800, # 30 minutes
"metadata": {
"customer_id": customer_id,
"chat_id": chat_id,
"priority": "high"
}
}
)
lease = response.json()
if lease["status"] == "acquired":
# ✓ This agent owns the lease
print(f"✓ Lease acquired: {lease['lease_id']}")
return lease
elif lease["status"] == "polling":
# ⏳ Another agent is already handling this
print(f"⏳ Chat already in progress. Lease: {lease['lease_id']}")
print(f" Wait {lease['ttl_left']} more seconds...")
return lease

Response (Acquired):

{
"ok": true,
"status": "acquired",
"key": "support_chat_abc123",
"lease_id": "lease_lkj123xyz",
"ttl": 1800,
"first_seen_at": "2025-01-15T10:30:00Z",
"version": 0
}

Response (Polling — Another Agent Owns It):

{
"ok": true,
"status": "polling",
"key": "support_chat_abc123",
"lease_id": "lease_lkj123xyz",
"ttl_left": 1750,
"version": 1
}

While the agent is working, other processes can check status:

def check_chat_progress(chat_id: str):
"""Check if chat is still running"""
response = requests.get(
f"{ONCEONLY_API}/v1/ai/status",
headers={"Authorization": f"Bearer {API_KEY}"},
params={"key": f"support_chat_{chat_id}"}
)
status = response.json()
if status["status"] == "in_progress":
print(f"⏳ Chat in progress. Time left: {status['ttl_left']}s")
return status
elif status["status"] == "completed":
print(f"✓ Chat completed at {status['done_at']}")
return status
elif status["status"] == "failed":
print(f"✗ Chat failed: {status['error_code']}")
return status
else:
print("❌ Chat not found")
return status

Response (In Progress):

{
"ok": true,
"status": "in_progress",
"key": "support_chat_abc123",
"lease_id": "lease_lkj123xyz",
"ttl_left": 1200,
"version": 1
}

If the task is taking longer than expected, extend the lease:

def extend_lease(chat_id: str, lease_id: str, additional_ttl: int = 600):
"""Extend a lease by adding more time"""
response = requests.post(
f"{ONCEONLY_API}/v1/ai/extend",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"key": f"support_chat_{chat_id}",
"lease_id": lease_id,
"ttl": additional_ttl # Add 10 more minutes
}
)
result = response.json()
if result["status"] == "extended":
print(f"✓ Lease extended. New TTL: {result['ttl']}s")
else:
print(f"✗ Extend failed: {result['status']}")
return result

Example: Support Chat Takes Longer

def support_chat_with_extension(customer_id: str, chat_id: str):
"""Support chat that auto-extends if needed"""
# Acquire initial lease (30 min)
lease = acquire_lease(chat_id, ttl=1800)
if lease["status"] != "acquired":
return {"status": "already_in_progress"}
lease_id = lease["lease_id"]
start_time = time.time()
try:
# Start support chat
for message in chat_messages:
# Every 10 minutes, check if we need more time
if (time.time() - start_time) % 600 == 0:
if should_extend(): # Your logic
extend_lease(chat_id, lease_id, ttl=1800)
# Process message
response = get_ai_response(message)
send_to_customer(response)
# Chat complete
complete_lease(chat_id, lease_id, {"status": "resolved"})
except Exception as e:
fail_lease(chat_id, lease_id, error_code="chat_error")
raise

When the task finishes successfully:

def complete_chat(chat_id: str, lease_id: str, result: dict):
"""Mark lease as completed with result"""
response = requests.post(
f"{ONCEONLY_API}/v1/ai/complete",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"key": f"support_chat_{chat_id}",
"lease_id": lease_id,
"result": {
"status": "resolved",
"messages_exchanged": result.get("message_count", 0),
"resolution": result.get("resolution", ""),
"customer_satisfaction": result.get("satisfaction", 0),
},
"result_hash": sha256(json.dumps(result))
}
)
return response.json()

Request:

{
"key": "support_chat_abc123",
"lease_id": "lease_lkj123xyz",
"result": {
"status": "resolved",
"messages": 15,
"resolution": "Issue fixed with password reset",
"satisfaction": 5
}
}

Response:

{
"ok": true,
"status": "completed",
"key": "support_chat_abc123",
"version": 5
}

def get_chat_result(chat_id: str):
"""Retrieve the completed chat result"""
response = requests.get(
f"{ONCEONLY_API}/v1/ai/result",
headers={"Authorization": f"Bearer {API_KEY}"},
params={"key": f"support_chat_{chat_id}"}
)
data = response.json()
if data["status"] == "completed":
print(f"Result: {data['result']}")
return data["result"]
else:
print(f"Chat not done yet: {data['status']}")
return None

Response:

{
"ok": true,
"status": "completed",
"key": "support_chat_abc123",
"result": {
"status": "resolved",
"messages": 15,
"resolution": "Issue fixed with password reset",
"satisfaction": 5
},
"result_hash": "abc123def456",
"done_at": "2025-01-15T11:05:00Z"
}

If the task fails:

def handle_chat_error(chat_id: str, lease_id: str, error: Exception):
"""Report lease failure"""
response = requests.post(
f"{ONCEONLY_API}/v1/ai/fail",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"key": f"support_chat_{chat_id}",
"lease_id": lease_id,
"error_code": "connection_lost",
"error_hash": hashlib.sha256(str(error).encode()).hexdigest()
}
)
return response.json()

Response:

{
"ok": true,
"status": "failed",
"key": "support_chat_abc123",
"version": 2
}

Complete Example: Email Newsletter Processing

Section titled “Complete Example: Email Newsletter Processing”
class NewsletterProcessor:
"""Process bulk email newsletter (takes 30 min - 2 hours)"""
def __init__(self, api_key: str):
self.api_key = api_key
def process_newsletter(self, newsletter_id: str, recipients: list) -> dict:
"""
Process a large newsletter send.
May take 1-2 hours depending on recipient count.
"""
key = f"newsletter_{newsletter_id}"
# 1. Acquire lease (60 minutes initial)
lease = self._acquire_lease(key, ttl=3600)
if lease["status"] == "polling":
return {
"status": "already_processing",
"message": "Another agent is already processing this"
}
lease_id = lease["lease_id"]
try:
# 2. Process recipients
processed = 0
failed = 0
start_time = time.time()
for batch in self._batch_recipients(recipients, batch_size=100):
# Auto-extend if taking too long
if (time.time() - start_time) > 50 * 60: # 50 min
self._extend_lease(key, lease_id, ttl=3600)
start_time = time.time()
# Send batch
sent, errors = self._send_batch(batch)
processed += sent
failed += len(errors)
# 3. Complete lease with results
result = {
"recipients_processed": processed,
"recipients_failed": failed,
"completed_at": datetime.now().isoformat()
}
self._complete_lease(key, lease_id, result)
return {"status": "completed", "result": result}
except Exception as e:
# 3b. Report failure
self._fail_lease(key, lease_id, error_code="processing_error")
raise
def _acquire_lease(self, key: str, ttl: int) -> dict:
"""Acquire a lease from OnceOnly"""
response = requests.post(
f"{ONCEONLY_API}/v1/ai/lease",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"key": key, "ttl": ttl}
)
return response.json()
def _extend_lease(self, key: str, lease_id: str, ttl: int) -> dict:
"""Extend an existing lease"""
response = requests.post(
f"{ONCEONLY_API}/v1/ai/extend",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"key": key, "lease_id": lease_id, "ttl": ttl}
)
return response.json()
def _complete_lease(self, key: str, lease_id: str, result: dict):
"""Mark lease as completed"""
requests.post(
f"{ONCEONLY_API}/v1/ai/complete",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"key": key, "lease_id": lease_id, "result": result}
)
def _fail_lease(self, key: str, lease_id: str, error_code: str):
"""Mark lease as failed"""
requests.post(
f"{ONCEONLY_API}/v1/ai/fail",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"key": key, "lease_id": lease_id, "error_code": error_code}
)
def _batch_recipients(self, recipients: list, batch_size: int):
"""Yield batches of recipients"""
for i in range(0, len(recipients), batch_size):
yield recipients[i:i+batch_size]
def _send_batch(self, batch: list) -> tuple[int, list]:
"""Send a batch of emails"""
# Your email sending logic here
return len(batch), [] # (sent, errors)

┌─────────────────────────────────────┐
│ No Lease │
│ (Task hasn't started) │
└────────────────┬────────────────────┘
POST /ai/lease
┌───────┴───────┐
▼ ▼
┌─────────┐ ┌──────────┐
│ACQUIRED │ │POLLING │
│(this │ │(another │
│agent) │ │ agent) │
└────┬────┘ └────┬─────┘
│ │
/ai/extend /ai/status
/ai/complete (wait)
/ai/fail
┌──┴────────────┐
▼ ▼
┌─────────┐ ┌─────────┐
│COMPLETED│ │FAILED │
└─────────┘ └─────────┘
(7 day TTL) (7 day TTL)
/ai/result /ai/result

  1. Use descriptive keys — Include resource ID and operation type
  2. Set appropriate TTL — Account for worst-case execution time
  3. Extend if needed — Check progress periodically and extend
  4. Cache results — Store final result for 7 days minimum
  5. Log milestones — Track start, extend, complete/fail events
key = f"email_campaign_{campaign_id}_send"
ttl = 3600 * 2 # 2 hours for large campaigns
lease = acquire_lease(key, ttl)
if lease["status"] == "acquired":
logger.info(f"Started campaign send: {campaign_id}")
try:
send_emails(campaign_id)
complete_lease(key, lease["lease_id"], result)
except Exception:
fail_lease(key, lease["lease_id"], "send_error")
  1. Don’t use for quick tasks — Use check-lock for < 1 min
  2. Don’t ignore polling status — It means another agent owns it
  3. Don’t set TTL too short — Add buffer for unexpected delays
  4. Don’t forget to complete/fail — Let OnceOnly know when done
  5. Don’t retry without extending — Extend TTL if you need more time

def get_agent_metrics(agent_id: str):
"""Get performance metrics including lease usage"""
response = requests.get(
f"{ONCEONLY_API}/v1/agents/{agent_id}/metrics",
headers={"Authorization": f"Bearer {api_key}"},
params={"period": "day"}
)
return response.json()

Response:

{
"agent_id": "newsletter_agent",
"period": "day",
"total_actions": 450,
"blocked_actions": 5,
"total_spend_usd": 125.50,
"top_tools": [
{"tool": "send_email", "count": 450},
{"tool": "log_event", "count": 200}
]
}

  • AI Leases manage long-running async tasks (1 min - 24 hours)
  • AcquireExecuteComplete/Fail is the lifecycle
  • Extend TTL if task takes longer than expected
  • Polling status is safe — won’t interfere with owner
  • For short tasks, use Idempotency instead