Python Examples

Practical Python code examples for integrating with the Zaits API in various scenarios.

Setup

pip install zaits-python requests pillow opencv-python
from zaits import ZaitsClient
client = ZaitsClient(api_key='your-api-key')

Face Recognition Examples

Identity Verification System

import cv2
import numpy as np
from PIL import Image
from pathlib import Path
import sqlite3
import hashlib

class IdentityVerificationSystem:
    def __init__(self, zaits_client, db_path='identity_db.sqlite'):
        self.client = zaits_client
        self.db_path = db_path
        self.setup_database()
    
    def setup_database(self):
        """Initialize the identity database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS identities (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id TEXT UNIQUE,
                name TEXT,
                photo_hash TEXT,
                photo_path TEXT,
                verification_score REAL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def register_identity(self, user_id, name, photo_path, verification_photos):
        """Register a new identity with multiple verification photos"""
        try:
            # Verify all photos are of the same person
            base_photo = photo_path
            verification_scores = []
            
            for verify_photo in verification_photos:
                result = self.client.face.verify(base_photo, verify_photo)
                if not result.verified or result.confidence < 0.8:
                    raise ValueError(f"Photo {verify_photo} doesn't match base photo")
                verification_scores.append(result.confidence)
            
            # Check for liveness
            liveness = self.client.face.liveness(base_photo)
            if not liveness.is_live or liveness.confidence < 0.7:
                raise ValueError("Liveness check failed - photo may be fake")
            
            # Calculate average verification score
            avg_score = sum(verification_scores) / len(verification_scores)
            
            # Generate photo hash for quick comparison
            photo_hash = self._generate_photo_hash(base_photo)
            
            # Store in database
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT INTO identities (user_id, name, photo_hash, photo_path, verification_score)
                VALUES (?, ?, ?, ?, ?)
            ''', (user_id, name, photo_hash, base_photo, avg_score))
            
            conn.commit()
            conn.close()
            
            print(f"✅ Identity registered: {name} (Score: {avg_score:.3f})")
            return True
            
        except Exception as e:
            print(f"❌ Registration failed: {e}")
            return False
    
    def verify_identity(self, user_id, photo_path, threshold=0.85):
        """Verify identity against stored photo"""
        try:
            # Get stored identity
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute(
                'SELECT name, photo_path, verification_score FROM identities WHERE user_id = ?',
                (user_id,)
            )
            result = cursor.fetchone()
            conn.close()
            
            if not result:
                return {'verified': False, 'reason': 'Identity not found'}
            
            stored_name, stored_photo, stored_score = result
            
            # Perform face verification
            verification = self.client.face.verify(stored_photo, photo_path)
            
            # Check liveness
            liveness = self.client.face.liveness(photo_path)
            
            verified = (
                verification.verified and 
                verification.confidence >= threshold and
                liveness.is_live and 
                liveness.confidence >= 0.7
            )
            
            return {
                'verified': verified,
                'name': stored_name,
                'confidence': verification.confidence,
                'liveness_score': liveness.confidence,
                'stored_score': stored_score,
                'threshold': threshold
            }
            
        except Exception as e:
            print(f"❌ Verification failed: {e}")
            return {'verified': False, 'reason': str(e)}
    
    def _generate_photo_hash(self, photo_path):
        """Generate hash of photo for quick comparison"""
        with open(photo_path, 'rb') as f:
            return hashlib.md5(f.read()).hexdigest()

# Usage example
async def demo_identity_system():
    system = IdentityVerificationSystem(client)
    
    # Register new identity
    success = system.register_identity(
        user_id='user_123',
        name='John Doe',
        photo_path='./photos/john_base.jpg',
        verification_photos=[
            './photos/john_verify1.jpg',
            './photos/john_verify2.jpg'
        ]
    )
    
    if success:
        # Verify identity
        result = system.verify_identity('user_123', './photos/john_test.jpg')
        
        if result['verified']:
            print(f"🎉 Identity verified: {result['name']}")
            print(f"📊 Confidence: {result['confidence']:.3f}")
        else:
            print(f"❌ Identity verification failed: {result.get('reason', 'Unknown')}")

Webcam Face Detection with OpenCV

import cv2
import numpy as np
import threading
import queue
from datetime import datetime

class RealTimeFaceAnalyzer:
    def __init__(self, zaits_client):
        self.client = zaits_client
        self.cap = cv2.VideoCapture(0)
        self.analysis_queue = queue.Queue()
        self.current_analysis = None
        self.analyzing = False
        
    def start_analysis(self):
        """Start real-time face analysis"""
        # Start analysis thread
        analysis_thread = threading.Thread(target=self._analysis_worker)
        analysis_thread.daemon = True
        analysis_thread.start()
        
        frame_count = 0
        
        while True:
            ret, frame = self.cap.read()
            if not ret:
                break
            
            # Mirror the frame horizontally
            frame = cv2.flip(frame, 1)
            
            # Add frame to analysis queue every 30 frames (~1 second at 30 FPS)
            if frame_count % 30 == 0 and not self.analyzing:
                self.analysis_queue.put(frame.copy())
            
            # Draw analysis results on frame
            self._draw_analysis_results(frame)
            
            # Display the frame
            cv2.imshow('Real-time Face Analysis', frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            
            frame_count += 1
        
        self.cap.release()
        cv2.destroyAllWindows()
    
    def _analysis_worker(self):
        """Background thread for face analysis"""
        while True:
            try:
                frame = self.analysis_queue.get(timeout=1)
                self.analyzing = True
                
                # Save frame temporarily
                temp_path = 'temp_frame.jpg'
                cv2.imwrite(temp_path, frame)
                
                # Analyze face
                try:
                    analysis = self.client.face.analyze(
                        temp_path,
                        actions=['age', 'gender', 'emotion'],
                        return_face_region=True
                    )
                    
                    self.current_analysis = {
                        'age': analysis.age,
                        'gender': analysis.gender.prediction,
                        'gender_confidence': analysis.gender.confidence,
                        'emotion': analysis.emotion.dominant_emotion,
                        'face_region': analysis.region,
                        'timestamp': datetime.now()
                    }
                    
                except Exception as e:
                    print(f"Analysis failed: {e}")
                    self.current_analysis = None
                
                self.analyzing = False
                
            except queue.Empty:
                continue
    
    def _draw_analysis_results(self, frame):
        """Draw analysis results on the frame"""
        if self.current_analysis is None:
            cv2.putText(frame, 'No face detected', (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            return
        
        analysis = self.current_analysis
        
        # Draw face rectangle if available
        if analysis.get('face_region'):
            region = analysis['face_region']
            cv2.rectangle(frame, (region.x, region.y),
                         (region.x + region.w, region.y + region.h),
                         (0, 255, 0), 2)
        
        # Draw analysis text
        y_offset = 30
        texts = [
            f"Age: {analysis['age']}",
            f"Gender: {analysis['gender']} ({analysis['gender_confidence']:.2f})",
            f"Emotion: {analysis['emotion']}",
        ]
        
        if self.analyzing:
            texts.append("Analyzing...")
        
        for text in texts:
            cv2.putText(frame, text, (10, y_offset),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
            y_offset += 30

# Usage
analyzer = RealTimeFaceAnalyzer(client)
analyzer.start_analysis()  # Press 'q' to quit

OCR Examples

Intelligent Document Processor

import os
import json
import pandas as pd
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging

class IntelligentDocumentProcessor:
    def __init__(self, zaits_client):
        self.client = zaits_client
        self.supported_formats = ['.pdf', '.jpg', '.jpeg', '.png', '.tiff']
        self.setup_logging()
    
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('document_processing.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def process_document(self, file_path, document_type='auto'):
        """Process a single document with intelligent analysis"""
        try:
            self.logger.info(f"📄 Processing: {file_path}")
            
            # Extract text
            extraction_result = self.client.ocr.extract(
                file_path,
                language='en',
                extract_tables=True,
                output_format='structured'
            )
            
            # Analyze document
            analysis_result = self.client.ocr.analyze(
                file_path,
                document_type=document_type,
                extract_fields='auto',  # Auto-detect relevant fields
                validate_document=True
            )
            
            # Intelligent field extraction based on document type
            structured_data = self._extract_structured_data(
                analysis_result.document_type,
                analysis_result.extracted_fields,
                extraction_result.text
            )
            
            result = {
                'file_path': str(file_path),
                'document_type': analysis_result.document_type,
                'text': extraction_result.text,
                'confidence': extraction_result.confidence,
                'structured_data': structured_data,
                'is_valid': analysis_result.validation.is_valid,
                'processing_time': extraction_result.processing_time,
                'timestamp': datetime.now().isoformat()
            }
            
            self.logger.info(f"✅ Successfully processed: {file_path} (Type: {analysis_result.document_type})")
            return result
            
        except Exception as e:
            self.logger.error(f"❌ Failed to process {file_path}: {str(e)}")
            return {
                'file_path': str(file_path),
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }
    
    def batch_process_directory(self, directory_path, max_workers=3):
        """Process all documents in a directory concurrently"""
        directory = Path(directory_path)
        
        # Find all supported document files
        files_to_process = []
        for ext in self.supported_formats:
            files_to_process.extend(directory.glob(f'**/*{ext}'))
        
        if not files_to_process:
            self.logger.warning(f"No supported documents found in {directory_path}")
            return []
        
        self.logger.info(f"🔄 Processing {len(files_to_process)} documents with {max_workers} workers")
        
        results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Submit all tasks
            future_to_file = {
                executor.submit(self.process_document, file_path): file_path
                for file_path in files_to_process
            }
            
            # Collect results as they complete
            for future in as_completed(future_to_file):
                file_path = future_to_file[future]
                try:
                    result = future.result()
                    results.append(result)
                    
                    # Progress update
                    processed_count = len(results)
                    progress = (processed_count / len(files_to_process)) * 100
                    self.logger.info(f"📊 Progress: {processed_count}/{len(files_to_process)} ({progress:.1f}%)")
                    
                except Exception as e:
                    self.logger.error(f"❌ Error processing {file_path}: {str(e)}")
        
        return results
    
    def _extract_structured_data(self, document_type, extracted_fields, raw_text):
        """Extract structured data based on document type"""
        if document_type == 'invoice':
            return self._extract_invoice_data(extracted_fields, raw_text)
        elif document_type == 'receipt':
            return self._extract_receipt_data(extracted_fields, raw_text)
        elif document_type == 'contract':
            return self._extract_contract_data(extracted_fields, raw_text)
        else:
            return extracted_fields
    
    def _extract_invoice_data(self, fields, text):
        """Extract structured invoice data"""
        return {
            'invoice_number': fields.get('invoice_number'),
            'date': fields.get('date'),
            'due_date': fields.get('due_date'),
            'vendor': fields.get('vendor'),
            'customer': fields.get('customer'),
            'subtotal': fields.get('subtotal'),
            'tax': fields.get('tax'),
            'total': fields.get('total'),
            'currency': self._extract_currency(text),
            'payment_terms': fields.get('payment_terms'),
            'line_items': self._extract_line_items(text)
        }
    
    def _extract_receipt_data(self, fields, text):
        """Extract structured receipt data"""
        return {
            'merchant': fields.get('merchant'),
            'date': fields.get('date'),
            'time': fields.get('time'),
            'total': fields.get('total'),
            'tax': fields.get('tax'),
            'payment_method': fields.get('payment_method'),
            'items': self._extract_receipt_items(text)
        }
    
    def _extract_currency(self, text):
        """Extract currency from text"""
        import re
        currency_patterns = [
            r'\$',  # Dollar
            r'€',   # Euro
            r'£',   # Pound
            r'¥',   # Yen
            r'USD', r'EUR', r'GBP', r'JPY'  # Currency codes
        ]
        
        for pattern in currency_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return pattern
        
        return 'Unknown'
    
    def generate_report(self, results, output_path='processing_report.json'):
        """Generate a comprehensive processing report"""
        successful = [r for r in results if 'error' not in r]
        failed = [r for r in results if 'error' in r]
        
        # Calculate statistics
        if successful:
            avg_confidence = sum(r['confidence'] for r in successful) / len(successful)
            doc_types = {}
            for result in successful:
                doc_type = result['document_type']
                doc_types[doc_type] = doc_types.get(doc_type, 0) + 1
        else:
            avg_confidence = 0
            doc_types = {}
        
        report = {
            'summary': {
                'total_documents': len(results),
                'successful': len(successful),
                'failed': len(failed),
                'success_rate': (len(successful) / len(results) * 100) if results else 0,
                'average_confidence': avg_confidence,
                'document_types': doc_types
            },
            'successful_documents': successful,
            'failed_documents': failed,
            'generated_at': datetime.now().isoformat()
        }
        
        # Save report
        with open(output_path, 'w') as f:
            json.dump(report, f, indent=2, default=str)
        
        self.logger.info(f"📊 Report saved to: {output_path}")
        return report
    
    def export_to_excel(self, results, output_path='documents_data.xlsx'):
        """Export structured data to Excel"""
        successful_results = [r for r in results if 'error' not in r]
        
        if not successful_results:
            self.logger.warning("No successful results to export")
            return
        
        # Group by document type
        by_type = {}
        for result in successful_results:
            doc_type = result['document_type']
            if doc_type not in by_type:
                by_type[doc_type] = []
            
            # Flatten structured data
            flat_data = {
                'file_path': result['file_path'],
                'confidence': result['confidence'],
                'is_valid': result['is_valid'],
                'processing_time': result['processing_time'],
                **result['structured_data']
            }
            by_type[doc_type].append(flat_data)
        
        # Create Excel file with separate sheets for each document type
        with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
            for doc_type, data in by_type.items():
                if data:
                    df = pd.DataFrame(data)
                    df.to_excel(writer, sheet_name=doc_type, index=False)
        
        self.logger.info(f"📊 Data exported to Excel: {output_path}")

# Usage example
async def process_business_documents():
    processor = IntelligentDocumentProcessor(client)
    
    # Process all documents in a directory
    results = processor.batch_process_directory(
        './business_documents',
        max_workers=4
    )
    
    # Generate comprehensive report
    report = processor.generate_report(results, 'business_docs_report.json')
    
    # Export to Excel
    processor.export_to_excel(results, 'business_docs_data.xlsx')
    
    # Print summary
    print(f"\n📊 Processing Summary:")
    print(f"Total documents: {report['summary']['total_documents']}")
    print(f"Successful: {report['summary']['successful']}")
    print(f"Failed: {report['summary']['failed']}")
    print(f"Success rate: {report['summary']['success_rate']:.1f}%")
    print(f"Average confidence: {report['summary']['average_confidence']:.3f}")
    print(f"Document types found: {list(report['summary']['document_types'].keys())}")

# Run the processor
if __name__ == "__main__":
    import asyncio
    asyncio.run(process_business_documents())

PDF Table Extractor

import pandas as pd
from pathlib import Path
import json

class PDFTableExtractor:
    def __init__(self, zaits_client):
        self.client = zaits_client
    
    def extract_tables(self, pdf_path, output_format='excel'):
        """Extract tables from PDF and export to various formats"""
        try:
            print(f"📄 Processing PDF: {pdf_path}")
            
            # Extract with table detection
            result = self.client.ocr.extract(
                pdf_path,
                extract_tables=True,
                output_format='structured',
                language='en'
            )
            
            if not hasattr(result, 'tables') or not result.tables:
                print("❌ No tables found in the PDF")
                return None
            
            print(f"✅ Found {len(result.tables)} tables")
            
            # Process each table
            processed_tables = []
            for i, table in enumerate(result.tables):
                processed_table = self._process_table(table, i)
                processed_tables.append(processed_table)
            
            # Export tables
            output_files = self._export_tables(
                processed_tables, 
                pdf_path, 
                output_format
            )
            
            return {
                'tables': processed_tables,
                'output_files': output_files,
                'table_count': len(processed_tables)
            }
            
        except Exception as e:
            print(f"❌ Error extracting tables: {e}")
            return None
    
    def _process_table(self, table_data, table_index):
        """Process raw table data into structured format"""
        # Convert table data to DataFrame
        if isinstance(table_data, list):
            # Handle list of rows
            df = pd.DataFrame(table_data[1:], columns=table_data[0])
        elif isinstance(table_data, dict):
            # Handle structured table data
            df = pd.DataFrame(table_data)
        else:
            # Handle string data (parse as CSV-like)
            import io
            df = pd.read_csv(io.StringIO(str(table_data)))
        
        # Clean the DataFrame
        df = self._clean_dataframe(df)
        
        return {
            'index': table_index,
            'shape': df.shape,
            'columns': df.columns.tolist(),
            'data': df.to_dict('records'),
            'dataframe': df
        }
    
    def _clean_dataframe(self, df):
        """Clean and standardize DataFrame"""
        # Remove empty rows and columns
        df = df.dropna(how='all').dropna(axis=1, how='all')
        
        # Strip whitespace from string columns
        string_columns = df.select_dtypes(include=['object']).columns
        df[string_columns] = df[string_columns].apply(lambda x: x.str.strip())
        
        # Try to convert numeric columns
        for col in df.columns:
            # Try to convert to numeric
            numeric_series = pd.to_numeric(df[col], errors='coerce')
            if not numeric_series.isna().all():
                df[col] = numeric_series
        
        return df
    
    def _export_tables(self, tables, source_pdf, output_format):
        """Export tables to various formats"""
        output_files = []
        base_name = Path(source_pdf).stem
        
        if output_format.lower() in ['excel', 'xlsx']:
            # Export to Excel with multiple sheets
            excel_path = f"{base_name}_tables.xlsx"
            
            with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
                for table in tables:
                    sheet_name = f"Table_{table['index'] + 1}"
                    table['dataframe'].to_excel(
                        writer, 
                        sheet_name=sheet_name, 
                        index=False
                    )
            
            output_files.append(excel_path)
            print(f"📊 Exported to Excel: {excel_path}")
        
        elif output_format.lower() == 'csv':
            # Export each table as separate CSV
            for table in tables:
                csv_path = f"{base_name}_table_{table['index'] + 1}.csv"
                table['dataframe'].to_csv(csv_path, index=False)
                output_files.append(csv_path)
            
            print(f"📊 Exported {len(tables)} CSV files")
        
        elif output_format.lower() == 'json':
            # Export as JSON
            json_path = f"{base_name}_tables.json"
            
            json_data = {
                'source_pdf': str(source_pdf),
                'extraction_timestamp': pd.Timestamp.now().isoformat(),
                'tables': [
                    {
                        'index': table['index'],
                        'shape': table['shape'],
                        'columns': table['columns'],
                        'data': table['data']
                    }
                    for table in tables
                ]
            }
            
            with open(json_path, 'w') as f:
                json.dump(json_data, f, indent=2, default=str)
            
            output_files.append(json_path)
            print(f"📊 Exported to JSON: {json_path}")
        
        return output_files
    
    def batch_extract_directory(self, directory_path, output_format='excel'):
        """Extract tables from all PDFs in a directory"""
        directory = Path(directory_path)
        pdf_files = list(directory.glob('*.pdf'))
        
        if not pdf_files:
            print(f"❌ No PDF files found in {directory_path}")
            return
        
        print(f"🔄 Processing {len(pdf_files)} PDF files...")
        
        results = []
        for pdf_file in pdf_files:
            try:
                result = self.extract_tables(pdf_file, output_format)
                if result:
                    results.append({
                        'pdf_file': str(pdf_file),
                        'success': True,
                        **result
                    })
                else:
                    results.append({
                        'pdf_file': str(pdf_file),
                        'success': False,
                        'error': 'No tables found'
                    })
            except Exception as e:
                results.append({
                    'pdf_file': str(pdf_file),
                    'success': False,
                    'error': str(e)
                })
        
        # Generate summary report
        successful = [r for r in results if r['success']]
        total_tables = sum(r.get('table_count', 0) for r in successful)
        
        print(f"\n📊 Batch Processing Summary:")
        print(f"PDFs processed: {len(pdf_files)}")
        print(f"Successful extractions: {len(successful)}")
        print(f"Total tables extracted: {total_tables}")
        print(f"Success rate: {len(successful)/len(pdf_files)*100:.1f}%")
        
        return results

# Usage example
extractor = PDFTableExtractor(client)

# Extract tables from single PDF
result = extractor.extract_tables('./financial_report.pdf', 'excel')

# Extract tables from all PDFs in directory
batch_results = extractor.batch_extract_directory('./reports/', 'excel')

Document Signing Examples

Automated Contract Workflow

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
import time

class ContractWorkflowManager:
    def __init__(self, zaits_client, email_config=None):
        self.client = zaits_client
        self.email_config = email_config or {}
        self.active_contracts = {}
    
    def create_contract_workflow(self, contract_data):
        """Create a complete contract signing workflow"""
        try:
            print(f"📝 Creating contract workflow: {contract_data['title']}")
            
            # Create signing request
            signing_request = self.client.signing.create({
                'document': contract_data['document_path'],
                'signers': contract_data['signers'],
                'title': contract_data['title'],
                'message': contract_data.get('message', 'Please review and sign this contract.'),
                'expires_in': contract_data.get('expires_in', '30d'),
                'required_fields': contract_data.get('required_fields', [
                    {'name': 'signature', 'type': 'signature', 'required': True},
                    {'name': 'date', 'type': 'date', 'required': True}
                ])
            })
            
            # Store contract info
            workflow_data = {
                'document_id': signing_request.document_id,
                'signing_url': signing_request.signing_url,
                'created_at': datetime.now(),
                'status': 'pending',
                'signers': contract_data['signers'],
                'title': contract_data['title'],
                'reminders_sent': 0,
                'max_reminders': contract_data.get('max_reminders', 3)
            }
            
            self.active_contracts[signing_request.document_id] = workflow_data
            
            # Send initial notifications
            self._send_signing_invitations(workflow_data)
            
            print(f"✅ Contract workflow created: {signing_request.document_id}")
            return signing_request.document_id
            
        except Exception as e:
            print(f"❌ Failed to create contract workflow: {e}")
            return None
    
    def monitor_contracts(self, check_interval=3600):  # Check every hour
        """Monitor active contracts and send reminders"""
        print(f"🔄 Starting contract monitoring (checking every {check_interval/3600:.1f} hours)")
        
        while self.active_contracts:
            for document_id in list(self.active_contracts.keys()):
                try:
                    self._check_contract_status(document_id)
                    time.sleep(10)  # Small delay between checks
                except Exception as e:
                    print(f"❌ Error checking contract {document_id}: {e}")
            
            print(f"⏰ Waiting {check_interval} seconds until next check...")
            time.sleep(check_interval)
    
    def _check_contract_status(self, document_id):
        """Check status of a specific contract"""
        workflow = self.active_contracts[document_id]
        
        try:
            # Get current status
            status = self.client.signing.get_status(document_id)
            
            if status.status == 'completed':
                print(f"🎉 Contract completed: {workflow['title']}")
                
                # Download signed document
                signed_doc_path = f"signed_{document_id}.pdf"
                self.client.signing.download(document_id, signed_doc_path)
                
                # Send completion notifications
                self._send_completion_notification(workflow, signed_doc_path)
                
                # Remove from active contracts
                del self.active_contracts[document_id]
                
            elif status.status == 'expired':
                print(f"⏰ Contract expired: {workflow['title']}")
                self._send_expiration_notification(workflow)
                del self.active_contracts[document_id]
                
            else:
                # Check if reminders are needed
                self._check_reminders_needed(workflow, status)
                
        except Exception as e:
            print(f"❌ Error checking status for {document_id}: {e}")
    
    def _check_reminders_needed(self, workflow, status):
        """Check if reminder emails should be sent"""
        now = datetime.now()
        days_since_created = (now - workflow['created_at']).days
        
        # Send reminders every 7 days, max 3 reminders
        if (days_since_created > 0 and 
            days_since_created % 7 == 0 and 
            workflow['reminders_sent'] < workflow['max_reminders']):
            
            # Find pending signers
            pending_signers = [
                signer for signer in status.signers 
                if signer.status == 'pending'
            ]
            
            if pending_signers:
                self._send_reminder_emails(workflow, pending_signers)
                workflow['reminders_sent'] += 1
    
    def _send_signing_invitations(self, workflow):
        """Send initial signing invitations"""
        if not self.email_config:
            print("📧 Email not configured - skipping invitations")
            return
        
        subject = f"Please sign: {workflow['title']}"
        
        for signer in workflow['signers']:
            body = f"""
            Hello {signer['name']},
            
            You have been requested to sign a document: {workflow['title']}
            
            Please click the link below to review and sign:
            {workflow['signing_url']}
            
            This request will expire in 30 days.
            
            Best regards,
            Contract Management System
            """
            
            self._send_email(signer['email'], subject, body)
        
        print(f"📧 Signing invitations sent to {len(workflow['signers'])} signers")
    
    def _send_reminder_emails(self, workflow, pending_signers):
        """Send reminder emails to pending signers"""
        subject = f"Reminder: Please sign {workflow['title']}"
        
        for signer in pending_signers:
            body = f"""
            Hello {signer['name']},
            
            This is a reminder that you have a pending document to sign: {workflow['title']}
            
            Please click the link below to review and sign:
            {workflow['signing_url']}
            
            Please complete this at your earliest convenience.
            
            Best regards,
            Contract Management System
            """
            
            self._send_email(signer['email'], subject, body)
        
        print(f"📧 Reminder sent to {len(pending_signers)} pending signers")
    
    def _send_email(self, to_email, subject, body):
        """Send email using SMTP"""
        try:
            msg = MIMEMultipart()
            msg['From'] = self.email_config['from_email']
            msg['To'] = to_email
            msg['Subject'] = subject
            
            msg.attach(MIMEText(body, 'plain'))
            
            server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
            server.starttls()
            server.login(self.email_config['username'], self.email_config['password'])
            
            server.send_message(msg)
            server.quit()
            
            print(f"📧 Email sent to {to_email}")
            
        except Exception as e:
            print(f"❌ Failed to send email to {to_email}: {e}")

# Usage example
email_config = {
    'smtp_server': 'smtp.gmail.com',
    'smtp_port': 587,
    'username': '[email protected]',
    'password': 'your-app-password',
    'from_email': '[email protected]'
}

workflow_manager = ContractWorkflowManager(client, email_config)

# Create contract workflow
contract_data = {
    'document_path': './contracts/service_agreement.pdf',
    'title': 'Software Development Service Agreement',
    'message': 'Please review and sign this service agreement for our upcoming project.',
    'signers': [
        {
            'name': 'John Doe',
            'email': '[email protected]',
            'role': 'client'
        },
        {
            'name': 'Jane Smith',
            'email': '[email protected]',
            'role': 'service_provider'
        }
    ],
    'expires_in': '14d',
    'max_reminders': 2
}

document_id = workflow_manager.create_contract_workflow(contract_data)

if document_id:
    # Start monitoring (this will run indefinitely)
    workflow_manager.monitor_contracts(check_interval=3600)  # Check every hour

Next: cURL Examples

Last updated