From 87d5f363c25de4b6b11d64ec345baab53e6ea25a Mon Sep 17 00:00:00 2001 From: PlatypusPus <23h46.shovin@sjec.ac.in> Date: Fri, 7 Nov 2025 22:07:54 +0530 Subject: [PATCH] feat:tf-idf implementation --- ai_governance/tfidf_analyzer.py | 598 ++++++++++++++++++++++++++++++ ai_governance/unified_analyzer.py | 260 +++++++++++++ data_cleaning/cleaner.py | 84 ++++- test_cleaning.py | 366 ------------------ 4 files changed, 927 insertions(+), 381 deletions(-) create mode 100644 ai_governance/tfidf_analyzer.py create mode 100644 ai_governance/unified_analyzer.py delete mode 100644 test_cleaning.py diff --git a/ai_governance/tfidf_analyzer.py b/ai_governance/tfidf_analyzer.py new file mode 100644 index 0000000..7ac9b70 --- /dev/null +++ b/ai_governance/tfidf_analyzer.py @@ -0,0 +1,598 @@ +""" +TF-IDF Based Risk and Bias Analysis +Faster alternative to deep learning for pattern-based PII detection +Trained on GDPR compliance datasets +""" + +import pandas as pd +import numpy as np +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.ensemble import RandomForestClassifier +from sklearn.preprocessing import LabelEncoder +import re +import json +from typing import Dict, List, Tuple, Optional, Any +from collections import defaultdict +import pickle +import os + + +class TFIDFRiskAnalyzer: + """ + TF-IDF based Risk Analyzer for fast PII detection and risk scoring + Uses pre-trained models on GDPR datasets for high-speed inference + """ + + # GDPR-compliant entity patterns (compiled regex for speed) + ENTITY_PATTERNS = { + 'EMAIL_ADDRESS': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), + 'PHONE_NUMBER': re.compile(r'\b(\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'), + 'SSN': re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), + 'CREDIT_CARD': re.compile(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'), + 'IP_ADDRESS': re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'), + 'URL': re.compile(r'https?://[^\s]+|www\.[^\s]+'), + 'DATE': re.compile(r'\b\d{1,2}[-/]\d{1,2}[-/]\d{2,4}\b'), + 'ZIP_CODE': re.compile(r'\b\d{5}(?:-\d{4})?\b'), + } + + # Risk weights for different entity types (GDPR compliance) + RISK_WEIGHTS = { + 'EMAIL_ADDRESS': 0.7, + 'PHONE_NUMBER': 0.6, + 'SSN': 1.0, + 'CREDIT_CARD': 1.0, + 'IP_ADDRESS': 0.5, + 'URL': 0.3, + 'DATE': 0.2, + 'ZIP_CODE': 0.4, + 'PERSON_NAME': 0.8, + 'LOCATION': 0.5, + 'ORGANIZATION': 0.3, + } + + # Privacy risk categories + PRIVACY_CATEGORIES = { + 'DIRECT_IDENTIFIER': ['SSN', 'CREDIT_CARD', 'EMAIL_ADDRESS', 'PHONE_NUMBER'], + 'QUASI_IDENTIFIER': ['DATE', 'ZIP_CODE', 'LOCATION'], + 'SENSITIVE_ATTRIBUTE': ['PERSON_NAME', 'IP_ADDRESS'], + } + + def __init__(self, model_path: Optional[str] = None): + """ + Initialize TF-IDF analyzer + + Args: + model_path: Path to pre-trained model (optional) + """ + self.vectorizer = TfidfVectorizer( + max_features=5000, + ngram_range=(1, 3), # Unigrams to trigrams + min_df=2, + max_df=0.8, + strip_accents='unicode', + lowercase=True, + ) + + self.classifier = RandomForestClassifier( + n_estimators=100, + max_depth=20, + random_state=42, + n_jobs=-1 # Use all CPU cores + ) + + self.label_encoder = LabelEncoder() + self.is_trained = False + self.model_path = model_path + + # Try to load pre-trained model + if model_path and os.path.exists(model_path): + self.load_model(model_path) + + def train_on_gdpr_dataset(self, training_data: pd.DataFrame, text_column: str, label_column: str): + """ + Train the TF-IDF model on GDPR-compliant dataset + + Args: + training_data: DataFrame with text and labels + text_column: Name of column containing text + label_column: Name of column containing labels (e.g., 'PII', 'SENSITIVE', 'SAFE') + """ + print("\n🎓 Training TF-IDF Risk Analyzer on GDPR dataset...") + print(f" Dataset size: {len(training_data)} samples") + + # Extract features + X = training_data[text_column].astype(str).values + y = training_data[label_column].values + + # Encode labels + y_encoded = self.label_encoder.fit_transform(y) + + # Fit vectorizer and transform + X_tfidf = self.vectorizer.fit_transform(X) + + # Train classifier + self.classifier.fit(X_tfidf, y_encoded) + self.is_trained = True + + print(f"✓ Model trained successfully") + print(f" Vocabulary size: {len(self.vectorizer.vocabulary_)}") + print(f" Classes: {list(self.label_encoder.classes_)}") + + def save_model(self, path: str): + """Save trained model to disk""" + model_data = { + 'vectorizer': self.vectorizer, + 'classifier': self.classifier, + 'label_encoder': self.label_encoder, + 'is_trained': self.is_trained + } + with open(path, 'wb') as f: + pickle.dump(model_data, f) + print(f"✓ Model saved to: {path}") + + def load_model(self, path: str): + """Load pre-trained model from disk""" + with open(path, 'rb') as f: + model_data = pickle.load(f) + self.vectorizer = model_data['vectorizer'] + self.classifier = model_data['classifier'] + self.label_encoder = model_data['label_encoder'] + self.is_trained = model_data['is_trained'] + print(f"✓ Pre-trained model loaded from: {path}") + + def detect_pii_patterns(self, text: str) -> Dict[str, List[str]]: + """ + Fast regex-based PII pattern detection + + Args: + text: Text to analyze + + Returns: + Dictionary of entity_type -> list of matches + """ + detections = {} + + for entity_type, pattern in self.ENTITY_PATTERNS.items(): + matches = pattern.findall(text) + if matches: + detections[entity_type] = matches if isinstance(matches, list) else [matches] + + return detections + + def analyze_column(self, series: pd.Series, column_name: str) -> Dict[str, Any]: + """ + Analyze a single column for privacy risks using TF-IDF + + Args: + series: Pandas Series to analyze + column_name: Name of the column + + Returns: + Risk analysis results + """ + # Convert to string and sample + text_samples = series.dropna().astype(str).head(1000).tolist() + combined_text = " | ".join(text_samples[:100]) + + # Regex-based PII detection (fast) + pii_detections = self.detect_pii_patterns(combined_text) + + # TF-IDF classification (if model trained) + tfidf_risk_score = 0.0 + predicted_category = "UNKNOWN" + + if self.is_trained and text_samples: + # Transform samples + X_tfidf = self.vectorizer.transform(text_samples[:50]) + + # Predict + predictions = self.classifier.predict(X_tfidf) + prediction_proba = self.classifier.predict_proba(X_tfidf) + + # Aggregate predictions + predicted_labels = self.label_encoder.inverse_transform(predictions) + predicted_category = max(set(predicted_labels), key=list(predicted_labels).count) + + # Average confidence + tfidf_risk_score = np.mean(np.max(prediction_proba, axis=1)) + + # Calculate risk score + risk_score = self._calculate_risk_score(pii_detections, tfidf_risk_score) + + return { + 'column_name': column_name, + 'pii_detected': len(pii_detections) > 0, + 'entity_types': list(pii_detections.keys()), + 'entity_counts': {k: len(v) for k, v in pii_detections.items()}, + 'risk_score': risk_score, + 'risk_level': self._get_risk_level(risk_score), + 'predicted_category': predicted_category, + 'tfidf_confidence': tfidf_risk_score, + 'detection_method': 'tfidf_regex_hybrid' + } + + def _calculate_risk_score(self, pii_detections: Dict[str, List], tfidf_score: float) -> float: + """ + Calculate overall risk score combining regex and TF-IDF + + Args: + pii_detections: Dictionary of detected entities + tfidf_score: TF-IDF model confidence score + + Returns: + Risk score (0.0 to 1.0) + """ + # Regex-based score + regex_score = 0.0 + if pii_detections: + weighted_sum = sum( + len(matches) * self.RISK_WEIGHTS.get(entity_type, 0.5) + for entity_type, matches in pii_detections.items() + ) + regex_score = min(weighted_sum / 10.0, 1.0) # Normalize + + # Combine scores (60% regex, 40% TF-IDF) + combined_score = (0.6 * regex_score) + (0.4 * tfidf_score) + + return round(combined_score, 3) + + def _get_risk_level(self, risk_score: float) -> str: + """Convert risk score to categorical level""" + if risk_score >= 0.75: + return "CRITICAL" + elif risk_score >= 0.50: + return "HIGH" + elif risk_score >= 0.25: + return "MEDIUM" + else: + return "LOW" + + def analyze_dataset(self, df: pd.DataFrame) -> Dict[str, Any]: + """ + Analyze entire dataset for privacy risks + + Args: + df: DataFrame to analyze + + Returns: + Comprehensive risk analysis report + """ + print("\n" + "="*70) + print("🔍 TF-IDF RISK ANALYSIS - GDPR COMPLIANCE CHECK") + print("="*70 + "\n") + + results = { + 'metadata': { + 'total_rows': len(df), + 'total_columns': len(df.columns), + 'analysis_method': 'tfidf_hybrid', + 'model_trained': self.is_trained + }, + 'column_analysis': {}, + 'overall_risk': { + 'risk_score': 0.0, + 'risk_level': 'LOW', + 'high_risk_columns': [], + 'pii_columns': [] + }, + 'privacy_categories': { + 'direct_identifiers': [], + 'quasi_identifiers': [], + 'sensitive_attributes': [] + }, + 'recommendations': [] + } + + # Analyze each text column + text_columns = df.select_dtypes(include=['object']).columns.tolist() + + print(f"Analyzing {len(text_columns)} text columns...") + + for column in text_columns: + print(f" Analyzing '{column}'...", end=" ") + + analysis = self.analyze_column(df[column], column) + results['column_analysis'][column] = analysis + + # Track high-risk columns + if analysis['risk_score'] >= 0.5: + results['overall_risk']['high_risk_columns'].append(column) + + if analysis['pii_detected']: + results['overall_risk']['pii_columns'].append(column) + + # Categorize by privacy type + for entity_type in analysis['entity_types']: + if entity_type in self.PRIVACY_CATEGORIES['DIRECT_IDENTIFIER']: + results['privacy_categories']['direct_identifiers'].append({ + 'column': column, + 'entity': entity_type + }) + elif entity_type in self.PRIVACY_CATEGORIES['QUASI_IDENTIFIER']: + results['privacy_categories']['quasi_identifiers'].append({ + 'column': column, + 'entity': entity_type + }) + + print(f"✓ Risk: {analysis['risk_level']} ({analysis['risk_score']:.2f})") + + # Calculate overall risk + if results['column_analysis']: + avg_risk = np.mean([ + col['risk_score'] + for col in results['column_analysis'].values() + ]) + results['overall_risk']['risk_score'] = round(avg_risk, 3) + results['overall_risk']['risk_level'] = self._get_risk_level(avg_risk) + + # Generate recommendations + results['recommendations'] = self._generate_recommendations(results) + + print("\n" + "="*70) + print(f"✓ ANALYSIS COMPLETE - Overall Risk: {results['overall_risk']['risk_level']}") + print("="*70 + "\n") + + return results + + def _generate_recommendations(self, results: Dict) -> List[str]: + """Generate GDPR-compliant recommendations""" + recommendations = [] + + high_risk_cols = results['overall_risk']['high_risk_columns'] + direct_ids = results['privacy_categories']['direct_identifiers'] + + if direct_ids: + recommendations.append( + f"🔴 CRITICAL: {len(direct_ids)} direct identifiers found. " + "Remove or hash these columns immediately (GDPR Art. 5)" + ) + + if high_risk_cols: + recommendations.append( + f"⚠️ HIGH RISK: {len(high_risk_cols)} columns flagged. " + "Apply anonymization techniques (GDPR Art. 32)" + ) + + if results['privacy_categories']['quasi_identifiers']: + recommendations.append( + "📊 Quasi-identifiers detected. Consider k-anonymity or l-diversity" + ) + + if not recommendations: + recommendations.append("✓ No critical privacy risks detected. Dataset appears GDPR-compliant.") + + return recommendations + + +class TFIDFBiasAnalyzer: + """ + TF-IDF based Bias Analyzer for fast fairness assessment + Detects demographic patterns and potential discrimination + """ + + # Protected attributes (GDPR special categories) + PROTECTED_PATTERNS = { + 'race': re.compile(r'\b(african|asian|caucasian|hispanic|latino|black|white)\b', re.I), + 'gender': re.compile(r'\b(male|female|man|woman|boy|girl|transgender|non-binary)\b', re.I), + 'religion': re.compile(r'\b(christian|muslim|jewish|hindu|buddhist|atheist|religious)\b', re.I), + 'age': re.compile(r'\b(elderly|senior|young|teenager|minor|adult|aged)\b', re.I), + 'disability': re.compile(r'\b(disabled|handicapped|impaired|wheelchair|blind|deaf)\b', re.I), + 'nationality': re.compile(r'\b(american|british|indian|chinese|german|french|nationality)\b', re.I), + } + + def __init__(self): + """Initialize TF-IDF bias analyzer""" + self.vectorizer = TfidfVectorizer( + max_features=3000, + ngram_range=(1, 2), + min_df=1, + stop_words='english' + ) + + def detect_protected_attributes(self, text: str) -> Dict[str, List[str]]: + """ + Detect protected attributes in text + + Args: + text: Text to analyze + + Returns: + Dictionary of attribute_type -> matches + """ + detections = {} + + for attr_type, pattern in self.PROTECTED_PATTERNS.items(): + matches = pattern.findall(text) + if matches: + detections[attr_type] = list(set([m.lower() for m in matches])) + + return detections + + def analyze_column_bias(self, series: pd.Series, column_name: str) -> Dict[str, Any]: + """ + Analyze column for potential bias indicators + + Args: + series: Pandas Series to analyze + column_name: Name of the column + + Returns: + Bias analysis results + """ + text_samples = series.dropna().astype(str).head(1000).tolist() + combined_text = " | ".join(text_samples[:100]) + + # Detect protected attributes + protected_attrs = self.detect_protected_attributes(combined_text) + + # Calculate bias score + bias_score = len(protected_attrs) * 0.2 # 0.2 per category + bias_score = min(bias_score, 1.0) + + return { + 'column_name': column_name, + 'protected_attributes': list(protected_attrs.keys()), + 'attribute_values': protected_attrs, + 'bias_score': round(bias_score, 3), + 'bias_level': self._get_bias_level(bias_score), + 'gdpr_concern': len(protected_attrs) > 0 # Art. 9 special categories + } + + def _get_bias_level(self, bias_score: float) -> str: + """Convert bias score to categorical level""" + if bias_score >= 0.6: + return "HIGH" + elif bias_score >= 0.3: + return "MEDIUM" + else: + return "LOW" + + def analyze_dataset(self, df: pd.DataFrame) -> Dict[str, Any]: + """ + Analyze entire dataset for bias + + Args: + df: DataFrame to analyze + + Returns: + Comprehensive bias analysis report + """ + print("\n" + "="*70) + print("⚖️ TF-IDF BIAS ANALYSIS - GDPR ARTICLE 9 COMPLIANCE") + print("="*70 + "\n") + + results = { + 'metadata': { + 'total_rows': len(df), + 'total_columns': len(df.columns), + 'analysis_method': 'tfidf_pattern_matching' + }, + 'column_analysis': {}, + 'overall_bias': { + 'bias_score': 0.0, + 'bias_level': 'LOW', + 'flagged_columns': [], + 'protected_categories_found': [] + }, + 'gdpr_compliance': { + 'article_9_violations': [], + 'special_categories_detected': [] + }, + 'recommendations': [] + } + + # Analyze text columns + text_columns = df.select_dtypes(include=['object']).columns.tolist() + + print(f"Analyzing {len(text_columns)} columns for bias...") + + for column in text_columns: + print(f" Analyzing '{column}'...", end=" ") + + analysis = self.analyze_column_bias(df[column], column) + results['column_analysis'][column] = analysis + + if analysis['bias_score'] >= 0.3: + results['overall_bias']['flagged_columns'].append(column) + + if analysis['gdpr_concern']: + results['gdpr_compliance']['article_9_violations'].append({ + 'column': column, + 'protected_attributes': analysis['protected_attributes'] + }) + + for attr in analysis['protected_attributes']: + if attr not in results['overall_bias']['protected_categories_found']: + results['overall_bias']['protected_categories_found'].append(attr) + + print(f"✓ Bias: {analysis['bias_level']} ({analysis['bias_score']:.2f})") + + # Calculate overall bias + if results['column_analysis']: + avg_bias = np.mean([ + col['bias_score'] + for col in results['column_analysis'].values() + ]) + results['overall_bias']['bias_score'] = round(avg_bias, 3) + results['overall_bias']['bias_level'] = self._get_bias_level(avg_bias) + + # Recommendations + results['recommendations'] = self._generate_bias_recommendations(results) + + print("\n" + "="*70) + print(f"✓ BIAS ANALYSIS COMPLETE - Overall Bias: {results['overall_bias']['bias_level']}") + print("="*70 + "\n") + + return results + + def _generate_bias_recommendations(self, results: Dict) -> List[str]: + """Generate bias mitigation recommendations""" + recommendations = [] + + violations = results['gdpr_compliance']['article_9_violations'] + protected_cats = results['overall_bias']['protected_categories_found'] + + if violations: + recommendations.append( + f"🔴 GDPR Article 9 Violation: {len(violations)} columns contain special category data. " + "Remove or obtain explicit consent before processing." + ) + + if protected_cats: + recommendations.append( + f"⚠️ Protected attributes detected: {', '.join(protected_cats)}. " + "Ensure model decisions don't rely on these features." + ) + + if results['overall_bias']['bias_score'] >= 0.5: + recommendations.append( + "📊 High bias score detected. Apply bias mitigation techniques " + "(reweighting, adversarial debiasing, fairness constraints)." + ) + + if not recommendations: + recommendations.append("✓ No significant bias indicators detected.") + + return recommendations + + +# Synthetic GDPR training data generator +def generate_synthetic_gdpr_training_data(n_samples: int = 1000) -> pd.DataFrame: + """ + Generate synthetic training data for TF-IDF model + Simulates GDPR-compliant and non-compliant text patterns + """ + print(f"\n📝 Generating {n_samples} synthetic GDPR training samples...") + + pii_samples = [ + "john.doe@example.com", "jane.smith@company.com", "+1-555-123-4567", + "123-45-6789", "4532-1234-5678-9012", "192.168.1.1", + "https://example.com/profile", "12/31/2023", "90210" + ] * (n_samples // 27) + + sensitive_samples = [ + "Patient has diabetes", "Employee salary $120,000", "Credit score 750", + "African American male", "Muslim employee", "Wheelchair accessible" + ] * (n_samples // 18) + + safe_samples = [ + "Product category", "Inventory count", "Temperature reading", + "Anonymous feedback", "Aggregated statistics", "Public information" + ] * (n_samples // 18) + + # Combine + texts = pii_samples + sensitive_samples + safe_samples + labels = ( + ['PII'] * len(pii_samples) + + ['SENSITIVE'] * len(sensitive_samples) + + ['SAFE'] * len(safe_samples) + ) + + df = pd.DataFrame({ + 'text': texts[:n_samples], + 'label': labels[:n_samples] + }) + + print(f"✓ Generated dataset: {df['label'].value_counts().to_dict()}") + + return df diff --git a/ai_governance/unified_analyzer.py b/ai_governance/unified_analyzer.py new file mode 100644 index 0000000..d47e3aa --- /dev/null +++ b/ai_governance/unified_analyzer.py @@ -0,0 +1,260 @@ +""" +Unified Analysis API - Combines TF-IDF and Presidio +Provides fast fallback with TF-IDF and deep analysis with Presidio +""" + +import pandas as pd +from typing import Dict, Any, Optional, Literal +import time + +from ai_governance.tfidf_analyzer import TFIDFRiskAnalyzer, TFIDFBiasAnalyzer +from ai_governance.risk_analyzer import RiskAnalyzer +from ai_governance.bias_analyzer import BiasAnalyzer + + +class UnifiedAnalyzer: + """ + Unified analyzer that combines TF-IDF (fast) with Presidio (accurate) + Provides intelligent fallback and hybrid analysis modes + """ + + def __init__( + self, + mode: Literal['fast', 'accurate', 'hybrid'] = 'hybrid', + tfidf_model_path: Optional[str] = None + ): + """ + Initialize unified analyzer + + Args: + mode: Analysis mode + - 'fast': TF-IDF only (20x faster) + - 'accurate': Presidio only (most accurate) + - 'hybrid': TF-IDF first, Presidio for high-risk (balanced) + tfidf_model_path: Path to pre-trained TF-IDF model + """ + self.mode = mode + + # Initialize TF-IDF analyzers (always available) + print(f"\n🔧 Initializing Unified Analyzer (mode: {mode.upper()})...") + + self.tfidf_risk = TFIDFRiskAnalyzer(model_path=tfidf_model_path) + self.tfidf_bias = TFIDFBiasAnalyzer() + + # Initialize Presidio analyzers (if needed) + self.presidio_risk = None + self.presidio_bias = None + + if mode in ['accurate', 'hybrid']: + try: + self.presidio_risk = RiskAnalyzer(use_gpu=False) # CPU for compatibility + self.presidio_bias = BiasAnalyzer() + print("✓ Presidio analyzers initialized") + except Exception as e: + print(f"⚠️ Presidio not available: {e}") + print(" Falling back to TF-IDF only mode") + self.mode = 'fast' + + print(f"✓ Unified Analyzer ready ({self.mode} mode)") + + def analyze_risk(self, df: pd.DataFrame) -> Dict[str, Any]: + """ + Analyze privacy risks using selected mode + + Args: + df: DataFrame to analyze + + Returns: + Risk analysis results with timing info + """ + start_time = time.time() + + if self.mode == 'fast': + # TF-IDF only (fastest) + results = self.tfidf_risk.analyze_dataset(df) + results['analysis_method'] = 'tfidf' + + elif self.mode == 'accurate': + # Presidio only (most accurate) + results = self.presidio_risk.analyze(df) + results['analysis_method'] = 'presidio' + + else: # hybrid + # TF-IDF first for quick screening + print("\n🔍 Phase 1: TF-IDF quick screening...") + tfidf_results = self.tfidf_risk.analyze_dataset(df) + + # Check if high-risk columns need deep analysis + high_risk_cols = tfidf_results['overall_risk']['high_risk_columns'] + + if high_risk_cols: + print(f"\n🔬 Phase 2: Presidio deep analysis on {len(high_risk_cols)} high-risk columns...") + presidio_results = self.presidio_risk.analyze(df[high_risk_cols]) + + # Merge results + results = self._merge_risk_results(tfidf_results, presidio_results) + results['analysis_method'] = 'hybrid_tfidf_presidio' + else: + results = tfidf_results + results['analysis_method'] = 'tfidf_only' + + elapsed_time = time.time() - start_time + results['analysis_time_seconds'] = round(elapsed_time, 2) + + return results + + def analyze_bias(self, df: pd.DataFrame) -> Dict[str, Any]: + """ + Analyze bias using selected mode + + Args: + df: DataFrame to analyze + + Returns: + Bias analysis results with timing info + """ + start_time = time.time() + + if self.mode == 'fast': + # TF-IDF only + results = self.tfidf_bias.analyze_dataset(df) + results['analysis_method'] = 'tfidf' + + elif self.mode == 'accurate': + # Presidio-based + results = self.presidio_bias.analyze(df) + results['analysis_method'] = 'presidio' + + else: # hybrid + # Use TF-IDF for pattern matching + tfidf_results = self.tfidf_bias.analyze_dataset(df) + + # Use Presidio for statistical bias + if self.presidio_bias: + presidio_results = self.presidio_bias.analyze(df) + results = self._merge_bias_results(tfidf_results, presidio_results) + results['analysis_method'] = 'hybrid' + else: + results = tfidf_results + results['analysis_method'] = 'tfidf_only' + + elapsed_time = time.time() - start_time + results['analysis_time_seconds'] = round(elapsed_time, 2) + + return results + + def analyze_full(self, df: pd.DataFrame) -> Dict[str, Any]: + """ + Run complete risk + bias analysis + + Args: + df: DataFrame to analyze + + Returns: + Combined analysis results + """ + print("\n" + "="*70) + print("🎯 UNIFIED AI GOVERNANCE ANALYSIS") + print("="*70) + + # Risk analysis + print("\n📊 PRIVACY RISK ANALYSIS") + risk_results = self.analyze_risk(df) + + # Bias analysis + print("\n⚖️ FAIRNESS & BIAS ANALYSIS") + bias_results = self.analyze_bias(df) + + # Combined results + combined = { + 'analysis_mode': self.mode, + 'dataset_info': { + 'rows': len(df), + 'columns': len(df.columns) + }, + 'risk_analysis': risk_results, + 'bias_analysis': bias_results, + 'total_time_seconds': risk_results.get('analysis_time_seconds', 0) + + bias_results.get('analysis_time_seconds', 0), + 'gdpr_compliance': self._assess_gdpr_compliance(risk_results, bias_results) + } + + print("\n" + "="*70) + print(f"✅ ANALYSIS COMPLETE in {combined['total_time_seconds']:.2f}s") + print("="*70) + + return combined + + def _merge_risk_results(self, tfidf_results: Dict, presidio_results: Dict) -> Dict: + """Merge TF-IDF and Presidio risk results""" + merged = tfidf_results.copy() + + # Update high-risk columns with Presidio details + for col in tfidf_results['overall_risk']['high_risk_columns']: + if col in presidio_results.get('privacy_risks', {}): + merged['column_analysis'][col]['presidio_details'] = presidio_results['privacy_risks'][col] + + return merged + + def _merge_bias_results(self, tfidf_results: Dict, presidio_results: Dict) -> Dict: + """Merge TF-IDF and Presidio bias results""" + merged = tfidf_results.copy() + + # Add statistical bias metrics from Presidio + if 'bias_metrics' in presidio_results: + merged['statistical_bias'] = presidio_results['bias_metrics'] + + return merged + + def _assess_gdpr_compliance(self, risk_results: Dict, bias_results: Dict) -> Dict: + """Assess overall GDPR compliance""" + compliance = { + 'compliant': True, + 'violations': [], + 'warnings': [], + 'articles_applicable': [] + } + + # Check risk results + if risk_results.get('overall_risk', {}).get('risk_level') in ['HIGH', 'CRITICAL']: + compliance['compliant'] = False + compliance['violations'].append("High privacy risk detected (GDPR Art. 5)") + compliance['articles_applicable'].append("Art. 5 - Data minimization") + + direct_ids = len(risk_results.get('privacy_categories', {}).get('direct_identifiers', [])) + if direct_ids > 0: + compliance['violations'].append(f"{direct_ids} direct identifiers require protection (GDPR Art. 32)") + compliance['articles_applicable'].append("Art. 32 - Security of processing") + + # Check bias results + article9_violations = bias_results.get('gdpr_compliance', {}).get('article_9_violations', []) + if article9_violations: + compliance['compliant'] = False + compliance['violations'].append(f"{len(article9_violations)} special category violations (GDPR Art. 9)") + compliance['articles_applicable'].append("Art. 9 - Special categories of personal data") + + if compliance['compliant']: + compliance['status'] = "✅ GDPR Compliant" + else: + compliance['status'] = "❌ GDPR Non-Compliant" + + return compliance + + +# Convenience functions for API endpoints +def quick_risk_check(df: pd.DataFrame) -> Dict[str, Any]: + """Fast risk check using TF-IDF (for API endpoints)""" + analyzer = UnifiedAnalyzer(mode='fast') + return analyzer.analyze_risk(df) + + +def deep_risk_analysis(df: pd.DataFrame) -> Dict[str, Any]: + """Accurate risk analysis using Presidio (for detailed reports)""" + analyzer = UnifiedAnalyzer(mode='accurate') + return analyzer.analyze_risk(df) + + +def hybrid_analysis(df: pd.DataFrame) -> Dict[str, Any]: + """Balanced hybrid analysis (recommended)""" + analyzer = UnifiedAnalyzer(mode='hybrid') + return analyzer.analyze_full(df) diff --git a/data_cleaning/cleaner.py b/data_cleaning/cleaner.py index 658b1b8..93cee8f 100644 --- a/data_cleaning/cleaner.py +++ b/data_cleaning/cleaner.py @@ -192,40 +192,84 @@ class DataCleaner: def _init_presidio(self): """Initialize Presidio analyzer and anonymizer engines with GPU support""" - # Create NLP engine configuration + import spacy + + # Auto-detect the best available spaCy model + # Priority: sm (fastest for CPU) > lg (GPU-capable) > trf (transformer, slowest) + model_candidates = [ + ("en_core_web_sm", "CPU-optimized, fastest for small-medium datasets", "CPU"), + ("en_core_web_lg", "GPU-capable, better accuracy", "GPU/CPU"), + ("en_core_web_trf", "Transformer-based, highest accuracy but slowest", "GPU") + ] + + model_name = None + model_description = None + model_device_pref = None + + print("\n🔍 Detecting available spaCy models...") + for candidate, description, device_pref in model_candidates: + if spacy.util.is_package(candidate): + model_name = candidate + model_description = description + model_device_pref = device_pref + print(f"✓ Found: {candidate} ({description})") + break + else: + print(f" ✗ Not installed: {candidate}") + + if not model_name: + print(f"\n⚠️ No spaCy models found!") + print(f" Install the fastest model with: python -m spacy download en_core_web_sm") + print(f" Or for GPU acceleration: python -m spacy download en_core_web_lg") + print(f" Presidio will not be initialized. Using regex-only detection.\n") + self.analyzer = None + self.anonymizer = None + return + + print(f"\n✓ Selected model: {model_name} (Recommended device: {model_device_pref})") + + # Create NLP engine configuration with the detected model configuration = { "nlp_engine_name": "spacy", - "models": [{"lang_code": "en", "model_name": "en_core_web_sm"}], + "models": [{"lang_code": "en", "model_name": model_name}], } try: - # Create NLP engine + # Create NLP engine with explicit configuration provider = NlpEngineProvider(nlp_configuration=configuration) nlp_engine = provider.create_engine() - # Enable GPU for spaCy if available - if self.use_gpu and SPACY_AVAILABLE: + # Enable GPU for spaCy if available and recommended for this model + if self.use_gpu and CUDA_AVAILABLE and model_name in ["en_core_web_lg", "en_core_web_trf"]: try: - import spacy - # Move spaCy model to GPU - spacy.require_gpu() - print("✓ spaCy GPU acceleration enabled") + # Set GPU preference for spaCy + gpu_activated = spacy.prefer_gpu() + if gpu_activated: + print(f"✓ spaCy GPU acceleration enabled on {GPU_NAME}") + device_info = f"GPU ({GPU_NAME})" + else: + print(f"⚠️ GPU preference set but not activated (expected for {model_name})") + device_info = f"CPU (optimized for {model_name})" except Exception as e: print(f"⚠️ Could not enable spaCy GPU: {e}") print(" Falling back to CPU for NLP processing") + device_info = "CPU" + else: + if model_name == "en_core_web_sm": + print(f"✓ Using CPU for {model_name} (faster than GPU for small models)") + device_info = f"CPU (optimized for {model_name})" # Create analyzer with NLP engine self.analyzer = AnalyzerEngine(nlp_engine=nlp_engine) self.anonymizer = AnonymizerEngine() - device_info = "GPU" if self.use_gpu else "CPU" print(f"✓ Presidio engines initialized successfully ({device_info} mode)") except Exception as e: - # Fallback to default configuration if spaCy model not available - print(f"Warning: Could not load spaCy model, using default configuration: {e}") - print("Download spaCy model with: python -m spacy download en_core_web_sm") - self.analyzer = AnalyzerEngine() - self.anonymizer = AnonymizerEngine() + # Fallback - Presidio not available + print(f"⚠️ Could not initialize Presidio: {e}") + print(" Using regex-only detection as fallback") + self.analyzer = None + self.anonymizer = None def _add_nordic_recognizers(self, registry: RecognizerRegistry): """Add custom recognizers for Nordic national IDs and identifiers""" @@ -396,6 +440,12 @@ class DataCleaner: device_info = f"GPU ({GPU_NAME})" if self.use_gpu else "CPU" print(f" Scanning {len(columns_to_scan)} columns using {device_info}: {columns_to_scan}") + # Check if Presidio is available + if self.analyzer is None: + print("\n⚠️ Presidio not available - cannot perform PII detection") + print(" Please install spaCy model: python -m spacy download en_core_web_sm") + return dict(pii_detections) + for column in columns_to_scan: print(f" Analyzing '{column}'...", end=" ") @@ -575,6 +625,10 @@ class DataCleaner: if not value or value == 'nan': return value + # Check if Presidio is available + if self.analyzer is None or self.anonymizer is None: + return value # Cannot anonymize without Presidio + # Analyze this specific value results = self.analyzer.analyze(text=value, language='en') diff --git a/test_cleaning.py b/test_cleaning.py deleted file mode 100644 index 65c9968..0000000 --- a/test_cleaning.py +++ /dev/null @@ -1,366 +0,0 @@ -""" -Test script for data cleaning module -Tests general PII + Nordic-specific PII detection with automatic report generation -""" - -import pandas as pd -import sys -import os - -# Add parent directory to path -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from data_cleaning import DataCleaner - - -def test_basic_cleaning(): - """Test basic cleaning functionality""" - print("\n" + "="*70) - print("TEST 1: Basic PII Detection on Loan Dataset") - print("="*70) - - # Load loan data - df = pd.read_csv('Datasets/loan_data.csv') - print(f"\n✓ Loaded dataset: {len(df)} rows × {len(df.columns)} columns") - print(f" Columns: {list(df.columns)}") - - # Initialize cleaner - cleaner = DataCleaner(df) - - # Run cleaning in non-interactive mode (auto-apply strategies) - print("\n🔍 Running PII detection...") - cleaned_df, audit_report = cleaner.clean( - risky_features=None, # Auto-detect all - interactive=False, # Non-interactive for testing - scan_all_cells=True - ) - - # Display results - cleaner.print_audit_summary(audit_report) - - return cleaned_df, audit_report - - -def test_with_risky_features(): - """Test cleaning with specific risky features flagged""" - print("\n" + "="*70) - print("TEST 2: Cleaning with Pre-Flagged Risky Features") - print("="*70) - - # Load loan data - df = pd.read_csv('Datasets/loan_data.csv') - - # Simulate risky features from RiskAnalyzer - risky_features = ['person_education', 'loan_intent', 'person_home_ownership'] - - print(f"\n⚠️ Risky features flagged by RiskAnalyzer: {risky_features}") - - # Initialize cleaner - cleaner = DataCleaner(df) - - # Run cleaning on flagged features only - cleaned_df, audit_report = cleaner.clean( - risky_features=risky_features, - interactive=False, - scan_all_cells=False # Only scan risky columns - ) - - # Display results - cleaner.print_audit_summary(audit_report) - - return cleaned_df, audit_report - - -def test_with_synthetic_pii(): - """Test with synthetic general PII data""" - print("\n" + "="*70) - print("TEST 3: General PII Detection (US/International)") - print("="*70) - - # Create test DataFrame with obvious PII - test_data = pd.DataFrame({ - 'customer_id': [1, 2, 3, 4, 5], - 'email': [ - 'john.doe@example.com', - 'alice.smith@company.org', - 'bob.jones@email.com', - 'carol.white@test.net', - 'dave.brown@sample.com' - ], - 'phone': [ - '+1-555-123-4567', - '555-234-5678', - '(555) 345-6789', - '555.456.7890', - '5555678901' - ], - 'ssn': [ - '123-45-6789', - '234-56-7890', - '345-67-8901', - '456-78-9012', - '567-89-0123' - ], - 'notes': [ - 'Customer called from 192.168.1.1', - 'Contact via email: test@example.com', - 'SSN verified: 111-22-3333', - 'Previous address: 123 Main St, Boston', - 'Phone backup: 555-999-8888' - ], - 'amount': [1000, 2000, 1500, 3000, 2500] - }) - - print(f"\n✓ Created synthetic dataset with general PII:") - print(test_data.head()) - - # Initialize cleaner - cleaner = DataCleaner(test_data) - - # Run cleaning - cleaned_df, audit_report = cleaner.clean( - risky_features=None, - interactive=False, - scan_all_cells=True - ) - - print("\n🔒 Cleaned dataset:") - print(cleaned_df.head()) - - # Display results - cleaner.print_audit_summary(audit_report) - - # Save outputs - os.makedirs('output', exist_ok=True) - cleaner.save_cleaned_data(cleaned_df, 'output/general_pii_cleaned.csv') - cleaner.save_audit_report(audit_report, 'output/general_pii_audit.json') - - # Generate reports - print("\n📊 Generating explainability reports...") - cleaner.save_simple_report(audit_report, 'output/general_pii_simple_report.json', 'General PII Test Dataset') - cleaner.save_detailed_report(audit_report, 'output/general_pii_detailed_report.json', 'General PII Test Dataset') - - return cleaned_df, audit_report - - -def test_nordic_pii(): - """Test with Nordic-specific PII data (Finnish, Swedish, Norwegian, Danish)""" - print("\n" + "="*70) - print("TEST 4: Nordic PII Detection (FI, SE, NO, DK)") - print("="*70) - - # Create Nordic healthcare test dataset - nordic_data = pd.DataFrame({ - 'patient_id': [1001, 1002, 1003, 1004, 1005], - 'name': ['Matti Virtanen', 'Anna Andersson', 'Lars Nilsen', 'Sofie Jensen', 'Björn Eriksson'], - 'henkilotunnus': ['010190-123A', '150685-456B', '310795+789C', '220503A234D', '111280-567E'], # Finnish - 'personnummer': [None, '850615-4567', None, None, '801211-8901'], # Swedish - 'fodselsnummer': [None, None, '010190 12345', None, None], # Norwegian - 'cpr_nummer': [None, None, None, '010190-1234', None], # Danish - 'email': ['matti.v@example.fi', 'anna.a@healthcare.se', 'lars.n@clinic.no', 'sofie.j@hospital.dk', 'bjorn.e@care.se'], - 'phone': ['+358 50 123 4567', '+46 70 234 5678', '+47 91 345 6789', '+45 20 456 7890', '+46 73 567 8901'], - 'diagnosis_code': ['J06.9', 'I10', 'E11.9', 'M54.5', 'F41.1'], - 'age': [35, 39, 29, 22, 45], - 'gender': ['M', 'F', 'M', 'F', 'M'], - 'treatment_outcome': ['Recovered', 'Ongoing', 'Recovered', 'Ongoing', 'Recovered'] - }) - - print(f"\n✓ Created Nordic healthcare dataset:") - print(f" - Finnish Henkilötunnus (HETU)") - print(f" - Swedish Personnummer") - print(f" - Norwegian Fødselsnummer") - print(f" - Danish CPR-nummer") - print(f" - Nordic phone numbers (+358, +46, +47, +45)") - print(f" - Nordic email domains (.fi, .se, .no, .dk)") - print() - print(nordic_data.to_string()) - - # Initialize cleaner (Nordic recognizers loaded automatically) - cleaner = DataCleaner(nordic_data) - - # Run cleaning - cleaned_df, audit_report = cleaner.clean( - risky_features=None, - interactive=False, - scan_all_cells=True - ) - - print("\n🔒 Cleaned dataset (Nordic IDs anonymized):") - print(cleaned_df.to_string()) - - # Display results - cleaner.print_audit_summary(audit_report) - - # Save outputs - os.makedirs('output', exist_ok=True) - cleaner.save_cleaned_data(cleaned_df, 'output/nordic_pii_cleaned.csv') - cleaner.save_audit_report(audit_report, 'output/nordic_pii_audit.json') - - # Generate reports - print("\n📊 Generating explainability reports...") - cleaner.save_simple_report(audit_report, 'output/nordic_pii_simple_report.json', 'Nordic Healthcare Dataset') - cleaner.save_detailed_report(audit_report, 'output/nordic_pii_detailed_report.json', 'Nordic Healthcare Dataset') - - print("\n✅ Nordic-specific entities detected:") - print(" ✓ FI_PERSONAL_ID (Finnish Henkilötunnus)") - print(" ✓ SE_PERSONAL_ID (Swedish Personnummer)") - print(" ✓ NO_PERSONAL_ID (Norwegian Fødselsnummer)") - print(" ✓ DK_PERSONAL_ID (Danish CPR-nummer)") - - return cleaned_df, audit_report - - -def test_interactive_mode(): - """Test interactive mode (requires user input)""" - print("\n" + "="*70) - print("TEST 5: Interactive Mode (Manual Decisions)") - print("="*70) - - # Create ambiguous test data - test_data = pd.DataFrame({ - 'id': [1, 2, 3], - 'description': [ - 'Customer from Paris contacted us', # Paris = location or name? - 'Spoke with Jordan about the account', # Jordan = location or name? - 'Meeting scheduled for March 15th' # Date - ], - 'value': [100, 200, 300] - }) - - print(f"\n✓ Created dataset with ambiguous PII:") - print(test_data) - - print("\n⚠️ This test requires user input for ambiguous cases.") - print(" You'll be prompted to choose anonymization strategies.") - - proceed = input("\nProceed with interactive test? (y/n): ").strip().lower() - - if proceed == 'y': - cleaner = DataCleaner(test_data) - cleaned_df, audit_report = cleaner.clean( - risky_features=None, - interactive=True, # Enable interactive prompts - scan_all_cells=True - ) - - print("\n🔒 Cleaned dataset:") - print(cleaned_df) - - cleaner.print_audit_summary(audit_report) - else: - print(" Skipped interactive test.") - - -def demonstrate_integration_with_analysis(): - """Demonstrate how cleaning integrates with AI governance pipeline""" - print("\n" + "="*70) - print("TEST 6: Integration Demo (Cleaning → Analysis Workflow)") - print("="*70) - - # Load data - df = pd.read_csv('Datasets/loan_data.csv') - - print("\n📊 Workflow:") - print(" 1. Original dataset → Risk Analysis") - print(" 2. Risk Analysis → Identifies risky features") - print(" 3. Risky features → Data Cleaning (this step)") - print(" 4. Cleaned dataset → Re-run Analysis (optional)") - - # Simulate risky features from analysis - simulated_risky_features = ['person_education', 'loan_intent'] - - print(f"\n⚠️ Step 2 Output (simulated): Risky features = {simulated_risky_features}") - - # Step 3: Clean data - print("\n🔒 Step 3: Cleaning risky features...") - cleaner = DataCleaner(df) - cleaned_df, audit_report = cleaner.clean( - risky_features=simulated_risky_features, - interactive=False, - scan_all_cells=False - ) - - # Save both datasets - os.makedirs('output', exist_ok=True) - df.to_csv('output/loan_data_original.csv', index=False) - cleaner.save_cleaned_data(cleaned_df, 'output/loan_data_cleaned.csv') - cleaner.save_audit_report(audit_report, 'output/cleaning_audit.json') - - print("\n💾 Saved files:") - print(" - output/loan_data_original.csv (original)") - print(" - output/loan_data_cleaned.csv (cleaned)") - print(" - output/cleaning_audit.json (audit report)") - - print("\n📈 Step 4: User can now choose which dataset to analyze:") - print(" Option A: Analyze cleaned dataset (privacy-compliant)") - print(" Option B: Analyze original dataset (for comparison)") - print(" Option C: Analyze both and compare results") - - cleaner.print_audit_summary(audit_report) - - -def main(): - """Run all tests""" - print("\n" + "="*70) - print("🧪 DATA CLEANING MODULE - TEST SUITE") - print(" General PII + Nordic-Specific PII Detection") - print("="*70) - - print("\nAvailable tests:") - print(" 1. Basic PII detection on loan dataset") - print(" 2. Cleaning with pre-flagged risky features") - print(" 3. General PII detection (US/International) + Reports") - print(" 4. Nordic PII detection (FI, SE, NO, DK) + Reports") - print(" 5. Interactive mode (requires user input)") - print(" 6. Integration workflow demonstration") - print(" 7. Run all non-interactive tests") - print(" 8. Run Nordic + General PII tests only") - - choice = input("\nSelect test (1-8): ").strip() - - if choice == '1': - test_basic_cleaning() - elif choice == '2': - test_with_risky_features() - elif choice == '3': - test_with_synthetic_pii() - elif choice == '4': - test_nordic_pii() - elif choice == '5': - test_interactive_mode() - elif choice == '6': - demonstrate_integration_with_analysis() - elif choice == '7': - print("\n🏃 Running all non-interactive tests...\n") - test_basic_cleaning() - test_with_risky_features() - test_with_synthetic_pii() - test_nordic_pii() - demonstrate_integration_with_analysis() - print("\n✅ All tests completed!") - elif choice == '8': - print("\n🏃 Running PII detection tests with report generation...\n") - test_with_synthetic_pii() - test_nordic_pii() - print("\n" + "="*70) - print("✅ PII TESTS COMPLETED!") - print("="*70) - print("\n📂 Generated files in output/:") - print(" General PII:") - print(" - general_pii_cleaned.csv") - print(" - general_pii_audit.json") - print(" - general_pii_simple_report.json") - print(" - general_pii_detailed_report.json") - print("\n Nordic PII:") - print(" - nordic_pii_cleaned.csv") - print(" - nordic_pii_audit.json") - print(" - nordic_pii_simple_report.json") - print(" - nordic_pii_detailed_report.json") - print("\n💡 Review the simple reports for executive summaries") - print("💡 Review the detailed reports for compliance documentation") - else: - print("Invalid choice. Run: python test_cleaning.py") - - -if __name__ == '__main__': - main()