Best Practices

Learn the best practices for using the Zaits API efficiently, securely, and reliably in production applications.

API Security

API Key Management

✅ Secure Key Storage

// Good: Environment variables
const apiKey = process.env.ZAITS_API_KEY;

// Bad: Hardcoded in source code
const apiKey = "sk_live_1234567890abcdef"; // Don't do this!

✅ Key Rotation Strategy

  • Rotate keys every 90 days for production applications

  • Generate separate keys for different environments

  • Use dedicated keys for development and staging environments

  • Monitor key usage in your dashboard

# Example rotation script
# 1. Generate new key in dashboard
# 2. Update environment variable
export ZAITS_API_KEY="sk_live_newkey123456"
# 3. Deploy application
# 4. Delete old key after verification

✅ Permission Principle

Assign minimal required permissions to API keys:

Use Case
Permissions
Rationale

Analytics Dashboard

Read only

Cannot modify data

Mobile App

Face + OCR only

Limited functionality

Webhook Handler

Write only

Process events

IP Whitelisting

Configure IP restrictions for production applications:

# Example: Restrict to server IP ranges
allowed_ips = [
    "203.0.113.0/24",    # Production servers
    "198.51.100.50",     # Backup server
    "192.168.1.0/24"     # Office network
]

Request Signing (Advanced)

For high-security applications, implement request signing:

const crypto = require('crypto');

function signRequest(payload, secret) {
  const signature = crypto
    .createHmac('sha256', secret)
    .update(JSON.stringify(payload))
    .digest('hex');
    
  return `sha256=${signature}`;
}

// Include signature in request headers
headers: {
  'Authorization': 'Bearer YOUR_API_KEY',
  'X-Zaits-Signature': signRequest(requestBody, webhookSecret)
}

Performance Optimization

Image Optimization

✅ Image Size and Format

from PIL import Image
import io

def optimize_image_for_api(image_path, max_size=(1024, 1024), quality=85):
    """Optimize image for API processing"""
    with Image.open(image_path) as img:
        # Resize if needed
        if img.size[0] > max_size[0] or img.size[1] > max_size[1]:
            img.thumbnail(max_size, Image.LANCZOS)
        
        # Convert to RGB if needed
        if img.mode in ('RGBA', 'P', 'LA'):
            img = img.convert('RGB')
        
        # Save optimized version
        output = io.BytesIO()
        img.save(output, format='JPEG', quality=quality, optimize=True)
        return output.getvalue()

✅ Optimal Image Requirements

  • Format: JPEG for photos, PNG for graphics with transparency

  • Size: 300KB - 2MB (max 5MB)

  • Resolution: 300x300px minimum, 1920x1920px maximum

  • Quality: 80-90% JPEG quality for best balance

Caching Strategies

✅ Result Caching

const NodeCache = require('node-cache');
const cache = new NodeCache({ stdTTL: 3600 }); // 1 hour cache

async function verifyFaceWithCache(image1, image2) {
  // Create cache key from image hashes
  const cacheKey = crypto
    .createHash('md5')
    .update(image1 + image2)
    .digest('hex');
  
  // Check cache first
  const cachedResult = cache.get(cacheKey);
  if (cachedResult) {
    return cachedResult;
  }
  
  // Make API call
  const result = await zaitsApi.faceVerify(image1, image2);
  
  // Cache successful results only
  if (result.success) {
    cache.set(cacheKey, result, 3600);
  }
  
  return result;
}

✅ API Response Caching

import hashlib
import json
import redis
from datetime import timedelta

class ZaitsAPICache:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        
    def cache_key(self, endpoint, data):
        """Generate cache key from endpoint and data"""
        key_data = f"{endpoint}:{json.dumps(data, sort_keys=True)}"
        return hashlib.md5(key_data.encode()).hexdigest()
    
    def get_cached_result(self, endpoint, data, ttl_hours=1):
        """Get cached API result"""
        key = self.cache_key(endpoint, data)
        cached = self.redis_client.get(key)
        
        if cached:
            return json.loads(cached)
        return None
    
    def cache_result(self, endpoint, data, result, ttl_hours=1):
        """Cache API result"""
        key = self.cache_key(endpoint, data)
        ttl = timedelta(hours=ttl_hours)
        self.redis_client.setex(key, ttl, json.dumps(result))

Batch Processing

✅ Process Multiple Files Efficiently

const pLimit = require('p-limit');

async function batchOCRProcessing(documents, concurrency = 3) {
  const limit = pLimit(concurrency); // Limit concurrent requests
  
  const results = await Promise.allSettled(
    documents.map(doc => 
      limit(() => processDocument(doc))
    )
  );
  
  // Handle results and errors
  const successful = results
    .filter(r => r.status === 'fulfilled')
    .map(r => r.value);
    
  const errors = results
    .filter(r => r.status === 'rejected')
    .map(r => r.reason);
  
  return { successful, errors };
}

async function processDocument(document) {
  try {
    const result = await zaitsApi.ocrExtract(document);
    return { document: document.name, result };
  } catch (error) {
    console.error(`Failed to process ${document.name}:`, error);
    throw error;
  }
}

Error Handling

Robust Error Handling

✅ Comprehensive Error Handling

import requests
import time
from typing import Dict, Any, Optional

class ZaitsAPIClient:
    def __init__(self, api_key: str, max_retries: int = 3):
        self.api_key = api_key
        self.max_retries = max_retries
        self.base_url = "https://api.zaits.net"
    
    def make_request(self, endpoint: str, data: Dict[Any, Any], 
                    files: Optional[Dict] = None) -> Dict[Any, Any]:
        """Make API request with comprehensive error handling"""
        
        headers = {'Authorization': f'Bearer {self.api_key}'}
        url = f"{self.base_url}{endpoint}"
        
        for attempt in range(self.max_retries + 1):
            try:
                response = requests.post(
                    url, 
                    headers=headers, 
                    data=data, 
                    files=files,
                    timeout=30
                )
                
                # Handle different response codes
                if response.status_code == 200:
                    return response.json()
                elif response.status_code == 400:
                    error_data = response.json()
                    raise ValidationError(error_data.get('error', 'Bad request'))
                elif response.status_code == 401:
                    raise AuthenticationError('Invalid API key')
                elif response.status_code == 403:
                    raise AuthorizationError('Insufficient permissions')
                elif response.status_code == 429:
                    # Rate limit - implement exponential backoff
                    wait_time = 2 ** attempt
                    time.sleep(wait_time)
                    continue
                elif response.status_code >= 500:
                    # Server error - retry with backoff
                    if attempt < self.max_retries:
                        wait_time = 2 ** attempt
                        time.sleep(wait_time)
                        continue
                    raise ServerError('Server error after retries')
                else:
                    raise APIError(f'Unexpected status code: {response.status_code}')
                    
            except requests.exceptions.RequestException as e:
                if attempt < self.max_retries:
                    time.sleep(2 ** attempt)
                    continue
                raise NetworkError(f'Network error: {str(e)}')
        
        raise APIError('Maximum retries exceeded')

# Custom exception classes
class ZaitsAPIError(Exception):
    """Base exception for Zaits API errors"""
    pass

class ValidationError(ZaitsAPIError):
    """Validation error (400)"""
    pass

class AuthenticationError(ZaitsAPIError):
    """Authentication error (401)"""
    pass

class AuthorizationError(ZaitsAPIError):
    """Authorization error (403)"""
    pass

class RateLimitError(ZaitsAPIError):
    """Rate limit error (429)"""
    pass

class ServerError(ZaitsAPIError):
    """Server error (5xx)"""
    pass

class NetworkError(ZaitsAPIError):
    """Network/connection error"""
    pass

Graceful Degradation

✅ Fallback Strategies

class FaceVerificationService {
  async verifyFace(image1, image2, options = {}) {
    try {
      // Primary API call
      const result = await this.zaitsApi.faceVerify(image1, image2);
      return {
        verified: result.data.verified,
        confidence: result.data.confidence,
        source: 'zaits-api'
      };
      
    } catch (error) {
      if (error instanceof RateLimitError) {
        // Fallback to local processing
        return this.fallbackLocalVerification(image1, image2);
      }
      
      if (error instanceof NetworkError) {
        // Queue for later processing
        await this.queueForLaterProcessing(image1, image2);
        return {
          verified: null,
          confidence: 0,
          source: 'queued',
          message: 'Verification queued due to network issues'
        };
      }
      
      throw error; // Re-throw unexpected errors
    }
  }
  
  async fallbackLocalVerification(image1, image2) {
    // Implement basic local verification
    console.warn('Using fallback local verification');
    return {
      verified: false, // Conservative default
      confidence: 0,
      source: 'local-fallback'
    };
  }
}

Rate Limiting

Respect Rate Limits

✅ Implement Rate Limit Handling

import time
from datetime import datetime, timedelta
from collections import deque

class RateLimiter:
    def __init__(self, max_requests: int, time_window: int):
        self.max_requests = max_requests
        self.time_window = time_window  # in seconds
        self.requests = deque()
    
    def wait_if_needed(self):
        """Wait if necessary to respect rate limits"""
        now = datetime.now()
        
        # Remove requests outside the time window
        while self.requests and (now - self.requests[0]) > timedelta(seconds=self.time_window):
            self.requests.popleft()
        
        # Check if we need to wait
        if len(self.requests) >= self.max_requests:
            oldest_request = self.requests[0]
            wait_until = oldest_request + timedelta(seconds=self.time_window)
            wait_seconds = (wait_until - now).total_seconds()
            
            if wait_seconds > 0:
                print(f"Rate limit reached. Waiting {wait_seconds:.2f} seconds...")
                time.sleep(wait_seconds)
        
        # Record this request
        self.requests.append(now)

# Usage
rate_limiter = RateLimiter(max_requests=100, time_window=60)  # 100 requests per minute

def api_call_with_rate_limiting():
    rate_limiter.wait_if_needed()
    return make_api_request()

Circuit Breaker Pattern

✅ Implement Circuit Breaker

class CircuitBreaker {
  constructor(threshold = 5, timeout = 60000) {
    this.threshold = threshold;
    this.timeout = timeout;
    this.failureCount = 0;
    this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
    this.nextAttempt = Date.now();
  }
  
  async call(fn) {
    if (this.state === 'OPEN') {
      if (Date.now() < this.nextAttempt) {
        throw new Error('Circuit breaker is OPEN');
      }
      this.state = 'HALF_OPEN';
    }
    
    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }
  
  onSuccess() {
    this.failureCount = 0;
    this.state = 'CLOSED';
  }
  
  onFailure() {
    this.failureCount++;
    if (this.failureCount >= this.threshold) {
      this.state = 'OPEN';
      this.nextAttempt = Date.now() + this.timeout;
    }
  }
}

// Usage
const circuitBreaker = new CircuitBreaker(5, 60000);

async function protectedApiCall() {
  return circuitBreaker.call(() => zaitsApi.faceVerify(img1, img2));
}

Monitoring and Logging

Comprehensive Logging

✅ Structured Logging

const winston = require('winston');

const logger = winston.createLogger({
  level: 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    winston.format.errors({ stack: true }),
    winston.format.json()
  ),
  defaultMeta: { service: 'zaits-api-client' },
  transports: [
    new winston.transports.File({ filename: 'error.log', level: 'error' }),
    new winston.transports.File({ filename: 'combined.log' }),
    new winston.transports.Console({
      format: winston.format.simple()
    })
  ]
});

class ZaitsAPIClient {
  async faceVerify(image1, image2) {
    const startTime = Date.now();
    const requestId = this.generateRequestId();
    
    logger.info('API request started', {
      requestId,
      endpoint: '/v1/face/verify',
      timestamp: new Date().toISOString()
    });
    
    try {
      const result = await this.makeRequest('/v1/face/verify', {
        image1,
        image2
      });
      
      const duration = Date.now() - startTime;
      
      logger.info('API request completed', {
        requestId,
        endpoint: '/v1/face/verify',
        duration,
        success: result.success,
        verified: result.data?.verified
      });
      
      return result;
    } catch (error) {
      const duration = Date.now() - startTime;
      
      logger.error('API request failed', {
        requestId,
        endpoint: '/v1/face/verify',
        duration,
        error: error.message,
        stack: error.stack
      });
      
      throw error;
    }
  }
}

Health Monitoring

✅ API Health Checks

import requests
import time
from datetime import datetime

class ZaitsAPIHealthMonitor:
    def __init__(self, api_key: str, check_interval: int = 300):
        self.api_key = api_key
        self.check_interval = check_interval  # 5 minutes
        self.health_status = {}
    
    def check_api_health(self):
        """Perform health check on API endpoints"""
        endpoints = [
            '/v1/usage/summary',
            '/v1/face/verify',
            '/v1/ocr/extract'
        ]
        
        for endpoint in endpoints:
            try:
                start_time = time.time()
                
                if endpoint == '/v1/usage/summary':
                    response = self.check_usage_endpoint()
                else:
                    # For endpoints requiring data, use test mode
                    response = self.check_endpoint_availability(endpoint)
                
                response_time = time.time() - start_time
                
                self.health_status[endpoint] = {
                    'status': 'healthy',
                    'response_time': response_time,
                    'last_check': datetime.now().isoformat()
                }
                
            except Exception as e:
                self.health_status[endpoint] = {
                    'status': 'unhealthy',
                    'error': str(e),
                    'last_check': datetime.now().isoformat()
                }
    
    def check_usage_endpoint(self):
        """Check usage endpoint (doesn't require data)"""
        headers = {'Authorization': f'Bearer {self.api_key}'}
        response = requests.get(
            'https://api.zaits.net/v1/usage/summary',
            headers=headers,
            timeout=10
        )
        response.raise_for_status()
        return response.json()
    
    def get_health_summary(self):
        """Get overall health summary"""
        healthy_count = sum(1 for status in self.health_status.values() 
                          if status['status'] == 'healthy')
        total_count = len(self.health_status)
        
        return {
            'overall_status': 'healthy' if healthy_count == total_count else 'degraded',
            'healthy_endpoints': healthy_count,
            'total_endpoints': total_count,
            'endpoints': self.health_status
        }

Production Deployment

Environment Configuration

✅ Environment-Specific Configuration

# config/production.yml
zaits_api:
  base_url: https://api.zaits.net
  api_key: ${ZAITS_API_KEY}
  timeout: 30
  max_retries: 3
  rate_limit:
    requests_per_minute: 100
  cache:
    ttl: 3600
    redis_url: ${REDIS_URL}
  monitoring:
    enable_metrics: true
    log_level: info
# config/development.yml
zaits_api:
  base_url: https://api.zaits.net
  api_key: ${ZAITS_TEST_API_KEY}
  timeout: 10
  max_retries: 1
  rate_limit:
    requests_per_minute: 20
  cache:
    ttl: 300
    redis_url: redis://localhost:6379
  monitoring:
    enable_metrics: false
    log_level: debug

Deployment Checklist

✅ Pre-Production Checklist

✅ Post-Deployment Monitoring

// Example monitoring dashboard metrics
const metrics = {
  api_requests_total: 0,
  api_requests_success: 0,
  api_requests_error: 0,
  api_response_time_avg: 0,
  api_rate_limit_hits: 0,
  cache_hit_ratio: 0.85
};

// Health check endpoint for load balancers
app.get('/health', (req, res) => {
  const health = {
    status: 'ok',
    timestamp: new Date().toISOString(),
    services: {
      zaits_api: checkZaitsAPIHealth(),
      database: checkDatabaseHealth(),
      cache: checkCacheHealth()
    }
  };
  
  const allHealthy = Object.values(health.services)
    .every(service => service.status === 'ok');
  
  res.status(allHealthy ? 200 : 503).json(health);
});

Testing Best Practices

Unit Testing

✅ Mock API Responses

// tests/zaits-api.test.js
const nock = require('nock');
const ZaitsAPI = require('../src/zaits-api');

describe('ZaitsAPI', () => {
  let api;
  
  beforeEach(() => {
    api = new ZaitsAPI('test-api-key');
    nock.cleanAll();
  });
  
  it('should verify faces successfully', async () => {
    // Mock successful response
    nock('https://api.zaits.net')
      .post('/v1/face/verify')
      .reply(200, {
        success: true,
        data: {
          verified: true,
          confidence: 0.89,
          distance: 0.32
        }
      });
    
    const result = await api.faceVerify('image1.jpg', 'image2.jpg');
    
    expect(result.success).toBe(true);
    expect(result.data.verified).toBe(true);
    expect(result.data.confidence).toBeGreaterThan(0.8);
  });
  
  it('should handle rate limit errors', async () => {
    // Mock rate limit response
    nock('https://api.zaits.net')
      .post('/v1/face/verify')
      .reply(429, {
        success: false,
        error: {
          code: 'rate_limit_exceeded',
          message: 'Too many requests'
        }
      });
    
    await expect(api.faceVerify('image1.jpg', 'image2.jpg'))
      .rejects.toThrow('rate_limit_exceeded');
  });
});

Integration Testing

✅ API Integration Tests

import pytest
import os
from zaits_api_client import ZaitsAPIClient

@pytest.fixture
def api_client():
    """Test API client with test key"""
    api_key = os.getenv('ZAITS_TEST_API_KEY')
    if not api_key:
        pytest.skip('Test API key not configured')
    return ZaitsAPIClient(api_key)

@pytest.fixture
def sample_images():
    """Sample test images"""
    return {
        'face1': 'tests/fixtures/face1.jpg',
        'face2': 'tests/fixtures/face2.jpg',
        'document': 'tests/fixtures/document.pdf'
    }

def test_face_verification_integration(api_client, sample_images):
    """Test actual face verification API"""
    result = api_client.face_verify(
        sample_images['face1'], 
        sample_images['face2']
    )
    
    assert result['success'] is True
    assert 'verified' in result['data']
    assert 'confidence' in result['data']
    assert isinstance(result['data']['confidence'], float)
    assert 0 <= result['data']['confidence'] <= 1

def test_ocr_extraction_integration(api_client, sample_images):
    """Test actual OCR extraction API"""
    result = api_client.ocr_extract(sample_images['document'])
    
    assert result['success'] is True
    assert 'text' in result['data']
    assert len(result['data']['text']) > 0

@pytest.mark.performance
def test_api_response_time(api_client, sample_images):
    """Test API response time requirements"""
    import time
    
    start_time = time.time()
    result = api_client.face_verify(
        sample_images['face1'], 
        sample_images['face2']
    )
    response_time = time.time() - start_time
    
    assert result['success'] is True
    assert response_time < 5.0  # Should respond within 5 seconds

Next: Error Handling Guide

Last updated