Merge pull request #2 from dionjoshualobo/main

Cleaning.py added
This commit is contained in:
ShovinDsouza
2025-11-07 05:21:33 +05:30
committed by GitHub
3 changed files with 1235 additions and 0 deletions

740
cleaning.py Normal file
View File

@@ -0,0 +1,740 @@
"""
Data Cleaning Module - PII Detection and Anonymization
Handles GDPR-compliant data cleaning using Presidio for PII detection
"""
import pandas as pd
import numpy as np
import hashlib
import json
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any
from collections import defaultdict
try:
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
PRESIDIO_AVAILABLE = True
except ImportError:
PRESIDIO_AVAILABLE = False
print("Warning: Presidio not installed. Run: pip install presidio-analyzer presidio-anonymizer")
class CleaningConfig:
"""Configuration for data cleaning strategies"""
# Anonymization strategy mapping based on entity type and risk level
STRATEGY_MAP = {
# HIGH RISK - Remove completely (sensitive financial/identity data)
"CREDIT_CARD": "REMOVE",
"CRYPTO": "REMOVE",
"IBAN_CODE": "REMOVE",
"US_SSN": "REMOVE",
"US_BANK_NUMBER": "REMOVE",
"US_DRIVER_LICENSE": "REMOVE",
"US_PASSPORT": "REMOVE",
"MEDICAL_LICENSE": "REMOVE",
# MEDIUM RISK - Hash (deterministic, irreversible)
"EMAIL_ADDRESS": "HASH",
"PHONE_NUMBER": "HASH",
"PERSON": "HASH", # Names
"URL": "HASH",
"IP_ADDRESS": "HASH",
"AU_ABN": "HASH",
"AU_ACN": "HASH",
"AU_TFN": "HASH",
# LOW RISK - Mask (keep format, hide details)
"LOCATION": "MASK",
"DATE_TIME": "GENERALIZE",
"NRP": "MASK", # Nationality/religious/political
"US_ITIN": "MASK",
# Numeric identifiers - depends on context
"UK_NHS": "HASH",
"SG_NRIC_FIN": "HASH",
"IN_PAN": "HASH",
}
# Confidence thresholds
HIGH_CONFIDENCE = 0.85
MEDIUM_CONFIDENCE = 0.60
# Risk levels
RISK_LEVELS = {
"REMOVE": "HIGH",
"HASH": "MEDIUM",
"MASK": "LOW",
"GENERALIZE": "LOW"
}
# GDPR compliance mapping
GDPR_ARTICLE_MAPPING = {
"CREDIT_CARD": "Art. 4(1) - Personal data identifier",
"US_SSN": "Art. 4(1) - Personal data identifier",
"EMAIL_ADDRESS": "Art. 4(1) - Personal data identifier",
"PHONE_NUMBER": "Art. 4(1) - Personal data identifier",
"PERSON": "Art. 4(1) - Personal data (name)",
"LOCATION": "Art. 4(1) - Personal data (location)",
"IP_ADDRESS": "Art. 4(1) - Online identifier",
"MEDICAL_LICENSE": "Art. 9(1) - Special category data (health)",
"NRP": "Art. 9(1) - Special category data (political/religious views)",
}
class DataCleaner:
"""
Main class for detecting and anonymizing PII in datasets
Example:
>>> cleaner = DataCleaner(df)
>>> cleaned_df, audit_report = cleaner.clean(
... risky_features=['email', 'phone'],
... interactive=True
... )
"""
def __init__(self, df: pd.DataFrame, config: Optional[CleaningConfig] = None):
"""
Initialize the data cleaner
Args:
df: Input DataFrame to clean
config: Optional custom configuration
"""
self.df = df.copy()
self.config = config or CleaningConfig()
self.audit_log = []
self.cleaning_actions = {}
# Initialize Presidio engines
if PRESIDIO_AVAILABLE:
self._init_presidio()
else:
raise ImportError(
"Presidio is required for data cleaning. "
"Install with: pip install presidio-analyzer presidio-anonymizer"
)
def _init_presidio(self):
"""Initialize Presidio analyzer and anonymizer engines"""
# Create NLP engine configuration
configuration = {
"nlp_engine_name": "spacy",
"models": [{"lang_code": "en", "model_name": "en_core_web_sm"}],
}
try:
# Create NLP engine
provider = NlpEngineProvider(nlp_configuration=configuration)
nlp_engine = provider.create_engine()
# Create analyzer with NLP engine
self.analyzer = AnalyzerEngine(nlp_engine=nlp_engine)
self.anonymizer = AnonymizerEngine()
print("✓ Presidio engines initialized successfully")
except Exception as e:
# Fallback to default configuration if spaCy model not available
print(f"Warning: Could not load spaCy model, using default configuration: {e}")
print("Download spaCy model with: python -m spacy download en_core_web_sm")
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
def clean(
self,
risky_features: Optional[List[str]] = None,
interactive: bool = True,
scan_all_cells: bool = True
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
Main cleaning method - detect and anonymize PII
Args:
risky_features: List of column names flagged as risky (from RiskAnalyzer)
interactive: Whether to prompt user for ambiguous cases
scan_all_cells: Whether to scan cell contents for embedded PII
Returns:
Tuple of (cleaned_df, audit_report)
"""
print("\n" + "="*70)
print("🔒 GDPR-COMPLIANT DATA CLEANING - PRESIDIO PII DETECTION")
print("="*70 + "\n")
cleaned_df = self.df.copy()
# Step 1: Detect PII in flagged columns and text fields
print("Step 1/4: Detecting PII using Presidio...")
pii_detections = self._detect_pii(cleaned_df, risky_features, scan_all_cells)
if not pii_detections:
print("✓ No PII detected in dataset")
return cleaned_df, self._generate_audit_report(cleaned_df)
# Step 2: Classify by risk level
print("\nStep 2/4: Classifying PII by risk level...")
risk_classification = self._classify_risk(pii_detections)
self._display_risk_summary(risk_classification)
# Step 3: Apply anonymization strategies
print("\nStep 3/4: Applying anonymization strategies...")
for column, detections in pii_detections.items():
cleaned_df = self._process_column(
cleaned_df,
column,
detections,
interactive
)
# Step 4: Generate audit report
print("\nStep 4/4: Generating audit report...")
audit_report = self._generate_audit_report(cleaned_df)
print("\n" + "="*70)
print("✓ DATA CLEANING COMPLETED")
print("="*70 + "\n")
return cleaned_df, audit_report
def _detect_pii(
self,
df: pd.DataFrame,
risky_columns: Optional[List[str]],
scan_all_cells: bool
) -> Dict[str, List[Dict]]:
"""
Detect PII at column and cell level
Returns:
Dictionary mapping column names to list of detected entities
"""
pii_detections = defaultdict(list)
# Determine which columns to scan
if risky_columns:
columns_to_scan = [col for col in risky_columns if col in df.columns]
else:
# Scan all text/object columns if no risky features specified
columns_to_scan = df.select_dtypes(include=['object']).columns.tolist()
# Also scan all text columns if requested
if scan_all_cells:
text_columns = df.select_dtypes(include=['object']).columns.tolist()
columns_to_scan = list(set(columns_to_scan + text_columns))
print(f" Scanning {len(columns_to_scan)} columns: {columns_to_scan}")
for column in columns_to_scan:
print(f" Analyzing '{column}'...", end=" ")
# Sample values for analysis (avoid scanning millions of rows)
sample_values = df[column].dropna().astype(str).head(1000).tolist()
if not sample_values:
print("(empty)")
continue
# Combine sample values for batch analysis
combined_text = " | ".join(sample_values[:100]) # Limit to first 100
# Analyze with Presidio
results = self.analyzer.analyze(
text=combined_text,
language='en',
entities=None # Detect all entity types
)
if results:
# Aggregate by entity type
entity_summary = defaultdict(lambda: {'count': 0, 'scores': []})
for result in results:
entity_summary[result.entity_type]['count'] += 1
entity_summary[result.entity_type]['scores'].append(result.score)
# Store detection results
for entity_type, info in entity_summary.items():
avg_confidence = np.mean(info['scores'])
pii_detections[column].append({
'entity_type': entity_type,
'count': info['count'],
'avg_confidence': avg_confidence,
'max_confidence': max(info['scores']),
'min_confidence': min(info['scores'])
})
detected_types = [d['entity_type'] for d in pii_detections[column]]
print(f"✓ Found: {', '.join(detected_types)}")
else:
print("(no PII)")
return dict(pii_detections)
def _classify_risk(self, pii_detections: Dict[str, List[Dict]]) -> Dict[str, Dict]:
"""
Classify detected PII by risk level
Returns:
Dictionary with HIGH/MEDIUM/LOW risk classifications
"""
risk_classification = {
'HIGH': defaultdict(list),
'MEDIUM': defaultdict(list),
'LOW': defaultdict(list),
'UNKNOWN': defaultdict(list)
}
for column, detections in pii_detections.items():
for detection in detections:
entity_type = detection['entity_type']
strategy = self.config.STRATEGY_MAP.get(entity_type, 'UNKNOWN')
risk_level = self.config.RISK_LEVELS.get(strategy, 'UNKNOWN')
risk_classification[risk_level][column].append({
'entity_type': entity_type,
'strategy': strategy,
'confidence': detection['avg_confidence'],
'count': detection['count']
})
return risk_classification
def _display_risk_summary(self, risk_classification: Dict[str, Dict]):
"""Display risk summary to user"""
for risk_level in ['HIGH', 'MEDIUM', 'LOW', 'UNKNOWN']:
detections = risk_classification[risk_level]
if detections:
symbol = "🔴" if risk_level == "HIGH" else "🟡" if risk_level == "MEDIUM" else "🟢"
print(f"\n {symbol} {risk_level} RISK:")
for column, entities in detections.items():
entity_list = [f"{e['entity_type']} ({e['count']})" for e in entities]
print(f" - {column}: {', '.join(entity_list)}")
def _process_column(
self,
df: pd.DataFrame,
column: str,
detections: List[Dict],
interactive: bool
) -> pd.DataFrame:
"""
Process a single column with detected PII
Args:
df: DataFrame to modify
column: Column name
detections: List of PII detections in this column
interactive: Whether to prompt user
Returns:
Modified DataFrame
"""
# Determine strategies for each entity type
strategies = {}
needs_prompt = []
for detection in detections:
entity_type = detection['entity_type']
confidence = detection['avg_confidence']
default_strategy = self.config.STRATEGY_MAP.get(entity_type)
# Decide if we need to prompt user
if confidence < self.config.MEDIUM_CONFIDENCE or default_strategy is None:
needs_prompt.append(detection)
else:
strategies[entity_type] = default_strategy
# Interactive prompts for ambiguous cases
if interactive and needs_prompt:
print(f"\n ⚠️ Column '{column}' has ambiguous PII detections:")
for i, detection in enumerate(needs_prompt, 1):
print(f" {i}. {detection['entity_type']} "
f"(confidence: {detection['avg_confidence']:.2f}, "
f"count: {detection['count']})")
strategy = self._prompt_user_strategy(column, needs_prompt)
for detection in needs_prompt:
strategies[detection['entity_type']] = strategy
# Apply strategies
action_log = {
'column': column,
'detections': detections,
'strategies': strategies,
'examples': []
}
# Determine overall column strategy (most conservative)
if 'REMOVE' in strategies.values():
# Remove entire column
df = df.drop(columns=[column])
action_log['action'] = 'REMOVED_COLUMN'
action_log['reason'] = "Contains HIGH risk PII requiring removal"
print(f" ❌ Removed column '{column}' (HIGH risk PII)")
else:
# Anonymize cell-by-cell
original_values = df[column].copy()
df[column] = df[column].apply(
lambda x: self._anonymize_value(str(x), strategies) if pd.notna(x) else x
)
# Collect examples
action_log['examples'] = self._collect_examples(original_values, df[column], 5)
action_log['action'] = 'ANONYMIZED'
action_log['num_affected'] = (original_values != df[column]).sum()
strategy_desc = ', '.join(set(strategies.values()))
print(f" ✓ Anonymized column '{column}' using {strategy_desc}")
self.cleaning_actions[column] = action_log
return df
def _anonymize_value(self, value: str, strategies: Dict[str, str]) -> str:
"""
Anonymize a single cell value based on detected PII types
Args:
value: Original value
strategies: Dictionary of entity_type -> strategy
Returns:
Anonymized value
"""
if not value or value == 'nan':
return value
# Analyze this specific value
results = self.analyzer.analyze(text=value, language='en')
if not results:
return value # No PII detected
# Apply anonymization using Presidio
anonymized_result = self.anonymizer.anonymize(
text=value,
analyzer_results=results,
operators=self._get_presidio_operators(strategies)
)
return anonymized_result.text
def _get_presidio_operators(self, strategies: Dict[str, str]) -> Dict[str, OperatorConfig]:
"""
Convert our strategies to Presidio operators
Args:
strategies: Dictionary of entity_type -> strategy
Returns:
Dictionary of entity_type -> OperatorConfig
"""
operators = {}
for entity_type, strategy in strategies.items():
if strategy == 'HASH':
operators[entity_type] = OperatorConfig("hash", {"hash_type": "sha256"})
elif strategy == 'MASK':
operators[entity_type] = OperatorConfig("mask", {
"masking_char": "*",
"chars_to_mask": 100,
"from_end": False
})
elif strategy == 'GENERALIZE':
operators[entity_type] = OperatorConfig("replace", {"new_value": "[REDACTED]"})
else: # REMOVE handled at column level
operators[entity_type] = OperatorConfig("replace", {"new_value": ""})
return operators
def _prompt_user_strategy(self, column: str, detections: List[Dict]) -> str:
"""
Prompt user to choose anonymization strategy
Args:
column: Column name
detections: List of ambiguous detections
Returns:
Chosen strategy
"""
print(f"\n Choose strategy for column '{column}':")
print(" [1] REMOVE - Delete entire column (HIGH risk)")
print(" [2] HASH - One-way hash (MEDIUM risk, irreversible)")
print(" [3] MASK - Hide with *** (LOW risk, format preserved)")
print(" [4] KEEP - No changes (not recommended)")
while True:
try:
choice = input("\n Choice (1-4): ").strip()
if choice == '1':
return 'REMOVE'
elif choice == '2':
return 'HASH'
elif choice == '3':
return 'MASK'
elif choice == '4':
return 'KEEP'
else:
print(" Invalid choice. Please enter 1-4.")
except Exception:
print(" Invalid input. Please enter 1-4.")
def _collect_examples(
self,
original: pd.Series,
anonymized: pd.Series,
n: int = 5
) -> List[Dict[str, str]]:
"""
Collect example transformations for audit report
Args:
original: Original values
anonymized: Anonymized values
n: Number of examples to collect
Returns:
List of before/after examples
"""
examples = []
changes = original != anonymized
changed_indices = original[changes].index[:n]
for idx in changed_indices:
examples.append({
'before': str(original[idx])[:50], # Truncate long values
'after': str(anonymized[idx])[:50]
})
return examples
def _generate_audit_report(self, cleaned_df: pd.DataFrame) -> Dict[str, Any]:
"""
Generate comprehensive audit report
Returns:
Detailed audit report with explanations
"""
report = {
'metadata': {
'timestamp': datetime.now().isoformat(),
'original_rows': len(self.df),
'original_columns': len(self.df.columns),
'cleaned_rows': len(cleaned_df),
'cleaned_columns': len(cleaned_df.columns),
'presidio_version': 'enabled' if PRESIDIO_AVAILABLE else 'disabled'
},
'summary': {
'columns_removed': [],
'columns_anonymized': [],
'total_cells_affected': 0
},
'details': {},
'compliance': {
'gdpr_articles_applied': set(),
'risk_mitigation': {}
}
}
# Process each action
for column, action_log in self.cleaning_actions.items():
if action_log['action'] == 'REMOVED_COLUMN':
report['summary']['columns_removed'].append(column)
# Build detailed entry
detail = {
'action': 'REMOVED',
'reason': action_log['reason'],
'entity_types_found': [d['entity_type'] for d in action_log['detections']],
'risk_level': 'HIGH',
'presidio_metrics': {
'detections': action_log['detections']
},
'gdpr_compliance': self._get_gdpr_explanation(action_log['detections'])
}
else: # ANONYMIZED
report['summary']['columns_anonymized'].append(column)
report['summary']['total_cells_affected'] += action_log.get('num_affected', 0)
# Build detailed entry
detail = {
'action': 'ANONYMIZED',
'strategies_applied': list(set(action_log['strategies'].values())),
'reason': self._explain_anonymization(action_log),
'entity_types_found': [d['entity_type'] for d in action_log['detections']],
'num_affected_rows': action_log.get('num_affected', 0),
'percentage_affected': f"{(action_log.get('num_affected', 0) / len(self.df) * 100):.1f}%",
'examples': action_log.get('examples', [])[:3], # Show top 3
'presidio_metrics': {
'avg_confidence': np.mean([d['avg_confidence'] for d in action_log['detections']]),
'detections': action_log['detections']
},
'gdpr_compliance': self._get_gdpr_explanation(action_log['detections'])
}
report['details'][column] = detail
# Track GDPR articles
for gdpr_ref in detail['gdpr_compliance']:
report['compliance']['gdpr_articles_applied'].add(gdpr_ref)
# Convert set to list for JSON serialization
report['compliance']['gdpr_articles_applied'] = list(
report['compliance']['gdpr_articles_applied']
)
return report
def _explain_anonymization(self, action_log: Dict) -> str:
"""Generate human-readable explanation of anonymization"""
entity_types = [d['entity_type'] for d in action_log['detections']]
strategies = list(set(action_log['strategies'].values()))
explanation = f"Contains {', '.join(entity_types)} entities. "
explanation += f"Applied {', '.join(strategies).lower()} anonymization to protect privacy."
return explanation
def _get_gdpr_explanation(self, detections: List[Dict]) -> List[str]:
"""Get GDPR article references for detected entities"""
gdpr_refs = []
for detection in detections:
entity_type = detection['entity_type']
if entity_type in self.config.GDPR_ARTICLE_MAPPING:
gdpr_refs.append(self.config.GDPR_ARTICLE_MAPPING[entity_type])
return list(set(gdpr_refs)) # Remove duplicates
def save_cleaned_data(self, cleaned_df: pd.DataFrame, output_path: str) -> str:
"""
Save cleaned dataset to CSV
Args:
cleaned_df: Cleaned DataFrame
output_path: Path to save file
Returns:
Path to saved file
"""
cleaned_df.to_csv(output_path, index=False)
print(f"✓ Cleaned data saved to: {output_path}")
return output_path
def save_audit_report(self, audit_report: Dict, output_path: str) -> str:
"""
Save audit report to JSON
Args:
audit_report: Audit report dictionary
output_path: Path to save file
Returns:
Path to saved file
"""
# Convert numpy types to native Python types for JSON serialization
import numpy as np
def convert_numpy(obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, dict):
return {key: convert_numpy(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [convert_numpy(item) for item in obj]
elif isinstance(obj, set):
return list(obj)
return obj
audit_report = convert_numpy(audit_report)
with open(output_path, 'w') as f:
json.dump(audit_report, f, indent=2)
print(f"✓ Audit report saved to: {output_path}")
return output_path
def print_audit_summary(self, audit_report: Dict):
"""
Print human-readable audit summary
Args:
audit_report: Audit report dictionary
"""
print("\n" + "="*70)
print("📊 CLEANING AUDIT SUMMARY")
print("="*70)
summary = audit_report['summary']
metadata = audit_report['metadata']
print(f"\n📈 Dataset Changes:")
print(f" Original: {metadata['original_rows']} rows × {metadata['original_columns']} columns")
print(f" Cleaned: {metadata['cleaned_rows']} rows × {metadata['cleaned_columns']} columns")
if summary['columns_removed']:
print(f"\n❌ Removed Columns ({len(summary['columns_removed'])}):")
for col in summary['columns_removed']:
print(f" - {col}")
if summary['columns_anonymized']:
print(f"\n🔒 Anonymized Columns ({len(summary['columns_anonymized'])}):")
for col in summary['columns_anonymized']:
detail = audit_report['details'][col]
print(f" - {col}: {detail['num_affected_rows']} rows affected "
f"({detail['percentage_affected']})")
print(f"\n📝 Total cells anonymized: {summary['total_cells_affected']}")
print(f"\n⚖️ GDPR Compliance:")
for article in audit_report['compliance']['gdpr_articles_applied']:
print(f" - {article}")
print("\n" + "="*70 + "\n")
def main():
"""Example usage and testing"""
import sys
if len(sys.argv) < 2:
print("Usage: python cleaning.py <data_file.csv>")
print("Example: python cleaning.py Datasets/loan_data.csv")
sys.exit(1)
data_path = sys.argv[1]
# Load data
print(f"Loading data from {data_path}...")
df = pd.read_csv(data_path)
print(f"Loaded {len(df)} rows × {len(df.columns)} columns")
# Initialize cleaner
cleaner = DataCleaner(df)
# Run cleaning (interactive mode)
cleaned_df, audit_report = cleaner.clean(
risky_features=None, # Auto-detect
interactive=True,
scan_all_cells=True
)
# Save results
output_base = data_path.replace('.csv', '_cleaned')
cleaner.save_cleaned_data(cleaned_df, f"{output_base}.csv")
cleaner.save_audit_report(audit_report, f"{output_base}_audit.json")
# Print summary
cleaner.print_audit_summary(audit_report)
if __name__ == '__main__':
main()

227
cleaning_config.py Normal file
View File

@@ -0,0 +1,227 @@
"""
Cleaning Configuration
Customize PII detection and anonymization strategies
"""
# Anonymization Strategy Definitions
STRATEGIES = {
'REMOVE': {
'description': 'Delete entire column',
'risk_level': 'HIGH',
'reversible': False,
'use_cases': ['Credit cards', 'SSN', 'Bank accounts']
},
'HASH': {
'description': 'One-way SHA-256 hash',
'risk_level': 'MEDIUM',
'reversible': False,
'use_cases': ['Emails', 'Phone numbers', 'Names']
},
'MASK': {
'description': 'Replace with asterisks',
'risk_level': 'LOW',
'reversible': False,
'use_cases': ['Partial identifiers', 'Locations']
},
'GENERALIZE': {
'description': 'Reduce precision',
'risk_level': 'LOW',
'reversible': False,
'use_cases': ['Dates', 'Ages', 'ZIP codes']
},
'KEEP': {
'description': 'No changes',
'risk_level': 'NONE',
'reversible': True,
'use_cases': ['Non-sensitive data']
}
}
# Entity Type to Strategy Mapping
# Customize these based on your compliance requirements
ENTITY_STRATEGY_MAP = {
# Financial Identifiers - HIGHEST RISK
'CREDIT_CARD': 'REMOVE',
'CRYPTO': 'REMOVE',
'IBAN_CODE': 'REMOVE',
'US_BANK_NUMBER': 'REMOVE',
# Government IDs - HIGH RISK
'US_SSN': 'REMOVE',
'US_DRIVER_LICENSE': 'REMOVE',
'US_PASSPORT': 'REMOVE',
'US_ITIN': 'REMOVE',
'UK_NHS': 'REMOVE',
'SG_NRIC_FIN': 'REMOVE',
'IN_PAN': 'REMOVE',
# Health Information - HIGH RISK (GDPR Art. 9)
'MEDICAL_LICENSE': 'REMOVE',
# Contact Information - MEDIUM RISK
'EMAIL_ADDRESS': 'HASH',
'PHONE_NUMBER': 'HASH',
'URL': 'HASH',
# Personal Identifiers - MEDIUM RISK
'PERSON': 'HASH', # Names
'IP_ADDRESS': 'HASH',
# Geographic Information - LOW RISK
'LOCATION': 'MASK',
'US_ZIP_CODE': 'GENERALIZE',
# Temporal Information - LOW RISK
'DATE_TIME': 'GENERALIZE',
# Special Categories - MEDIUM RISK (GDPR Art. 9)
'NRP': 'HASH', # Nationality, religious, political views
# Business Identifiers - LOW RISK
'AU_ABN': 'HASH',
'AU_ACN': 'HASH',
'AU_TFN': 'HASH',
}
# Confidence Score Thresholds
CONFIDENCE_THRESHOLDS = {
'HIGH': 0.85, # Auto-apply strategy
'MEDIUM': 0.60, # Prompt user in interactive mode
'LOW': 0.40, # Treat as potential false positive
}
# GDPR Article Mappings
GDPR_COMPLIANCE = {
'CREDIT_CARD': 'Art. 4(1) - Personal data identifier',
'US_SSN': 'Art. 4(1) - Personal data identifier',
'US_BANK_NUMBER': 'Art. 4(1) - Personal data identifier',
'EMAIL_ADDRESS': 'Art. 4(1) - Personal data identifier',
'PHONE_NUMBER': 'Art. 4(1) - Personal data identifier',
'PERSON': 'Art. 4(1) - Personal data (name)',
'LOCATION': 'Art. 4(1) - Personal data (location)',
'IP_ADDRESS': 'Art. 4(1) - Online identifier',
'MEDICAL_LICENSE': 'Art. 9(1) - Special category data (health)',
'NRP': 'Art. 9(1) - Special category data (political/religious views)',
'DATE_TIME': 'Art. 4(1) - Personal data (temporal information)',
}
# Presidio Analyzer Settings
PRESIDIO_CONFIG = {
'language': 'en',
'score_threshold': 0.5, # Minimum confidence to report
'entities': None, # None = detect all, or specify list like ['EMAIL_ADDRESS', 'PHONE_NUMBER']
'allow_list': [], # Terms to ignore (e.g., company names that look like PII)
}
# Custom Recognizers (domain-specific patterns)
# Add patterns specific to your industry/use case
CUSTOM_PATTERNS = {
'LOAN_ID': {
'pattern': r'LN\d{8}',
'score': 0.9,
'strategy': 'HASH'
},
'EMPLOYEE_ID': {
'pattern': r'EMP\d{6}',
'score': 0.9,
'strategy': 'HASH'
},
'ACCOUNT_NUMBER': {
'pattern': r'ACC\d{10}',
'score': 0.95,
'strategy': 'REMOVE'
}
}
# Column Name Heuristics
# Auto-flag columns based on name patterns
RISKY_COLUMN_PATTERNS = [
r'.*email.*',
r'.*phone.*',
r'.*ssn.*',
r'.*social.*security.*',
r'.*credit.*card.*',
r'.*passport.*',
r'.*license.*',
r'.*address.*',
r'.*ip.*addr.*',
]
# Protected Attributes Configuration
# These are needed for bias analysis but may contain PII
PROTECTED_ATTRIBUTES_HANDLING = {
'default_strategy': 'KEEP', # Keep for bias analysis
'warn_user': True, # Warn about privacy implications
'alternative': 'Use generalization (e.g., age_group instead of exact age)'
}
# Audit Report Settings
AUDIT_CONFIG = {
'include_examples': True,
'max_examples_per_column': 3,
'truncate_values': 50, # Max characters to show in examples
'include_presidio_metrics': True,
'include_gdpr_references': True,
'include_recommendations': True
}
# Performance Settings
PERFORMANCE_CONFIG = {
'sample_size_for_detection': 1000, # Max rows to analyze per column
'batch_size': 100, # Rows to process per batch
'enable_parallel': False, # Future: parallel column processing
}
# Output Settings
OUTPUT_CONFIG = {
'cleaned_suffix': '_cleaned',
'audit_suffix': '_audit',
'format': 'csv', # Future: support parquet, json
'compression': None, # Future: gzip, bz2
}
def get_strategy_for_entity(entity_type: str) -> str:
"""
Get anonymization strategy for an entity type
Args:
entity_type: Presidio entity type (e.g., 'EMAIL_ADDRESS')
Returns:
Strategy name (e.g., 'HASH')
"""
return ENTITY_STRATEGY_MAP.get(entity_type, 'HASH') # Default to HASH if unknown
def get_risk_level(strategy: str) -> str:
"""
Get risk level for a strategy
Args:
strategy: Strategy name (e.g., 'HASH')
Returns:
Risk level (e.g., 'MEDIUM')
"""
return STRATEGIES.get(strategy, {}).get('risk_level', 'UNKNOWN')
def is_high_confidence(score: float) -> bool:
"""Check if confidence score is high enough for auto-processing"""
return score >= CONFIDENCE_THRESHOLDS['HIGH']
def is_medium_confidence(score: float) -> bool:
"""Check if confidence score requires user confirmation"""
return CONFIDENCE_THRESHOLDS['MEDIUM'] <= score < CONFIDENCE_THRESHOLDS['HIGH']
def is_low_confidence(score: float) -> bool:
"""Check if confidence score might be false positive"""
return score < CONFIDENCE_THRESHOLDS['MEDIUM']
# Example usage in cleaning.py:
# from cleaning_config import ENTITY_STRATEGY_MAP, get_strategy_for_entity
# strategy = get_strategy_for_entity('EMAIL_ADDRESS') # Returns 'HASH'

268
test_cleaning.py Normal file
View File

@@ -0,0 +1,268 @@
"""
Test script for data cleaning module
Demonstrates PII detection and anonymization on loan dataset
"""
import pandas as pd
import sys
import os
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cleaning import DataCleaner, CleaningConfig
def test_basic_cleaning():
"""Test basic cleaning functionality"""
print("\n" + "="*70)
print("TEST 1: Basic PII Detection on Loan Dataset")
print("="*70)
# Load loan data
df = pd.read_csv('Datasets/loan_data.csv')
print(f"\n✓ Loaded dataset: {len(df)} rows × {len(df.columns)} columns")
print(f" Columns: {list(df.columns)}")
# Initialize cleaner
cleaner = DataCleaner(df)
# Run cleaning in non-interactive mode (auto-apply strategies)
print("\n🔍 Running PII detection...")
cleaned_df, audit_report = cleaner.clean(
risky_features=None, # Auto-detect all
interactive=False, # Non-interactive for testing
scan_all_cells=True
)
# Display results
cleaner.print_audit_summary(audit_report)
return cleaned_df, audit_report
def test_with_risky_features():
"""Test cleaning with specific risky features flagged"""
print("\n" + "="*70)
print("TEST 2: Cleaning with Pre-Flagged Risky Features")
print("="*70)
# Load loan data
df = pd.read_csv('Datasets/loan_data.csv')
# Simulate risky features from RiskAnalyzer
risky_features = ['person_education', 'loan_intent', 'person_home_ownership']
print(f"\n⚠️ Risky features flagged by RiskAnalyzer: {risky_features}")
# Initialize cleaner
cleaner = DataCleaner(df)
# Run cleaning on flagged features only
cleaned_df, audit_report = cleaner.clean(
risky_features=risky_features,
interactive=False,
scan_all_cells=False # Only scan risky columns
)
# Display results
cleaner.print_audit_summary(audit_report)
return cleaned_df, audit_report
def test_with_synthetic_pii():
"""Test with synthetic PII data"""
print("\n" + "="*70)
print("TEST 3: Synthetic PII Detection")
print("="*70)
# Create test DataFrame with obvious PII
test_data = pd.DataFrame({
'customer_id': [1, 2, 3, 4, 5],
'email': [
'john.doe@example.com',
'alice.smith@company.org',
'bob.jones@email.com',
'carol.white@test.net',
'dave.brown@sample.com'
],
'phone': [
'+1-555-123-4567',
'555-234-5678',
'(555) 345-6789',
'555.456.7890',
'5555678901'
],
'ssn': [
'123-45-6789',
'234-56-7890',
'345-67-8901',
'456-78-9012',
'567-89-0123'
],
'notes': [
'Customer called from 192.168.1.1',
'Contact via email: test@example.com',
'SSN verified: 111-22-3333',
'Previous address: 123 Main St, Boston',
'Phone backup: 555-999-8888'
],
'amount': [1000, 2000, 1500, 3000, 2500]
})
print(f"\n✓ Created synthetic dataset with PII:")
print(test_data.head())
# Initialize cleaner
cleaner = DataCleaner(test_data)
# Run cleaning
cleaned_df, audit_report = cleaner.clean(
risky_features=None,
interactive=False,
scan_all_cells=True
)
print("\n🔒 Cleaned dataset:")
print(cleaned_df.head())
# Display results
cleaner.print_audit_summary(audit_report)
# Save outputs
os.makedirs('output', exist_ok=True)
cleaner.save_cleaned_data(cleaned_df, 'output/synthetic_cleaned.csv')
cleaner.save_audit_report(audit_report, 'output/synthetic_audit.json')
return cleaned_df, audit_report
def test_interactive_mode():
"""Test interactive mode (requires user input)"""
print("\n" + "="*70)
print("TEST 4: Interactive Mode (Manual Decisions)")
print("="*70)
# Create ambiguous test data
test_data = pd.DataFrame({
'id': [1, 2, 3],
'description': [
'Customer from Paris contacted us', # Paris = location or name?
'Spoke with Jordan about the account', # Jordan = location or name?
'Meeting scheduled for March 15th' # Date
],
'value': [100, 200, 300]
})
print(f"\n✓ Created dataset with ambiguous PII:")
print(test_data)
print("\n⚠️ This test requires user input for ambiguous cases.")
print(" You'll be prompted to choose anonymization strategies.")
proceed = input("\nProceed with interactive test? (y/n): ").strip().lower()
if proceed == 'y':
cleaner = DataCleaner(test_data)
cleaned_df, audit_report = cleaner.clean(
risky_features=None,
interactive=True, # Enable interactive prompts
scan_all_cells=True
)
print("\n🔒 Cleaned dataset:")
print(cleaned_df)
cleaner.print_audit_summary(audit_report)
else:
print(" Skipped interactive test.")
def demonstrate_integration_with_analysis():
"""Demonstrate how cleaning integrates with AI governance pipeline"""
print("\n" + "="*70)
print("INTEGRATION DEMO: Cleaning → Analysis Workflow")
print("="*70)
# Load data
df = pd.read_csv('Datasets/loan_data.csv')
print("\n📊 Workflow:")
print(" 1. Original dataset → Risk Analysis")
print(" 2. Risk Analysis → Identifies risky features")
print(" 3. Risky features → Data Cleaning (this step)")
print(" 4. Cleaned dataset → Re-run Analysis (optional)")
# Simulate risky features from analysis
simulated_risky_features = ['person_education', 'loan_intent']
print(f"\n⚠️ Step 2 Output (simulated): Risky features = {simulated_risky_features}")
# Step 3: Clean data
print("\n🔒 Step 3: Cleaning risky features...")
cleaner = DataCleaner(df)
cleaned_df, audit_report = cleaner.clean(
risky_features=simulated_risky_features,
interactive=False,
scan_all_cells=False
)
# Save both datasets
os.makedirs('output', exist_ok=True)
df.to_csv('output/loan_data_original.csv', index=False)
cleaner.save_cleaned_data(cleaned_df, 'output/loan_data_cleaned.csv')
cleaner.save_audit_report(audit_report, 'output/cleaning_audit.json')
print("\n💾 Saved files:")
print(" - output/loan_data_original.csv (original)")
print(" - output/loan_data_cleaned.csv (cleaned)")
print(" - output/cleaning_audit.json (audit report)")
print("\n📈 Step 4: User can now choose which dataset to analyze:")
print(" Option A: Analyze cleaned dataset (privacy-compliant)")
print(" Option B: Analyze original dataset (for comparison)")
print(" Option C: Analyze both and compare results")
cleaner.print_audit_summary(audit_report)
def main():
"""Run all tests"""
print("\n" + "="*70)
print("🧪 DATA CLEANING MODULE - TEST SUITE")
print("="*70)
print("\nAvailable tests:")
print(" 1. Basic PII detection on loan dataset")
print(" 2. Cleaning with pre-flagged risky features")
print(" 3. Synthetic PII detection (comprehensive)")
print(" 4. Interactive mode (requires user input)")
print(" 5. Integration workflow demonstration")
print(" 6. Run all non-interactive tests")
choice = input("\nSelect test (1-6): ").strip()
if choice == '1':
test_basic_cleaning()
elif choice == '2':
test_with_risky_features()
elif choice == '3':
test_with_synthetic_pii()
elif choice == '4':
test_interactive_mode()
elif choice == '5':
demonstrate_integration_with_analysis()
elif choice == '6':
print("\n🏃 Running all non-interactive tests...\n")
test_basic_cleaning()
test_with_risky_features()
test_with_synthetic_pii()
demonstrate_integration_with_analysis()
print("\n✅ All tests completed!")
else:
print("Invalid choice. Run: python test_cleaning.py")
if __name__ == '__main__':
main()