Python SDK
A comprehensive Python client for the IdentityCall API with both synchronous and asynchronous support.
Installation
pip install requests
# For async support
pip install aiohttpQuick Start
from identitycall import IdentityCallClient
client = IdentityCallClient()
# List recordings
recordings = client.recordings.list()
for recording in recordings["data"]:
print(f"{recording['id']}: {recording['name']}")Complete Client Implementation
Create a file called identitycall.py:
identitycall.py
import os
import time
import requests
from typing import Optional, Dict, Any, BinaryIO
class IdentityCallError(Exception):
"""Base exception for IdentityCall API errors."""
def __init__(self, message: str, status_code: int = None, response: dict = None):
self.message = message
self.status_code = status_code
self.response = response
super().__init__(self.message)
class AuthenticationError(IdentityCallError):
"""Raised when API key is invalid or missing."""
pass
class PermissionError(IdentityCallError):
"""Raised when API key lacks required permissions."""
pass
class NotFoundError(IdentityCallError):
"""Raised when requested resource doesn't exist."""
pass
class ValidationError(IdentityCallError):
"""Raised when request data is invalid."""
pass
class RateLimitError(IdentityCallError):
"""Raised when rate limit is exceeded."""
def __init__(self, message: str, retry_after: int = 60, **kwargs):
self.retry_after = retry_after
super().__init__(message, **kwargs)
class RecordingsResource:
"""Recordings API resource."""
def __init__(self, client: "IdentityCallClient"):
self._client = client
def list(self, page: int = 1, per_page: int = 20) -> Dict[str, Any]:
"""List all recordings with pagination."""
return self._client._request(
"GET", "/recordings",
params={"page": page, "per_page": per_page}
)
def create(
self,
file: BinaryIO,
filename: str = "recording.mp3",
language: str = "en",
name: Optional[str] = None
) -> Dict[str, Any]:
"""Upload a new recording."""
files = {"file": (filename, file, "audio/mpeg")}
data = {"language": language}
if name:
data["name"] = name
return self._client._request("POST", "/recordings", files=files, data=data)
def get(self, recording_id: int) -> Dict[str, Any]:
"""Get a specific recording."""
return self._client._request("GET", f"/recordings/{recording_id}")
def update(self, recording_id: int, **kwargs) -> Dict[str, Any]:
"""Update a recording."""
return self._client._request("PATCH", f"/recordings/{recording_id}", json=kwargs)
def delete(self, recording_id: int) -> None:
"""Delete a recording."""
self._client._request("DELETE", f"/recordings/{recording_id}")
def transcription(self, recording_id: int) -> Dict[str, Any]:
"""Get transcription for a recording."""
return self._client._request("GET", f"/recordings/{recording_id}/transcription")
def results(self, recording_id: int) -> Dict[str, Any]:
"""Get analysis results for a recording."""
return self._client._request("GET", f"/recordings/{recording_id}/results")
def summary(self, recording_id: int) -> Dict[str, Any]:
"""Get summary for a recording."""
return self._client._request("GET", f"/recordings/{recording_id}/summary")
def wait_for_completion(
self,
recording_id: int,
timeout: int = 600,
poll_interval: int = 5
) -> Dict[str, Any]:
"""Wait for a recording to finish processing."""
start_time = time.time()
while True:
recording = self.get(recording_id)["data"]
if recording["status"] == "completed":
return recording
if recording["status"] == "failed":
raise IdentityCallError("Transcription failed")
if time.time() - start_time > timeout:
raise TimeoutError("Transcription timed out")
time.sleep(poll_interval)
class IdentityCallClient:
"""IdentityCall API client."""
DEFAULT_BASE_URL = "https://api.identitycall.com/api/v1/public"
DEFAULT_TIMEOUT = 30
MAX_RETRIES = 3
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: int = DEFAULT_TIMEOUT
):
self.api_key = api_key or os.environ.get("IDENTITYCALL_API_KEY")
if not self.api_key:
raise ValueError("API key is required. Set IDENTITYCALL_API_KEY or pass api_key parameter.")
self.base_url = base_url or self.DEFAULT_BASE_URL
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json"
})
# Resources
self.recordings = RecordingsResource(self)
def _request(
self,
method: str,
path: str,
params: Optional[Dict] = None,
json: Optional[Dict] = None,
data: Optional[Dict] = None,
files: Optional[Dict] = None
) -> Optional[Dict[str, Any]]:
"""Make an API request with retry logic."""
url = f"{self.base_url}{path}"
for attempt in range(self.MAX_RETRIES):
try:
response = self.session.request(
method,
url,
params=params,
json=json,
data=data,
files=files,
timeout=self.timeout
)
return self._handle_response(response)
except RateLimitError as e:
if attempt < self.MAX_RETRIES - 1:
time.sleep(e.retry_after)
else:
raise
except IdentityCallError as e:
if e.status_code and e.status_code >= 500 and attempt < self.MAX_RETRIES - 1:
time.sleep(2 ** attempt)
else:
raise
def _handle_response(self, response: requests.Response) -> Optional[Dict[str, Any]]:
"""Handle API response and raise appropriate exceptions."""
if response.status_code == 204:
return None
try:
data = response.json()
except ValueError:
data = {}
if response.ok:
return data
error_message = data.get("error", "Unknown error")
if response.status_code == 401:
raise AuthenticationError(error_message, response.status_code, data)
elif response.status_code == 403:
raise PermissionError(error_message, response.status_code, data)
elif response.status_code == 404:
raise NotFoundError(error_message, response.status_code, data)
elif response.status_code == 422:
raise ValidationError(error_message, response.status_code, data)
elif response.status_code == 429:
retry_after = data.get("retry_after", 60)
raise RateLimitError(error_message, retry_after, status_code=429, response=data)
else:
raise IdentityCallError(error_message, response.status_code, data)
def close(self):
"""Close the session."""
self.session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()Usage Examples
List Recordings
from identitycall import IdentityCallClient
client = IdentityCallClient()
# Get first page
result = client.recordings.list()
for recording in result["data"]:
print(f"[{recording['id']}] {recording['name']} - {recording['status']}")
# Pagination
result = client.recordings.list(page=2, per_page=50)
print(f"Page {result['meta']['current_page']} of {result['meta']['total_pages']}")Upload Recording
with open("call.mp3", "rb") as f:
result = client.recordings.create(
file=f,
filename="customer-call.mp3",
language="en",
name="Support Call #123"
)
recording_id = result["data"]["id"]
print(f"Uploaded! Recording ID: {recording_id}")Wait for Transcription
# Upload and wait for completion
with open("call.mp3", "rb") as f:
result = client.recordings.create(file=f)
recording_id = result["data"]["id"]
# Wait for processing to complete
recording = client.recordings.wait_for_completion(recording_id, timeout=300)
print(f"Completed! Duration: {recording['duration_ms']}ms")
# Get transcription
transcription = client.recordings.transcription(recording_id)
print(transcription["data"]["full_text"])Get Analysis Results
# Get all analysis data
results = client.recordings.results(recording_id)
# Goals
for goal in results["data"]["goals"]:
status = "✓" if goal["met"] else "✗"
print(f"{status} {goal['goal_name']}: {goal['score']:.2f}")
# Keywords
for keyword in results["data"]["keywords"]:
print(f"'{keyword['keyword_name']}' mentioned {keyword['count']} times")Error Handling
from identitycall import (
IdentityCallClient,
AuthenticationError,
PermissionError,
NotFoundError,
RateLimitError
)
client = IdentityCallClient()
try:
recording = client.recordings.get(999999)
except NotFoundError:
print("Recording not found")
except PermissionError:
print("You don't have permission to view this recording")
except AuthenticationError:
print("Invalid API key")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after} seconds")Async Client
For applications requiring async support:
identitycall_async.py
import os
import asyncio
import aiohttp
from typing import Optional, Dict, Any
class AsyncIdentityCallClient:
"""Async IdentityCall API client."""
DEFAULT_BASE_URL = "https://api.identitycall.com/api/v1/public"
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None
):
self.api_key = api_key or os.environ.get("IDENTITYCALL_API_KEY")
self.base_url = base_url or self.DEFAULT_BASE_URL
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json"
}
)
return self._session
async def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
session = await self._get_session()
url = f"{self.base_url}{path}"
async with session.request(method, url, **kwargs) as response:
if response.status == 204:
return None
data = await response.json()
if not response.ok:
raise Exception(data.get("error", "Unknown error"))
return data
async def list_recordings(self, page: int = 1, per_page: int = 20):
return await self._request(
"GET", "/recordings",
params={"page": page, "per_page": per_page}
)
async def get_recording(self, recording_id: int):
return await self._request("GET", f"/recordings/{recording_id}")
async def get_transcription(self, recording_id: int):
return await self._request("GET", f"/recordings/{recording_id}/transcription")
async def wait_for_completion(self, recording_id: int, timeout: int = 600):
start = asyncio.get_event_loop().time()
while True:
recording = await self.get_recording(recording_id)
if recording["data"]["status"] == "completed":
return recording["data"]
if asyncio.get_event_loop().time() - start > timeout:
raise TimeoutError("Transcription timed out")
await asyncio.sleep(5)
async def close(self):
if self._session:
await self._session.close()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
# Usage
async def main():
async with AsyncIdentityCallClient() as client:
result = await client.list_recordings()
for recording in result["data"]:
print(f"{recording['id']}: {recording['name']}")
asyncio.run(main())