Python Examples
Practical Python code examples for integrating with the Zaits API in various scenarios.
Setup
pip install zaits-python requests pillow opencv-pythonfrom zaits import ZaitsClient
client = ZaitsClient(api_key='your-api-key')Face Recognition Examples
Identity Verification System
import cv2
import numpy as np
from PIL import Image
from pathlib import Path
import sqlite3
import hashlib
class IdentityVerificationSystem:
def __init__(self, zaits_client, db_path='identity_db.sqlite'):
self.client = zaits_client
self.db_path = db_path
self.setup_database()
def setup_database(self):
"""Initialize the identity database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS identities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT UNIQUE,
name TEXT,
photo_hash TEXT,
photo_path TEXT,
verification_score REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def register_identity(self, user_id, name, photo_path, verification_photos):
"""Register a new identity with multiple verification photos"""
try:
# Verify all photos are of the same person
base_photo = photo_path
verification_scores = []
for verify_photo in verification_photos:
result = self.client.face.verify(base_photo, verify_photo)
if not result.verified or result.confidence < 0.8:
raise ValueError(f"Photo {verify_photo} doesn't match base photo")
verification_scores.append(result.confidence)
# Check for liveness
liveness = self.client.face.liveness(base_photo)
if not liveness.is_live or liveness.confidence < 0.7:
raise ValueError("Liveness check failed - photo may be fake")
# Calculate average verification score
avg_score = sum(verification_scores) / len(verification_scores)
# Generate photo hash for quick comparison
photo_hash = self._generate_photo_hash(base_photo)
# Store in database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO identities (user_id, name, photo_hash, photo_path, verification_score)
VALUES (?, ?, ?, ?, ?)
''', (user_id, name, photo_hash, base_photo, avg_score))
conn.commit()
conn.close()
print(f"✅ Identity registered: {name} (Score: {avg_score:.3f})")
return True
except Exception as e:
print(f"❌ Registration failed: {e}")
return False
def verify_identity(self, user_id, photo_path, threshold=0.85):
"""Verify identity against stored photo"""
try:
# Get stored identity
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
'SELECT name, photo_path, verification_score FROM identities WHERE user_id = ?',
(user_id,)
)
result = cursor.fetchone()
conn.close()
if not result:
return {'verified': False, 'reason': 'Identity not found'}
stored_name, stored_photo, stored_score = result
# Perform face verification
verification = self.client.face.verify(stored_photo, photo_path)
# Check liveness
liveness = self.client.face.liveness(photo_path)
verified = (
verification.verified and
verification.confidence >= threshold and
liveness.is_live and
liveness.confidence >= 0.7
)
return {
'verified': verified,
'name': stored_name,
'confidence': verification.confidence,
'liveness_score': liveness.confidence,
'stored_score': stored_score,
'threshold': threshold
}
except Exception as e:
print(f"❌ Verification failed: {e}")
return {'verified': False, 'reason': str(e)}
def _generate_photo_hash(self, photo_path):
"""Generate hash of photo for quick comparison"""
with open(photo_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
# Usage example
async def demo_identity_system():
system = IdentityVerificationSystem(client)
# Register new identity
success = system.register_identity(
user_id='user_123',
name='John Doe',
photo_path='./photos/john_base.jpg',
verification_photos=[
'./photos/john_verify1.jpg',
'./photos/john_verify2.jpg'
]
)
if success:
# Verify identity
result = system.verify_identity('user_123', './photos/john_test.jpg')
if result['verified']:
print(f"🎉 Identity verified: {result['name']}")
print(f"📊 Confidence: {result['confidence']:.3f}")
else:
print(f"❌ Identity verification failed: {result.get('reason', 'Unknown')}")Webcam Face Detection with OpenCV
import cv2
import numpy as np
import threading
import queue
from datetime import datetime
class RealTimeFaceAnalyzer:
def __init__(self, zaits_client):
self.client = zaits_client
self.cap = cv2.VideoCapture(0)
self.analysis_queue = queue.Queue()
self.current_analysis = None
self.analyzing = False
def start_analysis(self):
"""Start real-time face analysis"""
# Start analysis thread
analysis_thread = threading.Thread(target=self._analysis_worker)
analysis_thread.daemon = True
analysis_thread.start()
frame_count = 0
while True:
ret, frame = self.cap.read()
if not ret:
break
# Mirror the frame horizontally
frame = cv2.flip(frame, 1)
# Add frame to analysis queue every 30 frames (~1 second at 30 FPS)
if frame_count % 30 == 0 and not self.analyzing:
self.analysis_queue.put(frame.copy())
# Draw analysis results on frame
self._draw_analysis_results(frame)
# Display the frame
cv2.imshow('Real-time Face Analysis', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_count += 1
self.cap.release()
cv2.destroyAllWindows()
def _analysis_worker(self):
"""Background thread for face analysis"""
while True:
try:
frame = self.analysis_queue.get(timeout=1)
self.analyzing = True
# Save frame temporarily
temp_path = 'temp_frame.jpg'
cv2.imwrite(temp_path, frame)
# Analyze face
try:
analysis = self.client.face.analyze(
temp_path,
actions=['age', 'gender', 'emotion'],
return_face_region=True
)
self.current_analysis = {
'age': analysis.age,
'gender': analysis.gender.prediction,
'gender_confidence': analysis.gender.confidence,
'emotion': analysis.emotion.dominant_emotion,
'face_region': analysis.region,
'timestamp': datetime.now()
}
except Exception as e:
print(f"Analysis failed: {e}")
self.current_analysis = None
self.analyzing = False
except queue.Empty:
continue
def _draw_analysis_results(self, frame):
"""Draw analysis results on the frame"""
if self.current_analysis is None:
cv2.putText(frame, 'No face detected', (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
return
analysis = self.current_analysis
# Draw face rectangle if available
if analysis.get('face_region'):
region = analysis['face_region']
cv2.rectangle(frame, (region.x, region.y),
(region.x + region.w, region.y + region.h),
(0, 255, 0), 2)
# Draw analysis text
y_offset = 30
texts = [
f"Age: {analysis['age']}",
f"Gender: {analysis['gender']} ({analysis['gender_confidence']:.2f})",
f"Emotion: {analysis['emotion']}",
]
if self.analyzing:
texts.append("Analyzing...")
for text in texts:
cv2.putText(frame, text, (10, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
y_offset += 30
# Usage
analyzer = RealTimeFaceAnalyzer(client)
analyzer.start_analysis() # Press 'q' to quitOCR Examples
Intelligent Document Processor
import os
import json
import pandas as pd
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
class IntelligentDocumentProcessor:
def __init__(self, zaits_client):
self.client = zaits_client
self.supported_formats = ['.pdf', '.jpg', '.jpeg', '.png', '.tiff']
self.setup_logging()
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('document_processing.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def process_document(self, file_path, document_type='auto'):
"""Process a single document with intelligent analysis"""
try:
self.logger.info(f"📄 Processing: {file_path}")
# Extract text
extraction_result = self.client.ocr.extract(
file_path,
language='en',
extract_tables=True,
output_format='structured'
)
# Analyze document
analysis_result = self.client.ocr.analyze(
file_path,
document_type=document_type,
extract_fields='auto', # Auto-detect relevant fields
validate_document=True
)
# Intelligent field extraction based on document type
structured_data = self._extract_structured_data(
analysis_result.document_type,
analysis_result.extracted_fields,
extraction_result.text
)
result = {
'file_path': str(file_path),
'document_type': analysis_result.document_type,
'text': extraction_result.text,
'confidence': extraction_result.confidence,
'structured_data': structured_data,
'is_valid': analysis_result.validation.is_valid,
'processing_time': extraction_result.processing_time,
'timestamp': datetime.now().isoformat()
}
self.logger.info(f"✅ Successfully processed: {file_path} (Type: {analysis_result.document_type})")
return result
except Exception as e:
self.logger.error(f"❌ Failed to process {file_path}: {str(e)}")
return {
'file_path': str(file_path),
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def batch_process_directory(self, directory_path, max_workers=3):
"""Process all documents in a directory concurrently"""
directory = Path(directory_path)
# Find all supported document files
files_to_process = []
for ext in self.supported_formats:
files_to_process.extend(directory.glob(f'**/*{ext}'))
if not files_to_process:
self.logger.warning(f"No supported documents found in {directory_path}")
return []
self.logger.info(f"🔄 Processing {len(files_to_process)} documents with {max_workers} workers")
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_file = {
executor.submit(self.process_document, file_path): file_path
for file_path in files_to_process
}
# Collect results as they complete
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
results.append(result)
# Progress update
processed_count = len(results)
progress = (processed_count / len(files_to_process)) * 100
self.logger.info(f"📊 Progress: {processed_count}/{len(files_to_process)} ({progress:.1f}%)")
except Exception as e:
self.logger.error(f"❌ Error processing {file_path}: {str(e)}")
return results
def _extract_structured_data(self, document_type, extracted_fields, raw_text):
"""Extract structured data based on document type"""
if document_type == 'invoice':
return self._extract_invoice_data(extracted_fields, raw_text)
elif document_type == 'receipt':
return self._extract_receipt_data(extracted_fields, raw_text)
elif document_type == 'contract':
return self._extract_contract_data(extracted_fields, raw_text)
else:
return extracted_fields
def _extract_invoice_data(self, fields, text):
"""Extract structured invoice data"""
return {
'invoice_number': fields.get('invoice_number'),
'date': fields.get('date'),
'due_date': fields.get('due_date'),
'vendor': fields.get('vendor'),
'customer': fields.get('customer'),
'subtotal': fields.get('subtotal'),
'tax': fields.get('tax'),
'total': fields.get('total'),
'currency': self._extract_currency(text),
'payment_terms': fields.get('payment_terms'),
'line_items': self._extract_line_items(text)
}
def _extract_receipt_data(self, fields, text):
"""Extract structured receipt data"""
return {
'merchant': fields.get('merchant'),
'date': fields.get('date'),
'time': fields.get('time'),
'total': fields.get('total'),
'tax': fields.get('tax'),
'payment_method': fields.get('payment_method'),
'items': self._extract_receipt_items(text)
}
def _extract_currency(self, text):
"""Extract currency from text"""
import re
currency_patterns = [
r'\$', # Dollar
r'€', # Euro
r'£', # Pound
r'¥', # Yen
r'USD', r'EUR', r'GBP', r'JPY' # Currency codes
]
for pattern in currency_patterns:
if re.search(pattern, text, re.IGNORECASE):
return pattern
return 'Unknown'
def generate_report(self, results, output_path='processing_report.json'):
"""Generate a comprehensive processing report"""
successful = [r for r in results if 'error' not in r]
failed = [r for r in results if 'error' in r]
# Calculate statistics
if successful:
avg_confidence = sum(r['confidence'] for r in successful) / len(successful)
doc_types = {}
for result in successful:
doc_type = result['document_type']
doc_types[doc_type] = doc_types.get(doc_type, 0) + 1
else:
avg_confidence = 0
doc_types = {}
report = {
'summary': {
'total_documents': len(results),
'successful': len(successful),
'failed': len(failed),
'success_rate': (len(successful) / len(results) * 100) if results else 0,
'average_confidence': avg_confidence,
'document_types': doc_types
},
'successful_documents': successful,
'failed_documents': failed,
'generated_at': datetime.now().isoformat()
}
# Save report
with open(output_path, 'w') as f:
json.dump(report, f, indent=2, default=str)
self.logger.info(f"📊 Report saved to: {output_path}")
return report
def export_to_excel(self, results, output_path='documents_data.xlsx'):
"""Export structured data to Excel"""
successful_results = [r for r in results if 'error' not in r]
if not successful_results:
self.logger.warning("No successful results to export")
return
# Group by document type
by_type = {}
for result in successful_results:
doc_type = result['document_type']
if doc_type not in by_type:
by_type[doc_type] = []
# Flatten structured data
flat_data = {
'file_path': result['file_path'],
'confidence': result['confidence'],
'is_valid': result['is_valid'],
'processing_time': result['processing_time'],
**result['structured_data']
}
by_type[doc_type].append(flat_data)
# Create Excel file with separate sheets for each document type
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
for doc_type, data in by_type.items():
if data:
df = pd.DataFrame(data)
df.to_excel(writer, sheet_name=doc_type, index=False)
self.logger.info(f"📊 Data exported to Excel: {output_path}")
# Usage example
async def process_business_documents():
processor = IntelligentDocumentProcessor(client)
# Process all documents in a directory
results = processor.batch_process_directory(
'./business_documents',
max_workers=4
)
# Generate comprehensive report
report = processor.generate_report(results, 'business_docs_report.json')
# Export to Excel
processor.export_to_excel(results, 'business_docs_data.xlsx')
# Print summary
print(f"\n📊 Processing Summary:")
print(f"Total documents: {report['summary']['total_documents']}")
print(f"Successful: {report['summary']['successful']}")
print(f"Failed: {report['summary']['failed']}")
print(f"Success rate: {report['summary']['success_rate']:.1f}%")
print(f"Average confidence: {report['summary']['average_confidence']:.3f}")
print(f"Document types found: {list(report['summary']['document_types'].keys())}")
# Run the processor
if __name__ == "__main__":
import asyncio
asyncio.run(process_business_documents())PDF Table Extractor
import pandas as pd
from pathlib import Path
import json
class PDFTableExtractor:
def __init__(self, zaits_client):
self.client = zaits_client
def extract_tables(self, pdf_path, output_format='excel'):
"""Extract tables from PDF and export to various formats"""
try:
print(f"📄 Processing PDF: {pdf_path}")
# Extract with table detection
result = self.client.ocr.extract(
pdf_path,
extract_tables=True,
output_format='structured',
language='en'
)
if not hasattr(result, 'tables') or not result.tables:
print("❌ No tables found in the PDF")
return None
print(f"✅ Found {len(result.tables)} tables")
# Process each table
processed_tables = []
for i, table in enumerate(result.tables):
processed_table = self._process_table(table, i)
processed_tables.append(processed_table)
# Export tables
output_files = self._export_tables(
processed_tables,
pdf_path,
output_format
)
return {
'tables': processed_tables,
'output_files': output_files,
'table_count': len(processed_tables)
}
except Exception as e:
print(f"❌ Error extracting tables: {e}")
return None
def _process_table(self, table_data, table_index):
"""Process raw table data into structured format"""
# Convert table data to DataFrame
if isinstance(table_data, list):
# Handle list of rows
df = pd.DataFrame(table_data[1:], columns=table_data[0])
elif isinstance(table_data, dict):
# Handle structured table data
df = pd.DataFrame(table_data)
else:
# Handle string data (parse as CSV-like)
import io
df = pd.read_csv(io.StringIO(str(table_data)))
# Clean the DataFrame
df = self._clean_dataframe(df)
return {
'index': table_index,
'shape': df.shape,
'columns': df.columns.tolist(),
'data': df.to_dict('records'),
'dataframe': df
}
def _clean_dataframe(self, df):
"""Clean and standardize DataFrame"""
# Remove empty rows and columns
df = df.dropna(how='all').dropna(axis=1, how='all')
# Strip whitespace from string columns
string_columns = df.select_dtypes(include=['object']).columns
df[string_columns] = df[string_columns].apply(lambda x: x.str.strip())
# Try to convert numeric columns
for col in df.columns:
# Try to convert to numeric
numeric_series = pd.to_numeric(df[col], errors='coerce')
if not numeric_series.isna().all():
df[col] = numeric_series
return df
def _export_tables(self, tables, source_pdf, output_format):
"""Export tables to various formats"""
output_files = []
base_name = Path(source_pdf).stem
if output_format.lower() in ['excel', 'xlsx']:
# Export to Excel with multiple sheets
excel_path = f"{base_name}_tables.xlsx"
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
for table in tables:
sheet_name = f"Table_{table['index'] + 1}"
table['dataframe'].to_excel(
writer,
sheet_name=sheet_name,
index=False
)
output_files.append(excel_path)
print(f"📊 Exported to Excel: {excel_path}")
elif output_format.lower() == 'csv':
# Export each table as separate CSV
for table in tables:
csv_path = f"{base_name}_table_{table['index'] + 1}.csv"
table['dataframe'].to_csv(csv_path, index=False)
output_files.append(csv_path)
print(f"📊 Exported {len(tables)} CSV files")
elif output_format.lower() == 'json':
# Export as JSON
json_path = f"{base_name}_tables.json"
json_data = {
'source_pdf': str(source_pdf),
'extraction_timestamp': pd.Timestamp.now().isoformat(),
'tables': [
{
'index': table['index'],
'shape': table['shape'],
'columns': table['columns'],
'data': table['data']
}
for table in tables
]
}
with open(json_path, 'w') as f:
json.dump(json_data, f, indent=2, default=str)
output_files.append(json_path)
print(f"📊 Exported to JSON: {json_path}")
return output_files
def batch_extract_directory(self, directory_path, output_format='excel'):
"""Extract tables from all PDFs in a directory"""
directory = Path(directory_path)
pdf_files = list(directory.glob('*.pdf'))
if not pdf_files:
print(f"❌ No PDF files found in {directory_path}")
return
print(f"🔄 Processing {len(pdf_files)} PDF files...")
results = []
for pdf_file in pdf_files:
try:
result = self.extract_tables(pdf_file, output_format)
if result:
results.append({
'pdf_file': str(pdf_file),
'success': True,
**result
})
else:
results.append({
'pdf_file': str(pdf_file),
'success': False,
'error': 'No tables found'
})
except Exception as e:
results.append({
'pdf_file': str(pdf_file),
'success': False,
'error': str(e)
})
# Generate summary report
successful = [r for r in results if r['success']]
total_tables = sum(r.get('table_count', 0) for r in successful)
print(f"\n📊 Batch Processing Summary:")
print(f"PDFs processed: {len(pdf_files)}")
print(f"Successful extractions: {len(successful)}")
print(f"Total tables extracted: {total_tables}")
print(f"Success rate: {len(successful)/len(pdf_files)*100:.1f}%")
return results
# Usage example
extractor = PDFTableExtractor(client)
# Extract tables from single PDF
result = extractor.extract_tables('./financial_report.pdf', 'excel')
# Extract tables from all PDFs in directory
batch_results = extractor.batch_extract_directory('./reports/', 'excel')Document Signing Examples
Automated Contract Workflow
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
import time
class ContractWorkflowManager:
def __init__(self, zaits_client, email_config=None):
self.client = zaits_client
self.email_config = email_config or {}
self.active_contracts = {}
def create_contract_workflow(self, contract_data):
"""Create a complete contract signing workflow"""
try:
print(f"📝 Creating contract workflow: {contract_data['title']}")
# Create signing request
signing_request = self.client.signing.create({
'document': contract_data['document_path'],
'signers': contract_data['signers'],
'title': contract_data['title'],
'message': contract_data.get('message', 'Please review and sign this contract.'),
'expires_in': contract_data.get('expires_in', '30d'),
'required_fields': contract_data.get('required_fields', [
{'name': 'signature', 'type': 'signature', 'required': True},
{'name': 'date', 'type': 'date', 'required': True}
])
})
# Store contract info
workflow_data = {
'document_id': signing_request.document_id,
'signing_url': signing_request.signing_url,
'created_at': datetime.now(),
'status': 'pending',
'signers': contract_data['signers'],
'title': contract_data['title'],
'reminders_sent': 0,
'max_reminders': contract_data.get('max_reminders', 3)
}
self.active_contracts[signing_request.document_id] = workflow_data
# Send initial notifications
self._send_signing_invitations(workflow_data)
print(f"✅ Contract workflow created: {signing_request.document_id}")
return signing_request.document_id
except Exception as e:
print(f"❌ Failed to create contract workflow: {e}")
return None
def monitor_contracts(self, check_interval=3600): # Check every hour
"""Monitor active contracts and send reminders"""
print(f"🔄 Starting contract monitoring (checking every {check_interval/3600:.1f} hours)")
while self.active_contracts:
for document_id in list(self.active_contracts.keys()):
try:
self._check_contract_status(document_id)
time.sleep(10) # Small delay between checks
except Exception as e:
print(f"❌ Error checking contract {document_id}: {e}")
print(f"⏰ Waiting {check_interval} seconds until next check...")
time.sleep(check_interval)
def _check_contract_status(self, document_id):
"""Check status of a specific contract"""
workflow = self.active_contracts[document_id]
try:
# Get current status
status = self.client.signing.get_status(document_id)
if status.status == 'completed':
print(f"🎉 Contract completed: {workflow['title']}")
# Download signed document
signed_doc_path = f"signed_{document_id}.pdf"
self.client.signing.download(document_id, signed_doc_path)
# Send completion notifications
self._send_completion_notification(workflow, signed_doc_path)
# Remove from active contracts
del self.active_contracts[document_id]
elif status.status == 'expired':
print(f"⏰ Contract expired: {workflow['title']}")
self._send_expiration_notification(workflow)
del self.active_contracts[document_id]
else:
# Check if reminders are needed
self._check_reminders_needed(workflow, status)
except Exception as e:
print(f"❌ Error checking status for {document_id}: {e}")
def _check_reminders_needed(self, workflow, status):
"""Check if reminder emails should be sent"""
now = datetime.now()
days_since_created = (now - workflow['created_at']).days
# Send reminders every 7 days, max 3 reminders
if (days_since_created > 0 and
days_since_created % 7 == 0 and
workflow['reminders_sent'] < workflow['max_reminders']):
# Find pending signers
pending_signers = [
signer for signer in status.signers
if signer.status == 'pending'
]
if pending_signers:
self._send_reminder_emails(workflow, pending_signers)
workflow['reminders_sent'] += 1
def _send_signing_invitations(self, workflow):
"""Send initial signing invitations"""
if not self.email_config:
print("📧 Email not configured - skipping invitations")
return
subject = f"Please sign: {workflow['title']}"
for signer in workflow['signers']:
body = f"""
Hello {signer['name']},
You have been requested to sign a document: {workflow['title']}
Please click the link below to review and sign:
{workflow['signing_url']}
This request will expire in 30 days.
Best regards,
Contract Management System
"""
self._send_email(signer['email'], subject, body)
print(f"📧 Signing invitations sent to {len(workflow['signers'])} signers")
def _send_reminder_emails(self, workflow, pending_signers):
"""Send reminder emails to pending signers"""
subject = f"Reminder: Please sign {workflow['title']}"
for signer in pending_signers:
body = f"""
Hello {signer['name']},
This is a reminder that you have a pending document to sign: {workflow['title']}
Please click the link below to review and sign:
{workflow['signing_url']}
Please complete this at your earliest convenience.
Best regards,
Contract Management System
"""
self._send_email(signer['email'], subject, body)
print(f"📧 Reminder sent to {len(pending_signers)} pending signers")
def _send_email(self, to_email, subject, body):
"""Send email using SMTP"""
try:
msg = MIMEMultipart()
msg['From'] = self.email_config['from_email']
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
print(f"📧 Email sent to {to_email}")
except Exception as e:
print(f"❌ Failed to send email to {to_email}: {e}")
# Usage example
email_config = {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'username': '[email protected]',
'password': 'your-app-password',
'from_email': '[email protected]'
}
workflow_manager = ContractWorkflowManager(client, email_config)
# Create contract workflow
contract_data = {
'document_path': './contracts/service_agreement.pdf',
'title': 'Software Development Service Agreement',
'message': 'Please review and sign this service agreement for our upcoming project.',
'signers': [
{
'name': 'John Doe',
'email': '[email protected]',
'role': 'client'
},
{
'name': 'Jane Smith',
'email': '[email protected]',
'role': 'service_provider'
}
],
'expires_in': '14d',
'max_reminders': 2
}
document_id = workflow_manager.create_contract_workflow(contract_data)
if document_id:
# Start monitoring (this will run indefinitely)
workflow_manager.monitor_contracts(check_interval=3600) # Check every hourNext: cURL Examples
Last updated