Feat:Create the Basic Ai Governance Package to use has a guide

This commit is contained in:
2025-11-06 23:26:50 +05:30
parent 61d02d1dac
commit 9a3d073815
14 changed files with 47207 additions and 0 deletions

145
src/__init__.py Normal file
View File

@@ -0,0 +1,145 @@
"""
AI Governance Module - Bias Detection and Risk Analysis
"""
from .data_processor import DataProcessor
from .model_trainer import GeneralizedModelTrainer
from .bias_analyzer import BiasAnalyzer
from .risk_analyzer import RiskAnalyzer
from .report_generator import ReportGenerator, NumpyEncoder
import pandas as pd
import json
__version__ = '1.0.0'
__all__ = [
'DataProcessor',
'GeneralizedModelTrainer',
'BiasAnalyzer',
'RiskAnalyzer',
'ReportGenerator',
'NumpyEncoder',
'AIGovernanceAnalyzer'
]
class AIGovernanceAnalyzer:
"""
Main interface for AI Governance analysis
Example:
>>> analyzer = AIGovernanceAnalyzer()
>>> report = analyzer.analyze('data.csv', 'target', ['gender', 'age'])
>>> print(f"Bias Score: {report['summary']['overall_bias_score']:.3f}")
"""
def __init__(self):
"""Initialize the analyzer"""
self.processor = None
self.trainer = None
self.bias_analyzer = None
self.risk_analyzer = None
self.report_generator = None
def analyze(self, data_path, target_column, protected_attributes):
"""
Run complete AI governance analysis from file
Args:
data_path (str): Path to CSV file
target_column (str): Name of target column
protected_attributes (list): List of protected attribute column names
Returns:
dict: Complete analysis report
"""
df = pd.read_csv(data_path)
return self.analyze_dataframe(df, target_column, protected_attributes)
def analyze_dataframe(self, df, target_column, protected_attributes):
"""
Run complete AI governance analysis from DataFrame
Args:
df (pd.DataFrame): Input dataframe
target_column (str): Name of target column
protected_attributes (list): List of protected attribute column names
Returns:
dict: Complete analysis report
"""
# Step 1: Process data
self.processor = DataProcessor(df)
self.processor.target_column = target_column
self.processor.protected_attributes = protected_attributes
self.processor.prepare_data()
# Step 2: Train model
self.trainer = GeneralizedModelTrainer(
self.processor.X_train,
self.processor.X_test,
self.processor.y_train,
self.processor.y_test,
self.processor.feature_names
)
self.trainer.train()
self.trainer.evaluate()
# Step 3: Analyze bias
self.bias_analyzer = BiasAnalyzer(
self.processor.X_test,
self.processor.y_test,
self.trainer.y_pred,
self.processor.df,
self.processor.protected_attributes,
self.processor.target_column
)
bias_results = self.bias_analyzer.analyze()
# Step 4: Assess risks
self.risk_analyzer = RiskAnalyzer(
self.processor.df,
self.trainer.results,
bias_results,
self.processor.protected_attributes,
self.processor.target_column
)
risk_results = self.risk_analyzer.analyze()
# Step 5: Generate report
self.report_generator = ReportGenerator(
self.trainer.results,
bias_results,
risk_results,
self.processor.df
)
return self.report_generator.generate_report()
def save_report(self, report, output_path):
"""
Save report to JSON file
Args:
report (dict): Analysis report
output_path (str): Path to save JSON file
Returns:
str: Path to saved file
"""
with open(output_path, 'w') as f:
json.dump(report, f, indent=2, cls=NumpyEncoder)
return output_path
def get_summary(self, report):
"""
Get executive summary from report
Args:
report (dict): Analysis report
Returns:
dict: Summary metrics
"""
return report.get('summary', {})

263
src/report_generator.py Normal file
View File

@@ -0,0 +1,263 @@
"""
Report Generator Module
Generates comprehensive JSON reports
"""
import json
import numpy as np
from datetime import datetime
class NumpyEncoder(json.JSONEncoder):
"""Custom JSON encoder for numpy types"""
def default(self, obj):
if isinstance(obj, (np.integer, np.int64, np.int32)):
return int(obj)
elif isinstance(obj, (np.floating, np.float64, np.float32)):
return float(obj)
elif isinstance(obj, (np.ndarray,)):
return obj.tolist()
elif isinstance(obj, (np.bool_,)):
return bool(obj)
return super(NumpyEncoder, self).default(obj)
class ReportGenerator:
"""Generate comprehensive analysis reports"""
def __init__(self, model_results, bias_results, risk_results, df):
self.model_results = model_results
self.bias_results = bias_results
self.risk_results = risk_results
self.df = df
def generate_report(self):
"""Generate comprehensive JSON report"""
report = {
'metadata': self._generate_metadata(),
'summary': self._generate_summary(),
'model_performance': self._format_model_results(),
'bias_analysis': self._format_bias_results(),
'risk_assessment': self._format_risk_results(),
'key_findings': self._extract_key_findings(),
'recommendations': self._compile_recommendations(),
'detailed_metrics': self._compile_detailed_metrics()
}
return report
def _generate_metadata(self):
"""Generate report metadata"""
return {
'report_id': f"AIGov_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
'generated_at': datetime.now().isoformat(),
'report_version': '1.0',
'dataset_info': {
'total_records': len(self.df),
'total_features': len(self.df.columns),
'columns': list(self.df.columns)
}
}
def _generate_summary(self):
"""Generate executive summary"""
model_metrics = self.model_results.get('metrics', {})
return {
'overall_bias_score': self.bias_results.get('overall_bias_score', 0.0),
'overall_risk_score': self.risk_results.get('overall_risk_score', 0.0),
'risk_level': self.risk_results.get('risk_level', 'UNKNOWN'),
'model_accuracy': model_metrics.get('accuracy', 0.0),
'fairness_violations_count': len(self.bias_results.get('fairness_violations', [])),
'passes_fairness_threshold': self.bias_results.get('fairness_assessment', {}).get('passes_fairness_threshold', False)
}
def _format_model_results(self):
"""Format model performance results"""
return {
'model_type': self.model_results.get('model_type', 'Unknown'),
'metrics': self.model_results.get('metrics', {}),
'confusion_matrix': self.model_results.get('confusion_matrix', []),
'top_features': dict(list(self.model_results.get('feature_importance', {}).items())[:10])
}
def _format_bias_results(self):
"""Format bias analysis results"""
return {
'overall_bias_score': self.bias_results.get('overall_bias_score', 0.0),
'fairness_metrics': self.bias_results.get('fairness_metrics', {}),
'fairness_violations': self.bias_results.get('fairness_violations', []),
'fairness_assessment': self.bias_results.get('fairness_assessment', {}),
'demographic_bias_summary': self._summarize_demographic_bias()
}
def _format_risk_results(self):
"""Format risk assessment results"""
return {
'overall_risk_score': self.risk_results.get('overall_risk_score', 0.0),
'risk_level': self.risk_results.get('risk_level', 'UNKNOWN'),
'risk_categories': self.risk_results.get('risk_categories', {}),
'privacy_risks': self._summarize_privacy_risks(),
'ethical_risks': self._summarize_ethical_risks()
}
def _summarize_demographic_bias(self):
"""Summarize demographic bias"""
demo_bias = self.bias_results.get('demographic_bias', {})
summary = {}
for attr, data in demo_bias.items():
summary[attr] = {
'max_disparity': data.get('max_disparity', 0),
'groups_analyzed': len(data.get('approval_rates', {}))
}
return summary
def _summarize_privacy_risks(self):
"""Summarize privacy risks"""
privacy = self.risk_results.get('privacy_risks', {})
return {
'pii_count': len(privacy.get('pii_detected', [])),
'anonymization_level': privacy.get('anonymization_level', 'UNKNOWN'),
'exposure_risk_count': len(privacy.get('exposure_risks', [])),
'gdpr_compliance_score': privacy.get('gdpr_compliance', {}).get('compliance_score', 0)
}
def _summarize_ethical_risks(self):
"""Summarize ethical risks"""
ethical = self.risk_results.get('ethical_risks', {})
return {
'fairness_issues_count': len(ethical.get('fairness_issues', [])),
'transparency_score': ethical.get('transparency_score', 0),
'bias_amplification_risk': ethical.get('bias_amplification_risk', 'UNKNOWN'),
'social_impact': ethical.get('social_impact_assessment', {})
}
def _extract_key_findings(self):
"""Extract key findings from analysis"""
findings = []
# Model performance findings
accuracy = self.model_results.get('metrics', {}).get('accuracy', 0)
if accuracy >= 0.8:
findings.append(f"✓ Model achieves good accuracy ({accuracy:.2%})")
else:
findings.append(f"⚠ Model accuracy is below optimal ({accuracy:.2%})")
# Bias findings
bias_score = self.bias_results.get('overall_bias_score', 0)
if bias_score < 0.3:
findings.append("✓ Low bias detected across protected attributes")
elif bias_score < 0.5:
findings.append("⚠ Moderate bias detected - monitoring recommended")
else:
findings.append("❌ High bias detected - immediate action required")
# Fairness violations
violations = self.bias_results.get('fairness_violations', [])
if violations:
high_sev = sum(1 for v in violations if v['severity'] == 'HIGH')
findings.append(f"{len(violations)} fairness violations detected ({high_sev} high severity)")
else:
findings.append("✓ No fairness violations detected")
# Privacy findings
privacy = self.risk_results.get('privacy_risks', {})
pii_count = len(privacy.get('pii_detected', []))
if pii_count > 0:
findings.append(f"{pii_count} columns contain potential PII")
else:
findings.append("✓ No obvious PII detected in dataset")
# Risk level
risk_level = self.risk_results.get('risk_level', 'UNKNOWN')
findings.append(f"Overall Risk Level: {risk_level}")
return findings
def _compile_recommendations(self):
"""Compile all recommendations"""
recommendations = []
# Get recommendations from each component
privacy_recs = self.risk_results.get('privacy_risks', {}).get('recommendations', [])
ethical_recs = self.risk_results.get('ethical_risks', {}).get('recommendations', [])
performance_recs = self.risk_results.get('model_performance_risks', {}).get('recommendations', [])
compliance_recs = self.risk_results.get('compliance_risks', {}).get('recommendations', [])
# Prioritize recommendations
all_recs = []
# High priority (from violations and high risks)
violations = self.bias_results.get('fairness_violations', [])
if violations:
all_recs.append({
'priority': 'HIGH',
'category': 'Fairness',
'recommendation': 'Address fairness violations in protected attributes'
})
if len(privacy_recs) > 0:
all_recs.append({
'priority': 'HIGH',
'category': 'Privacy',
'recommendation': privacy_recs[0]
})
# Medium priority
for rec in ethical_recs[:2]:
all_recs.append({
'priority': 'MEDIUM',
'category': 'Ethics',
'recommendation': rec
})
# Lower priority
for rec in performance_recs[:2]:
all_recs.append({
'priority': 'MEDIUM',
'category': 'Performance',
'recommendation': rec
})
for rec in compliance_recs[:2]:
all_recs.append({
'priority': 'MEDIUM',
'category': 'Compliance',
'recommendation': rec
})
# Convert to simple list with formatting
recommendations = [
f"[{r['priority']}] {r['category']}: {r['recommendation']}"
for r in all_recs[:10] # Limit to top 10
]
return recommendations
def _compile_detailed_metrics(self):
"""Compile detailed metrics for analysis"""
return {
'bias_metrics': {
'by_attribute': self.bias_results.get('fairness_metrics', {}),
'demographic_analysis': self.bias_results.get('demographic_bias', {})
},
'risk_breakdown': {
'privacy': self.risk_results.get('privacy_risks', {}),
'ethical': self.risk_results.get('ethical_risks', {}),
'compliance': self.risk_results.get('compliance_risks', {}),
'data_quality': self.risk_results.get('data_quality_risks', {})
},
'model_details': {
'classification_report': self.model_results.get('classification_report', {}),
'feature_importance': self.model_results.get('feature_importance', {})
}
}
def save_report(self, filepath):
"""Save report to JSON file"""
report = self.generate_report()
with open(filepath, 'w') as f:
json.dump(report, f, indent=2, cls=NumpyEncoder)
return filepath