mirror of
https://github.com/PlatypusPus/MushroomEmpire.git
synced 2026-02-07 22:18:59 +00:00
226 lines
9.8 KiB
Python
226 lines
9.8 KiB
Python
"""
|
|
Data Processor Module
|
|
Handles data loading, preprocessing, and feature detection
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder
|
|
import re
|
|
|
|
class DataProcessor:
|
|
"""Process and prepare data for analysis"""
|
|
|
|
def __init__(self, df):
|
|
self.df = df.copy()
|
|
self.original_df = df.copy()
|
|
self.target_column = None
|
|
self.protected_attributes = []
|
|
self.numerical_features = []
|
|
self.categorical_features = []
|
|
self.feature_names = []
|
|
self.encoders = {}
|
|
self.target_encoder = None # Add target encoder
|
|
self.scaler = StandardScaler()
|
|
|
|
self.X_train = None
|
|
self.X_test = None
|
|
self.y_train = None
|
|
self.y_test = None
|
|
|
|
# Auto-detect column types
|
|
self._detect_column_types()
|
|
|
|
def _detect_column_types(self):
|
|
"""Automatically detect numerical and categorical columns with enhanced logic"""
|
|
for col in self.df.columns:
|
|
# Skip if all null
|
|
if self.df[col].isnull().all():
|
|
continue
|
|
|
|
# Get non-null values for analysis
|
|
non_null_values = self.df[col].dropna()
|
|
|
|
if len(non_null_values) == 0:
|
|
continue
|
|
|
|
# Check data type
|
|
if self.df[col].dtype in ['int64', 'float64']:
|
|
# Check if it's actually categorical despite being numeric
|
|
unique_count = non_null_values.nunique()
|
|
unique_ratio = unique_count / len(non_null_values) if len(non_null_values) > 0 else 0
|
|
|
|
# Heuristics for categorical detection:
|
|
# 1. Very few unique values (< 10)
|
|
# 2. Low unique ratio (< 5% of total)
|
|
# 3. Binary values (0/1, 1/2, etc.)
|
|
is_binary = unique_count == 2 and set(non_null_values.unique()).issubset({0, 1, 1.0, 0.0, 2, 1, 2.0})
|
|
is_small_discrete = unique_count < 10 and unique_ratio < 0.05
|
|
|
|
if is_binary or is_small_discrete:
|
|
self.categorical_features.append(col)
|
|
else:
|
|
self.numerical_features.append(col)
|
|
else:
|
|
# String, object, or category type
|
|
self.categorical_features.append(col)
|
|
|
|
def _detect_pii_columns(self):
|
|
"""Detect potential PII columns"""
|
|
pii_keywords = [
|
|
'name', 'email', 'phone', 'address', 'ssn', 'social',
|
|
'passport', 'license', 'id', 'zip', 'postal'
|
|
]
|
|
|
|
pii_columns = []
|
|
for col in self.df.columns:
|
|
col_lower = col.lower()
|
|
if any(keyword in col_lower for keyword in pii_keywords):
|
|
pii_columns.append(col)
|
|
|
|
return pii_columns
|
|
|
|
def prepare_data(self, test_size=0.2, random_state=42):
|
|
"""Prepare data for model training with robust handling of edge cases"""
|
|
# Handle missing values - use different strategies based on data type
|
|
print(f"Initial dataset: {len(self.df)} rows, {len(self.df.columns)} columns")
|
|
|
|
# Count missing values before handling
|
|
missing_counts = self.df.isnull().sum()
|
|
cols_with_missing = missing_counts[missing_counts > 0]
|
|
if len(cols_with_missing) > 0:
|
|
print(f"Columns with missing values: {dict(cols_with_missing)}")
|
|
|
|
# For numerical columns: fill with median
|
|
for col in self.numerical_features:
|
|
if col in self.df.columns and self.df[col].isnull().any():
|
|
median_val = self.df[col].median()
|
|
self.df[col].fillna(median_val, inplace=True)
|
|
print(f" Filled {col} missing values with median: {median_val}")
|
|
|
|
# For categorical columns: fill with mode or 'Unknown'
|
|
for col in self.categorical_features:
|
|
if col in self.df.columns and self.df[col].isnull().any():
|
|
if self.df[col].mode().empty:
|
|
self.df[col].fillna('Unknown', inplace=True)
|
|
else:
|
|
mode_val = self.df[col].mode()[0]
|
|
self.df[col].fillna(mode_val, inplace=True)
|
|
print(f" Filled {col} missing values with mode: {mode_val}")
|
|
|
|
# Drop rows with remaining missing values
|
|
rows_before = len(self.df)
|
|
self.df = self.df.dropna()
|
|
rows_dropped = rows_before - len(self.df)
|
|
if rows_dropped > 0:
|
|
print(f"Dropped {rows_dropped} rows with missing values")
|
|
|
|
# Separate features and target
|
|
if self.target_column is None:
|
|
# Auto-detect target (last column or column with 'target', 'label', 'status')
|
|
target_candidates = [col for col in self.df.columns
|
|
if any(keyword in col.lower() for keyword in ['target', 'label', 'status', 'class', 'outcome', 'result'])]
|
|
self.target_column = target_candidates[0] if target_candidates else self.df.columns[-1]
|
|
print(f"Auto-detected target column: {self.target_column}")
|
|
|
|
# Prepare features
|
|
feature_cols = [col for col in self.df.columns if col != self.target_column]
|
|
X = self.df[feature_cols].copy()
|
|
y = self.df[self.target_column].copy()
|
|
|
|
# Encode target variable if it's categorical
|
|
if y.dtype == 'object' or y.dtype.name == 'category':
|
|
self.target_encoder = LabelEncoder()
|
|
y_encoded = self.target_encoder.fit_transform(y)
|
|
y = pd.Series(y_encoded, index=y.index, name=self.target_column)
|
|
encoding_map = dict(enumerate(self.target_encoder.classes_))
|
|
print(f"Target '{self.target_column}' encoded: {encoding_map}")
|
|
elif y.dtype in ['float64', 'int64']:
|
|
# Check if numeric target needs binarization
|
|
unique_values = y.unique()
|
|
if len(unique_values) == 2:
|
|
print(f"Binary target detected with values: {sorted(unique_values)}")
|
|
# Ensure 0/1 encoding
|
|
if not set(unique_values).issubset({0, 1}):
|
|
min_val = min(unique_values)
|
|
y = (y != min_val).astype(int)
|
|
print(f"Converted to 0/1 encoding (1 = positive class)")
|
|
|
|
# Encode categorical variables with better handling
|
|
for col in self.categorical_features:
|
|
if col in X.columns:
|
|
# Handle high cardinality features
|
|
unique_count = X[col].nunique()
|
|
if unique_count > 50:
|
|
print(f" ⚠️ High cardinality feature '{col}' ({unique_count} unique values) - consider feature engineering")
|
|
|
|
le = LabelEncoder()
|
|
# Convert to string to handle mixed types
|
|
X[col] = le.fit_transform(X[col].astype(str))
|
|
self.encoders[col] = le
|
|
print(f"Encoded '{col}': {unique_count} categories")
|
|
|
|
# Store feature names
|
|
self.feature_names = X.columns.tolist()
|
|
|
|
# Check class balance
|
|
class_counts = y.value_counts()
|
|
print(f"\nTarget distribution:")
|
|
for val, count in class_counts.items():
|
|
print(f" Class {val}: {count} ({count/len(y)*100:.1f}%)")
|
|
|
|
# Determine if stratification is needed
|
|
min_class_count = class_counts.min()
|
|
use_stratify = y.nunique() < 10 and min_class_count >= 2
|
|
|
|
# Split data
|
|
if use_stratify:
|
|
print(f"Using stratified split (min class count: {min_class_count})")
|
|
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
|
|
X, y, test_size=test_size, random_state=random_state, stratify=y
|
|
)
|
|
else:
|
|
print(f"Using random split (class imbalance or regression)")
|
|
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
|
|
X, y, test_size=test_size, random_state=random_state
|
|
)
|
|
|
|
print(f"Train set: {len(self.X_train)} samples, Test set: {len(self.X_test)} samples")
|
|
|
|
# Scale numerical features
|
|
numerical_cols = [col for col in self.numerical_features if col in self.X_train.columns]
|
|
if numerical_cols:
|
|
print(f"Scaling {len(numerical_cols)} numerical features")
|
|
self.X_train[numerical_cols] = self.scaler.fit_transform(self.X_train[numerical_cols])
|
|
self.X_test[numerical_cols] = self.scaler.transform(self.X_test[numerical_cols])
|
|
|
|
return self.X_train, self.X_test, self.y_train, self.y_test
|
|
|
|
def get_data_summary(self):
|
|
"""Get summary statistics of the dataset"""
|
|
summary = {
|
|
'total_records': len(self.df),
|
|
'total_features': len(self.df.columns),
|
|
'numerical_features': len(self.numerical_features),
|
|
'categorical_features': len(self.categorical_features),
|
|
'missing_values': self.df.isnull().sum().to_dict(),
|
|
'target_column': self.target_column,
|
|
'protected_attributes': self.protected_attributes,
|
|
'pii_columns': self._detect_pii_columns(),
|
|
'target_distribution': self.df[self.target_column].value_counts().to_dict() if self.target_column else {}
|
|
}
|
|
return summary
|
|
|
|
def get_protected_attribute_stats(self):
|
|
"""Get statistics for protected attributes"""
|
|
stats = {}
|
|
for attr in self.protected_attributes:
|
|
if attr in self.df.columns:
|
|
stats[attr] = {
|
|
'unique_values': self.df[attr].nunique(),
|
|
'value_counts': self.df[attr].value_counts().to_dict(),
|
|
'missing_count': self.df[attr].isnull().sum()
|
|
}
|
|
return stats
|