mirror of
https://github.com/PlatypusPus/MushroomEmpire.git
synced 2026-02-08 06:28:58 +00:00
225 lines
11 KiB
Python
225 lines
11 KiB
Python
"""
|
|
PII Detection Router
|
|
Detects risky features WITHOUT anonymizing them
|
|
Returns risk classification for user review
|
|
"""
|
|
|
|
from fastapi import APIRouter, File, UploadFile, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
import pandas as pd
|
|
import numpy as np
|
|
import io
|
|
import os
|
|
import sys
|
|
from typing import Dict, Any, List
|
|
|
|
# Import cleaning module
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
|
from data_cleaning.cleaner import DataCleaner
|
|
from data_cleaning.config import (
|
|
ENTITY_STRATEGY_MAP,
|
|
STRATEGIES,
|
|
GDPR_COMPLIANCE,
|
|
COLUMN_CONTEXT_FILTERS,
|
|
EXCLUSION_PATTERNS,
|
|
get_strategy_for_entity,
|
|
get_risk_level
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def convert_to_serializable(obj):
|
|
"""Convert numpy/pandas types to native Python types for JSON serialization"""
|
|
if isinstance(obj, (np.integer, np.int64, np.int32)):
|
|
return int(obj)
|
|
elif isinstance(obj, (np.floating, np.float64, np.float32)):
|
|
return float(obj)
|
|
elif isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
elif isinstance(obj, dict):
|
|
return {key: convert_to_serializable(value) for key, value in obj.items()}
|
|
elif isinstance(obj, list):
|
|
return [convert_to_serializable(item) for item in obj]
|
|
return obj
|
|
|
|
|
|
@router.post("/detect-pii")
|
|
async def detect_pii(file: UploadFile = File(...)):
|
|
"""
|
|
Detect PII in uploaded file WITHOUT anonymizing
|
|
|
|
- **file**: CSV, JSON, or TXT file to analyze for PII
|
|
|
|
Returns:
|
|
- List of risky features with severity and recommended strategies
|
|
- Detection confidence scores
|
|
- GDPR article references
|
|
- Example values for review
|
|
"""
|
|
|
|
try:
|
|
# Read uploaded file
|
|
contents = await file.read()
|
|
file_extension = os.path.splitext(file.filename)[1].lower()
|
|
|
|
# Determine file type and parse accordingly
|
|
if file_extension == '.csv':
|
|
df = pd.read_csv(io.BytesIO(contents))
|
|
file_type = 'csv'
|
|
elif file_extension == '.json':
|
|
df = pd.read_json(io.BytesIO(contents))
|
|
file_type = 'json'
|
|
elif file_extension in ['.txt', '.text']:
|
|
# For plain text, create a single-column dataframe
|
|
text_content = contents.decode('utf-8', errors='ignore')
|
|
# Split into lines for better granularity
|
|
lines = [line.strip() for line in text_content.split('\n') if line.strip()]
|
|
df = pd.DataFrame({'text_content': lines})
|
|
file_type = 'text'
|
|
else:
|
|
# Try to auto-detect format
|
|
try:
|
|
# Try CSV first
|
|
df = pd.read_csv(io.BytesIO(contents))
|
|
file_type = 'csv'
|
|
except:
|
|
try:
|
|
# Try JSON
|
|
df = pd.read_json(io.BytesIO(contents))
|
|
file_type = 'json'
|
|
except:
|
|
# Fall back to plain text
|
|
text_content = contents.decode('utf-8', errors='ignore')
|
|
lines = [line.strip() for line in text_content.split('\n') if line.strip()]
|
|
df = pd.DataFrame({'text_content': lines})
|
|
file_type = 'text'
|
|
|
|
if df.empty:
|
|
raise HTTPException(status_code=400, detail="Uploaded file is empty")
|
|
|
|
print(f"Detecting PII in: {file.filename} ({file_type} format, {len(df)} rows, {len(df.columns)} columns)")
|
|
|
|
# Initialize Data Cleaner (with GPU if available)
|
|
cleaner = DataCleaner(df, use_gpu=True)
|
|
|
|
# Detect PII without cleaning
|
|
pii_detections = cleaner._detect_pii(
|
|
df=df,
|
|
risky_columns=None, # Scan all columns
|
|
scan_all_cells=True
|
|
)
|
|
|
|
# Classify by risk level
|
|
risk_classification = cleaner._classify_risk(pii_detections)
|
|
|
|
# Build response with detailed feature information
|
|
risky_features = []
|
|
|
|
for risk_level in ['HIGH', 'MEDIUM', 'LOW', 'UNKNOWN']:
|
|
detections = risk_classification[risk_level]
|
|
|
|
for column, entities in detections.items():
|
|
for entity_info in entities:
|
|
entity_type = entity_info['entity_type']
|
|
strategy = entity_info['strategy']
|
|
|
|
# Get example values from the column (first 3 non-null)
|
|
sample_values = df[column].dropna().head(5).astype(str).tolist()
|
|
|
|
# Get GDPR article
|
|
gdpr_article = GDPR_COMPLIANCE.get(entity_type, 'Not classified')
|
|
|
|
# Get strategy details
|
|
strategy_details = STRATEGIES.get(strategy, {})
|
|
|
|
risky_features.append({
|
|
'column': column,
|
|
'entity_type': entity_type,
|
|
'risk_level': risk_level,
|
|
'confidence': float(entity_info['confidence']),
|
|
'detection_count': int(entity_info['count']),
|
|
'recommended_strategy': strategy,
|
|
'strategy_description': strategy_details.get('description', ''),
|
|
'reversible': strategy_details.get('reversible', False),
|
|
'use_cases': strategy_details.get('use_cases', []),
|
|
'gdpr_article': gdpr_article,
|
|
'sample_values': sample_values[:3], # Show 3 examples
|
|
'explanation': _generate_risk_explanation(entity_type, risk_level, strategy)
|
|
})
|
|
|
|
# Sort by risk level (HIGH -> MEDIUM -> LOW)
|
|
risk_order = {'HIGH': 0, 'MEDIUM': 1, 'LOW': 2, 'UNKNOWN': 3}
|
|
risky_features.sort(key=lambda x: (risk_order[x['risk_level']], x['column']))
|
|
|
|
# Prepare summary statistics
|
|
summary = {
|
|
'total_columns_scanned': len(df.columns),
|
|
'risky_columns_found': len(set(f['column'] for f in risky_features)),
|
|
'high_risk_count': sum(1 for f in risky_features if f['risk_level'] == 'HIGH'),
|
|
'medium_risk_count': sum(1 for f in risky_features if f['risk_level'] == 'MEDIUM'),
|
|
'low_risk_count': sum(1 for f in risky_features if f['risk_level'] == 'LOW'),
|
|
'unique_entity_types': len(set(f['entity_type'] for f in risky_features))
|
|
}
|
|
|
|
response_data = {
|
|
'status': 'success',
|
|
'filename': file.filename,
|
|
'file_type': file_type,
|
|
'dataset_info': {
|
|
'rows': len(df),
|
|
'columns': len(df.columns),
|
|
'column_names': df.columns.tolist()
|
|
},
|
|
'summary': summary,
|
|
'risky_features': risky_features,
|
|
'available_strategies': STRATEGIES,
|
|
'message': f"Found {summary['risky_columns_found']} columns with PII ({summary['high_risk_count']} HIGH risk, {summary['medium_risk_count']} MEDIUM risk, {summary['low_risk_count']} LOW risk)"
|
|
}
|
|
|
|
# Convert all numpy/pandas types to native Python types
|
|
response_data = convert_to_serializable(response_data)
|
|
|
|
return JSONResponse(content=response_data)
|
|
|
|
except pd.errors.EmptyDataError:
|
|
raise HTTPException(status_code=400, detail="File is empty or invalid CSV format")
|
|
except ImportError as e:
|
|
raise HTTPException(status_code=500, detail=f"Presidio not installed. Please install: pip install presidio-analyzer presidio-anonymizer")
|
|
except Exception as e:
|
|
print(f"Error during PII detection: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(status_code=500, detail=f"PII detection failed: {str(e)}")
|
|
|
|
|
|
def _generate_risk_explanation(entity_type: str, risk_level: str, strategy: str) -> str:
|
|
"""Generate human-readable explanation for why a feature is risky"""
|
|
|
|
explanations = {
|
|
'CREDIT_CARD': "Credit card numbers are highly sensitive financial identifiers protected under GDPR Art. 4(1) and PCI-DSS regulations. Unauthorized disclosure can lead to fraud and identity theft.",
|
|
'US_SSN': "Social Security Numbers are government-issued identifiers that can be used for identity theft. They are strictly protected under US federal law and GDPR Art. 4(1).",
|
|
'EMAIL_ADDRESS': "Email addresses are personal identifiers under GDPR Art. 4(1) that can be used to re-identify individuals and track behavior across services.",
|
|
'PHONE_NUMBER': "Phone numbers are direct personal identifiers under GDPR Art. 4(1) that enable contact and can be used to track individuals.",
|
|
'PERSON': "Personal names are explicit identifiers under GDPR Art. 4(1) that directly identify individuals and must be protected in datasets.",
|
|
'LOCATION': "Location data reveals personal information about individuals' movements and residence, protected under GDPR Art. 4(1) as personal data.",
|
|
'IP_ADDRESS': "IP addresses are online identifiers under GDPR Art. 4(1) that can be used to track individuals across the internet.",
|
|
'DATE_TIME': "Temporal data can be used to re-identify individuals when combined with other data points, especially for rare events.",
|
|
'MEDICAL_LICENSE': "Medical information is special category data under GDPR Art. 9(1) requiring heightened protection due to health privacy concerns.",
|
|
'NRP': "Nationality, religious, or political views are special category data under GDPR Art. 9(1) that can lead to discrimination.",
|
|
'US_BANK_NUMBER': "Bank account numbers are financial identifiers that enable unauthorized access to accounts and are protected under GDPR Art. 4(1).",
|
|
'CRYPTO': "Cryptocurrency addresses are financial identifiers that can reveal transaction history and wealth, requiring protection.",
|
|
'FI_PERSONAL_ID': "Finnish personal identity numbers (HETU) are highly sensitive national identifiers under GDPR Art. 4(1) + Recital 26, granting access to government services.",
|
|
'SE_PERSONAL_ID': "Swedish Personnummer are national identifiers protected under GDPR Art. 4(1) + Recital 26, used across all government and private services.",
|
|
'NO_PERSONAL_ID': "Norwegian Fødselsnummer are national ID numbers under GDPR Art. 4(1) + Recital 26, used for all official identification.",
|
|
'DK_PERSONAL_ID': "Danish CPR numbers are national identifiers protected under GDPR Art. 4(1) + Recital 26, critical for government services.",
|
|
'FI_BUSINESS_ID': "Finnish business IDs (Y-tunnus) are organizational identifiers with lower risk than personal IDs, but still require protection for business privacy.",
|
|
}
|
|
|
|
base_explanation = explanations.get(entity_type,
|
|
f"{entity_type} detected as {risk_level} risk personal data under GDPR regulations requiring appropriate protection measures.")
|
|
|
|
strategy_note = f" Recommended action: {strategy} - this {'permanently removes' if strategy == 'REMOVE' else 'anonymizes'} the data to ensure compliance."
|
|
|
|
return base_explanation + strategy_note
|