diff --git a/cleaning.py b/cleaning.py new file mode 100644 index 0000000..f97eac6 --- /dev/null +++ b/cleaning.py @@ -0,0 +1,740 @@ +""" +Data Cleaning Module - PII Detection and Anonymization +Handles GDPR-compliant data cleaning using Presidio for PII detection +""" + +import pandas as pd +import numpy as np +import hashlib +import json +from datetime import datetime +from typing import Dict, List, Tuple, Optional, Any +from collections import defaultdict + +try: + from presidio_analyzer import AnalyzerEngine, RecognizerRegistry + from presidio_analyzer.nlp_engine import NlpEngineProvider + from presidio_anonymizer import AnonymizerEngine + from presidio_anonymizer.entities import OperatorConfig + PRESIDIO_AVAILABLE = True +except ImportError: + PRESIDIO_AVAILABLE = False + print("Warning: Presidio not installed. Run: pip install presidio-analyzer presidio-anonymizer") + + +class CleaningConfig: + """Configuration for data cleaning strategies""" + + # Anonymization strategy mapping based on entity type and risk level + STRATEGY_MAP = { + # HIGH RISK - Remove completely (sensitive financial/identity data) + "CREDIT_CARD": "REMOVE", + "CRYPTO": "REMOVE", + "IBAN_CODE": "REMOVE", + "US_SSN": "REMOVE", + "US_BANK_NUMBER": "REMOVE", + "US_DRIVER_LICENSE": "REMOVE", + "US_PASSPORT": "REMOVE", + "MEDICAL_LICENSE": "REMOVE", + + # MEDIUM RISK - Hash (deterministic, irreversible) + "EMAIL_ADDRESS": "HASH", + "PHONE_NUMBER": "HASH", + "PERSON": "HASH", # Names + "URL": "HASH", + "IP_ADDRESS": "HASH", + "AU_ABN": "HASH", + "AU_ACN": "HASH", + "AU_TFN": "HASH", + + # LOW RISK - Mask (keep format, hide details) + "LOCATION": "MASK", + "DATE_TIME": "GENERALIZE", + "NRP": "MASK", # Nationality/religious/political + "US_ITIN": "MASK", + + # Numeric identifiers - depends on context + "UK_NHS": "HASH", + "SG_NRIC_FIN": "HASH", + "IN_PAN": "HASH", + } + + # Confidence thresholds + HIGH_CONFIDENCE = 0.85 + MEDIUM_CONFIDENCE = 0.60 + + # Risk levels + RISK_LEVELS = { + "REMOVE": "HIGH", + "HASH": "MEDIUM", + "MASK": "LOW", + "GENERALIZE": "LOW" + } + + # GDPR compliance mapping + GDPR_ARTICLE_MAPPING = { + "CREDIT_CARD": "Art. 4(1) - Personal data identifier", + "US_SSN": "Art. 4(1) - Personal data identifier", + "EMAIL_ADDRESS": "Art. 4(1) - Personal data identifier", + "PHONE_NUMBER": "Art. 4(1) - Personal data identifier", + "PERSON": "Art. 4(1) - Personal data (name)", + "LOCATION": "Art. 4(1) - Personal data (location)", + "IP_ADDRESS": "Art. 4(1) - Online identifier", + "MEDICAL_LICENSE": "Art. 9(1) - Special category data (health)", + "NRP": "Art. 9(1) - Special category data (political/religious views)", + } + + +class DataCleaner: + """ + Main class for detecting and anonymizing PII in datasets + + Example: + >>> cleaner = DataCleaner(df) + >>> cleaned_df, audit_report = cleaner.clean( + ... risky_features=['email', 'phone'], + ... interactive=True + ... ) + """ + + def __init__(self, df: pd.DataFrame, config: Optional[CleaningConfig] = None): + """ + Initialize the data cleaner + + Args: + df: Input DataFrame to clean + config: Optional custom configuration + """ + self.df = df.copy() + self.config = config or CleaningConfig() + self.audit_log = [] + self.cleaning_actions = {} + + # Initialize Presidio engines + if PRESIDIO_AVAILABLE: + self._init_presidio() + else: + raise ImportError( + "Presidio is required for data cleaning. " + "Install with: pip install presidio-analyzer presidio-anonymizer" + ) + + def _init_presidio(self): + """Initialize Presidio analyzer and anonymizer engines""" + # Create NLP engine configuration + configuration = { + "nlp_engine_name": "spacy", + "models": [{"lang_code": "en", "model_name": "en_core_web_sm"}], + } + + try: + # Create NLP engine + provider = NlpEngineProvider(nlp_configuration=configuration) + nlp_engine = provider.create_engine() + + # Create analyzer with NLP engine + self.analyzer = AnalyzerEngine(nlp_engine=nlp_engine) + self.anonymizer = AnonymizerEngine() + + print("โœ“ Presidio engines initialized successfully") + except Exception as e: + # Fallback to default configuration if spaCy model not available + print(f"Warning: Could not load spaCy model, using default configuration: {e}") + print("Download spaCy model with: python -m spacy download en_core_web_sm") + self.analyzer = AnalyzerEngine() + self.anonymizer = AnonymizerEngine() + + def clean( + self, + risky_features: Optional[List[str]] = None, + interactive: bool = True, + scan_all_cells: bool = True + ) -> Tuple[pd.DataFrame, Dict[str, Any]]: + """ + Main cleaning method - detect and anonymize PII + + Args: + risky_features: List of column names flagged as risky (from RiskAnalyzer) + interactive: Whether to prompt user for ambiguous cases + scan_all_cells: Whether to scan cell contents for embedded PII + + Returns: + Tuple of (cleaned_df, audit_report) + """ + print("\n" + "="*70) + print("๐Ÿ”’ GDPR-COMPLIANT DATA CLEANING - PRESIDIO PII DETECTION") + print("="*70 + "\n") + + cleaned_df = self.df.copy() + + # Step 1: Detect PII in flagged columns and text fields + print("Step 1/4: Detecting PII using Presidio...") + pii_detections = self._detect_pii(cleaned_df, risky_features, scan_all_cells) + + if not pii_detections: + print("โœ“ No PII detected in dataset") + return cleaned_df, self._generate_audit_report(cleaned_df) + + # Step 2: Classify by risk level + print("\nStep 2/4: Classifying PII by risk level...") + risk_classification = self._classify_risk(pii_detections) + self._display_risk_summary(risk_classification) + + # Step 3: Apply anonymization strategies + print("\nStep 3/4: Applying anonymization strategies...") + for column, detections in pii_detections.items(): + cleaned_df = self._process_column( + cleaned_df, + column, + detections, + interactive + ) + + # Step 4: Generate audit report + print("\nStep 4/4: Generating audit report...") + audit_report = self._generate_audit_report(cleaned_df) + + print("\n" + "="*70) + print("โœ“ DATA CLEANING COMPLETED") + print("="*70 + "\n") + + return cleaned_df, audit_report + + def _detect_pii( + self, + df: pd.DataFrame, + risky_columns: Optional[List[str]], + scan_all_cells: bool + ) -> Dict[str, List[Dict]]: + """ + Detect PII at column and cell level + + Returns: + Dictionary mapping column names to list of detected entities + """ + pii_detections = defaultdict(list) + + # Determine which columns to scan + if risky_columns: + columns_to_scan = [col for col in risky_columns if col in df.columns] + else: + # Scan all text/object columns if no risky features specified + columns_to_scan = df.select_dtypes(include=['object']).columns.tolist() + + # Also scan all text columns if requested + if scan_all_cells: + text_columns = df.select_dtypes(include=['object']).columns.tolist() + columns_to_scan = list(set(columns_to_scan + text_columns)) + + print(f" Scanning {len(columns_to_scan)} columns: {columns_to_scan}") + + for column in columns_to_scan: + print(f" Analyzing '{column}'...", end=" ") + + # Sample values for analysis (avoid scanning millions of rows) + sample_values = df[column].dropna().astype(str).head(1000).tolist() + + if not sample_values: + print("(empty)") + continue + + # Combine sample values for batch analysis + combined_text = " | ".join(sample_values[:100]) # Limit to first 100 + + # Analyze with Presidio + results = self.analyzer.analyze( + text=combined_text, + language='en', + entities=None # Detect all entity types + ) + + if results: + # Aggregate by entity type + entity_summary = defaultdict(lambda: {'count': 0, 'scores': []}) + + for result in results: + entity_summary[result.entity_type]['count'] += 1 + entity_summary[result.entity_type]['scores'].append(result.score) + + # Store detection results + for entity_type, info in entity_summary.items(): + avg_confidence = np.mean(info['scores']) + pii_detections[column].append({ + 'entity_type': entity_type, + 'count': info['count'], + 'avg_confidence': avg_confidence, + 'max_confidence': max(info['scores']), + 'min_confidence': min(info['scores']) + }) + + detected_types = [d['entity_type'] for d in pii_detections[column]] + print(f"โœ“ Found: {', '.join(detected_types)}") + else: + print("(no PII)") + + return dict(pii_detections) + + def _classify_risk(self, pii_detections: Dict[str, List[Dict]]) -> Dict[str, Dict]: + """ + Classify detected PII by risk level + + Returns: + Dictionary with HIGH/MEDIUM/LOW risk classifications + """ + risk_classification = { + 'HIGH': defaultdict(list), + 'MEDIUM': defaultdict(list), + 'LOW': defaultdict(list), + 'UNKNOWN': defaultdict(list) + } + + for column, detections in pii_detections.items(): + for detection in detections: + entity_type = detection['entity_type'] + strategy = self.config.STRATEGY_MAP.get(entity_type, 'UNKNOWN') + risk_level = self.config.RISK_LEVELS.get(strategy, 'UNKNOWN') + + risk_classification[risk_level][column].append({ + 'entity_type': entity_type, + 'strategy': strategy, + 'confidence': detection['avg_confidence'], + 'count': detection['count'] + }) + + return risk_classification + + def _display_risk_summary(self, risk_classification: Dict[str, Dict]): + """Display risk summary to user""" + for risk_level in ['HIGH', 'MEDIUM', 'LOW', 'UNKNOWN']: + detections = risk_classification[risk_level] + if detections: + symbol = "๐Ÿ”ด" if risk_level == "HIGH" else "๐ŸŸก" if risk_level == "MEDIUM" else "๐ŸŸข" + print(f"\n {symbol} {risk_level} RISK:") + for column, entities in detections.items(): + entity_list = [f"{e['entity_type']} ({e['count']})" for e in entities] + print(f" - {column}: {', '.join(entity_list)}") + + def _process_column( + self, + df: pd.DataFrame, + column: str, + detections: List[Dict], + interactive: bool + ) -> pd.DataFrame: + """ + Process a single column with detected PII + + Args: + df: DataFrame to modify + column: Column name + detections: List of PII detections in this column + interactive: Whether to prompt user + + Returns: + Modified DataFrame + """ + # Determine strategies for each entity type + strategies = {} + needs_prompt = [] + + for detection in detections: + entity_type = detection['entity_type'] + confidence = detection['avg_confidence'] + default_strategy = self.config.STRATEGY_MAP.get(entity_type) + + # Decide if we need to prompt user + if confidence < self.config.MEDIUM_CONFIDENCE or default_strategy is None: + needs_prompt.append(detection) + else: + strategies[entity_type] = default_strategy + + # Interactive prompts for ambiguous cases + if interactive and needs_prompt: + print(f"\n โš ๏ธ Column '{column}' has ambiguous PII detections:") + for i, detection in enumerate(needs_prompt, 1): + print(f" {i}. {detection['entity_type']} " + f"(confidence: {detection['avg_confidence']:.2f}, " + f"count: {detection['count']})") + + strategy = self._prompt_user_strategy(column, needs_prompt) + for detection in needs_prompt: + strategies[detection['entity_type']] = strategy + + # Apply strategies + action_log = { + 'column': column, + 'detections': detections, + 'strategies': strategies, + 'examples': [] + } + + # Determine overall column strategy (most conservative) + if 'REMOVE' in strategies.values(): + # Remove entire column + df = df.drop(columns=[column]) + action_log['action'] = 'REMOVED_COLUMN' + action_log['reason'] = "Contains HIGH risk PII requiring removal" + print(f" โŒ Removed column '{column}' (HIGH risk PII)") + else: + # Anonymize cell-by-cell + original_values = df[column].copy() + df[column] = df[column].apply( + lambda x: self._anonymize_value(str(x), strategies) if pd.notna(x) else x + ) + + # Collect examples + action_log['examples'] = self._collect_examples(original_values, df[column], 5) + action_log['action'] = 'ANONYMIZED' + action_log['num_affected'] = (original_values != df[column]).sum() + + strategy_desc = ', '.join(set(strategies.values())) + print(f" โœ“ Anonymized column '{column}' using {strategy_desc}") + + self.cleaning_actions[column] = action_log + return df + + def _anonymize_value(self, value: str, strategies: Dict[str, str]) -> str: + """ + Anonymize a single cell value based on detected PII types + + Args: + value: Original value + strategies: Dictionary of entity_type -> strategy + + Returns: + Anonymized value + """ + if not value or value == 'nan': + return value + + # Analyze this specific value + results = self.analyzer.analyze(text=value, language='en') + + if not results: + return value # No PII detected + + # Apply anonymization using Presidio + anonymized_result = self.anonymizer.anonymize( + text=value, + analyzer_results=results, + operators=self._get_presidio_operators(strategies) + ) + + return anonymized_result.text + + def _get_presidio_operators(self, strategies: Dict[str, str]) -> Dict[str, OperatorConfig]: + """ + Convert our strategies to Presidio operators + + Args: + strategies: Dictionary of entity_type -> strategy + + Returns: + Dictionary of entity_type -> OperatorConfig + """ + operators = {} + + for entity_type, strategy in strategies.items(): + if strategy == 'HASH': + operators[entity_type] = OperatorConfig("hash", {"hash_type": "sha256"}) + elif strategy == 'MASK': + operators[entity_type] = OperatorConfig("mask", { + "masking_char": "*", + "chars_to_mask": 100, + "from_end": False + }) + elif strategy == 'GENERALIZE': + operators[entity_type] = OperatorConfig("replace", {"new_value": "[REDACTED]"}) + else: # REMOVE handled at column level + operators[entity_type] = OperatorConfig("replace", {"new_value": ""}) + + return operators + + def _prompt_user_strategy(self, column: str, detections: List[Dict]) -> str: + """ + Prompt user to choose anonymization strategy + + Args: + column: Column name + detections: List of ambiguous detections + + Returns: + Chosen strategy + """ + print(f"\n Choose strategy for column '{column}':") + print(" [1] REMOVE - Delete entire column (HIGH risk)") + print(" [2] HASH - One-way hash (MEDIUM risk, irreversible)") + print(" [3] MASK - Hide with *** (LOW risk, format preserved)") + print(" [4] KEEP - No changes (not recommended)") + + while True: + try: + choice = input("\n Choice (1-4): ").strip() + if choice == '1': + return 'REMOVE' + elif choice == '2': + return 'HASH' + elif choice == '3': + return 'MASK' + elif choice == '4': + return 'KEEP' + else: + print(" Invalid choice. Please enter 1-4.") + except Exception: + print(" Invalid input. Please enter 1-4.") + + def _collect_examples( + self, + original: pd.Series, + anonymized: pd.Series, + n: int = 5 + ) -> List[Dict[str, str]]: + """ + Collect example transformations for audit report + + Args: + original: Original values + anonymized: Anonymized values + n: Number of examples to collect + + Returns: + List of before/after examples + """ + examples = [] + changes = original != anonymized + changed_indices = original[changes].index[:n] + + for idx in changed_indices: + examples.append({ + 'before': str(original[idx])[:50], # Truncate long values + 'after': str(anonymized[idx])[:50] + }) + + return examples + + def _generate_audit_report(self, cleaned_df: pd.DataFrame) -> Dict[str, Any]: + """ + Generate comprehensive audit report + + Returns: + Detailed audit report with explanations + """ + report = { + 'metadata': { + 'timestamp': datetime.now().isoformat(), + 'original_rows': len(self.df), + 'original_columns': len(self.df.columns), + 'cleaned_rows': len(cleaned_df), + 'cleaned_columns': len(cleaned_df.columns), + 'presidio_version': 'enabled' if PRESIDIO_AVAILABLE else 'disabled' + }, + 'summary': { + 'columns_removed': [], + 'columns_anonymized': [], + 'total_cells_affected': 0 + }, + 'details': {}, + 'compliance': { + 'gdpr_articles_applied': set(), + 'risk_mitigation': {} + } + } + + # Process each action + for column, action_log in self.cleaning_actions.items(): + if action_log['action'] == 'REMOVED_COLUMN': + report['summary']['columns_removed'].append(column) + + # Build detailed entry + detail = { + 'action': 'REMOVED', + 'reason': action_log['reason'], + 'entity_types_found': [d['entity_type'] for d in action_log['detections']], + 'risk_level': 'HIGH', + 'presidio_metrics': { + 'detections': action_log['detections'] + }, + 'gdpr_compliance': self._get_gdpr_explanation(action_log['detections']) + } + + else: # ANONYMIZED + report['summary']['columns_anonymized'].append(column) + report['summary']['total_cells_affected'] += action_log.get('num_affected', 0) + + # Build detailed entry + detail = { + 'action': 'ANONYMIZED', + 'strategies_applied': list(set(action_log['strategies'].values())), + 'reason': self._explain_anonymization(action_log), + 'entity_types_found': [d['entity_type'] for d in action_log['detections']], + 'num_affected_rows': action_log.get('num_affected', 0), + 'percentage_affected': f"{(action_log.get('num_affected', 0) / len(self.df) * 100):.1f}%", + 'examples': action_log.get('examples', [])[:3], # Show top 3 + 'presidio_metrics': { + 'avg_confidence': np.mean([d['avg_confidence'] for d in action_log['detections']]), + 'detections': action_log['detections'] + }, + 'gdpr_compliance': self._get_gdpr_explanation(action_log['detections']) + } + + report['details'][column] = detail + + # Track GDPR articles + for gdpr_ref in detail['gdpr_compliance']: + report['compliance']['gdpr_articles_applied'].add(gdpr_ref) + + # Convert set to list for JSON serialization + report['compliance']['gdpr_articles_applied'] = list( + report['compliance']['gdpr_articles_applied'] + ) + + return report + + def _explain_anonymization(self, action_log: Dict) -> str: + """Generate human-readable explanation of anonymization""" + entity_types = [d['entity_type'] for d in action_log['detections']] + strategies = list(set(action_log['strategies'].values())) + + explanation = f"Contains {', '.join(entity_types)} entities. " + explanation += f"Applied {', '.join(strategies).lower()} anonymization to protect privacy." + + return explanation + + def _get_gdpr_explanation(self, detections: List[Dict]) -> List[str]: + """Get GDPR article references for detected entities""" + gdpr_refs = [] + + for detection in detections: + entity_type = detection['entity_type'] + if entity_type in self.config.GDPR_ARTICLE_MAPPING: + gdpr_refs.append(self.config.GDPR_ARTICLE_MAPPING[entity_type]) + + return list(set(gdpr_refs)) # Remove duplicates + + def save_cleaned_data(self, cleaned_df: pd.DataFrame, output_path: str) -> str: + """ + Save cleaned dataset to CSV + + Args: + cleaned_df: Cleaned DataFrame + output_path: Path to save file + + Returns: + Path to saved file + """ + cleaned_df.to_csv(output_path, index=False) + print(f"โœ“ Cleaned data saved to: {output_path}") + return output_path + + def save_audit_report(self, audit_report: Dict, output_path: str) -> str: + """ + Save audit report to JSON + + Args: + audit_report: Audit report dictionary + output_path: Path to save file + + Returns: + Path to saved file + """ + # Convert numpy types to native Python types for JSON serialization + import numpy as np + + def convert_numpy(obj): + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, dict): + return {key: convert_numpy(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [convert_numpy(item) for item in obj] + elif isinstance(obj, set): + return list(obj) + return obj + + audit_report = convert_numpy(audit_report) + + with open(output_path, 'w') as f: + json.dump(audit_report, f, indent=2) + print(f"โœ“ Audit report saved to: {output_path}") + return output_path + + def print_audit_summary(self, audit_report: Dict): + """ + Print human-readable audit summary + + Args: + audit_report: Audit report dictionary + """ + print("\n" + "="*70) + print("๐Ÿ“Š CLEANING AUDIT SUMMARY") + print("="*70) + + summary = audit_report['summary'] + metadata = audit_report['metadata'] + + print(f"\n๐Ÿ“ˆ Dataset Changes:") + print(f" Original: {metadata['original_rows']} rows ร— {metadata['original_columns']} columns") + print(f" Cleaned: {metadata['cleaned_rows']} rows ร— {metadata['cleaned_columns']} columns") + + if summary['columns_removed']: + print(f"\nโŒ Removed Columns ({len(summary['columns_removed'])}):") + for col in summary['columns_removed']: + print(f" - {col}") + + if summary['columns_anonymized']: + print(f"\n๐Ÿ”’ Anonymized Columns ({len(summary['columns_anonymized'])}):") + for col in summary['columns_anonymized']: + detail = audit_report['details'][col] + print(f" - {col}: {detail['num_affected_rows']} rows affected " + f"({detail['percentage_affected']})") + + print(f"\n๐Ÿ“ Total cells anonymized: {summary['total_cells_affected']}") + + print(f"\nโš–๏ธ GDPR Compliance:") + for article in audit_report['compliance']['gdpr_articles_applied']: + print(f" - {article}") + + print("\n" + "="*70 + "\n") + + +def main(): + """Example usage and testing""" + import sys + + if len(sys.argv) < 2: + print("Usage: python cleaning.py ") + print("Example: python cleaning.py Datasets/loan_data.csv") + sys.exit(1) + + data_path = sys.argv[1] + + # Load data + print(f"Loading data from {data_path}...") + df = pd.read_csv(data_path) + print(f"Loaded {len(df)} rows ร— {len(df.columns)} columns") + + # Initialize cleaner + cleaner = DataCleaner(df) + + # Run cleaning (interactive mode) + cleaned_df, audit_report = cleaner.clean( + risky_features=None, # Auto-detect + interactive=True, + scan_all_cells=True + ) + + # Save results + output_base = data_path.replace('.csv', '_cleaned') + cleaner.save_cleaned_data(cleaned_df, f"{output_base}.csv") + cleaner.save_audit_report(audit_report, f"{output_base}_audit.json") + + # Print summary + cleaner.print_audit_summary(audit_report) + + +if __name__ == '__main__': + main() diff --git a/cleaning_config.py b/cleaning_config.py new file mode 100644 index 0000000..8f6e645 --- /dev/null +++ b/cleaning_config.py @@ -0,0 +1,227 @@ +""" +Cleaning Configuration +Customize PII detection and anonymization strategies +""" + +# Anonymization Strategy Definitions +STRATEGIES = { + 'REMOVE': { + 'description': 'Delete entire column', + 'risk_level': 'HIGH', + 'reversible': False, + 'use_cases': ['Credit cards', 'SSN', 'Bank accounts'] + }, + 'HASH': { + 'description': 'One-way SHA-256 hash', + 'risk_level': 'MEDIUM', + 'reversible': False, + 'use_cases': ['Emails', 'Phone numbers', 'Names'] + }, + 'MASK': { + 'description': 'Replace with asterisks', + 'risk_level': 'LOW', + 'reversible': False, + 'use_cases': ['Partial identifiers', 'Locations'] + }, + 'GENERALIZE': { + 'description': 'Reduce precision', + 'risk_level': 'LOW', + 'reversible': False, + 'use_cases': ['Dates', 'Ages', 'ZIP codes'] + }, + 'KEEP': { + 'description': 'No changes', + 'risk_level': 'NONE', + 'reversible': True, + 'use_cases': ['Non-sensitive data'] + } +} + +# Entity Type to Strategy Mapping +# Customize these based on your compliance requirements +ENTITY_STRATEGY_MAP = { + # Financial Identifiers - HIGHEST RISK + 'CREDIT_CARD': 'REMOVE', + 'CRYPTO': 'REMOVE', + 'IBAN_CODE': 'REMOVE', + 'US_BANK_NUMBER': 'REMOVE', + + # Government IDs - HIGH RISK + 'US_SSN': 'REMOVE', + 'US_DRIVER_LICENSE': 'REMOVE', + 'US_PASSPORT': 'REMOVE', + 'US_ITIN': 'REMOVE', + 'UK_NHS': 'REMOVE', + 'SG_NRIC_FIN': 'REMOVE', + 'IN_PAN': 'REMOVE', + + # Health Information - HIGH RISK (GDPR Art. 9) + 'MEDICAL_LICENSE': 'REMOVE', + + # Contact Information - MEDIUM RISK + 'EMAIL_ADDRESS': 'HASH', + 'PHONE_NUMBER': 'HASH', + 'URL': 'HASH', + + # Personal Identifiers - MEDIUM RISK + 'PERSON': 'HASH', # Names + 'IP_ADDRESS': 'HASH', + + # Geographic Information - LOW RISK + 'LOCATION': 'MASK', + 'US_ZIP_CODE': 'GENERALIZE', + + # Temporal Information - LOW RISK + 'DATE_TIME': 'GENERALIZE', + + # Special Categories - MEDIUM RISK (GDPR Art. 9) + 'NRP': 'HASH', # Nationality, religious, political views + + # Business Identifiers - LOW RISK + 'AU_ABN': 'HASH', + 'AU_ACN': 'HASH', + 'AU_TFN': 'HASH', +} + +# Confidence Score Thresholds +CONFIDENCE_THRESHOLDS = { + 'HIGH': 0.85, # Auto-apply strategy + 'MEDIUM': 0.60, # Prompt user in interactive mode + 'LOW': 0.40, # Treat as potential false positive +} + +# GDPR Article Mappings +GDPR_COMPLIANCE = { + 'CREDIT_CARD': 'Art. 4(1) - Personal data identifier', + 'US_SSN': 'Art. 4(1) - Personal data identifier', + 'US_BANK_NUMBER': 'Art. 4(1) - Personal data identifier', + 'EMAIL_ADDRESS': 'Art. 4(1) - Personal data identifier', + 'PHONE_NUMBER': 'Art. 4(1) - Personal data identifier', + 'PERSON': 'Art. 4(1) - Personal data (name)', + 'LOCATION': 'Art. 4(1) - Personal data (location)', + 'IP_ADDRESS': 'Art. 4(1) - Online identifier', + 'MEDICAL_LICENSE': 'Art. 9(1) - Special category data (health)', + 'NRP': 'Art. 9(1) - Special category data (political/religious views)', + 'DATE_TIME': 'Art. 4(1) - Personal data (temporal information)', +} + +# Presidio Analyzer Settings +PRESIDIO_CONFIG = { + 'language': 'en', + 'score_threshold': 0.5, # Minimum confidence to report + 'entities': None, # None = detect all, or specify list like ['EMAIL_ADDRESS', 'PHONE_NUMBER'] + 'allow_list': [], # Terms to ignore (e.g., company names that look like PII) +} + +# Custom Recognizers (domain-specific patterns) +# Add patterns specific to your industry/use case +CUSTOM_PATTERNS = { + 'LOAN_ID': { + 'pattern': r'LN\d{8}', + 'score': 0.9, + 'strategy': 'HASH' + }, + 'EMPLOYEE_ID': { + 'pattern': r'EMP\d{6}', + 'score': 0.9, + 'strategy': 'HASH' + }, + 'ACCOUNT_NUMBER': { + 'pattern': r'ACC\d{10}', + 'score': 0.95, + 'strategy': 'REMOVE' + } +} + +# Column Name Heuristics +# Auto-flag columns based on name patterns +RISKY_COLUMN_PATTERNS = [ + r'.*email.*', + r'.*phone.*', + r'.*ssn.*', + r'.*social.*security.*', + r'.*credit.*card.*', + r'.*passport.*', + r'.*license.*', + r'.*address.*', + r'.*ip.*addr.*', +] + +# Protected Attributes Configuration +# These are needed for bias analysis but may contain PII +PROTECTED_ATTRIBUTES_HANDLING = { + 'default_strategy': 'KEEP', # Keep for bias analysis + 'warn_user': True, # Warn about privacy implications + 'alternative': 'Use generalization (e.g., age_group instead of exact age)' +} + +# Audit Report Settings +AUDIT_CONFIG = { + 'include_examples': True, + 'max_examples_per_column': 3, + 'truncate_values': 50, # Max characters to show in examples + 'include_presidio_metrics': True, + 'include_gdpr_references': True, + 'include_recommendations': True +} + +# Performance Settings +PERFORMANCE_CONFIG = { + 'sample_size_for_detection': 1000, # Max rows to analyze per column + 'batch_size': 100, # Rows to process per batch + 'enable_parallel': False, # Future: parallel column processing +} + +# Output Settings +OUTPUT_CONFIG = { + 'cleaned_suffix': '_cleaned', + 'audit_suffix': '_audit', + 'format': 'csv', # Future: support parquet, json + 'compression': None, # Future: gzip, bz2 +} + + +def get_strategy_for_entity(entity_type: str) -> str: + """ + Get anonymization strategy for an entity type + + Args: + entity_type: Presidio entity type (e.g., 'EMAIL_ADDRESS') + + Returns: + Strategy name (e.g., 'HASH') + """ + return ENTITY_STRATEGY_MAP.get(entity_type, 'HASH') # Default to HASH if unknown + + +def get_risk_level(strategy: str) -> str: + """ + Get risk level for a strategy + + Args: + strategy: Strategy name (e.g., 'HASH') + + Returns: + Risk level (e.g., 'MEDIUM') + """ + return STRATEGIES.get(strategy, {}).get('risk_level', 'UNKNOWN') + + +def is_high_confidence(score: float) -> bool: + """Check if confidence score is high enough for auto-processing""" + return score >= CONFIDENCE_THRESHOLDS['HIGH'] + + +def is_medium_confidence(score: float) -> bool: + """Check if confidence score requires user confirmation""" + return CONFIDENCE_THRESHOLDS['MEDIUM'] <= score < CONFIDENCE_THRESHOLDS['HIGH'] + + +def is_low_confidence(score: float) -> bool: + """Check if confidence score might be false positive""" + return score < CONFIDENCE_THRESHOLDS['MEDIUM'] + + +# Example usage in cleaning.py: +# from cleaning_config import ENTITY_STRATEGY_MAP, get_strategy_for_entity +# strategy = get_strategy_for_entity('EMAIL_ADDRESS') # Returns 'HASH' diff --git a/test_cleaning.py b/test_cleaning.py new file mode 100644 index 0000000..c711adb --- /dev/null +++ b/test_cleaning.py @@ -0,0 +1,268 @@ +""" +Test script for data cleaning module +Demonstrates PII detection and anonymization on loan dataset +""" + +import pandas as pd +import sys +import os + +# Add parent directory to path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from cleaning import DataCleaner, CleaningConfig + + +def test_basic_cleaning(): + """Test basic cleaning functionality""" + print("\n" + "="*70) + print("TEST 1: Basic PII Detection on Loan Dataset") + print("="*70) + + # Load loan data + df = pd.read_csv('Datasets/loan_data.csv') + print(f"\nโœ“ Loaded dataset: {len(df)} rows ร— {len(df.columns)} columns") + print(f" Columns: {list(df.columns)}") + + # Initialize cleaner + cleaner = DataCleaner(df) + + # Run cleaning in non-interactive mode (auto-apply strategies) + print("\n๐Ÿ” Running PII detection...") + cleaned_df, audit_report = cleaner.clean( + risky_features=None, # Auto-detect all + interactive=False, # Non-interactive for testing + scan_all_cells=True + ) + + # Display results + cleaner.print_audit_summary(audit_report) + + return cleaned_df, audit_report + + +def test_with_risky_features(): + """Test cleaning with specific risky features flagged""" + print("\n" + "="*70) + print("TEST 2: Cleaning with Pre-Flagged Risky Features") + print("="*70) + + # Load loan data + df = pd.read_csv('Datasets/loan_data.csv') + + # Simulate risky features from RiskAnalyzer + risky_features = ['person_education', 'loan_intent', 'person_home_ownership'] + + print(f"\nโš ๏ธ Risky features flagged by RiskAnalyzer: {risky_features}") + + # Initialize cleaner + cleaner = DataCleaner(df) + + # Run cleaning on flagged features only + cleaned_df, audit_report = cleaner.clean( + risky_features=risky_features, + interactive=False, + scan_all_cells=False # Only scan risky columns + ) + + # Display results + cleaner.print_audit_summary(audit_report) + + return cleaned_df, audit_report + + +def test_with_synthetic_pii(): + """Test with synthetic PII data""" + print("\n" + "="*70) + print("TEST 3: Synthetic PII Detection") + print("="*70) + + # Create test DataFrame with obvious PII + test_data = pd.DataFrame({ + 'customer_id': [1, 2, 3, 4, 5], + 'email': [ + 'john.doe@example.com', + 'alice.smith@company.org', + 'bob.jones@email.com', + 'carol.white@test.net', + 'dave.brown@sample.com' + ], + 'phone': [ + '+1-555-123-4567', + '555-234-5678', + '(555) 345-6789', + '555.456.7890', + '5555678901' + ], + 'ssn': [ + '123-45-6789', + '234-56-7890', + '345-67-8901', + '456-78-9012', + '567-89-0123' + ], + 'notes': [ + 'Customer called from 192.168.1.1', + 'Contact via email: test@example.com', + 'SSN verified: 111-22-3333', + 'Previous address: 123 Main St, Boston', + 'Phone backup: 555-999-8888' + ], + 'amount': [1000, 2000, 1500, 3000, 2500] + }) + + print(f"\nโœ“ Created synthetic dataset with PII:") + print(test_data.head()) + + # Initialize cleaner + cleaner = DataCleaner(test_data) + + # Run cleaning + cleaned_df, audit_report = cleaner.clean( + risky_features=None, + interactive=False, + scan_all_cells=True + ) + + print("\n๐Ÿ”’ Cleaned dataset:") + print(cleaned_df.head()) + + # Display results + cleaner.print_audit_summary(audit_report) + + # Save outputs + os.makedirs('output', exist_ok=True) + cleaner.save_cleaned_data(cleaned_df, 'output/synthetic_cleaned.csv') + cleaner.save_audit_report(audit_report, 'output/synthetic_audit.json') + + return cleaned_df, audit_report + + +def test_interactive_mode(): + """Test interactive mode (requires user input)""" + print("\n" + "="*70) + print("TEST 4: Interactive Mode (Manual Decisions)") + print("="*70) + + # Create ambiguous test data + test_data = pd.DataFrame({ + 'id': [1, 2, 3], + 'description': [ + 'Customer from Paris contacted us', # Paris = location or name? + 'Spoke with Jordan about the account', # Jordan = location or name? + 'Meeting scheduled for March 15th' # Date + ], + 'value': [100, 200, 300] + }) + + print(f"\nโœ“ Created dataset with ambiguous PII:") + print(test_data) + + print("\nโš ๏ธ This test requires user input for ambiguous cases.") + print(" You'll be prompted to choose anonymization strategies.") + + proceed = input("\nProceed with interactive test? (y/n): ").strip().lower() + + if proceed == 'y': + cleaner = DataCleaner(test_data) + cleaned_df, audit_report = cleaner.clean( + risky_features=None, + interactive=True, # Enable interactive prompts + scan_all_cells=True + ) + + print("\n๐Ÿ”’ Cleaned dataset:") + print(cleaned_df) + + cleaner.print_audit_summary(audit_report) + else: + print(" Skipped interactive test.") + + +def demonstrate_integration_with_analysis(): + """Demonstrate how cleaning integrates with AI governance pipeline""" + print("\n" + "="*70) + print("INTEGRATION DEMO: Cleaning โ†’ Analysis Workflow") + print("="*70) + + # Load data + df = pd.read_csv('Datasets/loan_data.csv') + + print("\n๐Ÿ“Š Workflow:") + print(" 1. Original dataset โ†’ Risk Analysis") + print(" 2. Risk Analysis โ†’ Identifies risky features") + print(" 3. Risky features โ†’ Data Cleaning (this step)") + print(" 4. Cleaned dataset โ†’ Re-run Analysis (optional)") + + # Simulate risky features from analysis + simulated_risky_features = ['person_education', 'loan_intent'] + + print(f"\nโš ๏ธ Step 2 Output (simulated): Risky features = {simulated_risky_features}") + + # Step 3: Clean data + print("\n๐Ÿ”’ Step 3: Cleaning risky features...") + cleaner = DataCleaner(df) + cleaned_df, audit_report = cleaner.clean( + risky_features=simulated_risky_features, + interactive=False, + scan_all_cells=False + ) + + # Save both datasets + os.makedirs('output', exist_ok=True) + df.to_csv('output/loan_data_original.csv', index=False) + cleaner.save_cleaned_data(cleaned_df, 'output/loan_data_cleaned.csv') + cleaner.save_audit_report(audit_report, 'output/cleaning_audit.json') + + print("\n๐Ÿ’พ Saved files:") + print(" - output/loan_data_original.csv (original)") + print(" - output/loan_data_cleaned.csv (cleaned)") + print(" - output/cleaning_audit.json (audit report)") + + print("\n๐Ÿ“ˆ Step 4: User can now choose which dataset to analyze:") + print(" Option A: Analyze cleaned dataset (privacy-compliant)") + print(" Option B: Analyze original dataset (for comparison)") + print(" Option C: Analyze both and compare results") + + cleaner.print_audit_summary(audit_report) + + +def main(): + """Run all tests""" + print("\n" + "="*70) + print("๐Ÿงช DATA CLEANING MODULE - TEST SUITE") + print("="*70) + + print("\nAvailable tests:") + print(" 1. Basic PII detection on loan dataset") + print(" 2. Cleaning with pre-flagged risky features") + print(" 3. Synthetic PII detection (comprehensive)") + print(" 4. Interactive mode (requires user input)") + print(" 5. Integration workflow demonstration") + print(" 6. Run all non-interactive tests") + + choice = input("\nSelect test (1-6): ").strip() + + if choice == '1': + test_basic_cleaning() + elif choice == '2': + test_with_risky_features() + elif choice == '3': + test_with_synthetic_pii() + elif choice == '4': + test_interactive_mode() + elif choice == '5': + demonstrate_integration_with_analysis() + elif choice == '6': + print("\n๐Ÿƒ Running all non-interactive tests...\n") + test_basic_cleaning() + test_with_risky_features() + test_with_synthetic_pii() + demonstrate_integration_with_analysis() + print("\nโœ… All tests completed!") + else: + print("Invalid choice. Run: python test_cleaning.py") + + +if __name__ == '__main__': + main()