# Python Examples

Practical Python code examples for integrating with the Zaits API in various scenarios.

## Setup

```bash
pip install zaits-python requests pillow opencv-python
```

```python
from zaits import ZaitsClient
client = ZaitsClient(api_key='your-api-key')
```

## Face Recognition Examples

### Identity Verification System

```python
import cv2
import numpy as np
from PIL import Image
from pathlib import Path
import sqlite3
import hashlib

class IdentityVerificationSystem:
    def __init__(self, zaits_client, db_path='identity_db.sqlite'):
        self.client = zaits_client
        self.db_path = db_path
        self.setup_database()
    
    def setup_database(self):
        """Initialize the identity database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS identities (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id TEXT UNIQUE,
                name TEXT,
                photo_hash TEXT,
                photo_path TEXT,
                verification_score REAL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def register_identity(self, user_id, name, photo_path, verification_photos):
        """Register a new identity with multiple verification photos"""
        try:
            # Verify all photos are of the same person
            base_photo = photo_path
            verification_scores = []
            
            for verify_photo in verification_photos:
                result = self.client.face.verify(base_photo, verify_photo)
                if not result.verified or result.confidence < 0.8:
                    raise ValueError(f"Photo {verify_photo} doesn't match base photo")
                verification_scores.append(result.confidence)
            
            # Check for liveness
            liveness = self.client.face.liveness(base_photo)
            if not liveness.is_live or liveness.confidence < 0.7:
                raise ValueError("Liveness check failed - photo may be fake")
            
            # Calculate average verification score
            avg_score = sum(verification_scores) / len(verification_scores)
            
            # Generate photo hash for quick comparison
            photo_hash = self._generate_photo_hash(base_photo)
            
            # Store in database
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT INTO identities (user_id, name, photo_hash, photo_path, verification_score)
                VALUES (?, ?, ?, ?, ?)
            ''', (user_id, name, photo_hash, base_photo, avg_score))
            
            conn.commit()
            conn.close()
            
            print(f"Identity registered: {name} (Score: {avg_score:.3f})")
            return True
            
        except Exception as e:
            print(f"Registration failed: {e}")
            return False
    
    def verify_identity(self, user_id, photo_path, threshold=0.85):
        """Verify identity against stored photo"""
        try:
            # Get stored identity
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute(
                'SELECT name, photo_path, verification_score FROM identities WHERE user_id = ?',
                (user_id,)
            )
            result = cursor.fetchone()
            conn.close()
            
            if not result:
                return {'verified': False, 'reason': 'Identity not found'}
            
            stored_name, stored_photo, stored_score = result
            
            # Perform face verification
            verification = self.client.face.verify(stored_photo, photo_path)
            
            # Check liveness
            liveness = self.client.face.liveness(photo_path)
            
            verified = (
                verification.verified and 
                verification.confidence >= threshold and
                liveness.is_live and 
                liveness.confidence >= 0.7
            )
            
            return {
                'verified': verified,
                'name': stored_name,
                'confidence': verification.confidence,
                'liveness_score': liveness.confidence,
                'stored_score': stored_score,
                'threshold': threshold
            }
            
        except Exception as e:
            print(f"Verification failed: {e}")
            return {'verified': False, 'reason': str(e)}
    
    def _generate_photo_hash(self, photo_path):
        """Generate hash of photo for quick comparison"""
        with open(photo_path, 'rb') as f:
            return hashlib.md5(f.read()).hexdigest()

# Usage example
async def demo_identity_system():
    system = IdentityVerificationSystem(client)
    
    # Register new identity
    success = system.register_identity(
        user_id='user_123',
        name='John Doe',
        photo_path='./photos/john_base.jpg',
        verification_photos=[
            './photos/john_verify1.jpg',
            './photos/john_verify2.jpg'
        ]
    )
    
    if success:
        # Verify identity
        result = system.verify_identity('user_123', './photos/john_test.jpg')
        
        if result['verified']:
            print(f"🎉 Identity verified: {result['name']}")
            print(f"Confidence: {result['confidence']:.3f}")
        else:
            print(f"Identity verification failed: {result.get('reason', 'Unknown')}")
```

### Webcam Face Detection with OpenCV

```python
import cv2
import numpy as np
import threading
import queue
from datetime import datetime

class RealTimeFaceAnalyzer:
    def __init__(self, zaits_client):
        self.client = zaits_client
        self.cap = cv2.VideoCapture(0)
        self.analysis_queue = queue.Queue()
        self.current_analysis = None
        self.analyzing = False
        
    def start_analysis(self):
        """Start real-time face analysis"""
        # Start analysis thread
        analysis_thread = threading.Thread(target=self._analysis_worker)
        analysis_thread.daemon = True
        analysis_thread.start()
        
        frame_count = 0
        
        while True:
            ret, frame = self.cap.read()
            if not ret:
                break
            
            # Mirror the frame horizontally
            frame = cv2.flip(frame, 1)
            
            # Add frame to analysis queue every 30 frames (~1 second at 30 FPS)
            if frame_count % 30 == 0 and not self.analyzing:
                self.analysis_queue.put(frame.copy())
            
            # Draw analysis results on frame
            self._draw_analysis_results(frame)
            
            # Display the frame
            cv2.imshow('Real-time Face Analysis', frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            
            frame_count += 1
        
        self.cap.release()
        cv2.destroyAllWindows()
    
    def _analysis_worker(self):
        """Background thread for face analysis"""
        while True:
            try:
                frame = self.analysis_queue.get(timeout=1)
                self.analyzing = True
                
                # Save frame temporarily
                temp_path = 'temp_frame.jpg'
                cv2.imwrite(temp_path, frame)
                
                # Analyze face
                try:
                    analysis = self.client.face.analyze(
                        temp_path,
                        actions=['age', 'gender', 'emotion'],
                        return_face_region=True
                    )
                    
                    self.current_analysis = {
                        'age': analysis.age,
                        'gender': analysis.gender.prediction,
                        'gender_confidence': analysis.gender.confidence,
                        'emotion': analysis.emotion.dominant_emotion,
                        'face_region': analysis.region,
                        'timestamp': datetime.now()
                    }
                    
                except Exception as e:
                    print(f"Analysis failed: {e}")
                    self.current_analysis = None
                
                self.analyzing = False
                
            except queue.Empty:
                continue
    
    def _draw_analysis_results(self, frame):
        """Draw analysis results on the frame"""
        if self.current_analysis is None:
            cv2.putText(frame, 'No face detected', (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            return
        
        analysis = self.current_analysis
        
        # Draw face rectangle if available
        if analysis.get('face_region'):
            region = analysis['face_region']
            cv2.rectangle(frame, (region.x, region.y),
                         (region.x + region.w, region.y + region.h),
                         (0, 255, 0), 2)
        
        # Draw analysis text
        y_offset = 30
        texts = [
            f"Age: {analysis['age']}",
            f"Gender: {analysis['gender']} ({analysis['gender_confidence']:.2f})",
            f"Emotion: {analysis['emotion']}",
        ]
        
        if self.analyzing:
            texts.append("Analyzing...")
        
        for text in texts:
            cv2.putText(frame, text, (10, y_offset),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
            y_offset += 30

# Usage
analyzer = RealTimeFaceAnalyzer(client)
analyzer.start_analysis()  # Press 'q' to quit
```

## OCR Examples

### ID Extraction (INE)

```python
import requests

API_KEY = 'YOUR_API_KEY'
BASE_URL = 'https://api.zaits.net'

def extract_ine(front_path, back_path=None):
    """Extract structured data from an INE. Back image enables MRZ validation."""
    files = {'front': open(front_path, 'rb')}
    if back_path:
        files['back'] = open(back_path, 'rb')

    data = {'document_type': 'ine'}

    response = requests.post(
        f'{BASE_URL}/v1/ocr/extract/id',
        headers={'Authorization': f'Bearer {API_KEY}'},
        files=files,
        data=data,
    )
    result = response.json()

    if not result.get('success'):
        raise RuntimeError(result.get('message') or result.get('error'))

    fields     = result['fields']
    validation = result['validation']

    print(f"Nombre:    {fields['surname']}")
    print(f"CURP:      {fields['curp']}")
    print(f"Domicilio: {fields['address']}")
    print(f"MRZ:       {validation['mrz']}")       # 'ok' | 'failed' | 'not_present'
    print(f"Vencido:   {validation['expired']}")   # True | False | None

    return result
```

### ID Extraction with Authenticity

```python
def extract_ine_with_authenticity(front_path, back_path=None):
    """Include authenticity verification inline (extra credit charge)."""
    files = {'front': open(front_path, 'rb')}
    if back_path:
        files['back'] = open(back_path, 'rb')

    data = {
        'document_type': 'ine',
        'include_authenticity': 'true',
    }

    response = requests.post(
        f'{BASE_URL}/v1/ocr/extract/id',
        headers={'Authorization': f'Bearer {API_KEY}'},
        files=files,
        data=data,
    )
    result = response.json()

    if result.get('authenticity'):
        auth = result['authenticity']['validity_check']
        print(f"Auténtico:    {auth['is_authentic']}")
        print(f"Confianza:    {auth['confidence']:.0%}")
        print(f"Manipulación: {auth['tampering_detected']}")

    return result
```

### Passport Extraction

```python
def extract_passport(image_path):
    """Extract structured data from a Mexican passport.""    response = requests.post(
        f'{BASE_URL}/v1/ocr/extract/id',
        headers={'Authorization': f'Bearer {API_KEY}'},
        files={'front': open(image_path, 'rb')},
        data={'document_type': 'passport'},
    )
    return response.json()
```

### Address Proof (CFE / TELMEX / IZZI)

```python
def extract_address_proof(image_path, doc_type='cfe'):
    """Extract data from a Mexican utility bill.
    doc_type: 'cfe' | 'telmex' | 'izzi'
    """
    response = requests.post(
        f'{BASE_URL}/v1/ocr/extract/document',
        headers={'Authorization': f'Bearer {API_KEY}'},
        files={'image': open(image_path, 'rb')},
        data={'document_type': doc_type},
    )
    result = response.json()

    if not result.get('success'):
        raise RuntimeError(result.get('message') or result.get('error'))

    fields = result['fields']
    print(f"Nombre:    {fields['name']}")
    print(f"Domicilio: {fields['fullAddress']}")
    print(f"CP:        {fields['zipCode']}")
    print(f"Total:     {fields['totalPayment']}")

    return result

# Usage
extract_address_proof('./recibo_cfe.jpg', 'cfe')
extract_address_proof('./recibo_telmex.jpg', 'telmex')
```

### Batch ID Verification

```python
from concurrent.futures import ThreadPoolExecutor, as_completed

def batch_extract_ids(documents, max_workers=3):
    """Process multiple ID documents concurrently.
    documents: list of dicts with keys 'front', optional 'back', and 'type'
    """
    results = []

    def process(doc):
        return extract_ine(doc['front'], doc.get('back'))

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process, doc): doc for doc in documents}

        for future in as_completed(futures):
            doc = futures[future]
            try:
                result = future.result()
                results.append({'file': doc['front'], 'result': result})
            except Exception as exc:
                results.append({'file': doc['front'], 'error': str(exc)})

    ok     = [r for r in results if 'error' not in r]
    failed = [r for r in results if 'error' in r]
    print(f'Processed: {len(ok)} OK, {len(failed)} failed')
    return results

# Usage
documents = [
    {'front': './user1_frente.jpg', 'back': './user1_reverso.jpg', 'type': 'ine'},
    {'front': './user2_frente.jpg', 'back': './user2_reverso.jpg', 'type': 'ine'},
    {'front': './user3_pasaporte.jpg', 'type': 'passport'},
]
batch_extract_ids(documents, max_workers=3)
```

## Document Signing Examples

### Automated Contract Workflow

```python
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
import time

class ContractWorkflowManager:
    def __init__(self, zaits_client, email_config=None):
        self.client = zaits_client
        self.email_config = email_config or {}
        self.active_contracts = {}
    
    def create_contract_workflow(self, contract_data):
        """Create a complete contract signing workflow"""
        try:
            print(f"📝 Creating contract workflow: {contract_data['title']}")
            
            # Create signing request
            signing_request = self.client.signing.create({
                'document': contract_data['document_path'],
                'signers': contract_data['signers'],
                'title': contract_data['title'],
                'message': contract_data.get('message', 'Please review and sign this contract.'),
                'expires_in': contract_data.get('expires_in', '30d'),
                'required_fields': contract_data.get('required_fields', [
                    {'name': 'signature', 'type': 'signature', 'required': True},
                    {'name': 'date', 'type': 'date', 'required': True}
                ])
            })
            
            # Store contract info
            workflow_data = {
                'document_id': signing_request.document_id,
                'signing_url': signing_request.signing_url,
                'created_at': datetime.now(),
                'status': 'pending',
                'signers': contract_data['signers'],
                'title': contract_data['title'],
                'reminders_sent': 0,
                'max_reminders': contract_data.get('max_reminders', 3)
            }
            
            self.active_contracts[signing_request.document_id] = workflow_data
            
            # Send initial notifications
            self._send_signing_invitations(workflow_data)
            
            print(f"Contract workflow created: {signing_request.document_id}")
            return signing_request.document_id
            
        except Exception as e:
            print(f"Failed to create contract workflow: {e}")
            return None
    
    def monitor_contracts(self, check_interval=3600):  # Check every hour
        """Monitor active contracts and send reminders"""
        print(f"🔄 Starting contract monitoring (checking every {check_interval/3600:.1f} hours)")
        
        while self.active_contracts:
            for document_id in list(self.active_contracts.keys()):
                try:
                    self._check_contract_status(document_id)
                    time.sleep(10)  # Small delay between checks
                except Exception as e:
                    print(f"Error checking contract {document_id}: {e}")
            
            print(f"⏰ Waiting {check_interval} seconds until next check...")
            time.sleep(check_interval)
    
    def _check_contract_status(self, document_id):
        """Check status of a specific contract"""
        workflow = self.active_contracts[document_id]
        
        try:
            # Get current status
            status = self.client.signing.get_status(document_id)
            
            if status.status == 'completed':
                print(f"🎉 Contract completed: {workflow['title']}")
                
                # Download signed document
                signed_doc_path = f"signed_{document_id}.pdf"
                self.client.signing.download(document_id, signed_doc_path)
                
                # Send completion notifications
                self._send_completion_notification(workflow, signed_doc_path)
                
                # Remove from active contracts
                del self.active_contracts[document_id]
                
            elif status.status == 'expired':
                print(f"⏰ Contract expired: {workflow['title']}")
                self._send_expiration_notification(workflow)
                del self.active_contracts[document_id]
                
            else:
                # Check if reminders are needed
                self._check_reminders_needed(workflow, status)
                
        except Exception as e:
            print(f"Error checking status for {document_id}: {e}")
    
    def _check_reminders_needed(self, workflow, status):
        """Check if reminder emails should be sent"""
        now = datetime.now()
        days_since_created = (now - workflow['created_at']).days
        
        # Send reminders every 7 days, max 3 reminders
        if (days_since_created > 0 and 
            days_since_created % 7 == 0 and 
            workflow['reminders_sent'] < workflow['max_reminders']):
            
            # Find pending signers
            pending_signers = [
                signer for signer in status.signers 
                if signer.status == 'pending'
            ]
            
            if pending_signers:
                self._send_reminder_emails(workflow, pending_signers)
                workflow['reminders_sent'] += 1
    
    def _send_signing_invitations(self, workflow):
        """Send initial signing invitations"""
        if not self.email_config:
            print("📧 Email not configured - skipping invitations")
            return
        
        subject = f"Please sign: {workflow['title']}"
        
        for signer in workflow['signers']:
            body = f"""
            Hello {signer['name']},
            
            You have been requested to sign a document: {workflow['title']}
            
            Please click the link below to review and sign:
            {workflow['signing_url']}
            
            This request will expire in 30 days.
            
            Best regards,
            Contract Management System
            """
            
            self._send_email(signer['email'], subject, body)
        
        print(f"📧 Signing invitations sent to {len(workflow['signers'])} signers")
    
    def _send_reminder_emails(self, workflow, pending_signers):
        """Send reminder emails to pending signers"""
        subject = f"Reminder: Please sign {workflow['title']}"
        
        for signer in pending_signers:
            body = f"""
            Hello {signer['name']},
            
            This is a reminder that you have a pending document to sign: {workflow['title']}
            
            Please click the link below to review and sign:
            {workflow['signing_url']}
            
            Please complete this at your earliest convenience.
            
            Best regards,
            Contract Management System
            """
            
            self._send_email(signer['email'], subject, body)
        
        print(f"📧 Reminder sent to {len(pending_signers)} pending signers")
    
    def _send_email(self, to_email, subject, body):
        """Send email using SMTP"""
        try:
            msg = MIMEMultipart()
            msg['From'] = self.email_config['from_email']
            msg['To'] = to_email
            msg['Subject'] = subject
            
            msg.attach(MIMEText(body, 'plain'))
            
            server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
            server.starttls()
            server.login(self.email_config['username'], self.email_config['password'])
            
            server.send_message(msg)
            server.quit()
            
            print(f"📧 Email sent to {to_email}")
            
        except Exception as e:
            print(f"Failed to send email to {to_email}: {e}")

# Usage example
email_config = {
    'smtp_server': 'smtp.gmail.com',
    'smtp_port': 587,
    'username': 'your-email@gmail.com',
    'password': 'your-app-password',
    'from_email': 'your-email@gmail.com'
}

workflow_manager = ContractWorkflowManager(client, email_config)

# Create contract workflow
contract_data = {
    'document_path': './contracts/service_agreement.pdf',
    'title': 'Software Development Service Agreement',
    'message': 'Please review and sign this service agreement for our upcoming project.',
    'signers': [
        {
            'name': 'John Doe',
            'email': 'john@client.com',
            'role': 'client'
        },
        {
            'name': 'Jane Smith',
            'email': 'jane@development-company.com',
            'role': 'service_provider'
        }
    ],
    'expires_in': '14d',
    'max_reminders': 2
}

document_id = workflow_manager.create_contract_workflow(contract_data)

if document_id:
    # Start monitoring (this will run indefinitely)
    workflow_manager.monitor_contracts(check_interval=3600)  # Check every hour
```

***

**Next:** [cURL Examples](/api/code-examples/curl.md)


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://zaits.gitbook.io/api/code-examples/python.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
