Skip to content

AI Leases

AI Leases are the OnceOnly primitive for long-running asynchronous operations. Lease TTL is plan-dependent and can range from minutes to multiple days.

  • Exclusive ownership — Only one agent can own a lease
  • Polling-safe — Other agents can check status without interfering
  • 7-day finished-state retention — Completed and failed leases stay readable for 7 days
  • Optional result storage — If you send a result on completion, /v1/ai/result can return it during retention
  • TTL extension — Extend the deadline if needed
  • Failure handling — Track errors and failure states
NeedUseWhyExample
Fast synchronous work with duplicate protectioncheck-lockBlocks duplicate execution, but does not track a long-running lifecycleStripe charge, email send
Long-running work that needs status trackingai/leaseTracks in_progress, completed, or failed, supports polling, extend, and result retrievalSupport chat, PDF parsing
Long-running key-based work with automatic worker kickoffai/runUses the same lease lifecycle, but starts the queued run from one requestBackground AI workflow, document pipeline

Rule of thumb: Duration is only a signal. Use check-lock for simple idempotent requests, and use AI leases when the work may outlive a single request or needs lifecycle tracking.

If you want the key-based lease flow plus automatic worker execution from one call, use POST /v1/ai/run.


Basic Flow: Acquire → Execute → Complete

Section titled “Basic Flow: Acquire → Execute → Complete”
Agent 1 OnceOnly Agent 2
│ │ │
├─ POST /v1/ai/lease ─→│ │
│ key=chat_1 │ │
│ ttl=1800 │ │
│←─ "acquired" ───────┤ │
│ lease_id=L123 │ │
│ │ │
│ [Doing work for 20 min...] │
│ │ │
│ │←─ GET /v1/ai/status ─┤
│ │ key=chat_1 │
│ ├─ "in_progress" ────→│
│ │ ttl_left=1000 │
│ │ │
├─ POST /v1/ai/complete ─→│ │
│ lease_id=L123 │ │
│ result={...} │ │
│←─ "completed" ──────┤ │
│ │ │
│ │←─ GET /v1/ai/result ─┤
│ ├─ result data ──────→│

POST /v1/ai/lease may return polling when another worker already owns the active lease. GET /v1/ai/status uses the canonical state name in_progress.


Step-by-step Example: Customer Support Chat

Section titled “Step-by-step Example: Customer Support Chat”
import requests
import json
ONCEONLY_API = "https://api.onceonly.tech"
API_KEY = "once_live_xxxxxxxxxxxxx"
def start_support_chat(customer_id: str, chat_id: str):
"""Start a long-running customer support chat"""
# Acquire lease for 30 minutes
response = requests.post(
f"{ONCEONLY_API}/v1/ai/lease",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"key": f"support_chat_{chat_id}",
"ttl": 1800, # 30 minutes
"metadata": {
"customer_id": customer_id,
"chat_id": chat_id,
"priority": "high"
}
}
)
lease = response.json()
if lease["status"] == "acquired":
# ✓ This agent owns the lease
print(f"✓ Lease acquired: {lease['lease_id']}")
return lease
elif lease["status"] == "polling":
# ⏳ Another agent is already handling this
print(f"⏳ Chat already in progress. Lease: {lease['lease_id']}")
print(f" Wait {lease['ttl_left']} more seconds...")
return lease
elif lease["status"] in ("completed", "failed"):
# This key already finished earlier
print(f"ℹ Existing finished state: {lease['status']}")
return lease

Response (Acquired):

{
"ok": true,
"status": "acquired",
"key": "support_chat_abc123",
"lease_id": "lease_lkj123xyz",
"ttl": 1800,
"first_seen_at": "2025-01-15T10:30:00Z",
"version": 1
}

Response (Polling — Another Agent Owns It):

{
"ok": true,
"status": "polling",
"key": "support_chat_abc123",
"lease_id": "lease_lkj123xyz",
"ttl": 1800,
"ttl_left": 1750,
"version": 1
}

The same POST /v1/ai/lease call can also return completed or failed if the key already finished earlier.


While the agent is working, other processes can check status:

def check_chat_progress(chat_id: str):
"""Check if chat is still running"""
response = requests.get(
f"{ONCEONLY_API}/v1/ai/status",
headers={"Authorization": f"Bearer {API_KEY}"},
params={"key": f"support_chat_{chat_id}"}
)
status = response.json()
if status["status"] == "in_progress":
print(f"⏳ Chat in progress. Time left: {status['ttl_left']}s")
return status
elif status["status"] == "completed":
print(f"✓ Chat completed at {status['done_at']}")
return status
elif status["status"] == "failed":
print(f"✗ Chat failed: {status['error_code']}")
return status
else:
print("❌ Chat not found")
return status

Response (In Progress):

{
"ok": true,
"status": "in_progress",
"key": "support_chat_abc123",
"lease_id": "lease_lkj123xyz",
"ttl_left": 1200,
"version": 1
}

If the task is taking longer than expected, extend the lease:

def extend_lease(chat_id: str, lease_id: str, ttl: int = 600):
"""Refresh the lease TTL from now"""
response = requests.post(
f"{ONCEONLY_API}/v1/ai/extend",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"key": f"support_chat_{chat_id}",
"lease_id": lease_id,
"ttl": ttl # Reset remaining TTL to 10 minutes from now
}
)
result = response.json()
if result["status"] == "extended":
print(f"✓ Lease extended. New TTL: {result['ttl']}s")
else:
print(f"✗ Extend failed: {result['status']}")
return result

Example: Support Chat Takes Longer

import time
def support_chat_with_extension(customer_id: str, chat_id: str, chat_messages: list[str]):
"""Support chat that auto-extends if needed"""
# Acquire initial lease (30 min)
lease = start_support_chat(customer_id, chat_id)
if lease["status"] == "polling":
return {"status": "already_in_progress"}
if lease["status"] in ("completed", "failed"):
return {"status": lease["status"]}
lease_id = lease["lease_id"]
last_extend = time.time()
try:
# Start support chat
for message in chat_messages:
# About every 10 minutes, refresh the TTL if we still need the lease
if time.time() - last_extend >= 600:
if should_extend(): # Your logic
extend_lease(chat_id, lease_id, ttl=1800)
last_extend = time.time()
# Process message
response = get_ai_response(message)
send_to_customer(response)
# Chat complete
complete_chat(chat_id, lease_id, {"status": "resolved"})
except Exception as e:
handle_chat_error(chat_id, lease_id, e)
raise

When the task finishes successfully:

import hashlib
import json
def complete_chat(chat_id: str, lease_id: str, result: dict):
"""Mark lease as completed with result"""
result_payload = {
"status": "resolved",
"messages": result.get("message_count", 0),
"resolution": result.get("resolution", ""),
"satisfaction": result.get("satisfaction", 0),
}
response = requests.post(
f"{ONCEONLY_API}/v1/ai/complete",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"key": f"support_chat_{chat_id}",
"lease_id": lease_id,
"result": result_payload,
"result_hash": hashlib.sha256(
json.dumps(result_payload, sort_keys=True).encode("utf-8")
).hexdigest()
}
)
return response.json()

Request:

{
"key": "support_chat_abc123",
"lease_id": "lease_lkj123xyz",
"result": {
"status": "resolved",
"messages": 15,
"resolution": "Issue fixed with password reset",
"satisfaction": 5
}
}

Response:

{
"ok": true,
"status": "completed",
"key": "support_chat_abc123",
"version": 2
}

If you completed the lease with a result payload, you can fetch it during the 7-day retention window:

def get_chat_result(chat_id: str):
"""Retrieve the completed chat result"""
response = requests.get(
f"{ONCEONLY_API}/v1/ai/result",
headers={"Authorization": f"Bearer {API_KEY}"},
params={"key": f"support_chat_{chat_id}"}
)
data = response.json()
if data["status"] == "completed":
print(f"Result: {data['result']}")
return data["result"]
else:
print(f"Chat not done yet: {data['status']}")
return None

Response:

{
"ok": true,
"status": "completed",
"key": "support_chat_abc123",
"result": {
"status": "resolved",
"messages": 15,
"resolution": "Issue fixed with password reset",
"satisfaction": 5
},
"result_hash": "abc123def456",
"done_at": "2025-01-15T11:05:00Z"
}

If the lease finished without a result payload, /v1/ai/result still returns status, done_at, and optional result_hash, but result can be null.


If the task fails:

import hashlib
def handle_chat_error(chat_id: str, lease_id: str, error: Exception):
"""Report lease failure"""
response = requests.post(
f"{ONCEONLY_API}/v1/ai/fail",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"key": f"support_chat_{chat_id}",
"lease_id": lease_id,
"error_code": "connection_lost",
"error_hash": hashlib.sha256(str(error).encode()).hexdigest()
}
)
return response.json()

Response:

{
"ok": true,
"status": "failed",
"key": "support_chat_abc123",
"version": 2
}

If the task is intentionally stopped, use POST /v1/ai/cancel. It finishes the lease as failed and sets error_code to canceled or canceled:<reason>.


Complete Example: Email Newsletter Processing

Section titled “Complete Example: Email Newsletter Processing”
class NewsletterProcessor:
"""Process bulk email newsletter (takes 30 min - 2 hours)"""
def __init__(self, api_key: str):
self.api_key = api_key
def process_newsletter(self, newsletter_id: str, recipients: list) -> dict:
"""
Process a large newsletter send.
May take 1-2 hours depending on recipient count.
"""
key = f"newsletter_{newsletter_id}"
# 1. Acquire lease (60 minutes initial)
lease = self._acquire_lease(key, ttl=3600)
if lease["status"] == "polling":
return {
"status": "already_processing",
"message": "Another agent is already processing this"
}
if lease["status"] in ("completed", "failed"):
return {"status": lease["status"], "lease_id": lease.get("lease_id")}
lease_id = lease["lease_id"]
try:
# 2. Process recipients
processed = 0
failed = 0
start_time = time.time()
for batch in self._batch_recipients(recipients, batch_size=100):
# Auto-extend if taking too long
if (time.time() - start_time) > 50 * 60: # 50 min
self._extend_lease(key, lease_id, ttl=3600)
start_time = time.time()
# Send batch
sent, errors = self._send_batch(batch)
processed += sent
failed += len(errors)
# 3. Complete lease with results
result = {
"recipients_processed": processed,
"recipients_failed": failed,
"completed_at": datetime.now().isoformat()
}
self._complete_lease(key, lease_id, result)
return {"status": "completed", "result": result}
except Exception as e:
# 3b. Report failure
self._fail_lease(key, lease_id, error_code="processing_error")
raise
def _acquire_lease(self, key: str, ttl: int) -> dict:
"""Acquire a lease from OnceOnly"""
response = requests.post(
f"{ONCEONLY_API}/v1/ai/lease",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"key": key, "ttl": ttl}
)
return response.json()
def _extend_lease(self, key: str, lease_id: str, ttl: int) -> dict:
"""Refresh TTL on an existing lease"""
response = requests.post(
f"{ONCEONLY_API}/v1/ai/extend",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"key": key, "lease_id": lease_id, "ttl": ttl}
)
return response.json()
def _complete_lease(self, key: str, lease_id: str, result: dict):
"""Mark lease as completed"""
requests.post(
f"{ONCEONLY_API}/v1/ai/complete",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"key": key, "lease_id": lease_id, "result": result}
)
def _fail_lease(self, key: str, lease_id: str, error_code: str):
"""Mark lease as failed"""
requests.post(
f"{ONCEONLY_API}/v1/ai/fail",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"key": key, "lease_id": lease_id, "error_code": error_code}
)
def _batch_recipients(self, recipients: list, batch_size: int):
"""Yield batches of recipients"""
for i in range(0, len(recipients), batch_size):
yield recipients[i:i+batch_size]
def _send_batch(self, batch: list) -> tuple[int, list]:
"""Send a batch of emails"""
# Your email sending logic here
return len(batch), [] # (sent, errors)

No lease
└─ POST /v1/ai/lease
├─ acquired
│ └─ owner can /v1/ai/extend, /v1/ai/complete, /v1/ai/fail, or /v1/ai/cancel
├─ polling
│ └─ client-facing alias from /v1/ai/lease when another worker already owns the in-progress lease
├─ completed
│ └─ finished state retained for 7 days
└─ failed
└─ finished state retained for 7 days
Canonical state checks:
GET /v1/ai/status
- in_progress
- completed
- failed
- not_found
Result lookup:
GET /v1/ai/result
- completed => optional stored result payload
- failed => failure metadata only
- in_progress => no result yet

  1. Use descriptive keys — Include resource ID and operation type
  2. Set appropriate TTL — Account for worst-case execution time
  3. Extend if needed — Check progress periodically and extend
  4. Send a final result when useful — Then /v1/ai/result can return structured output during retention
  5. Log milestones on your side — Track start, extend, complete/fail events if you need business-level observability
key = f"email_campaign_{campaign_id}_send"
ttl = 3600 * 2 # 2 hours for large campaigns
lease = acquire_lease(key, ttl)
if lease["status"] == "acquired":
logger.info(f"Started campaign send: {campaign_id}")
try:
send_emails(campaign_id)
complete_lease(key, lease["lease_id"], result)
except Exception:
fail_lease(key, lease["lease_id"], "send_error")
  1. Don’t choose leases by time alone — Use check-lock for simple one-shot idempotent work
  2. Don’t ignore polling status — It means another agent owns it
  3. Don’t set TTL too short — Add buffer for unexpected delays
  4. Don’t forget to complete/fail — Let OnceOnly know when done
  5. Don’t assume extend adds time on top — It refreshes the remaining TTL from now

def monitor_lease(chat_id: str):
"""Check the current lease state for a keyed task"""
response = requests.get(
f"{ONCEONLY_API}/v1/ai/status",
headers={"Authorization": f"Bearer {API_KEY}"},
params={"key": f"support_chat_{chat_id}"}
)
return response.json()

Example Response:

{
"ok": true,
"status": "in_progress",
"key": "support_chat_abc123",
"lease_id": "lease_lkj123xyz",
"ttl_left": 942,
"version": 2,
"retry_after_sec": 10
}

For keyed background runs started through POST /v1/ai/run, you can also inspect Runs & Events API for run-level timelines.


  • AI Leases manage long-running async tasks with plan-dependent TTLs
  • AcquireExecuteComplete/Fail is the lifecycle
  • POST /v1/ai/lease can return polling; GET /v1/ai/status uses the canonical in_progress
  • Extend TTL refreshes the remaining TTL from now
  • /v1/ai/result returns the stored payload only if you completed the lease with result
  • For short tasks, use Idempotency instead