Best Practices
Learn the best practices for using the Zaits API efficiently, securely, and reliably in production applications.
API Security
API Key Management
✅ Secure Key Storage
// Good: Environment variables
const apiKey = process.env.ZAITS_API_KEY;
// Bad: Hardcoded in source code
const apiKey = "sk_live_1234567890abcdef"; // Don't do this!✅ Key Rotation Strategy
Rotate keys every 90 days for production applications
Generate separate keys for different environments
Use dedicated keys for development and staging environments
Monitor key usage in your dashboard
# Example rotation script
# 1. Generate new key in dashboard
# 2. Update environment variable
export ZAITS_API_KEY="sk_live_newkey123456"
# 3. Deploy application
# 4. Delete old key after verification✅ Permission Principle
Assign minimal required permissions to API keys:
Analytics Dashboard
Read only
Cannot modify data
Mobile App
Face + OCR only
Limited functionality
Webhook Handler
Write only
Process events
IP Whitelisting
Configure IP restrictions for production applications:
# Example: Restrict to server IP ranges
allowed_ips = [
"203.0.113.0/24", # Production servers
"198.51.100.50", # Backup server
"192.168.1.0/24" # Office network
]Request Signing (Advanced)
For high-security applications, implement request signing:
const crypto = require('crypto');
function signRequest(payload, secret) {
const signature = crypto
.createHmac('sha256', secret)
.update(JSON.stringify(payload))
.digest('hex');
return `sha256=${signature}`;
}
// Include signature in request headers
headers: {
'Authorization': 'Bearer YOUR_API_KEY',
'X-Zaits-Signature': signRequest(requestBody, webhookSecret)
}Performance Optimization
Image Optimization
✅ Image Size and Format
from PIL import Image
import io
def optimize_image_for_api(image_path, max_size=(1024, 1024), quality=85):
"""Optimize image for API processing"""
with Image.open(image_path) as img:
# Resize if needed
if img.size[0] > max_size[0] or img.size[1] > max_size[1]:
img.thumbnail(max_size, Image.LANCZOS)
# Convert to RGB if needed
if img.mode in ('RGBA', 'P', 'LA'):
img = img.convert('RGB')
# Save optimized version
output = io.BytesIO()
img.save(output, format='JPEG', quality=quality, optimize=True)
return output.getvalue()✅ Optimal Image Requirements
Format: JPEG for photos, PNG for graphics with transparency
Size: 300KB - 2MB (max 5MB)
Resolution: 300x300px minimum, 1920x1920px maximum
Quality: 80-90% JPEG quality for best balance
Caching Strategies
✅ Result Caching
const NodeCache = require('node-cache');
const cache = new NodeCache({ stdTTL: 3600 }); // 1 hour cache
async function verifyFaceWithCache(image1, image2) {
// Create cache key from image hashes
const cacheKey = crypto
.createHash('md5')
.update(image1 + image2)
.digest('hex');
// Check cache first
const cachedResult = cache.get(cacheKey);
if (cachedResult) {
return cachedResult;
}
// Make API call
const result = await zaitsApi.faceVerify(image1, image2);
// Cache successful results only
if (result.success) {
cache.set(cacheKey, result, 3600);
}
return result;
}✅ API Response Caching
import hashlib
import json
import redis
from datetime import timedelta
class ZaitsAPICache:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cache_key(self, endpoint, data):
"""Generate cache key from endpoint and data"""
key_data = f"{endpoint}:{json.dumps(data, sort_keys=True)}"
return hashlib.md5(key_data.encode()).hexdigest()
def get_cached_result(self, endpoint, data, ttl_hours=1):
"""Get cached API result"""
key = self.cache_key(endpoint, data)
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
return None
def cache_result(self, endpoint, data, result, ttl_hours=1):
"""Cache API result"""
key = self.cache_key(endpoint, data)
ttl = timedelta(hours=ttl_hours)
self.redis_client.setex(key, ttl, json.dumps(result))Batch Processing
✅ Process Multiple Files Efficiently
const pLimit = require('p-limit');
async function batchOCRProcessing(documents, concurrency = 3) {
const limit = pLimit(concurrency); // Limit concurrent requests
const results = await Promise.allSettled(
documents.map(doc =>
limit(() => processDocument(doc))
)
);
// Handle results and errors
const successful = results
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
const errors = results
.filter(r => r.status === 'rejected')
.map(r => r.reason);
return { successful, errors };
}
async function processDocument(document) {
try {
const result = await zaitsApi.ocrExtract(document);
return { document: document.name, result };
} catch (error) {
console.error(`Failed to process ${document.name}:`, error);
throw error;
}
}Error Handling
Robust Error Handling
✅ Comprehensive Error Handling
import requests
import time
from typing import Dict, Any, Optional
class ZaitsAPIClient:
def __init__(self, api_key: str, max_retries: int = 3):
self.api_key = api_key
self.max_retries = max_retries
self.base_url = "https://api.zaits.net"
def make_request(self, endpoint: str, data: Dict[Any, Any],
files: Optional[Dict] = None) -> Dict[Any, Any]:
"""Make API request with comprehensive error handling"""
headers = {'Authorization': f'Bearer {self.api_key}'}
url = f"{self.base_url}{endpoint}"
for attempt in range(self.max_retries + 1):
try:
response = requests.post(
url,
headers=headers,
data=data,
files=files,
timeout=30
)
# Handle different response codes
if response.status_code == 200:
return response.json()
elif response.status_code == 400:
error_data = response.json()
raise ValidationError(error_data.get('error', 'Bad request'))
elif response.status_code == 401:
raise AuthenticationError('Invalid API key')
elif response.status_code == 403:
raise AuthorizationError('Insufficient permissions')
elif response.status_code == 429:
# Rate limit - implement exponential backoff
wait_time = 2 ** attempt
time.sleep(wait_time)
continue
elif response.status_code >= 500:
# Server error - retry with backoff
if attempt < self.max_retries:
wait_time = 2 ** attempt
time.sleep(wait_time)
continue
raise ServerError('Server error after retries')
else:
raise APIError(f'Unexpected status code: {response.status_code}')
except requests.exceptions.RequestException as e:
if attempt < self.max_retries:
time.sleep(2 ** attempt)
continue
raise NetworkError(f'Network error: {str(e)}')
raise APIError('Maximum retries exceeded')
# Custom exception classes
class ZaitsAPIError(Exception):
"""Base exception for Zaits API errors"""
pass
class ValidationError(ZaitsAPIError):
"""Validation error (400)"""
pass
class AuthenticationError(ZaitsAPIError):
"""Authentication error (401)"""
pass
class AuthorizationError(ZaitsAPIError):
"""Authorization error (403)"""
pass
class RateLimitError(ZaitsAPIError):
"""Rate limit error (429)"""
pass
class ServerError(ZaitsAPIError):
"""Server error (5xx)"""
pass
class NetworkError(ZaitsAPIError):
"""Network/connection error"""
passGraceful Degradation
✅ Fallback Strategies
class FaceVerificationService {
async verifyFace(image1, image2, options = {}) {
try {
// Primary API call
const result = await this.zaitsApi.faceVerify(image1, image2);
return {
verified: result.data.verified,
confidence: result.data.confidence,
source: 'zaits-api'
};
} catch (error) {
if (error instanceof RateLimitError) {
// Fallback to local processing
return this.fallbackLocalVerification(image1, image2);
}
if (error instanceof NetworkError) {
// Queue for later processing
await this.queueForLaterProcessing(image1, image2);
return {
verified: null,
confidence: 0,
source: 'queued',
message: 'Verification queued due to network issues'
};
}
throw error; // Re-throw unexpected errors
}
}
async fallbackLocalVerification(image1, image2) {
// Implement basic local verification
console.warn('Using fallback local verification');
return {
verified: false, // Conservative default
confidence: 0,
source: 'local-fallback'
};
}
}Rate Limiting
Respect Rate Limits
✅ Implement Rate Limit Handling
import time
from datetime import datetime, timedelta
from collections import deque
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window # in seconds
self.requests = deque()
def wait_if_needed(self):
"""Wait if necessary to respect rate limits"""
now = datetime.now()
# Remove requests outside the time window
while self.requests and (now - self.requests[0]) > timedelta(seconds=self.time_window):
self.requests.popleft()
# Check if we need to wait
if len(self.requests) >= self.max_requests:
oldest_request = self.requests[0]
wait_until = oldest_request + timedelta(seconds=self.time_window)
wait_seconds = (wait_until - now).total_seconds()
if wait_seconds > 0:
print(f"Rate limit reached. Waiting {wait_seconds:.2f} seconds...")
time.sleep(wait_seconds)
# Record this request
self.requests.append(now)
# Usage
rate_limiter = RateLimiter(max_requests=100, time_window=60) # 100 requests per minute
def api_call_with_rate_limiting():
rate_limiter.wait_if_needed()
return make_api_request()Circuit Breaker Pattern
✅ Implement Circuit Breaker
class CircuitBreaker {
constructor(threshold = 5, timeout = 60000) {
this.threshold = threshold;
this.timeout = timeout;
this.failureCount = 0;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.nextAttempt = Date.now();
}
async call(fn) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
this.state = 'HALF_OPEN';
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failureCount = 0;
this.state = 'CLOSED';
}
onFailure() {
this.failureCount++;
if (this.failureCount >= this.threshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.timeout;
}
}
}
// Usage
const circuitBreaker = new CircuitBreaker(5, 60000);
async function protectedApiCall() {
return circuitBreaker.call(() => zaitsApi.faceVerify(img1, img2));
}Monitoring and Logging
Comprehensive Logging
✅ Structured Logging
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: { service: 'zaits-api-client' },
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' }),
new winston.transports.Console({
format: winston.format.simple()
})
]
});
class ZaitsAPIClient {
async faceVerify(image1, image2) {
const startTime = Date.now();
const requestId = this.generateRequestId();
logger.info('API request started', {
requestId,
endpoint: '/v1/face/verify',
timestamp: new Date().toISOString()
});
try {
const result = await this.makeRequest('/v1/face/verify', {
image1,
image2
});
const duration = Date.now() - startTime;
logger.info('API request completed', {
requestId,
endpoint: '/v1/face/verify',
duration,
success: result.success,
verified: result.data?.verified
});
return result;
} catch (error) {
const duration = Date.now() - startTime;
logger.error('API request failed', {
requestId,
endpoint: '/v1/face/verify',
duration,
error: error.message,
stack: error.stack
});
throw error;
}
}
}Health Monitoring
✅ API Health Checks
import requests
import time
from datetime import datetime
class ZaitsAPIHealthMonitor:
def __init__(self, api_key: str, check_interval: int = 300):
self.api_key = api_key
self.check_interval = check_interval # 5 minutes
self.health_status = {}
def check_api_health(self):
"""Perform health check on API endpoints"""
endpoints = [
'/v1/usage/summary',
'/v1/face/verify',
'/v1/ocr/extract'
]
for endpoint in endpoints:
try:
start_time = time.time()
if endpoint == '/v1/usage/summary':
response = self.check_usage_endpoint()
else:
# For endpoints requiring data, use test mode
response = self.check_endpoint_availability(endpoint)
response_time = time.time() - start_time
self.health_status[endpoint] = {
'status': 'healthy',
'response_time': response_time,
'last_check': datetime.now().isoformat()
}
except Exception as e:
self.health_status[endpoint] = {
'status': 'unhealthy',
'error': str(e),
'last_check': datetime.now().isoformat()
}
def check_usage_endpoint(self):
"""Check usage endpoint (doesn't require data)"""
headers = {'Authorization': f'Bearer {self.api_key}'}
response = requests.get(
'https://api.zaits.net/v1/usage/summary',
headers=headers,
timeout=10
)
response.raise_for_status()
return response.json()
def get_health_summary(self):
"""Get overall health summary"""
healthy_count = sum(1 for status in self.health_status.values()
if status['status'] == 'healthy')
total_count = len(self.health_status)
return {
'overall_status': 'healthy' if healthy_count == total_count else 'degraded',
'healthy_endpoints': healthy_count,
'total_endpoints': total_count,
'endpoints': self.health_status
}Production Deployment
Environment Configuration
✅ Environment-Specific Configuration
# config/production.yml
zaits_api:
base_url: https://api.zaits.net
api_key: ${ZAITS_API_KEY}
timeout: 30
max_retries: 3
rate_limit:
requests_per_minute: 100
cache:
ttl: 3600
redis_url: ${REDIS_URL}
monitoring:
enable_metrics: true
log_level: info# config/development.yml
zaits_api:
base_url: https://api.zaits.net
api_key: ${ZAITS_TEST_API_KEY}
timeout: 10
max_retries: 1
rate_limit:
requests_per_minute: 20
cache:
ttl: 300
redis_url: redis://localhost:6379
monitoring:
enable_metrics: false
log_level: debugDeployment Checklist
✅ Pre-Production Checklist
✅ Post-Deployment Monitoring
// Example monitoring dashboard metrics
const metrics = {
api_requests_total: 0,
api_requests_success: 0,
api_requests_error: 0,
api_response_time_avg: 0,
api_rate_limit_hits: 0,
cache_hit_ratio: 0.85
};
// Health check endpoint for load balancers
app.get('/health', (req, res) => {
const health = {
status: 'ok',
timestamp: new Date().toISOString(),
services: {
zaits_api: checkZaitsAPIHealth(),
database: checkDatabaseHealth(),
cache: checkCacheHealth()
}
};
const allHealthy = Object.values(health.services)
.every(service => service.status === 'ok');
res.status(allHealthy ? 200 : 503).json(health);
});Testing Best Practices
Unit Testing
✅ Mock API Responses
// tests/zaits-api.test.js
const nock = require('nock');
const ZaitsAPI = require('../src/zaits-api');
describe('ZaitsAPI', () => {
let api;
beforeEach(() => {
api = new ZaitsAPI('test-api-key');
nock.cleanAll();
});
it('should verify faces successfully', async () => {
// Mock successful response
nock('https://api.zaits.net')
.post('/v1/face/verify')
.reply(200, {
success: true,
data: {
verified: true,
confidence: 0.89,
distance: 0.32
}
});
const result = await api.faceVerify('image1.jpg', 'image2.jpg');
expect(result.success).toBe(true);
expect(result.data.verified).toBe(true);
expect(result.data.confidence).toBeGreaterThan(0.8);
});
it('should handle rate limit errors', async () => {
// Mock rate limit response
nock('https://api.zaits.net')
.post('/v1/face/verify')
.reply(429, {
success: false,
error: {
code: 'rate_limit_exceeded',
message: 'Too many requests'
}
});
await expect(api.faceVerify('image1.jpg', 'image2.jpg'))
.rejects.toThrow('rate_limit_exceeded');
});
});Integration Testing
✅ API Integration Tests
import pytest
import os
from zaits_api_client import ZaitsAPIClient
@pytest.fixture
def api_client():
"""Test API client with test key"""
api_key = os.getenv('ZAITS_TEST_API_KEY')
if not api_key:
pytest.skip('Test API key not configured')
return ZaitsAPIClient(api_key)
@pytest.fixture
def sample_images():
"""Sample test images"""
return {
'face1': 'tests/fixtures/face1.jpg',
'face2': 'tests/fixtures/face2.jpg',
'document': 'tests/fixtures/document.pdf'
}
def test_face_verification_integration(api_client, sample_images):
"""Test actual face verification API"""
result = api_client.face_verify(
sample_images['face1'],
sample_images['face2']
)
assert result['success'] is True
assert 'verified' in result['data']
assert 'confidence' in result['data']
assert isinstance(result['data']['confidence'], float)
assert 0 <= result['data']['confidence'] <= 1
def test_ocr_extraction_integration(api_client, sample_images):
"""Test actual OCR extraction API"""
result = api_client.ocr_extract(sample_images['document'])
assert result['success'] is True
assert 'text' in result['data']
assert len(result['data']['text']) > 0
@pytest.mark.performance
def test_api_response_time(api_client, sample_images):
"""Test API response time requirements"""
import time
start_time = time.time()
result = api_client.face_verify(
sample_images['face1'],
sample_images['face2']
)
response_time = time.time() - start_time
assert result['success'] is True
assert response_time < 5.0 # Should respond within 5 secondsNext: Error Handling Guide
Last updated