Error Handling
Learn how to handle errors gracefully when using the Zaits API, including retry strategies, error classification, and debugging techniques.
Error Response Format
All Zaits API errors follow a consistent format:
{
"success": false,
"error": {
"code": "error_code",
"message": "Human-readable error message",
"details": {
"additional": "context-specific information"
}
}
}HTTP Status Codes
200
Success
Request completed successfully
400
Bad Request
Invalid parameters or malformed request
401
Unauthorized
Missing or invalid API key
403
Forbidden
Insufficient permissions or limits exceeded
404
Not Found
Endpoint or resource not found
413
Payload Too Large
File size exceeds limits
429
Too Many Requests
Rate limit exceeded
500
Internal Server Error
Temporary server issue
502/503
Service Unavailable
Server maintenance or overload
Error Categories
1. Authentication Errors (401)
Common Authentication Issues
missing_api_key
No Authorization header
Add Authorization: Bearer YOUR_API_KEY
invalid_api_key
API key is invalid/expired
Check key in dashboard, generate new one
api_key_format_invalid
Malformed API key
Ensure key starts with sk_
Example Response
{
"success": false,
"error": {
"code": "invalid_api_key",
"message": "The provided API key is invalid or has expired",
"details": {
"key_prefix": "sk_1234..."
}
}
}Handling Authentication Errors
async function handleAuthError(error) {
if (error.code === 'invalid_api_key') {
// Log security event
console.error('API key invalid - possible security issue');
// Notify admin
await notifyAdmin('Invalid API key detected', {
timestamp: new Date(),
endpoint: error.endpoint,
ip: error.clientIp
});
// Don't retry - require manual intervention
throw new Error('Authentication failed - check API key');
}
}2. Authorization Errors (403)
Common Authorization Issues
insufficient_permissions
API key lacks required permissions
Enable permissions in dashboard
ip_not_allowed
Request from non-whitelisted IP
Add IP to allowlist
subscription_required
Feature requires paid plan
Upgrade subscription
usage_limit_exceeded
Monthly usage limit reached
Upgrade plan or wait for reset
Example Response
{
"success": false,
"error": {
"code": "insufficient_permissions",
"message": "Your API key doesn't have write permissions",
"details": {
"required_permission": "write",
"current_permissions": ["read"]
}
}
}3. Validation Errors (400)
Image Validation Errors
no_face_detected
No face found in image
Use different image with clear face
multiple_faces_detected
Multiple faces when one expected
Crop to single face or use detection API
image_too_small
Image resolution too low
Use higher resolution image
image_quality_low
Image quality insufficient
Use better lighting/focus
unsupported_format
Invalid file format
Use JPG, PNG, or WebP
Example Response
{
"success": false,
"error": {
"code": "no_face_detected",
"message": "No face was detected in the provided image",
"details": {
"image": "image1",
"min_face_size": "80x80",
"suggestions": [
"Ensure the face is clearly visible",
"Check image lighting and quality",
"Try a different angle or photo"
]
}
}
}Handling Validation Errors
def handle_validation_error(error):
"""Handle validation errors with specific guidance"""
error_handlers = {
'no_face_detected': handle_no_face_error,
'image_too_small': handle_small_image_error,
'image_quality_low': handle_quality_error,
'unsupported_format': handle_format_error
}
handler = error_handlers.get(error['code'])
if handler:
return handler(error)
# Generic validation error
return {
'retry': False,
'user_message': error['message'],
'suggestions': error.get('details', {}).get('suggestions', [])
}
def handle_no_face_error(error):
return {
'retry': False,
'user_message': 'Please upload an image with a clearly visible face',
'suggestions': [
'Make sure the face takes up at least 1/4 of the image',
'Ensure good lighting',
'Face should be looking towards the camera'
]
}4. Rate Limiting Errors (429)
Rate Limit Error Response
{
"success": false,
"error": {
"code": "rate_limit_exceeded",
"message": "Too many requests. Please wait before retrying.",
"details": {
"limit": 100,
"window": "1 minute",
"retry_after": 45,
"reset_time": "2024-01-15T14:35:00Z"
}
}
}Exponential Backoff Implementation
class ExponentialBackoff {
constructor(baseDelay = 1000, maxDelay = 30000, maxRetries = 5) {
this.baseDelay = baseDelay;
this.maxDelay = maxDelay;
this.maxRetries = maxRetries;
}
async execute(fn, context = {}) {
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
// Only retry on rate limit or server errors
if (!this.shouldRetry(error, attempt)) {
throw error;
}
const delay = this.calculateDelay(attempt, error);
console.log(`Rate limited. Retrying in ${delay}ms (attempt ${attempt + 1}/${this.maxRetries + 1})`);
await this.sleep(delay);
}
}
throw new Error('Maximum retries exceeded');
}
shouldRetry(error, attempt) {
if (attempt >= this.maxRetries) return false;
// Retry on rate limits and server errors
return error.status === 429 || error.status >= 500;
}
calculateDelay(attempt, error) {
// Use server's retry-after header if available
if (error.details?.retry_after) {
return error.details.retry_after * 1000;
}
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s (max)
const delay = Math.min(
this.baseDelay * Math.pow(2, attempt),
this.maxDelay
);
// Add jitter to prevent thundering herd
const jitter = Math.random() * 0.3; // ±30%
return Math.floor(delay * (1 + jitter));
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Usage
const backoff = new ExponentialBackoff();
async function makeApiCallWithBackoff(endpoint, data) {
return backoff.execute(async () => {
const response = await fetch(endpoint, {
method: 'POST',
headers: {
'Authorization': 'Bearer YOUR_API_KEY',
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
if (!response.ok) {
const error = await response.json();
const apiError = new Error(error.error.message);
apiError.status = response.status;
apiError.code = error.error.code;
apiError.details = error.error.details;
throw apiError;
}
return response.json();
});
}5. Server Errors (5xx)
Server Error Handling Strategy
import logging
import time
from datetime import datetime, timedelta
class ServerErrorHandler:
def __init__(self, max_retries=3, base_delay=2):
self.max_retries = max_retries
self.base_delay = base_delay
self.logger = logging.getLogger(__name__)
def handle_server_error(self, error, attempt=0):
"""Handle server errors with appropriate retry logic"""
if attempt >= self.max_retries:
self.logger.error(f"Server error after {attempt} retries: {error}")
raise ServerError(f"Server unavailable after {attempt} retries")
# Calculate delay with exponential backoff
delay = self.base_delay * (2 ** attempt)
# Add jitter
import random
jitter = random.uniform(0.5, 1.5)
delay = delay * jitter
self.logger.warning(f"Server error (attempt {attempt + 1}). Retrying in {delay:.2f}s")
time.sleep(delay)
return True # Indicate retry should happen
def is_retriable_error(self, status_code):
"""Determine if error is worth retrying"""
retriable_codes = [500, 502, 503, 504]
return status_code in retriable_codes
# Usage in API client
class ZaitsAPIClient:
def __init__(self, api_key):
self.api_key = api_key
self.error_handler = ServerErrorHandler()
def make_request(self, endpoint, data):
attempt = 0
while attempt <= self.error_handler.max_retries:
try:
response = requests.post(
f"https://api.zaits.net{endpoint}",
headers={'Authorization': f'Bearer {self.api_key}'},
json=data,
timeout=30
)
if response.status_code == 200:
return response.json()
elif self.error_handler.is_retriable_error(response.status_code):
if self.error_handler.handle_server_error(response.text, attempt):
attempt += 1
continue
else:
# Non-retriable error
response.raise_for_status()
except requests.exceptions.RequestException as e:
if attempt >= self.error_handler.max_retries:
raise NetworkError(f"Network error after retries: {e}")
self.error_handler.handle_server_error(str(e), attempt)
attempt += 1Error Recovery Patterns
1. Circuit Breaker Pattern
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.recoveryTimeout = options.recoveryTimeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 10000;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.nextAttempt = Date.now();
this.successCount = 0;
}
async call(operation) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
this.state = 'HALF_OPEN';
this.successCount = 0;
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failureCount = 0;
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= 3) { // Require 3 successes to fully close
this.state = 'CLOSED';
}
}
}
onFailure() {
this.failureCount++;
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.recoveryTimeout;
}
}
getState() {
return {
state: this.state,
failureCount: this.failureCount,
nextAttempt: new Date(this.nextAttempt).toISOString()
};
}
}2. Fallback Mechanisms
class FallbackManager:
def __init__(self):
self.fallback_strategies = {}
def register_fallback(self, operation, fallback_fn):
"""Register a fallback function for an operation"""
self.fallback_strategies[operation] = fallback_fn
async def execute_with_fallback(self, operation, primary_fn, *args, **kwargs):
"""Execute primary function with fallback on failure"""
try:
return await primary_fn(*args, **kwargs)
except Exception as e:
fallback = self.fallback_strategies.get(operation)
if fallback:
logging.warning(f"Primary operation failed, using fallback: {e}")
return await fallback(*args, **kwargs)
raise
# Usage example
fallback_manager = FallbackManager()
# Register fallbacks
fallback_manager.register_fallback(
'face_verification',
fallback_local_verification
)
fallback_manager.register_fallback(
'ocr_extraction',
fallback_queue_for_later
)
async def fallback_local_verification(image1, image2):
"""Local fallback for face verification"""
# Implement simple local comparison
return {
'verified': False, # Conservative default
'confidence': 0.0,
'source': 'local_fallback',
'message': 'Used local fallback due to API unavailability'
}
async def fallback_queue_for_later(document):
"""Queue document for later processing"""
await queue.add('ocr_processing', {
'document': document,
'timestamp': datetime.now().isoformat()
})
return {
'text': '',
'status': 'queued',
'message': 'Document queued for processing when API is available'
}3. Dead Letter Queue
class DeadLetterQueue {
constructor(storage, retryLimit = 3) {
this.storage = storage;
this.retryLimit = retryLimit;
}
async processWithDLQ(operation, data, context = {}) {
const dlqKey = `dlq:${context.id || Date.now()}`;
try {
return await operation(data);
} catch (error) {
const retryCount = context.retryCount || 0;
if (retryCount >= this.retryLimit) {
// Send to dead letter queue
await this.storage.set(dlqKey, {
data,
error: error.message,
retryCount,
timestamp: new Date().toISOString(),
context
});
console.error(`Operation failed permanently, sent to DLQ: ${dlqKey}`);
return { status: 'failed', dlqId: dlqKey };
}
// Schedule retry
setTimeout(() => {
this.processWithDLQ(operation, data, {
...context,
retryCount: retryCount + 1
});
}, Math.pow(2, retryCount) * 1000);
throw error;
}
}
async reprocessDLQ() {
const dlqItems = await this.storage.getPattern('dlq:*');
for (const item of dlqItems) {
try {
console.log(`Reprocessing DLQ item: ${item.id}`);
// Attempt to reprocess
// If successful, remove from DLQ
await this.storage.delete(item.id);
} catch (error) {
console.error(`Failed to reprocess ${item.id}: ${error}`);
}
}
}
}Error Monitoring and Alerting
1. Error Tracking
class ErrorTracker {
constructor(options = {}) {
this.errors = new Map();
this.alertThresholds = options.alertThresholds || {
error_rate: 0.05, // 5% error rate
consecutive_errors: 10,
time_window: 300000 // 5 minutes
};
}
trackError(error, context = {}) {
const errorKey = `${error.code}_${context.endpoint}`;
const timestamp = Date.now();
if (!this.errors.has(errorKey)) {
this.errors.set(errorKey, []);
}
this.errors.get(errorKey).push({
timestamp,
error,
context
});
this.checkAlertConditions(errorKey);
}
checkAlertConditions(errorKey) {
const errors = this.errors.get(errorKey);
const now = Date.now();
// Remove old errors outside time window
const recentErrors = errors.filter(
e => (now - e.timestamp) < this.alertThresholds.time_window
);
// Check consecutive errors
if (recentErrors.length >= this.alertThresholds.consecutive_errors) {
this.sendAlert('consecutive_errors', {
errorKey,
count: recentErrors.length,
timeWindow: this.alertThresholds.time_window / 1000
});
}
}
async sendAlert(type, data) {
const alert = {
type,
timestamp: new Date().toISOString(),
data,
severity: this.getSeverity(type)
};
console.error('ALERT:', alert);
// Send to monitoring service
await this.notifyMonitoringService(alert);
}
getSeverity(alertType) {
const severityMap = {
consecutive_errors: 'high',
error_rate_exceeded: 'medium',
api_unavailable: 'critical'
};
return severityMap[alertType] || 'low';
}
}2. Health Check Implementation
from datetime import datetime, timedelta
import asyncio
class HealthChecker:
def __init__(self, api_client, check_interval=300):
self.api_client = api_client
self.check_interval = check_interval # 5 minutes
self.health_status = {
'status': 'unknown',
'last_check': None,
'consecutive_failures': 0,
'endpoints': {}
}
async def start_monitoring(self):
"""Start continuous health monitoring"""
while True:
try:
await self.perform_health_check()
await asyncio.sleep(self.check_interval)
except Exception as e:
logging.error(f"Health check error: {e}")
await asyncio.sleep(60) # Shorter retry on error
async def perform_health_check(self):
"""Perform comprehensive health check"""
endpoints = [
('usage', '/v1/usage/summary'),
('face', '/v1/face/verify'),
('ocr', '/v1/ocr/extract')
]
endpoint_results = {}
overall_healthy = True
for name, endpoint in endpoints:
try:
start_time = datetime.now()
result = await self.check_endpoint(endpoint)
response_time = (datetime.now() - start_time).total_seconds()
endpoint_results[name] = {
'status': 'healthy',
'response_time': response_time,
'last_check': datetime.now().isoformat()
}
except Exception as e:
overall_healthy = False
endpoint_results[name] = {
'status': 'unhealthy',
'error': str(e),
'last_check': datetime.now().isoformat()
}
# Update health status
self.health_status.update({
'status': 'healthy' if overall_healthy else 'degraded',
'last_check': datetime.now().isoformat(),
'consecutive_failures': 0 if overall_healthy else self.health_status['consecutive_failures'] + 1,
'endpoints': endpoint_results
})
# Alert on consecutive failures
if self.health_status['consecutive_failures'] >= 3:
await self.send_health_alert()
async def check_endpoint(self, endpoint):
"""Check specific endpoint health"""
if endpoint == '/v1/usage/summary':
return await self.api_client.get_usage_summary()
else:
# For endpoints requiring data, check if they respond to OPTIONS
return await self.api_client.check_endpoint_availability(endpoint)
async def send_health_alert(self):
"""Send health alert to monitoring system"""
alert = {
'type': 'api_health_degraded',
'consecutive_failures': self.health_status['consecutive_failures'],
'unhealthy_endpoints': [
name for name, status in self.health_status['endpoints'].items()
if status['status'] == 'unhealthy'
],
'timestamp': datetime.now().isoformat()
}
# Send alert (implement your notification logic)
logging.critical(f"API Health Alert: {alert}")Debugging Techniques
1. Request/Response Logging
class APILogger {
constructor(options = {}) {
this.logLevel = options.logLevel || 'info';
this.logRequests = options.logRequests !== false;
this.logResponses = options.logResponses !== false;
this.logErrors = options.logErrors !== false;
this.maskSensitive = options.maskSensitive !== false;
}
async logRequest(endpoint, data, headers) {
if (!this.logRequests) return;
const logData = {
timestamp: new Date().toISOString(),
type: 'request',
endpoint,
method: 'POST',
headers: this.maskSensitive ? this.maskHeaders(headers) : headers,
data: this.maskSensitive ? this.maskRequestData(data) : data
};
console.log('API Request:', JSON.stringify(logData, null, 2));
}
async logResponse(endpoint, response, responseTime) {
if (!this.logResponses) return;
const logData = {
timestamp: new Date().toISOString(),
type: 'response',
endpoint,
success: response.success,
responseTime,
data: this.maskSensitive ? this.maskResponseData(response) : response
};
console.log('API Response:', JSON.stringify(logData, null, 2));
}
async logError(endpoint, error, context = {}) {
if (!this.logErrors) return;
const logData = {
timestamp: new Date().toISOString(),
type: 'error',
endpoint,
error: {
code: error.code,
message: error.message,
status: error.status
},
context
};
console.error('API Error:', JSON.stringify(logData, null, 2));
}
maskHeaders(headers) {
const masked = { ...headers };
if (masked.Authorization) {
masked.Authorization = 'Bearer sk_***';
}
return masked;
}
maskRequestData(data) {
// Remove or mask sensitive data in requests
const masked = { ...data };
if (masked.image) {
masked.image = '[IMAGE_DATA]';
}
return masked;
}
}2. Error Context Collection
import traceback
import sys
from datetime import datetime
class ErrorContext:
@staticmethod
def collect_context(error, request_data=None):
"""Collect comprehensive error context"""
return {
'error': {
'type': type(error).__name__,
'message': str(error),
'traceback': traceback.format_exc()
},
'request': {
'data': ErrorContext.sanitize_request_data(request_data),
'timestamp': datetime.now().isoformat()
},
'system': {
'python_version': sys.version,
'platform': sys.platform
},
'environment': ErrorContext.get_environment_info()
}
@staticmethod
def sanitize_request_data(data):
"""Remove sensitive data from request context"""
if not data:
return None
sanitized = data.copy()
# Remove binary data
for key in ['image', 'image1', 'image2', 'file', 'document']:
if key in sanitized:
sanitized[key] = f'[{key.upper()}_DATA_REMOVED]'
return sanitized
@staticmethod
def get_environment_info():
"""Collect relevant environment information"""
import os
return {
'api_environment': os.getenv('ENVIRONMENT', 'unknown'),
'api_version': os.getenv('API_VERSION', 'unknown'),
'service_name': os.getenv('SERVICE_NAME', 'unknown')
}Testing Error Scenarios
1. Error Simulation for Testing
// test-utils/api-mocker.js
class APIErrorSimulator {
constructor() {
this.errorScenarios = new Map();
}
addScenario(endpoint, errorType, config = {}) {
if (!this.errorScenarios.has(endpoint)) {
this.errorScenarios.set(endpoint, []);
}
this.errorScenarios.get(endpoint).push({
type: errorType,
probability: config.probability || 1.0,
delay: config.delay || 0,
...config
});
}
async simulateCall(endpoint, originalCall) {
const scenarios = this.errorScenarios.get(endpoint) || [];
for (const scenario of scenarios) {
if (Math.random() < scenario.probability) {
await this.sleep(scenario.delay);
throw this.generateError(scenario);
}
}
return originalCall();
}
generateError(scenario) {
const errors = {
rate_limit: {
status: 429,
code: 'rate_limit_exceeded',
message: 'Too many requests',
details: { retry_after: 60 }
},
server_error: {
status: 500,
code: 'internal_server_error',
message: 'Internal server error'
},
validation_error: {
status: 400,
code: 'no_face_detected',
message: 'No face detected in image'
},
auth_error: {
status: 401,
code: 'invalid_api_key',
message: 'Invalid API key'
}
};
const errorTemplate = errors[scenario.type];
const error = new Error(errorTemplate.message);
Object.assign(error, errorTemplate, scenario.override || {});
return error;
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Usage in tests
describe('Error Handling', () => {
let apiSimulator;
beforeEach(() => {
apiSimulator = new APIErrorSimulator();
});
it('should handle rate limit errors with exponential backoff', async () => {
apiSimulator.addScenario('/v1/face/verify', 'rate_limit', {
probability: 1.0,
delay: 100
});
const startTime = Date.now();
try {
await apiSimulator.simulateCall('/v1/face/verify', mockApiCall);
fail('Expected rate limit error');
} catch (error) {
expect(error.status).toBe(429);
expect(Date.now() - startTime).toBeGreaterThan(100);
}
});
});2. Error Recovery Testing
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock
class TestErrorRecovery:
@pytest.fixture
def api_client(self):
return MagicMock()
@pytest.fixture
def circuit_breaker(self):
return CircuitBreaker(
failure_threshold=3,
recovery_timeout=1000,
monitoring_period=500
)
@pytest.mark.asyncio
async def test_circuit_breaker_opens_after_failures(self, circuit_breaker, api_client):
"""Test circuit breaker opens after consecutive failures"""
# Mock API to always fail
api_client.face_verify = AsyncMock(side_effect=Exception("API Error"))
# Make requests until circuit opens
for i in range(5):
try:
await circuit_breaker.call(lambda: api_client.face_verify())
except Exception:
pass
# Circuit should now be open
assert circuit_breaker.state == 'OPEN'
# Next call should fail immediately without calling API
with pytest.raises(Exception, match="Circuit breaker is OPEN"):
await circuit_breaker.call(lambda: api_client.face_verify())
# API should not be called when circuit is open
assert api_client.face_verify.call_count == 3 # Only during threshold detection
@pytest.mark.asyncio
async def test_fallback_on_primary_failure(self, api_client):
"""Test fallback mechanism when primary service fails"""
fallback_manager = FallbackManager()
# Mock primary to fail
primary_fn = AsyncMock(side_effect=Exception("Primary service down"))
# Mock fallback to succeed
fallback_fn = AsyncMock(return_value={'status': 'fallback_used'})
fallback_manager.register_fallback('test_operation', fallback_fn)
result = await fallback_manager.execute_with_fallback(
'test_operation',
primary_fn,
'test_arg'
)
assert result['status'] == 'fallback_used'
assert primary_fn.called
assert fallback_fn.calledNext: Rate Limits Guide
Last updated