Modern voice AI systems face unique security challenges that go beyond traditional IT security concerns.
Data Interception:
Spoofing & Deepfakes:
Fraud via IVR:
class VoiceSecurityThreats:
"""Common security threats in voice AI systems"""
def __init__(self):
self.threat_categories = {
"interception": {
"description": "Unauthorized access to voice data",
"mitigation": ["End-to-end encryption", "Secure transmission protocols"]
},
"spoofing": {
"description": "Voice impersonation attacks",
"mitigation": ["Voice biometrics", "Liveness detection", "MFA"]
},
"fraud": {
"description": "Exploitation of voice systems",
"mitigation": ["Rate limiting", "Behavioral analysis", "Fraud detection"]
}
}
def assess_threat_level(self, system_type: str, data_sensitivity: str) -> Dict[str, str]:
"""Assess threat level for different system types"""
if system_type in ["banking", "healthcare", "government"]:
return {"level": "high", "recommendations": self.threat_categories}
elif system_type in ["ecommerce", "utilities", "insurance"]:
return {"level": "medium", "recommendations": self.threat_categories}
else:
return {"level": "low", "recommendations": self.threat_categories}
from cryptography.fernet import Fernet
import re
class VoiceEncryption:
"""Voice data encryption and secure transmission"""
def __init__(self):
self.encryption_key = Fernet.generate_key()
self.cipher_suite = Fernet(self.encryption_key)
def encrypt_voice_data(self, audio_data: bytes) -> bytes:
"""Encrypt voice audio data"""
return self.cipher_suite.encrypt(audio_data)
def decrypt_voice_data(self, encrypted_data: bytes) -> bytes:
"""Decrypt voice audio data"""
return self.cipher_suite.decrypt(encrypted_data)
def mask_sensitive_data(self, text: str) -> str:
"""Mask sensitive information in voice transcripts"""
# Mask credit card numbers
text = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CARD_NUMBER]', text)
# Mask SSN
text = re.sub(r'\b\d{3}[\s-]?\d{2}[\s-]?\d{4}\b', '[SSN]', text)
# Mask phone numbers
text = re.sub(r'\b\d{3}[\s-]?\d{3}[\s-]?\d{4}\b', '[PHONE]', text)
return text
import hashlib
import secrets
import time
from typing import Dict, List, Optional
class VoiceIAM:
"""Identity and Access Management for voice systems"""
def __init__(self):
self.users = {}
self.api_keys = {}
self.session_tokens = {}
def create_user(self, username: str, password: str, role: str = "user") -> Dict[str, str]:
"""Create a new user with secure password hashing"""
# Generate salt and hash password
salt = secrets.token_hex(16)
password_hash = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000
).hex()
user_id = secrets.token_hex(16)
self.users[user_id] = {
"username": username,
"password_hash": password_hash,
"salt": salt,
"role": role,
"created_at": time.time(),
"mfa_enabled": False
}
return {"user_id": user_id, "status": "created"}
def authenticate_user(self, username: str, password: str, mfa_code: Optional[str] = None) -> Dict[str, Any]:
"""Authenticate user with MFA support"""
# Find user by username
user_id = None
for uid, user_data in self.users.items():
if user_data["username"] == username:
user_id = uid
break
if not user_id:
return {"authenticated": False, "error": "User not found"}
user = self.users[user_id]
# Verify password
password_hash = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
user["salt"].encode('utf-8'),
100000
).hex()
if password_hash != user["password_hash"]:
return {"authenticated": False, "error": "Invalid password"}
# Check MFA if enabled
if user["mfa_enabled"] and not mfa_code:
return {"authenticated": False, "error": "MFA code required"}
# Generate session token
session_token = secrets.token_hex(32)
self.session_tokens[session_token] = {
"user_id": user_id,
"created_at": time.time(),
"expires_at": time.time() + 3600 # 1 hour
}
return {
"authenticated": True,
"user_id": user_id,
"role": user["role"],
"session_token": session_token
}
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class GDPRCompliance:
"""GDPR compliance management for voice systems"""
def __init__(self):
self.consent_records = {}
self.retention_policies = {
"voice_recordings": 30, # days
"transcripts": 90, # days
"user_profiles": 365, # days
}
def record_consent(self, user_id: str, consent_type: str,
consent_given: bool) -> str:
"""Record user consent for data processing"""
consent_id = f"consent_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{user_id}"
self.consent_records[consent_id] = {
"user_id": user_id,
"consent_type": consent_type,
"consent_given": consent_given,
"timestamp": datetime.now()
}
return consent_id
def check_consent(self, user_id: str, consent_type: str) -> bool:
"""Check if user has given consent for specific processing"""
# Find most recent consent for this user and type
latest_consent = None
latest_timestamp = None
for consent_id, consent_data in self.consent_records.items():
if (consent_data["user_id"] == user_id and
consent_data["consent_type"] == consent_type):
if latest_timestamp is None or consent_data["timestamp"] > latest_timestamp:
latest_consent = consent_data
latest_timestamp = consent_data["timestamp"]
if latest_consent is None:
return False
return latest_consent["consent_given"]
def process_data_subject_request(self, user_id: str, request_type: str) -> Dict[str, Any]:
"""Process GDPR data subject requests"""
if request_type == "access":
return {
"request_type": "access",
"user_id": user_id,
"data": self._get_user_personal_data(user_id),
"timestamp": datetime.now()
}
elif request_type == "deletion":
return {
"request_type": "deletion",
"user_id": user_id,
"status": "deletion_scheduled",
"completion_date": datetime.now() + timedelta(days=30)
}
else:
return {"error": "Unknown request type"}
def _get_user_personal_data(self, user_id: str) -> Dict[str, Any]:
"""Get user's personal data"""
return {
"name": "John Doe",
"email": "john.doe@example.com",
"phone": "+1234567890",
"voice_profile": "voice_profile_hash"
}
class HIPAACompliance:
"""HIPAA compliance for healthcare voice applications"""
def __init__(self):
self.phi_records = {} # Protected Health Information
self.access_logs = {}
def handle_phi_data(self, patient_id: str, data_type: str,
data_content: str, user_id: str) -> Dict[str, Any]:
"""Handle Protected Health Information with HIPAA compliance"""
# Log access
access_id = self._log_access(patient_id, user_id, data_type)
# Encrypt PHI data
encrypted_data = self._encrypt_phi_data(data_content)
# Store with audit trail
self.phi_records[access_id] = {
"patient_id": patient_id,
"data_type": data_type,
"encrypted_data": encrypted_data,
"user_id": user_id,
"timestamp": datetime.now(),
"purpose": "treatment"
}
return {
"access_id": access_id,
"status": "phi_handled",
"compliance_verified": True
}
def _log_access(self, patient_id: str, user_id: str, data_type: str) -> str:
"""Log access to PHI"""
access_id = f"access_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{user_id}"
self.access_logs[access_id] = {
"patient_id": patient_id,
"user_id": user_id,
"data_type": data_type,
"timestamp": datetime.now(),
"action": "access"
}
return access_id
def _encrypt_phi_data(self, data: str) -> str:
"""Encrypt PHI data"""
return f"encrypted_{hash(data)}"
class VoiceAuditSystem:
"""Comprehensive audit system for voice applications"""
def __init__(self):
self.audit_logs = []
self.audit_config = {
"retention_days": 2555, # 7 years
"sensitive_fields": ["password", "ssn", "credit_card", "api_key"]
}
def log_audit_event(self, event_type: str, user_id: str,
action: str, details: Dict[str, Any],
severity: str = "INFO") -> str:
"""Log audit event with comprehensive details"""
audit_id = f"audit_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
audit_entry = {
"audit_id": audit_id,
"timestamp": datetime.now(),
"event_type": event_type,
"user_id": user_id,
"action": action,
"details": self._sanitize_details(details),
"severity": severity
}
self.audit_logs.append(audit_entry)
return audit_id
def _sanitize_details(self, details: Dict[str, Any]) -> Dict[str, Any]:
"""Remove sensitive information from audit details"""
sanitized = details.copy()
for field in self.audit_config["sensitive_fields"]:
if field in sanitized:
sanitized[field] = "[REDACTED]"
return sanitized
def generate_audit_report(self, start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Generate comprehensive audit report"""
period_logs = [
log for log in self.audit_logs
if start_date <= log["timestamp"] <= end_date
]
# Analyze by event type
event_counts = {}
for log in period_logs:
event_type = log["event_type"]
event_counts[event_type] = event_counts.get(event_type, 0) + 1
return {
"report_period": f"{start_date} to {end_date}",
"total_events": len(period_logs),
"event_type_breakdown": event_counts,
"unique_users": len(set(log["user_id"] for log in period_logs)),
"compliance_status": "compliant"
}
class ResponsibleAI:
"""Responsible AI practices for voice applications"""
def __init__(self):
self.ai_ethics_guidelines = {
"transparency": ["disclose_ai_usage", "explain_ai_decisions"],
"fairness": ["bias_detection", "equal_treatment"],
"privacy": ["data_minimization", "consent_management"],
"accountability": ["decision_logging", "human_oversight"]
}
self.decision_logs = []
def disclose_ai_usage(self, interaction_type: str) -> str:
"""Generate AI disclosure message"""
disclosures = {
"greeting": "Hello, I'm an AI assistant. How can I help you today?",
"confirmation": "I'm an AI system processing your request.",
"escalation": "I'm connecting you with a human agent who can better assist you.",
"closing": "Thank you for using our AI-powered service."
}
return disclosures.get(interaction_type, "I'm an AI assistant.")
def log_ai_decision(self, decision_type: str, input_data: str,
output_data: str, confidence: float,
user_id: str) -> str:
"""Log AI decision for transparency and accountability"""
decision_id = f"decision_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
decision_log = {
"decision_id": decision_id,
"timestamp": datetime.now(),
"decision_type": decision_type,
"input_data": self._sanitize_input(input_data),
"output_data": output_data,
"confidence": confidence,
"user_id": user_id,
"model_version": "voice_ai_v1.2"
}
self.decision_logs.append(decision_log)
return decision_id
def _sanitize_input(self, input_data: str) -> str:
"""Sanitize input data for logging"""
import re
# Mask personal information
sanitized = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CARD]', input_data)
sanitized = re.sub(r'\b\d{3}[\s-]?\d{2}[\s-]?\d{4}\b', '[SSN]', sanitized)
return sanitized
def monitor_bias(self, model_outputs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Monitor for bias in AI model outputs"""
bias_metrics = {
"gender_bias": 0.0,
"accent_bias": 0.0,
"language_bias": 0.0
}
total_outputs = len(model_outputs)
if total_outputs > 0:
for output in model_outputs:
if "gender" in output and output["gender"] == "female":
bias_metrics["gender_bias"] += 1
if "accent" in output and output["accent"] != "standard":
bias_metrics["accent_bias"] += 1
# Normalize metrics
for key in bias_metrics:
bias_metrics[key] = bias_metrics[key] / total_outputs
return {
"timestamp": datetime.now(),
"bias_metrics": bias_metrics,
"total_samples": total_outputs,
"bias_detected": any(metric > 0.1 for metric in bias_metrics.values())
}
Security and compliance are non-negotiable pillars in modern voice applications. This chapter has covered:
A well-implemented security and compliance strategy ensures:
✅ This closes Chapter 8.
Chapter 9 will cover deployment strategies, scaling considerations, and production best practices for enterprise voice AI systems.