AI Leases
AI Leases: Long-running Task Management
Section titled “AI Leases: Long-running Task Management”AI Leases are the OnceOnly primitive for long-running asynchronous operations. Lease TTL is plan-dependent and can range from minutes to multiple days.
- Exclusive ownership — Only one agent can own a lease
- Polling-safe — Other agents can check status without interfering
- 7-day finished-state retention — Completed and failed leases stay readable for 7 days
- Optional result storage — If you send a
resulton completion,/v1/ai/resultcan return it during retention - TTL extension — Extend the deadline if needed
- Failure handling — Track errors and failure states
When to Use Check-Lock vs AI Leases
Section titled “When to Use Check-Lock vs AI Leases”| Need | Use | Why | Example |
|---|---|---|---|
| Fast synchronous work with duplicate protection | check-lock | Blocks duplicate execution, but does not track a long-running lifecycle | Stripe charge, email send |
| Long-running work that needs status tracking | ai/lease | Tracks in_progress, completed, or failed, supports polling, extend, and result retrieval | Support chat, PDF parsing |
| Long-running key-based work with automatic worker kickoff | ai/run | Uses the same lease lifecycle, but starts the queued run from one request | Background AI workflow, document pipeline |
Rule of thumb: Duration is only a signal. Use check-lock for simple idempotent requests, and use AI leases when the work may outlive a single request or needs lifecycle tracking.
If you want the key-based lease flow plus automatic worker execution from one call, use POST /v1/ai/run.
Basic Flow: Acquire → Execute → Complete
Section titled “Basic Flow: Acquire → Execute → Complete”Agent 1 OnceOnly Agent 2 │ │ │ ├─ POST /v1/ai/lease ─→│ │ │ key=chat_1 │ │ │ ttl=1800 │ │ │←─ "acquired" ───────┤ │ │ lease_id=L123 │ │ │ │ │ │ [Doing work for 20 min...] │ │ │ │ │ │←─ GET /v1/ai/status ─┤ │ │ key=chat_1 │ │ ├─ "in_progress" ────→│ │ │ ttl_left=1000 │ │ │ │ ├─ POST /v1/ai/complete ─→│ │ │ lease_id=L123 │ │ │ result={...} │ │ │←─ "completed" ──────┤ │ │ │ │ │ │←─ GET /v1/ai/result ─┤ │ ├─ result data ──────→│POST /v1/ai/lease may return polling when another worker already owns the active lease. GET /v1/ai/status uses the canonical state name in_progress.
Step-by-step Example: Customer Support Chat
Section titled “Step-by-step Example: Customer Support Chat”1. Agent Acquires Lease
Section titled “1. Agent Acquires Lease”import requestsimport json
ONCEONLY_API = "https://api.onceonly.tech"API_KEY = "once_live_xxxxxxxxxxxxx"
def start_support_chat(customer_id: str, chat_id: str): """Start a long-running customer support chat"""
# Acquire lease for 30 minutes response = requests.post( f"{ONCEONLY_API}/v1/ai/lease", headers={"Authorization": f"Bearer {API_KEY}"}, json={ "key": f"support_chat_{chat_id}", "ttl": 1800, # 30 minutes "metadata": { "customer_id": customer_id, "chat_id": chat_id, "priority": "high" } } )
lease = response.json()
if lease["status"] == "acquired": # ✓ This agent owns the lease print(f"✓ Lease acquired: {lease['lease_id']}") return lease
elif lease["status"] == "polling": # ⏳ Another agent is already handling this print(f"⏳ Chat already in progress. Lease: {lease['lease_id']}") print(f" Wait {lease['ttl_left']} more seconds...") return lease
elif lease["status"] in ("completed", "failed"): # This key already finished earlier print(f"ℹ Existing finished state: {lease['status']}") return leaseResponse (Acquired):
{ "ok": true, "status": "acquired", "key": "support_chat_abc123", "lease_id": "lease_lkj123xyz", "ttl": 1800, "first_seen_at": "2025-01-15T10:30:00Z", "version": 1}Response (Polling — Another Agent Owns It):
{ "ok": true, "status": "polling", "key": "support_chat_abc123", "lease_id": "lease_lkj123xyz", "ttl": 1800, "ttl_left": 1750, "version": 1}The same POST /v1/ai/lease call can also return completed or failed if the key already finished earlier.
2. Check Status During Execution
Section titled “2. Check Status During Execution”While the agent is working, other processes can check status:
def check_chat_progress(chat_id: str): """Check if chat is still running"""
response = requests.get( f"{ONCEONLY_API}/v1/ai/status", headers={"Authorization": f"Bearer {API_KEY}"}, params={"key": f"support_chat_{chat_id}"} )
status = response.json()
if status["status"] == "in_progress": print(f"⏳ Chat in progress. Time left: {status['ttl_left']}s") return status
elif status["status"] == "completed": print(f"✓ Chat completed at {status['done_at']}") return status
elif status["status"] == "failed": print(f"✗ Chat failed: {status['error_code']}") return status
else: print("❌ Chat not found") return statusResponse (In Progress):
{ "ok": true, "status": "in_progress", "key": "support_chat_abc123", "lease_id": "lease_lkj123xyz", "ttl_left": 1200, "version": 1}3. Extend TTL If Needed
Section titled “3. Extend TTL If Needed”If the task is taking longer than expected, extend the lease:
def extend_lease(chat_id: str, lease_id: str, ttl: int = 600): """Refresh the lease TTL from now"""
response = requests.post( f"{ONCEONLY_API}/v1/ai/extend", headers={"Authorization": f"Bearer {API_KEY}"}, json={ "key": f"support_chat_{chat_id}", "lease_id": lease_id, "ttl": ttl # Reset remaining TTL to 10 minutes from now } )
result = response.json()
if result["status"] == "extended": print(f"✓ Lease extended. New TTL: {result['ttl']}s") else: print(f"✗ Extend failed: {result['status']}")
return resultExample: Support Chat Takes Longer
import time
def support_chat_with_extension(customer_id: str, chat_id: str, chat_messages: list[str]): """Support chat that auto-extends if needed"""
# Acquire initial lease (30 min) lease = start_support_chat(customer_id, chat_id)
if lease["status"] == "polling": return {"status": "already_in_progress"} if lease["status"] in ("completed", "failed"): return {"status": lease["status"]}
lease_id = lease["lease_id"] last_extend = time.time()
try: # Start support chat for message in chat_messages: # About every 10 minutes, refresh the TTL if we still need the lease if time.time() - last_extend >= 600: if should_extend(): # Your logic extend_lease(chat_id, lease_id, ttl=1800) last_extend = time.time()
# Process message response = get_ai_response(message) send_to_customer(response)
# Chat complete complete_chat(chat_id, lease_id, {"status": "resolved"})
except Exception as e: handle_chat_error(chat_id, lease_id, e) raise4. Complete the Lease
Section titled “4. Complete the Lease”When the task finishes successfully:
import hashlibimport json
def complete_chat(chat_id: str, lease_id: str, result: dict): """Mark lease as completed with result""" result_payload = { "status": "resolved", "messages": result.get("message_count", 0), "resolution": result.get("resolution", ""), "satisfaction": result.get("satisfaction", 0), }
response = requests.post( f"{ONCEONLY_API}/v1/ai/complete", headers={"Authorization": f"Bearer {API_KEY}"}, json={ "key": f"support_chat_{chat_id}", "lease_id": lease_id, "result": result_payload, "result_hash": hashlib.sha256( json.dumps(result_payload, sort_keys=True).encode("utf-8") ).hexdigest() } )
return response.json()Request:
{ "key": "support_chat_abc123", "lease_id": "lease_lkj123xyz", "result": { "status": "resolved", "messages": 15, "resolution": "Issue fixed with password reset", "satisfaction": 5 }}Response:
{ "ok": true, "status": "completed", "key": "support_chat_abc123", "version": 2}5. Get Result After Completion
Section titled “5. Get Result After Completion”If you completed the lease with a result payload, you can fetch it during the 7-day retention window:
def get_chat_result(chat_id: str): """Retrieve the completed chat result"""
response = requests.get( f"{ONCEONLY_API}/v1/ai/result", headers={"Authorization": f"Bearer {API_KEY}"}, params={"key": f"support_chat_{chat_id}"} )
data = response.json()
if data["status"] == "completed": print(f"Result: {data['result']}") return data["result"] else: print(f"Chat not done yet: {data['status']}") return NoneResponse:
{ "ok": true, "status": "completed", "key": "support_chat_abc123", "result": { "status": "resolved", "messages": 15, "resolution": "Issue fixed with password reset", "satisfaction": 5 }, "result_hash": "abc123def456", "done_at": "2025-01-15T11:05:00Z"}If the lease finished without a result payload, /v1/ai/result still returns status, done_at, and optional result_hash, but result can be null.
Error Handling: Fail or Cancel
Section titled “Error Handling: Fail or Cancel”If the task fails:
import hashlib
def handle_chat_error(chat_id: str, lease_id: str, error: Exception): """Report lease failure"""
response = requests.post( f"{ONCEONLY_API}/v1/ai/fail", headers={"Authorization": f"Bearer {API_KEY}"}, json={ "key": f"support_chat_{chat_id}", "lease_id": lease_id, "error_code": "connection_lost", "error_hash": hashlib.sha256(str(error).encode()).hexdigest() } )
return response.json()Response:
{ "ok": true, "status": "failed", "key": "support_chat_abc123", "version": 2}If the task is intentionally stopped, use POST /v1/ai/cancel. It finishes the lease as failed and sets error_code to canceled or canceled:<reason>.
Complete Example: Email Newsletter Processing
Section titled “Complete Example: Email Newsletter Processing”class NewsletterProcessor: """Process bulk email newsletter (takes 30 min - 2 hours)"""
def __init__(self, api_key: str): self.api_key = api_key
def process_newsletter(self, newsletter_id: str, recipients: list) -> dict: """ Process a large newsletter send. May take 1-2 hours depending on recipient count. """
key = f"newsletter_{newsletter_id}"
# 1. Acquire lease (60 minutes initial) lease = self._acquire_lease(key, ttl=3600)
if lease["status"] == "polling": return { "status": "already_processing", "message": "Another agent is already processing this" }
if lease["status"] in ("completed", "failed"): return {"status": lease["status"], "lease_id": lease.get("lease_id")}
lease_id = lease["lease_id"]
try: # 2. Process recipients processed = 0 failed = 0 start_time = time.time()
for batch in self._batch_recipients(recipients, batch_size=100): # Auto-extend if taking too long if (time.time() - start_time) > 50 * 60: # 50 min self._extend_lease(key, lease_id, ttl=3600) start_time = time.time()
# Send batch sent, errors = self._send_batch(batch) processed += sent failed += len(errors)
# 3. Complete lease with results result = { "recipients_processed": processed, "recipients_failed": failed, "completed_at": datetime.now().isoformat() }
self._complete_lease(key, lease_id, result)
return {"status": "completed", "result": result}
except Exception as e: # 3b. Report failure self._fail_lease(key, lease_id, error_code="processing_error") raise
def _acquire_lease(self, key: str, ttl: int) -> dict: """Acquire a lease from OnceOnly""" response = requests.post( f"{ONCEONLY_API}/v1/ai/lease", headers={"Authorization": f"Bearer {self.api_key}"}, json={"key": key, "ttl": ttl} ) return response.json()
def _extend_lease(self, key: str, lease_id: str, ttl: int) -> dict: """Refresh TTL on an existing lease""" response = requests.post( f"{ONCEONLY_API}/v1/ai/extend", headers={"Authorization": f"Bearer {self.api_key}"}, json={"key": key, "lease_id": lease_id, "ttl": ttl} ) return response.json()
def _complete_lease(self, key: str, lease_id: str, result: dict): """Mark lease as completed""" requests.post( f"{ONCEONLY_API}/v1/ai/complete", headers={"Authorization": f"Bearer {self.api_key}"}, json={"key": key, "lease_id": lease_id, "result": result} )
def _fail_lease(self, key: str, lease_id: str, error_code: str): """Mark lease as failed""" requests.post( f"{ONCEONLY_API}/v1/ai/fail", headers={"Authorization": f"Bearer {self.api_key}"}, json={"key": key, "lease_id": lease_id, "error_code": error_code} )
def _batch_recipients(self, recipients: list, batch_size: int): """Yield batches of recipients""" for i in range(0, len(recipients), batch_size): yield recipients[i:i+batch_size]
def _send_batch(self, batch: list) -> tuple[int, list]: """Send a batch of emails""" # Your email sending logic here return len(batch), [] # (sent, errors)Lease Lifecycle States
Section titled “Lease Lifecycle States”No lease │ └─ POST /v1/ai/lease │ ├─ acquired │ └─ owner can /v1/ai/extend, /v1/ai/complete, /v1/ai/fail, or /v1/ai/cancel │ ├─ polling │ └─ client-facing alias from /v1/ai/lease when another worker already owns the in-progress lease │ ├─ completed │ └─ finished state retained for 7 days │ └─ failed └─ finished state retained for 7 days
Canonical state checks: GET /v1/ai/status - in_progress - completed - failed - not_found
Result lookup: GET /v1/ai/result - completed => optional stored result payload - failed => failure metadata only - in_progress => no result yetBest Practices
Section titled “Best Practices”- Use descriptive keys — Include resource ID and operation type
- Set appropriate TTL — Account for worst-case execution time
- Extend if needed — Check progress periodically and extend
- Send a final
resultwhen useful — Then/v1/ai/resultcan return structured output during retention - Log milestones on your side — Track start, extend, complete/fail events if you need business-level observability
key = f"email_campaign_{campaign_id}_send"ttl = 3600 * 2 # 2 hours for large campaigns
lease = acquire_lease(key, ttl)if lease["status"] == "acquired": logger.info(f"Started campaign send: {campaign_id}") try: send_emails(campaign_id) complete_lease(key, lease["lease_id"], result) except Exception: fail_lease(key, lease["lease_id"], "send_error")❌ DON’T
Section titled “❌ DON’T”- Don’t choose leases by time alone — Use
check-lockfor simple one-shot idempotent work - Don’t ignore polling status — It means another agent owns it
- Don’t set TTL too short — Add buffer for unexpected delays
- Don’t forget to complete/fail — Let OnceOnly know when done
- Don’t assume extend adds time on top — It refreshes the remaining TTL from now
Monitoring Leases
Section titled “Monitoring Leases”def monitor_lease(chat_id: str): """Check the current lease state for a keyed task"""
response = requests.get( f"{ONCEONLY_API}/v1/ai/status", headers={"Authorization": f"Bearer {API_KEY}"}, params={"key": f"support_chat_{chat_id}"} )
return response.json()Example Response:
{ "ok": true, "status": "in_progress", "key": "support_chat_abc123", "lease_id": "lease_lkj123xyz", "ttl_left": 942, "version": 2, "retry_after_sec": 10}For keyed background runs started through POST /v1/ai/run, you can also inspect Runs & Events API for run-level timelines.
Summary
Section titled “Summary”- AI Leases manage long-running async tasks with plan-dependent TTLs
- Acquire → Execute → Complete/Fail is the lifecycle
POST /v1/ai/leasecan returnpolling;GET /v1/ai/statususes the canonicalin_progress- Extend TTL refreshes the remaining TTL from now
/v1/ai/resultreturns the stored payload only if you completed the lease withresult- For short tasks, use Idempotency instead