mirror of
https://github.com/PlatypusPus/MushroomEmpire.git
synced 2026-02-07 22:18:59 +00:00
1369 lines
54 KiB
Python
1369 lines
54 KiB
Python
"""
|
|
Enhanced Risk Analyzer Module - Presidio-Powered
|
|
Comprehensive privacy, security, and ethical risk assessment
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
import re
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any, Optional
|
|
from collections import defaultdict
|
|
|
|
# Presidio imports
|
|
try:
|
|
from presidio_analyzer import AnalyzerEngine
|
|
from presidio_analyzer.nlp_engine import NlpEngineProvider
|
|
PRESIDIO_AVAILABLE = True
|
|
except ImportError:
|
|
PRESIDIO_AVAILABLE = False
|
|
print("⚠️ Presidio not available. Install with: pip install presidio-analyzer")
|
|
|
|
|
|
class RiskAnalyzer:
|
|
"""Comprehensive risk analysis with Presidio-enhanced PII detection"""
|
|
|
|
# Class-level cache for Presidio analyzer
|
|
_presidio_analyzer = None
|
|
_presidio_initialized = False
|
|
_presidio_init_failed = False
|
|
|
|
def __init__(self, df, model_results, bias_results, protected_attributes, target_column, use_presidio=False):
|
|
self.df = df
|
|
self.model_results = model_results
|
|
self.bias_results = bias_results
|
|
self.protected_attributes = protected_attributes
|
|
self.target_column = target_column
|
|
self.results = {}
|
|
self.use_presidio = use_presidio
|
|
|
|
# Initialize Presidio only if requested and not already failed
|
|
if self.use_presidio and PRESIDIO_AVAILABLE and not RiskAnalyzer._presidio_init_failed:
|
|
if not RiskAnalyzer._presidio_initialized:
|
|
self._init_presidio()
|
|
self.analyzer = RiskAnalyzer._presidio_analyzer
|
|
else:
|
|
self.analyzer = None
|
|
|
|
def _init_presidio(self):
|
|
"""Initialize Presidio analyzer (cached at class level)"""
|
|
try:
|
|
print("⏳ Initializing Presidio for risk analysis...")
|
|
|
|
# Check if spaCy and model are available
|
|
try:
|
|
import spacy
|
|
|
|
# Check if model exists WITHOUT loading it
|
|
model_name = "en_core_web_sm"
|
|
if not spacy.util.is_package(model_name):
|
|
print(f"⚠️ spaCy model '{model_name}' not found.")
|
|
print(f" To enable Presidio, install the model with:")
|
|
print(f" python -m spacy download {model_name}")
|
|
print(" Continuing with regex-only PII detection...")
|
|
RiskAnalyzer._presidio_init_failed = True
|
|
return
|
|
|
|
# Model exists, now load it
|
|
print(f"✓ spaCy model '{model_name}' found, loading...")
|
|
nlp = spacy.load(model_name)
|
|
|
|
except ImportError:
|
|
print("⚠️ spaCy not installed. Continuing with regex-only detection...")
|
|
print(" Install spaCy with: pip install spacy")
|
|
RiskAnalyzer._presidio_init_failed = True
|
|
return
|
|
except Exception as e:
|
|
print(f"⚠️ Error loading spaCy model: {e}")
|
|
print(" Continuing with regex-only PII detection...")
|
|
RiskAnalyzer._presidio_init_failed = True
|
|
return
|
|
|
|
# Create NLP engine configuration (prevent auto-download)
|
|
from presidio_analyzer.nlp_engine import NlpEngineProvider
|
|
|
|
configuration = {
|
|
"nlp_engine_name": "spacy",
|
|
"models": [{"lang_code": "en", "model_name": model_name}],
|
|
}
|
|
|
|
provider = NlpEngineProvider(nlp_configuration=configuration)
|
|
nlp_engine = provider.create_engine()
|
|
|
|
# Initialize analyzer
|
|
RiskAnalyzer._presidio_analyzer = AnalyzerEngine(nlp_engine=nlp_engine)
|
|
RiskAnalyzer._presidio_initialized = True
|
|
print("✓ Presidio initialized for risk analysis")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Could not initialize Presidio: {e}")
|
|
print(" Continuing with regex-only PII detection...")
|
|
RiskAnalyzer._presidio_init_failed = True
|
|
RiskAnalyzer._presidio_analyzer = None
|
|
|
|
# Enhanced PII patterns for fallback regex detection
|
|
PII_PATTERNS = {
|
|
'EMAIL': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
|
|
'PHONE_US': r'\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b',
|
|
'PHONE_INTERNATIONAL': r'\+\d{1,3}[-.\s]?\d{1,4}[-.\s]?\d{1,4}[-.\s]?\d{1,9}',
|
|
'SSN': r'\b\d{3}-\d{2}-\d{4}\b',
|
|
'CREDIT_CARD': r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
|
|
'IP_ADDRESS': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
|
|
'MAC_ADDRESS': r'\b(?:[0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}\b',
|
|
'US_ADDRESS': r'\b\d{1,5}\s+[\w\s]+(?:street|st|avenue|ave|road|rd|highway|hwy)\b',
|
|
'ZIP_CODE': r'\b\d{5}(?:-\d{4})?\b',
|
|
'DATE_OF_BIRTH': r'\b(?:0?[1-9]|1[0-2])[/-](?:0?[1-9]|[12]\d|3[01])[/-](?:19|20)\d{2}\b',
|
|
'PASSPORT': r'\b[A-Z]{1,2}\d{6,9}\b',
|
|
'DRIVERS_LICENSE': r'\b[A-Z]{1,2}\d{6,8}\b',
|
|
'BANK_ACCOUNT': r'\b\d{8,17}\b',
|
|
'ROUTING_NUMBER': r'\b[0-9]{9}\b',
|
|
'MEDICAL_RECORD': r'\b(?:MRN|MR#)[\s:]*[A-Z0-9]{6,12}\b',
|
|
}
|
|
|
|
# Presidio entity types to detect
|
|
PRESIDIO_ENTITIES = [
|
|
'CREDIT_CARD', 'CRYPTO', 'EMAIL_ADDRESS', 'IBAN_CODE',
|
|
'IP_ADDRESS', 'LOCATION', 'PERSON', 'PHONE_NUMBER',
|
|
'MEDICAL_LICENSE', 'US_BANK_NUMBER', 'US_DRIVER_LICENSE',
|
|
'US_ITIN', 'US_PASSPORT', 'US_SSN', 'UK_NHS',
|
|
'SG_NRIC_FIN', 'AU_ABN', 'AU_ACN', 'AU_TFN', 'AU_MEDICARE'
|
|
]
|
|
|
|
def analyze(self):
|
|
"""Perform comprehensive risk analysis with Presidio integration"""
|
|
print("\n" + "=" * 70)
|
|
print("COMPREHENSIVE RISK ANALYSIS WITH PRESIDIO")
|
|
print("=" * 70)
|
|
|
|
# Enhanced risk analysis
|
|
privacy_risks = self._analyze_privacy_risks_enhanced()
|
|
ethical_risks = self._analyze_ethical_risks_enhanced()
|
|
compliance_risks = self._analyze_compliance_risks_enhanced()
|
|
security_risks = self._analyze_security_risks()
|
|
operational_risks = self._analyze_operational_risks()
|
|
data_quality_risks = self._analyze_data_quality_risks_enhanced()
|
|
|
|
# Calculate category scores
|
|
category_scores = {
|
|
'privacy': privacy_risks.get('risk_score', 0.0),
|
|
'ethical': ethical_risks.get('risk_score', 0.0),
|
|
'compliance': compliance_risks.get('risk_score', 0.0),
|
|
'security': security_risks.get('risk_score', 0.0),
|
|
'operational': operational_risks.get('risk_score', 0.0),
|
|
'data_quality': data_quality_risks.get('risk_score', 0.0)
|
|
}
|
|
|
|
# Calculate weighted overall risk
|
|
overall_risk_score = self._calculate_weighted_risk_score(category_scores)
|
|
risk_level = self._classify_risk_level(overall_risk_score)
|
|
|
|
# Detect violations
|
|
violations = self._detect_all_violations(
|
|
privacy_risks, ethical_risks, compliance_risks,
|
|
security_risks, operational_risks, data_quality_risks
|
|
)
|
|
|
|
# Generate insights
|
|
insights = self._generate_risk_insights(
|
|
category_scores, violations, privacy_risks, ethical_risks
|
|
)
|
|
|
|
self.results = {
|
|
'privacy_risks': privacy_risks,
|
|
'ethical_risks': ethical_risks,
|
|
'model_performance_risks': self._analyze_model_performance_risks(),
|
|
'compliance_risks': compliance_risks,
|
|
'data_quality_risks': data_quality_risks,
|
|
'security_risks': security_risks,
|
|
'operational_risks': operational_risks,
|
|
'risk_categories': category_scores,
|
|
'overall_risk_score': overall_risk_score,
|
|
'risk_level': risk_level,
|
|
'violations': violations,
|
|
'insights': insights,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'presidio_enabled': self.analyzer is not None
|
|
}
|
|
|
|
self._print_risk_summary()
|
|
return self.results
|
|
|
|
def _analyze_privacy_risks_enhanced(self):
|
|
"""Enhanced privacy analysis with Presidio"""
|
|
print("⏳ Analyzing privacy risks...")
|
|
|
|
# Detect PII using Presidio and/or regex
|
|
pii_detections = self._detect_pii_comprehensive()
|
|
|
|
# Calculate re-identification risk
|
|
reidentification_risk = self._calculate_reidentification_risk(pii_detections)
|
|
|
|
# Analyze data minimization
|
|
data_minimization_score = self._analyze_data_minimization()
|
|
|
|
# Check anonymization techniques
|
|
anonymization_level = self._assess_anonymization(pii_detections)
|
|
|
|
# Group-level privacy risk
|
|
group_privacy_risks = self._analyze_group_privacy_risks(pii_detections)
|
|
|
|
# Calculate overall privacy risk score
|
|
pii_count = len(pii_detections)
|
|
pii_risk = min(pii_count * 0.1, 1.0) # 0.1 per PII type, capped at 1.0
|
|
|
|
privacy_risk_score = (
|
|
pii_risk * 0.4 +
|
|
reidentification_risk * 0.3 +
|
|
(1 - data_minimization_score) * 0.2 +
|
|
(1 if anonymization_level == 'NONE' else 0.5 if anonymization_level == 'PARTIAL' else 0) * 0.1
|
|
)
|
|
|
|
return {
|
|
'risk_score': privacy_risk_score,
|
|
'pii_detected': pii_detections,
|
|
'pii_count': pii_count,
|
|
'reidentification_risk': reidentification_risk,
|
|
'data_minimization_score': data_minimization_score,
|
|
'anonymization_level': anonymization_level,
|
|
'group_privacy_risks': group_privacy_risks,
|
|
'sensitive_attributes': self.protected_attributes,
|
|
'detection_method': 'Presidio' if self.analyzer else 'Regex',
|
|
'recommendations': self._generate_privacy_recommendations(
|
|
pii_detections, reidentification_risk, anonymization_level
|
|
)
|
|
}
|
|
|
|
def _detect_pii_comprehensive(self):
|
|
"""Comprehensive PII detection using Presidio + regex"""
|
|
pii_detections = []
|
|
detected_types = set()
|
|
|
|
for col in self.df.columns:
|
|
col_lower = col.lower()
|
|
|
|
# Column name-based detection
|
|
column_pii = self._detect_pii_from_column_name(col, col_lower)
|
|
if column_pii:
|
|
pii_type = column_pii['type']
|
|
if pii_type not in detected_types:
|
|
pii_detections.append(column_pii)
|
|
detected_types.add(pii_type)
|
|
|
|
# Content-based detection with Presidio
|
|
if self.analyzer and self.df[col].dtype == 'object':
|
|
content_pii = self._detect_pii_with_presidio(col)
|
|
for pii in content_pii:
|
|
if pii['type'] not in detected_types:
|
|
pii_detections.append(pii)
|
|
detected_types.add(pii['type'])
|
|
|
|
# Regex fallback for content
|
|
elif self.df[col].dtype == 'object':
|
|
regex_pii = self._detect_pii_with_regex(col)
|
|
for pii in regex_pii:
|
|
if pii['type'] not in detected_types:
|
|
pii_detections.append(pii)
|
|
detected_types.add(pii['type'])
|
|
|
|
return sorted(pii_detections, key=lambda x: x['severity'], reverse=True)
|
|
|
|
def _detect_pii_from_column_name(self, col, col_lower):
|
|
"""Detect PII from column names"""
|
|
name_patterns = {
|
|
'EMAIL': ['email', 'e-mail', 'mail'],
|
|
'PHONE': ['phone', 'mobile', 'tel', 'telephone'],
|
|
'SSN': ['ssn', 'social security', 'social_security'],
|
|
'ADDRESS': ['address', 'street', 'location', 'residence'],
|
|
'ZIP_CODE': ['zip', 'postal', 'postcode'],
|
|
'NAME': ['name', 'firstname', 'lastname', 'fullname'],
|
|
'DOB': ['dob', 'birth', 'birthday', 'dateofbirth'],
|
|
'ID': ['id', 'identifier', 'userid', 'user_id'],
|
|
'IP_ADDRESS': ['ip', 'ipaddress', 'ip_address'],
|
|
'CREDIT_CARD': ['card', 'credit', 'creditcard'],
|
|
'PASSPORT': ['passport'],
|
|
'LICENSE': ['license', 'licence', 'driver'],
|
|
'BANK_ACCOUNT': ['account', 'bank_account', 'banking'],
|
|
}
|
|
|
|
for pii_type, keywords in name_patterns.items():
|
|
if any(kw in col_lower for kw in keywords):
|
|
severity = self._determine_pii_severity(pii_type)
|
|
return {
|
|
'column': col,
|
|
'type': pii_type,
|
|
'severity': severity,
|
|
'detection_method': 'column_name',
|
|
'confidence': 0.9
|
|
}
|
|
return None
|
|
|
|
def _detect_pii_with_presidio(self, column):
|
|
"""Detect PII in column content using Presidio"""
|
|
detections = []
|
|
|
|
# Sample values from column (max 100 for performance)
|
|
sample_size = min(100, len(self.df))
|
|
samples = self.df[column].dropna().sample(min(sample_size, len(self.df[column].dropna()))).astype(str)
|
|
|
|
entity_counts = defaultdict(int)
|
|
|
|
for value in samples:
|
|
if len(str(value)) > 5: # Skip very short values
|
|
try:
|
|
results = self.analyzer.analyze(
|
|
text=str(value),
|
|
entities=self.PRESIDIO_ENTITIES,
|
|
language='en'
|
|
)
|
|
|
|
for result in results:
|
|
if result.score > 0.5: # Confidence threshold
|
|
entity_counts[result.entity_type] += 1
|
|
except Exception as e:
|
|
continue
|
|
|
|
# If entity detected in >20% of samples, mark column as PII
|
|
threshold = sample_size * 0.2
|
|
for entity_type, count in entity_counts.items():
|
|
if count > threshold:
|
|
severity = self._determine_pii_severity(entity_type)
|
|
detections.append({
|
|
'column': column,
|
|
'type': entity_type,
|
|
'severity': severity,
|
|
'detection_method': 'presidio',
|
|
'confidence': min(count / sample_size, 1.0),
|
|
'occurrences': count
|
|
})
|
|
|
|
return detections
|
|
|
|
def _detect_pii_with_regex(self, column):
|
|
"""Fallback PII detection using regex patterns"""
|
|
detections = []
|
|
|
|
# Sample values
|
|
sample_size = min(100, len(self.df))
|
|
samples = self.df[column].dropna().sample(min(sample_size, len(self.df[column].dropna()))).astype(str)
|
|
|
|
for pii_type, pattern in self.PII_PATTERNS.items():
|
|
matches = 0
|
|
for value in samples:
|
|
if re.search(pattern, str(value), re.IGNORECASE):
|
|
matches += 1
|
|
|
|
# If pattern matches >15% of samples, mark as PII
|
|
if matches > sample_size * 0.15:
|
|
severity = self._determine_pii_severity(pii_type)
|
|
detections.append({
|
|
'column': column,
|
|
'type': pii_type,
|
|
'severity': severity,
|
|
'detection_method': 'regex',
|
|
'confidence': min(matches / sample_size, 1.0),
|
|
'occurrences': matches
|
|
})
|
|
|
|
return detections
|
|
|
|
def _determine_pii_severity(self, pii_type):
|
|
"""Determine severity level for PII type"""
|
|
critical = ['SSN', 'US_SSN', 'CREDIT_CARD', 'US_BANK_NUMBER', 'PASSPORT',
|
|
'US_PASSPORT', 'MEDICAL_LICENSE', 'MEDICAL_RECORD', 'CRYPTO']
|
|
high = ['EMAIL_ADDRESS', 'PHONE_NUMBER', 'US_DRIVER_LICENSE', 'BANK_ACCOUNT',
|
|
'IBAN_CODE', 'UK_NHS', 'AU_TFN', 'AU_MEDICARE']
|
|
medium = ['PERSON', 'LOCATION', 'IP_ADDRESS', 'ADDRESS', 'DOB']
|
|
|
|
pii_upper = pii_type.upper()
|
|
if pii_upper in critical or any(c in pii_upper for c in critical):
|
|
return 'CRITICAL'
|
|
elif pii_upper in high or any(h in pii_upper for h in high):
|
|
return 'HIGH'
|
|
elif pii_upper in medium or any(m in pii_upper for m in medium):
|
|
return 'MEDIUM'
|
|
else:
|
|
return 'LOW'
|
|
|
|
def _calculate_reidentification_risk(self, pii_detections):
|
|
"""Calculate risk of re-identifying individuals"""
|
|
# Count quasi-identifiers
|
|
quasi_identifiers = ['AGE', 'ZIP_CODE', 'GENDER', 'DOB', 'LOCATION']
|
|
quasi_id_count = sum(1 for pii in pii_detections
|
|
if any(qi in pii['type'].upper() for qi in quasi_identifiers))
|
|
|
|
# Direct identifiers
|
|
direct_identifiers = ['SSN', 'EMAIL', 'PHONE', 'NAME', 'PASSPORT']
|
|
direct_id_count = sum(1 for pii in pii_detections
|
|
if any(di in pii['type'].upper() for di in direct_identifiers))
|
|
|
|
# Calculate risk
|
|
if direct_id_count > 0:
|
|
return 1.0 # Very high risk with direct identifiers
|
|
elif quasi_id_count >= 3:
|
|
return 0.8 # High risk with multiple quasi-identifiers
|
|
elif quasi_id_count >= 2:
|
|
return 0.5 # Medium risk
|
|
elif quasi_id_count >= 1:
|
|
return 0.3 # Low-medium risk
|
|
else:
|
|
return 0.1 # Low risk
|
|
|
|
def _analyze_data_minimization(self):
|
|
"""Assess if data collection follows minimization principle"""
|
|
total_columns = len(self.df.columns)
|
|
# Assume target + 1-2 protected attributes + 5-10 features is reasonable
|
|
expected_min = 7
|
|
expected_max = 15
|
|
|
|
if total_columns <= expected_max:
|
|
score = 1.0
|
|
elif total_columns <= expected_max * 1.5:
|
|
score = 0.7
|
|
elif total_columns <= expected_max * 2:
|
|
score = 0.4
|
|
else:
|
|
score = 0.2
|
|
|
|
return score
|
|
|
|
def _assess_anonymization(self, pii_detections):
|
|
"""Assess anonymization level"""
|
|
critical_pii = [p for p in pii_detections if p['severity'] == 'CRITICAL']
|
|
high_pii = [p for p in pii_detections if p['severity'] == 'HIGH']
|
|
|
|
if len(critical_pii) > 0:
|
|
return 'NONE'
|
|
elif len(high_pii) > 2:
|
|
return 'NONE'
|
|
elif len(pii_detections) > 5:
|
|
return 'PARTIAL'
|
|
elif len(pii_detections) > 0:
|
|
return 'PARTIAL'
|
|
else:
|
|
return 'FULL'
|
|
|
|
def _analyze_group_privacy_risks(self, pii_detections):
|
|
"""Analyze privacy risks per demographic group"""
|
|
group_risks = []
|
|
|
|
for attr in self.protected_attributes:
|
|
if attr in self.df.columns:
|
|
groups = self.df[attr].unique()
|
|
for group in groups:
|
|
if pd.notna(group):
|
|
group_size = len(self.df[self.df[attr] == group])
|
|
|
|
# K-anonymity check: groups with <5 members at high risk
|
|
if group_size < 5:
|
|
group_risks.append({
|
|
'attribute': attr,
|
|
'group': str(group),
|
|
'size': int(group_size),
|
|
'risk': 'CRITICAL',
|
|
'issue': f'Group too small (n={group_size}) - re-identification risk'
|
|
})
|
|
elif group_size < 10:
|
|
group_risks.append({
|
|
'attribute': attr,
|
|
'group': str(group),
|
|
'size': int(group_size),
|
|
'risk': 'HIGH',
|
|
'issue': f'Small group size (n={group_size}) - elevated privacy risk'
|
|
})
|
|
|
|
return group_risks
|
|
|
|
def _generate_privacy_recommendations(self, pii_detections, reidentification_risk, anonymization_level):
|
|
"""Generate privacy recommendations"""
|
|
recommendations = []
|
|
|
|
if anonymization_level == 'NONE':
|
|
recommendations.append({
|
|
'priority': 'CRITICAL',
|
|
'recommendation': 'Implement data anonymization techniques (k-anonymity, l-diversity, t-closeness)',
|
|
'rationale': 'High volume of PII detected without anonymization'
|
|
})
|
|
|
|
if reidentification_risk > 0.7:
|
|
recommendations.append({
|
|
'priority': 'CRITICAL',
|
|
'recommendation': 'Remove or hash direct identifiers (SSN, email, phone numbers)',
|
|
'rationale': 'High re-identification risk from direct identifiers'
|
|
})
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': 'Apply differential privacy techniques to protect individual records',
|
|
'rationale': 'Prevent inference attacks on individual data points'
|
|
})
|
|
|
|
critical_pii = [p for p in pii_detections if p['severity'] == 'CRITICAL']
|
|
if len(critical_pii) > 0:
|
|
recommendations.append({
|
|
'priority': 'CRITICAL',
|
|
'recommendation': f"Remove or encrypt critical PII: {', '.join(set(p['type'] for p in critical_pii))}",
|
|
'rationale': 'Critical PII types detected that pose severe privacy risks'
|
|
})
|
|
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': 'Implement data encryption at rest and in transit',
|
|
'rationale': 'Protect sensitive data from unauthorized access'
|
|
})
|
|
|
|
recommendations.append({
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Establish data retention and deletion policies',
|
|
'rationale': 'Minimize privacy risk by limiting data lifecycle'
|
|
})
|
|
|
|
recommendations.append({
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Conduct regular privacy impact assessments (PIA)',
|
|
'rationale': 'Continuous monitoring of privacy risks'
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _analyze_ethical_risks_enhanced(self):
|
|
"""Enhanced ethical risk analysis"""
|
|
print("⏳ Analyzing ethical risks...")
|
|
|
|
# Extract bias information
|
|
bias_score = self.bias_results.get('overall_bias_score', 0.0)
|
|
fairness_violations = self.bias_results.get('fairness_violations', [])
|
|
|
|
# Transparency assessment
|
|
transparency_score = self._assess_transparency()
|
|
|
|
# Accountability measures
|
|
accountability_score = self._assess_accountability()
|
|
|
|
# Autonomy and consent
|
|
autonomy_score = self._assess_autonomy()
|
|
|
|
# Social impact
|
|
social_impact_risk = self._assess_social_impact(bias_score)
|
|
|
|
# Calculate ethical risk score
|
|
ethical_risk_score = (
|
|
bias_score * 0.35 + # Fairness is most important
|
|
(1 - transparency_score) * 0.25 +
|
|
(1 - accountability_score) * 0.20 +
|
|
(1 - autonomy_score) * 0.10 +
|
|
social_impact_risk * 0.10
|
|
)
|
|
|
|
return {
|
|
'risk_score': ethical_risk_score,
|
|
'fairness_issues': fairness_violations,
|
|
'bias_score': bias_score,
|
|
'transparency_score': transparency_score,
|
|
'accountability_score': accountability_score,
|
|
'autonomy_score': autonomy_score,
|
|
'social_impact_risk': social_impact_risk,
|
|
'affected_groups': self.protected_attributes,
|
|
'recommendations': self._generate_ethical_recommendations(
|
|
bias_score, transparency_score, accountability_score
|
|
)
|
|
}
|
|
|
|
def _assess_transparency(self):
|
|
"""Assess model transparency"""
|
|
model_type = self.model_results.get('model_type', 'Unknown')
|
|
|
|
# Interpretable models
|
|
if model_type in ['LogisticRegression', 'DecisionTreeClassifier', 'LinearRegression']:
|
|
return 0.9
|
|
# Partially interpretable
|
|
elif model_type in ['RandomForestClassifier', 'GradientBoostingClassifier', 'XGBClassifier']:
|
|
return 0.6
|
|
# Black box models
|
|
elif model_type in ['MLPClassifier', 'SVC', 'KNeighborsClassifier']:
|
|
return 0.3
|
|
else:
|
|
return 0.5
|
|
|
|
def _assess_accountability(self):
|
|
"""Assess accountability measures"""
|
|
# This would check for logging, versioning, audit trails
|
|
# For now, return moderate score
|
|
return 0.6
|
|
|
|
def _assess_autonomy(self):
|
|
"""Assess respect for autonomy and consent"""
|
|
# Would check for consent mechanisms, opt-out options
|
|
# For now, return moderate score
|
|
return 0.5
|
|
|
|
def _assess_social_impact(self, bias_score):
|
|
"""Assess potential social impact"""
|
|
# High bias = high social impact risk
|
|
if bias_score > 0.7:
|
|
return 0.9
|
|
elif bias_score > 0.5:
|
|
return 0.7
|
|
elif bias_score > 0.3:
|
|
return 0.4
|
|
else:
|
|
return 0.2
|
|
|
|
def _generate_ethical_recommendations(self, bias_score, transparency_score, accountability_score):
|
|
"""Generate ethical recommendations"""
|
|
recommendations = []
|
|
|
|
if bias_score > 0.5:
|
|
recommendations.append({
|
|
'priority': 'CRITICAL',
|
|
'recommendation': 'Address fairness violations immediately - implement bias mitigation techniques',
|
|
'rationale': f'High bias score ({bias_score:.1%}) indicates significant fairness issues'
|
|
})
|
|
|
|
if transparency_score < 0.5:
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': 'Implement explainable AI techniques (SHAP, LIME) for model interpretability',
|
|
'rationale': 'Low transparency score - users cannot understand model decisions'
|
|
})
|
|
|
|
if accountability_score < 0.6:
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': 'Establish model governance framework with versioning, logging, and audit trails',
|
|
'rationale': 'Insufficient accountability mechanisms in place'
|
|
})
|
|
|
|
recommendations.append({
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Create ethics review board for model deployment and monitoring',
|
|
'rationale': 'Ensure ongoing ethical oversight'
|
|
})
|
|
|
|
recommendations.append({
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Implement feedback mechanisms for affected individuals',
|
|
'rationale': 'Allow users to contest decisions and provide input'
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _analyze_compliance_risks_enhanced(self):
|
|
"""Enhanced compliance risk analysis"""
|
|
print("⏳ Analyzing compliance risks...")
|
|
|
|
# GDPR compliance
|
|
gdpr_compliance = self._assess_gdpr_compliance()
|
|
|
|
# CCPA compliance
|
|
ccpa_compliance = self._assess_ccpa_compliance()
|
|
|
|
# HIPAA (if healthcare data)
|
|
hipaa_compliance = self._assess_hipaa_compliance()
|
|
|
|
# Equal Credit Opportunity Act (if credit/lending)
|
|
ecoa_compliance = self._assess_ecoa_compliance()
|
|
|
|
# Calculate compliance risk score
|
|
compliance_scores = [
|
|
gdpr_compliance['score'],
|
|
ccpa_compliance['score'],
|
|
hipaa_compliance['score'],
|
|
ecoa_compliance['score']
|
|
]
|
|
compliance_risk_score = 1 - (sum(compliance_scores) / len(compliance_scores))
|
|
|
|
return {
|
|
'risk_score': compliance_risk_score,
|
|
'gdpr': gdpr_compliance,
|
|
'ccpa': ccpa_compliance,
|
|
'hipaa': hipaa_compliance,
|
|
'ecoa': ecoa_compliance,
|
|
'recommendations': self._generate_compliance_recommendations(
|
|
gdpr_compliance, ccpa_compliance, hipaa_compliance, ecoa_compliance
|
|
)
|
|
}
|
|
|
|
def _assess_gdpr_compliance(self):
|
|
"""Assess GDPR compliance"""
|
|
checks = {
|
|
'data_minimization': len(self.df.columns) < 20, # Simplified check
|
|
'purpose_limitation': False, # Cannot determine from data
|
|
'storage_limitation': False, # Cannot determine
|
|
'right_to_access': True, # Data is accessible
|
|
'right_to_erasure': False, # Cannot determine
|
|
'data_portability': True, # CSV format
|
|
'consent': False, # Cannot determine
|
|
}
|
|
|
|
score = sum(checks.values()) / len(checks)
|
|
issues = [k for k, v in checks.items() if not v]
|
|
|
|
return {
|
|
'score': score,
|
|
'compliant_checks': [k for k, v in checks.items() if v],
|
|
'non_compliant_checks': issues,
|
|
'status': 'COMPLIANT' if score > 0.7 else 'PARTIAL' if score > 0.4 else 'NON_COMPLIANT'
|
|
}
|
|
|
|
def _assess_ccpa_compliance(self):
|
|
"""Assess CCPA compliance"""
|
|
checks = {
|
|
'notice_at_collection': False,
|
|
'right_to_know': True,
|
|
'right_to_delete': False,
|
|
'right_to_opt_out': False,
|
|
'non_discrimination': True
|
|
}
|
|
|
|
score = sum(checks.values()) / len(checks)
|
|
|
|
return {
|
|
'score': score,
|
|
'compliant_checks': [k for k, v in checks.items() if v],
|
|
'non_compliant_checks': [k for k, v in checks.items() if not v],
|
|
'status': 'COMPLIANT' if score > 0.7 else 'PARTIAL' if score > 0.4 else 'NON_COMPLIANT'
|
|
}
|
|
|
|
def _assess_hipaa_compliance(self):
|
|
"""Assess HIPAA compliance (if healthcare data)"""
|
|
# Check for health-related PII
|
|
health_indicators = ['medical', 'health', 'diagnosis', 'treatment', 'prescription', 'mrn']
|
|
has_health_data = any(any(ind in col.lower() for ind in health_indicators) for col in self.df.columns)
|
|
|
|
if not has_health_data:
|
|
return {'score': 1.0, 'applicable': False, 'status': 'NOT_APPLICABLE'}
|
|
|
|
checks = {
|
|
'encryption': False,
|
|
'access_controls': False,
|
|
'audit_trails': False,
|
|
'breach_notification': False
|
|
}
|
|
|
|
score = sum(checks.values()) / len(checks)
|
|
|
|
return {
|
|
'score': score,
|
|
'applicable': True,
|
|
'compliant_checks': [k for k, v in checks.items() if v],
|
|
'non_compliant_checks': [k for k, v in checks.items() if not v],
|
|
'status': 'COMPLIANT' if score > 0.7 else 'PARTIAL' if score > 0.4 else 'NON_COMPLIANT'
|
|
}
|
|
|
|
def _assess_ecoa_compliance(self):
|
|
"""Assess Equal Credit Opportunity Act compliance"""
|
|
# Check if this is credit/lending data
|
|
credit_indicators = ['credit', 'loan', 'lending', 'mortgage', 'debt', 'income']
|
|
is_credit_data = any(any(ind in col.lower() for ind in credit_indicators) for col in self.df.columns)
|
|
|
|
if not is_credit_data:
|
|
return {'score': 1.0, 'applicable': False, 'status': 'NOT_APPLICABLE'}
|
|
|
|
# Check for prohibited basis discrimination
|
|
bias_score = self.bias_results.get('overall_bias_score', 0.0)
|
|
|
|
checks = {
|
|
'no_discrimination': bias_score < 0.3,
|
|
'adverse_action_notices': False, # Cannot determine
|
|
'record_retention': False, # Cannot determine
|
|
'monitoring': True # We're doing it now
|
|
}
|
|
|
|
score = sum(checks.values()) / len(checks)
|
|
|
|
return {
|
|
'score': score,
|
|
'applicable': True,
|
|
'bias_score': bias_score,
|
|
'compliant_checks': [k for k, v in checks.items() if v],
|
|
'non_compliant_checks': [k for k, v in checks.items() if not v],
|
|
'status': 'COMPLIANT' if score > 0.7 else 'PARTIAL' if score > 0.4 else 'NON_COMPLIANT'
|
|
}
|
|
|
|
def _generate_compliance_recommendations(self, gdpr, ccpa, hipaa, ecoa):
|
|
"""Generate compliance recommendations"""
|
|
recommendations = []
|
|
|
|
if gdpr['status'] != 'COMPLIANT':
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': f"Address GDPR non-compliance: {', '.join(gdpr['non_compliant_checks'])}",
|
|
'rationale': 'GDPR violations can result in significant fines'
|
|
})
|
|
|
|
if ccpa['status'] != 'COMPLIANT':
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': f"Address CCPA requirements: {', '.join(ccpa['non_compliant_checks'])}",
|
|
'rationale': 'CCPA compliance required for California residents'
|
|
})
|
|
|
|
if hipaa.get('applicable') and hipaa['status'] != 'COMPLIANT':
|
|
recommendations.append({
|
|
'priority': 'CRITICAL',
|
|
'recommendation': f"Implement HIPAA safeguards: {', '.join(hipaa['non_compliant_checks'])}",
|
|
'rationale': 'Healthcare data requires stringent HIPAA protections'
|
|
})
|
|
|
|
if ecoa.get('applicable') and ecoa['status'] != 'COMPLIANT':
|
|
recommendations.append({
|
|
'priority': 'CRITICAL',
|
|
'recommendation': 'Address discriminatory patterns in credit decisions',
|
|
'rationale': 'ECOA violations in lending can result in legal action'
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _analyze_security_risks(self):
|
|
"""Analyze security risks"""
|
|
print("⏳ Analyzing security risks...")
|
|
|
|
# Adversarial attack vulnerability
|
|
model_vulnerability = self._assess_adversarial_vulnerability()
|
|
|
|
# Data poisoning risk
|
|
poisoning_risk = self._assess_data_poisoning_risk()
|
|
|
|
# Model extraction risk
|
|
extraction_risk = self._assess_model_extraction_risk()
|
|
|
|
# Membership inference risk
|
|
membership_risk = self._assess_membership_inference_risk()
|
|
|
|
security_risk_score = (
|
|
model_vulnerability * 0.3 +
|
|
poisoning_risk * 0.25 +
|
|
extraction_risk * 0.25 +
|
|
membership_risk * 0.20
|
|
)
|
|
|
|
return {
|
|
'risk_score': security_risk_score,
|
|
'adversarial_vulnerability': model_vulnerability,
|
|
'data_poisoning_risk': poisoning_risk,
|
|
'model_extraction_risk': extraction_risk,
|
|
'membership_inference_risk': membership_risk,
|
|
'recommendations': self._generate_security_recommendations(
|
|
model_vulnerability, poisoning_risk
|
|
)
|
|
}
|
|
|
|
def _assess_adversarial_vulnerability(self):
|
|
"""Assess vulnerability to adversarial attacks"""
|
|
model_type = self.model_results.get('model_type', 'Unknown')
|
|
|
|
# Deep learning models more vulnerable
|
|
if 'Neural' in model_type or 'MLP' in model_type:
|
|
return 0.8
|
|
# Tree-based models more robust
|
|
elif 'Tree' in model_type or 'Forest' in model_type:
|
|
return 0.3
|
|
# Linear models moderately robust
|
|
elif 'Linear' in model_type or 'Logistic' in model_type:
|
|
return 0.5
|
|
else:
|
|
return 0.6
|
|
|
|
def _assess_data_poisoning_risk(self):
|
|
"""Assess risk of data poisoning attacks"""
|
|
# Check data quality indicators
|
|
missing_pct = self.df.isnull().sum().sum() / (len(self.df) * len(self.df.columns))
|
|
|
|
if missing_pct > 0.2:
|
|
return 0.7 # High missing data = higher risk
|
|
elif missing_pct > 0.1:
|
|
return 0.5
|
|
else:
|
|
return 0.3
|
|
|
|
def _assess_model_extraction_risk(self):
|
|
"""Assess risk of model extraction attacks"""
|
|
# Simple models easier to extract
|
|
model_type = self.model_results.get('model_type', 'Unknown')
|
|
|
|
if 'Linear' in model_type or 'Logistic' in model_type:
|
|
return 0.7
|
|
elif 'Tree' in model_type:
|
|
return 0.6
|
|
else:
|
|
return 0.5
|
|
|
|
def _assess_membership_inference_risk(self):
|
|
"""Assess membership inference attack risk"""
|
|
# Models that overfit are more vulnerable
|
|
train_acc = self.model_results.get('train_accuracy', 0)
|
|
test_acc = self.model_results.get('accuracy', 0)
|
|
|
|
if train_acc - test_acc > 0.15: # Overfitting
|
|
return 0.8
|
|
elif train_acc - test_acc > 0.10:
|
|
return 0.6
|
|
else:
|
|
return 0.4
|
|
|
|
def _generate_security_recommendations(self, adversarial_vuln, poisoning_risk):
|
|
"""Generate security recommendations"""
|
|
recommendations = []
|
|
|
|
if adversarial_vuln > 0.6:
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': 'Implement adversarial training and input validation',
|
|
'rationale': 'Model vulnerable to adversarial attacks'
|
|
})
|
|
|
|
if poisoning_risk > 0.6:
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': 'Implement data validation and anomaly detection in training pipeline',
|
|
'rationale': 'Training data vulnerable to poisoning attacks'
|
|
})
|
|
|
|
recommendations.append({
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Implement model access controls and rate limiting',
|
|
'rationale': 'Prevent model extraction attacks'
|
|
})
|
|
|
|
recommendations.append({
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Add differential privacy to model training',
|
|
'rationale': 'Protect against membership inference attacks'
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _analyze_operational_risks(self):
|
|
"""Analyze operational risks"""
|
|
print("⏳ Analyzing operational risks...")
|
|
|
|
# Model performance degradation risk
|
|
performance_risk = self._assess_performance_degradation_risk()
|
|
|
|
# Data drift risk
|
|
drift_risk = self._assess_data_drift_risk()
|
|
|
|
# Scalability risk
|
|
scalability_risk = self._assess_scalability_risk()
|
|
|
|
operational_risk_score = (
|
|
performance_risk * 0.4 +
|
|
drift_risk * 0.4 +
|
|
scalability_risk * 0.2
|
|
)
|
|
|
|
return {
|
|
'risk_score': operational_risk_score,
|
|
'performance_degradation_risk': performance_risk,
|
|
'data_drift_risk': drift_risk,
|
|
'scalability_risk': scalability_risk,
|
|
'recommendations': self._generate_operational_recommendations()
|
|
}
|
|
|
|
def _assess_performance_degradation_risk(self):
|
|
"""Assess risk of performance degradation over time"""
|
|
# Check model accuracy
|
|
accuracy = self.model_results.get('accuracy', 0)
|
|
|
|
if accuracy < 0.7:
|
|
return 0.8 # Already low performance
|
|
elif accuracy < 0.8:
|
|
return 0.5
|
|
else:
|
|
return 0.3
|
|
|
|
def _assess_data_drift_risk(self):
|
|
"""Assess risk of data drift"""
|
|
# Would need historical data for proper assessment
|
|
# For now, return moderate risk
|
|
return 0.5
|
|
|
|
def _assess_scalability_risk(self):
|
|
"""Assess scalability risk"""
|
|
model_type = self.model_results.get('model_type', 'Unknown')
|
|
dataset_size = len(self.df)
|
|
|
|
# KNN doesn't scale well
|
|
if 'KNeighbors' in model_type:
|
|
return 0.7
|
|
# Tree-based scale reasonably
|
|
elif 'Tree' in model_type or 'Forest' in model_type:
|
|
return 0.4
|
|
# Linear models scale well
|
|
elif 'Linear' in model_type or 'Logistic' in model_type:
|
|
return 0.2
|
|
else:
|
|
return 0.5
|
|
|
|
def _generate_operational_recommendations(self):
|
|
"""Generate operational recommendations"""
|
|
return [
|
|
{
|
|
'priority': 'HIGH',
|
|
'recommendation': 'Implement continuous monitoring for model performance and data drift',
|
|
'rationale': 'Detect degradation early'
|
|
},
|
|
{
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Establish model retraining pipeline and schedule',
|
|
'rationale': 'Maintain model accuracy over time'
|
|
},
|
|
{
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Set up alerting for performance drops below threshold',
|
|
'rationale': 'Enable rapid response to issues'
|
|
}
|
|
]
|
|
|
|
def _analyze_data_quality_risks_enhanced(self):
|
|
"""Enhanced data quality risk analysis"""
|
|
print("⏳ Analyzing data quality risks...")
|
|
|
|
# Missing data
|
|
missing_pct = self.df.isnull().sum().sum() / (len(self.df) * len(self.df.columns))
|
|
|
|
# Data completeness
|
|
completeness_score = 1 - missing_pct
|
|
|
|
# Data consistency
|
|
consistency_score = self._assess_data_consistency()
|
|
|
|
# Data accuracy proxy
|
|
accuracy_score = self._assess_data_accuracy()
|
|
|
|
# Sample size adequacy
|
|
sample_size_score = self._assess_sample_size()
|
|
|
|
data_quality_risk_score = (
|
|
(1 - completeness_score) * 0.3 +
|
|
(1 - consistency_score) * 0.25 +
|
|
(1 - accuracy_score) * 0.25 +
|
|
(1 - sample_size_score) * 0.20
|
|
)
|
|
|
|
return {
|
|
'risk_score': data_quality_risk_score,
|
|
'completeness_score': completeness_score,
|
|
'consistency_score': consistency_score,
|
|
'accuracy_score': accuracy_score,
|
|
'sample_size_score': sample_size_score,
|
|
'missing_percentage': missing_pct,
|
|
'total_records': len(self.df),
|
|
'recommendations': self._generate_data_quality_recommendations(
|
|
completeness_score, consistency_score, sample_size_score
|
|
)
|
|
}
|
|
|
|
def _assess_data_consistency(self):
|
|
"""Assess data consistency"""
|
|
inconsistencies = 0
|
|
total_checks = 0
|
|
|
|
for col in self.df.select_dtypes(include=['object']).columns:
|
|
total_checks += 1
|
|
unique_count = self.df[col].nunique()
|
|
total_count = len(self.df[col].dropna())
|
|
|
|
# High cardinality in categorical = potential inconsistency
|
|
if total_count > 0 and unique_count / total_count > 0.8:
|
|
inconsistencies += 1
|
|
|
|
if total_checks == 0:
|
|
return 1.0
|
|
|
|
return 1 - (inconsistencies / total_checks)
|
|
|
|
def _assess_data_accuracy(self):
|
|
"""Assess data accuracy (proxy measures)"""
|
|
accuracy_indicators = []
|
|
|
|
# Check for outliers in numerical columns
|
|
for col in self.df.select_dtypes(include=[np.number]).columns:
|
|
Q1 = self.df[col].quantile(0.25)
|
|
Q3 = self.df[col].quantile(0.75)
|
|
IQR = Q3 - Q1
|
|
outliers = ((self.df[col] < (Q1 - 3 * IQR)) | (self.df[col] > (Q3 + 3 * IQR))).sum()
|
|
outlier_pct = outliers / len(self.df)
|
|
accuracy_indicators.append(1 - min(outlier_pct, 0.5))
|
|
|
|
if len(accuracy_indicators) == 0:
|
|
return 0.7 # Default moderate score
|
|
|
|
return np.mean(accuracy_indicators)
|
|
|
|
def _assess_sample_size(self):
|
|
"""Assess if sample size is adequate"""
|
|
n = len(self.df)
|
|
n_features = len(self.df.columns) - 1 # Exclude target
|
|
|
|
# Rule of thumb: 10-20 samples per feature
|
|
min_required = n_features * 10
|
|
ideal_required = n_features * 20
|
|
|
|
if n >= ideal_required:
|
|
return 1.0
|
|
elif n >= min_required:
|
|
return 0.7
|
|
elif n >= min_required * 0.5:
|
|
return 0.4
|
|
else:
|
|
return 0.2
|
|
|
|
def _generate_data_quality_recommendations(self, completeness, consistency, sample_size):
|
|
"""Generate data quality recommendations"""
|
|
recommendations = []
|
|
|
|
if completeness < 0.8:
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': 'Address missing data through imputation or removal',
|
|
'rationale': f'Low data completeness ({completeness:.1%})'
|
|
})
|
|
|
|
if consistency < 0.7:
|
|
recommendations.append({
|
|
'priority': 'HIGH',
|
|
'recommendation': 'Standardize data formats and values',
|
|
'rationale': 'Inconsistent data detected'
|
|
})
|
|
|
|
if sample_size < 0.6:
|
|
recommendations.append({
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Collect more data samples',
|
|
'rationale': 'Sample size may be inadequate for reliable modeling'
|
|
})
|
|
|
|
recommendations.append({
|
|
'priority': 'MEDIUM',
|
|
'recommendation': 'Implement data validation rules and checks',
|
|
'rationale': 'Prevent future data quality issues'
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _analyze_model_performance_risks(self):
|
|
"""Analyze risks related to model performance"""
|
|
performance_risks = {
|
|
'accuracy_risk': 'UNKNOWN',
|
|
'precision_risk': 'UNKNOWN',
|
|
'recall_risk': 'UNKNOWN',
|
|
'overfitting_risk': 'UNKNOWN',
|
|
'underfitting_risk': 'UNKNOWN',
|
|
'recommendations': []
|
|
}
|
|
|
|
accuracy = self.model_results.get('accuracy', 0)
|
|
precision = self.model_results.get('precision', 0)
|
|
recall = self.model_results.get('recall', 0)
|
|
train_accuracy = self.model_results.get('train_accuracy', accuracy)
|
|
|
|
# Accuracy risk
|
|
if accuracy < 0.7:
|
|
performance_risks['accuracy_risk'] = 'HIGH'
|
|
performance_risks['recommendations'].append("Model accuracy is low - consider feature engineering or model selection")
|
|
elif accuracy < 0.8:
|
|
performance_risks['accuracy_risk'] = 'MEDIUM'
|
|
else:
|
|
performance_risks['accuracy_risk'] = 'LOW'
|
|
|
|
# Precision risk
|
|
if precision < 0.7:
|
|
performance_risks['precision_risk'] = 'HIGH'
|
|
performance_risks['recommendations'].append("Low precision - high false positive rate")
|
|
elif precision < 0.8:
|
|
performance_risks['precision_risk'] = 'MEDIUM'
|
|
else:
|
|
performance_risks['precision_risk'] = 'LOW'
|
|
|
|
# Recall risk
|
|
if recall < 0.7:
|
|
performance_risks['recall_risk'] = 'HIGH'
|
|
performance_risks['recommendations'].append("Low recall - high false negative rate")
|
|
elif recall < 0.8:
|
|
performance_risks['recall_risk'] = 'MEDIUM'
|
|
else:
|
|
performance_risks['recall_risk'] = 'LOW'
|
|
|
|
# Overfitting risk
|
|
if train_accuracy - accuracy > 0.15:
|
|
performance_risks['overfitting_risk'] = 'HIGH'
|
|
performance_risks['recommendations'].append("Model shows signs of overfitting - consider regularization")
|
|
elif train_accuracy - accuracy > 0.10:
|
|
performance_risks['overfitting_risk'] = 'MEDIUM'
|
|
else:
|
|
performance_risks['overfitting_risk'] = 'LOW'
|
|
|
|
# Underfitting risk
|
|
if train_accuracy < 0.75:
|
|
performance_risks['underfitting_risk'] = 'HIGH'
|
|
performance_risks['recommendations'].append("Model may be underfitting - consider more complex model or feature engineering")
|
|
elif train_accuracy < 0.85:
|
|
performance_risks['underfitting_risk'] = 'MEDIUM'
|
|
else:
|
|
performance_risks['underfitting_risk'] = 'LOW'
|
|
|
|
if not performance_risks['recommendations']:
|
|
performance_risks['recommendations'].append("Model performance is acceptable - continue monitoring")
|
|
|
|
return performance_risks
|
|
|
|
def _calculate_weighted_risk_score(self, category_scores):
|
|
"""Calculate weighted overall risk score"""
|
|
# Weights for each category
|
|
weights = {
|
|
'privacy': 0.25,
|
|
'ethical': 0.25,
|
|
'compliance': 0.20,
|
|
'security': 0.15,
|
|
'operational': 0.08,
|
|
'data_quality': 0.07
|
|
}
|
|
|
|
weighted_score = sum(category_scores.get(cat, 0) * weight
|
|
for cat, weight in weights.items())
|
|
|
|
return weighted_score
|
|
|
|
def _classify_risk_level(self, risk_score):
|
|
"""Classify overall risk level"""
|
|
if risk_score >= 0.7:
|
|
return 'CRITICAL'
|
|
elif risk_score >= 0.5:
|
|
return 'HIGH'
|
|
elif risk_score >= 0.3:
|
|
return 'MEDIUM'
|
|
else:
|
|
return 'LOW'
|
|
|
|
def _detect_all_violations(self, privacy, ethical, compliance, security, operational, data_quality):
|
|
"""Detect all risk violations across categories"""
|
|
violations = []
|
|
|
|
# Privacy violations
|
|
if privacy['risk_score'] > 0.7:
|
|
violations.append({
|
|
'category': 'privacy',
|
|
'severity': 'CRITICAL' if privacy['risk_score'] > 0.8 else 'HIGH',
|
|
'message': f"High privacy risk detected ({privacy['risk_score']:.1%})",
|
|
'details': f"{privacy['pii_count']} PII types found, {privacy['anonymization_level']} anonymization"
|
|
})
|
|
|
|
critical_pii = [p for p in privacy['pii_detected'] if p['severity'] == 'CRITICAL']
|
|
if len(critical_pii) > 0:
|
|
violations.append({
|
|
'category': 'privacy',
|
|
'severity': 'CRITICAL',
|
|
'message': 'Critical PII types detected without protection',
|
|
'details': f"Types: {', '.join(set(p['type'] for p in critical_pii))}"
|
|
})
|
|
|
|
# Ethical violations
|
|
if ethical['risk_score'] > 0.6:
|
|
violations.append({
|
|
'category': 'ethical',
|
|
'severity': 'HIGH' if ethical['risk_score'] > 0.7 else 'MEDIUM',
|
|
'message': f"Ethical concerns identified ({ethical['risk_score']:.1%})",
|
|
'details': f"Bias score: {ethical['bias_score']:.1%}, Transparency: {ethical['transparency_score']:.1%}"
|
|
})
|
|
|
|
# Compliance violations
|
|
compliance_issues = []
|
|
if compliance['gdpr']['status'] != 'COMPLIANT':
|
|
compliance_issues.append('GDPR')
|
|
if compliance['ccpa']['status'] != 'COMPLIANT':
|
|
compliance_issues.append('CCPA')
|
|
if compliance.get('hipaa', {}).get('applicable') and compliance['hipaa']['status'] != 'COMPLIANT':
|
|
compliance_issues.append('HIPAA')
|
|
if compliance.get('ecoa', {}).get('applicable') and compliance['ecoa']['status'] != 'COMPLIANT':
|
|
compliance_issues.append('ECOA')
|
|
|
|
if compliance_issues:
|
|
violations.append({
|
|
'category': 'compliance',
|
|
'severity': 'HIGH',
|
|
'message': 'Compliance violations detected',
|
|
'details': f"Non-compliant regulations: {', '.join(compliance_issues)}"
|
|
})
|
|
|
|
# Security violations
|
|
if security['risk_score'] > 0.6:
|
|
violations.append({
|
|
'category': 'security',
|
|
'severity': 'HIGH',
|
|
'message': f"Security risks identified ({security['risk_score']:.1%})",
|
|
'details': f"Adversarial vulnerability: {security['adversarial_vulnerability']:.1%}"
|
|
})
|
|
|
|
# Data quality violations
|
|
if data_quality['risk_score'] > 0.5:
|
|
violations.append({
|
|
'category': 'data_quality',
|
|
'severity': 'MEDIUM',
|
|
'message': f"Data quality issues detected ({data_quality['risk_score']:.1%})",
|
|
'details': f"Completeness: {data_quality['completeness_score']:.1%}, Consistency: {data_quality['consistency_score']:.1%}"
|
|
})
|
|
|
|
return sorted(violations, key=lambda x: {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2, 'LOW': 3}[x['severity']])
|
|
|
|
def _generate_risk_insights(self, category_scores, violations, privacy, ethical):
|
|
"""Generate key risk insights"""
|
|
insights = []
|
|
|
|
# Overall risk insight
|
|
overall_risk = self._calculate_weighted_risk_score(category_scores)
|
|
insights.append(
|
|
f"Overall risk score: {overall_risk:.1%} ({self._classify_risk_level(overall_risk)} risk)"
|
|
)
|
|
|
|
# Highest risk category
|
|
if category_scores:
|
|
max_cat = max(category_scores.items(), key=lambda x: x[1])
|
|
insights.append(
|
|
f"Highest risk category: {max_cat[0].title()} ({max_cat[1]:.1%})"
|
|
)
|
|
|
|
# Critical violations
|
|
critical_violations = [v for v in violations if v['severity'] == 'CRITICAL']
|
|
if critical_violations:
|
|
insights.append(
|
|
f"{len(critical_violations)} CRITICAL violations require immediate attention"
|
|
)
|
|
|
|
# PII detection
|
|
if privacy['pii_count'] > 0:
|
|
insights.append(
|
|
f"{privacy['pii_count']} PII types detected using {privacy['detection_method']}"
|
|
)
|
|
|
|
# Bias impact
|
|
if ethical['bias_score'] > 0.5:
|
|
insights.append(
|
|
f"High bias score ({ethical['bias_score']:.1%}) indicates fairness concerns"
|
|
)
|
|
|
|
return insights
|
|
|
|
def _print_risk_summary(self):
|
|
"""Print risk analysis summary"""
|
|
print("\n" + "=" * 70)
|
|
print("RISK ANALYSIS SUMMARY")
|
|
print("=" * 70)
|
|
|
|
print(f"\n📊 Overall Risk: {self.results['overall_risk_score']:.1%} ({self.results['risk_level']})")
|
|
print(f"🔒 Presidio: {'Enabled' if self.results['presidio_enabled'] else 'Disabled'}")
|
|
|
|
print("\n📈 Category Scores:")
|
|
for category, score in self.results['risk_categories'].items():
|
|
emoji = "🔴" if score > 0.7 else "🟠" if score > 0.5 else "🟡" if score > 0.3 else "🟢"
|
|
print(f" {emoji} {category.title()}: {score:.1%}")
|
|
|
|
print(f"\n⚠️ Violations: {len(self.results['violations'])}")
|
|
for v in self.results['violations'][:5]: # Show top 5
|
|
print(f" • [{v['severity']}] {v['message']}")
|
|
|
|
print(f"\n💡 Key Insights:")
|
|
for insight in self.results['insights'][:5]:
|
|
print(f" • {insight}")
|
|
|
|
print("\n" + "=" * 70)
|