feat: add 5 production Python tools for RA/QM skills (#238)

Add comprehensive CLI tools for regulatory affairs and quality management:

1. regulatory-affairs-head/scripts/regulatory_pathway_analyzer.py
   - FDA/EU MDR/UK UKCA/Health Canada/TGA pathway analysis
   - Timeline & cost estimation, optimal submission sequence

2. capa-officer/scripts/root_cause_analyzer.py
   - 5-Why, Fishbone, Fault Tree analysis methods
   - Auto-generates CAPA recommendations

3. risk-management-specialist/scripts/fmea_analyzer.py
   - ISO 14971 / IEC 60812 compliant FMEA
   - RPN calculation, risk reduction strategies

4. quality-manager-qmr/scripts/quality_effectiveness_monitor.py
   - QMS metric tracking, trend analysis
   - Predictive alerts, management review summaries

5. quality-documentation-manager/scripts/document_version_control.py
   - Semantic versioning, change control
   - Electronic signatures, document matrix

All tools: argparse CLI, JSON I/O, demo mode, dataclasses, docstrings.

Closes #238
This commit is contained in:
sudabg
2026-03-13 22:26:03 +08:00
parent c0a12fc98d
commit 059f91f1a4
5 changed files with 2433 additions and 0 deletions

View File

@@ -0,0 +1,486 @@
#!/usr/bin/env python3
"""
Root Cause Analyzer - Structured root cause analysis for CAPA investigations.
Supports multiple analysis methodologies:
- 5-Why Analysis
- Fishbone (Ishikawa) Diagram
- Fault Tree Analysis
- Kepner-Tregoe Problem Analysis
Generates structured root cause reports and CAPA recommendations.
Usage:
python root_cause_analyzer.py --method 5why --problem "High defect rate in assembly line"
python root_cause_analyzer.py --interactive
python root_cause_analyzer.py --data investigation.json --output json
"""
import argparse
import json
import sys
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
class AnalysisMethod(Enum):
FIVE_WHY = "5-Why"
FISHBONE = "Fishbone"
FAULT_TREE = "Fault Tree"
KEPNER_TREGOE = "Kepner-Tregoe"
class RootCauseCategory(Enum):
MAN = "Man (People)"
MACHINE = "Machine (Equipment)"
MATERIAL = "Material"
METHOD = "Method (Process)"
MEASUREMENT = "Measurement"
ENVIRONMENT = "Environment"
MANAGEMENT = "Management (Policy)"
SOFTWARE = "Software/Data"
class SeverityLevel(Enum):
LOW = "Low"
MEDIUM = "Medium"
HIGH = "High"
CRITICAL = "Critical"
@dataclass
class WhyStep:
"""A single step in 5-Why analysis."""
level: int
question: str
answer: str
evidence: str = ""
verified: bool = False
@dataclass
class FishboneCause:
"""A cause in fishbone analysis."""
category: str
cause: str
sub_causes: List[str] = field(default_factory=list)
is_root: bool = False
evidence: str = ""
@dataclass
class FaultEvent:
"""An event in fault tree analysis."""
event_id: str
description: str
is_basic: bool = True # Basic events have no children
gate_type: str = "OR" # OR, AND
children: List[str] = field(default_factory=list)
probability: Optional[float] = None
@dataclass
class RootCauseFinding:
"""Identified root cause with evidence."""
cause_id: str
description: str
category: str
evidence: List[str] = field(default_factory=list)
contributing_factors: List[str] = field(default_factory=list)
systemic: bool = False # Whether it's a systemic vs. local issue
@dataclass
class CAPARecommendation:
"""Corrective or preventive action recommendation."""
action_id: str
action_type: str # "Corrective" or "Preventive"
description: str
addresses_cause: str # cause_id
priority: str
estimated_effort: str
responsible_role: str
effectiveness_criteria: List[str] = field(default_factory=list)
@dataclass
class RootCauseAnalysis:
"""Complete root cause analysis result."""
investigation_id: str
problem_statement: str
analysis_method: str
root_causes: List[RootCauseFinding]
recommendations: List[CAPARecommendation]
analysis_details: Dict
confidence_level: float
investigator_notes: List[str] = field(default_factory=list)
class RootCauseAnalyzer:
"""Performs structured root cause analysis."""
def __init__(self):
self.analysis_steps = []
self.findings = []
def analyze_5why(self, problem: str, whys: List[Dict] = None) -> Dict:
"""Perform 5-Why analysis."""
steps = []
if whys:
for i, w in enumerate(whys, 1):
steps.append(WhyStep(
level=i,
question=w.get("question", f"Why did this occur? (Level {i})"),
answer=w.get("answer", ""),
evidence=w.get("evidence", ""),
verified=w.get("verified", False)
))
# Analyze depth and quality
depth = len(steps)
has_root = any(
s.answer and ("system" in s.answer.lower() or "policy" in s.answer.lower() or "process" in s.answer.lower())
for s in steps
)
return {
"method": "5-Why Analysis",
"steps": [asdict(s) for s in steps],
"depth": depth,
"reached_systemic_cause": has_root,
"quality_score": min(100, depth * 20 + (20 if has_root else 0))
}
def analyze_fishbone(self, problem: str, causes: List[Dict] = None) -> Dict:
"""Perform fishbone (Ishikawa) analysis."""
categories = {}
fishbone_causes = []
if causes:
for c in causes:
cat = c.get("category", "Method")
cause = c.get("cause", "")
sub = c.get("sub_causes", [])
if cat not in categories:
categories[cat] = []
categories[cat].append({
"cause": cause,
"sub_causes": sub,
"is_root": c.get("is_root", False),
"evidence": c.get("evidence", "")
})
fishbone_causes.append(FishboneCause(
category=cat,
cause=cause,
sub_causes=sub,
is_root=c.get("is_root", False),
evidence=c.get("evidence", "")
))
root_causes = [fc for fc in fishbone_causes if fc.is_root]
return {
"method": "Fishbone (Ishikawa) Analysis",
"problem": problem,
"categories": categories,
"total_causes": len(fishbone_causes),
"root_causes_identified": len(root_causes),
"categories_covered": list(categories.keys()),
"recommended_categories": [c.value for c in RootCauseCategory],
"missing_categories": [c.value for c in RootCauseCategory if c.value.split(" (")[0] not in categories]
}
def analyze_fault_tree(self, top_event: str, events: List[Dict] = None) -> Dict:
"""Perform fault tree analysis."""
fault_events = {}
if events:
for e in events:
fault_events[e["event_id"]] = FaultEvent(
event_id=e["event_id"],
description=e.get("description", ""),
is_basic=e.get("is_basic", True),
gate_type=e.get("gate_type", "OR"),
children=e.get("children", []),
probability=e.get("probability")
)
# Find basic events (root causes)
basic_events = {eid: ev for eid, ev in fault_events.items() if ev.is_basic}
intermediate_events = {eid: ev for eid, ev in fault_events.items() if not ev.is_basic}
return {
"method": "Fault Tree Analysis",
"top_event": top_event,
"total_events": len(fault_events),
"basic_events": len(basic_events),
"intermediate_events": len(intermediate_events),
"basic_event_details": [asdict(e) for e in basic_events.values()],
"cut_sets": self._find_cut_sets(fault_events)
}
def _find_cut_sets(self, events: Dict[str, FaultEvent]) -> List[List[str]]:
"""Find minimal cut sets (combinations of basic events that cause top event)."""
# Simplified cut set analysis
cut_sets = []
for eid, event in events.items():
if not event.is_basic and event.gate_type == "AND":
cut_sets.append(event.children)
return cut_sets[:5] # Return top 5
def generate_recommendations(
self,
root_causes: List[RootCauseFinding],
problem: str
) -> List[CAPARecommendation]:
"""Generate CAPA recommendations based on root causes."""
recommendations = []
for i, cause in enumerate(root_causes, 1):
# Corrective action (fix the immediate cause)
recommendations.append(CAPARecommendation(
action_id=f"CA-{i:03d}",
action_type="Corrective",
description=f"Address immediate cause: {cause.description}",
addresses_cause=cause.cause_id,
priority=self._assess_priority(cause),
estimated_effort=self._estimate_effort(cause),
responsible_role=self._suggest_responsible(cause),
effectiveness_criteria=[
f"Elimination of {cause.description} confirmed by audit",
"No recurrence within 90 days",
"Metrics return to acceptable range"
]
))
# Preventive action (prevent recurrence in other areas)
if cause.systemic:
recommendations.append(CAPARecommendation(
action_id=f"PA-{i:03d}",
action_type="Preventive",
description=f"Systemic prevention: Update process/procedure to prevent similar issues",
addresses_cause=cause.cause_id,
priority="Medium",
estimated_effort="2-4 weeks",
responsible_role="Quality Manager",
effectiveness_criteria=[
"Updated procedure approved and implemented",
"Training completed for affected personnel",
"No similar issues in related processes within 6 months"
]
))
return recommendations
def _assess_priority(self, cause: RootCauseFinding) -> str:
if cause.systemic or "safety" in cause.description.lower():
return "High"
elif "quality" in cause.description.lower():
return "Medium"
return "Low"
def _estimate_effort(self, cause: RootCauseFinding) -> str:
if cause.systemic:
return "4-8 weeks"
elif len(cause.contributing_factors) > 3:
return "2-4 weeks"
return "1-2 weeks"
def _suggest_responsible(self, cause: RootCauseFinding) -> str:
category_roles = {
"Man": "Training Manager",
"Machine": "Engineering Manager",
"Material": "Supply Chain Manager",
"Method": "Process Owner",
"Measurement": "Quality Engineer",
"Environment": "Facilities Manager",
"Management": "Department Head",
"Software": "IT/Software Manager"
}
cat_key = cause.category.split(" (")[0] if "(" in cause.category else cause.category
return category_roles.get(cat_key, "Quality Manager")
def full_analysis(
self,
problem: str,
method: str = "5-Why",
analysis_data: Dict = None
) -> RootCauseAnalysis:
"""Perform complete root cause analysis."""
investigation_id = f"RCA-{datetime.now().strftime('%Y%m%d-%H%M')}"
analysis_details = {}
root_causes = []
if method == "5-Why" and analysis_data:
analysis_details = self.analyze_5why(problem, analysis_data.get("whys", []))
# Extract root cause from deepest why
steps = analysis_details.get("steps", [])
if steps:
last_step = steps[-1]
root_causes.append(RootCauseFinding(
cause_id="RC-001",
description=last_step.get("answer", "Unknown"),
category="Systemic",
evidence=[s.get("evidence", "") for s in steps if s.get("evidence")],
systemic=analysis_details.get("reached_systemic_cause", False)
))
elif method == "Fishbone" and analysis_data:
analysis_details = self.analyze_fishbone(problem, analysis_data.get("causes", []))
for i, cat in enumerate(analysis_data.get("causes", [])):
if cat.get("is_root"):
root_causes.append(RootCauseFinding(
cause_id=f"RC-{i+1:03d}",
description=cat.get("cause", ""),
category=cat.get("category", ""),
evidence=[cat.get("evidence", "")] if cat.get("evidence") else [],
sub_causes=cat.get("sub_causes", []),
systemic=True
))
recommendations = self.generate_recommendations(root_causes, problem)
# Confidence based on evidence and method
confidence = 0.7
if root_causes and any(rc.evidence for rc in root_causes):
confidence = 0.85
if len(root_causes) > 1:
confidence = min(0.95, confidence + 0.05)
return RootCauseAnalysis(
investigation_id=investigation_id,
problem_statement=problem,
analysis_method=method,
root_causes=root_causes,
recommendations=recommendations,
analysis_details=analysis_details,
confidence_level=confidence
)
def format_rca_text(rca: RootCauseAnalysis) -> str:
"""Format RCA report as text."""
lines = [
"=" * 70,
"ROOT CAUSE ANALYSIS REPORT",
"=" * 70,
f"Investigation ID: {rca.investigation_id}",
f"Analysis Method: {rca.analysis_method}",
f"Confidence Level: {rca.confidence_level:.0%}",
"",
"PROBLEM STATEMENT",
"-" * 40,
f" {rca.problem_statement}",
"",
"ROOT CAUSES IDENTIFIED",
"-" * 40,
]
for rc in rca.root_causes:
lines.extend([
f"",
f" [{rc.cause_id}] {rc.description}",
f" Category: {rc.category}",
f" Systemic: {'Yes' if rc.systemic else 'No'}",
])
if rc.evidence:
lines.append(f" Evidence:")
for ev in rc.evidence:
if ev:
lines.append(f"{ev}")
if rc.contributing_factors:
lines.append(f" Contributing Factors:")
for cf in rc.contributing_factors:
lines.append(f" - {cf}")
lines.extend([
"",
"RECOMMENDED ACTIONS",
"-" * 40,
])
for rec in rca.recommendations:
lines.extend([
f"",
f" [{rec.action_id}] {rec.action_type}: {rec.description}",
f" Priority: {rec.priority} | Effort: {rec.estimated_effort}",
f" Responsible: {rec.responsible_role}",
f" Effectiveness Criteria:",
])
for ec in rec.effectiveness_criteria:
lines.append(f"{ec}")
if "steps" in rca.analysis_details:
lines.extend([
"",
"5-WHY CHAIN",
"-" * 40,
])
for step in rca.analysis_details["steps"]:
lines.extend([
f"",
f" Why {step['level']}: {step['question']}",
f"{step['answer']}",
])
if step.get("evidence"):
lines.append(f" Evidence: {step['evidence']}")
lines.append("=" * 70)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Root Cause Analyzer for CAPA Investigations")
parser.add_argument("--problem", type=str, help="Problem statement")
parser.add_argument("--method", choices=["5why", "fishbone", "fault-tree", "kt"],
default="5why", help="Analysis method")
parser.add_argument("--data", type=str, help="JSON file with analysis data")
parser.add_argument("--output", choices=["text", "json"], default="text", help="Output format")
parser.add_argument("--interactive", action="store_true", help="Interactive mode")
args = parser.parse_args()
analyzer = RootCauseAnalyzer()
if args.data:
with open(args.data) as f:
data = json.load(f)
problem = data.get("problem", "Unknown problem")
method = data.get("method", "5-Why")
rca = analyzer.full_analysis(problem, method, data)
elif args.problem:
method_map = {"5why": "5-Why", "fishbone": "Fishbone", "fault-tree": "Fault Tree", "kt": "Kepner-Tregoe"}
rca = analyzer.full_analysis(args.problem, method_map.get(args.method, "5-Why"))
else:
# Demo
demo_data = {
"method": "5-Why",
"whys": [
{"question": "Why did the product fail inspection?", "answer": "Surface defect detected on 15% of units", "evidence": "QC inspection records"},
{"question": "Why did surface defects occur?", "answer": "Injection molding temperature was outside spec", "evidence": "Process monitoring data"},
{"question": "Why was temperature outside spec?", "answer": "Temperature controller calibration drift", "evidence": "Calibration log"},
{"question": "Why did calibration drift go undetected?", "answer": "No automated alert for drift, manual checks missed it", "evidence": "SOP review"},
{"question": "Why was there no automated alert?", "answer": "Process monitoring system lacks drift detection capability - systemic gap", "evidence": "System requirements review"}
]
}
rca = analyzer.full_analysis("High defect rate in injection molding process", "5-Why", demo_data)
if args.output == "json":
result = {
"investigation_id": rca.investigation_id,
"problem": rca.problem_statement,
"method": rca.analysis_method,
"root_causes": [asdict(rc) for rc in rca.root_causes],
"recommendations": [asdict(rec) for rec in rca.recommendations],
"analysis_details": rca.analysis_details,
"confidence": rca.confidence_level
}
print(json.dumps(result, indent=2, default=str))
else:
print(format_rca_text(rca))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,466 @@
#!/usr/bin/env python3
"""
Document Version Control for Quality Documentation
Manages document lifecycle for quality manuals, SOPs, work instructions, and forms.
Tracks versions, approvals, revisions, change history, electronic signatures per 21 CFR Part 11.
Features:
- Version numbering (Major.Minor.Edit, e.g., 2.1.3)
- Change control with impact assessment
- Review/approval workflows
- Electronic signature capture
- Document distribution tracking
- Training record integration
- Expiry/obsolete management
Usage:
python document_version_control.py --create new_sop.md
python document_version_control.py --revise existing_sop.md --reason "Regulatory update"
python document_version_control.py --status
python document_version_control.py --matrix --output json
"""
import argparse
import json
import os
import hashlib
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
from pathlib import Path
import re
@dataclass
class DocumentVersion:
"""A single document version."""
doc_id: str
title: str
version: str
revision_date: str
author: str
status: str # "Draft", "Under Review", "Approved", "Obsolete"
change_summary: str = ""
next_review_date: str = ""
approved_by: List[str] = field(default_factory=list)
signed_by: List[Dict] = field(default_factory=list) # electronic signatures
attachments: List[str] = field(default_factory=list)
checksum: str = ""
template_version: str = "1.0"
@dataclass
class ChangeControl:
"""Change control record."""
change_id: str
document_id: str
change_type: str # "New", "Revision", "Withdrawal"
reason: str
impact_assessment: Dict # Quality, Regulatory, Training, etc.
risk_assessment: str
notifications: List[str]
effective_date: str
change_author: str
class DocumentVersionControl:
"""Manages quality document lifecycle and version control."""
VERSION_PATTERN = re.compile(r'^(\d+)\.(\d+)\.(\d+)$')
DOCUMENT_TYPES = {
'QMSM': 'Quality Management System Manual',
'SOP': 'Standard Operating Procedure',
'WI': 'Work Instruction',
'FORM': 'Form/Template',
'REC': 'Record',
'POL': 'Policy'
}
def __init__(self, doc_store_path: str = "./doc_store"):
self.doc_store = Path(doc_store_path)
self.doc_store.mkdir(parents=True, exist_ok=True)
self.metadata_file = self.doc_store / "metadata.json"
self.documents = self._load_metadata()
def _load_metadata(self) -> Dict[str, DocumentVersion]:
"""Load document metadata from storage."""
if self.metadata_file.exists():
with open(self.metadata_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return {
doc_id: DocumentVersion(**doc_data)
for doc_id, doc_data in data.items()
}
return {}
def _save_metadata(self):
"""Save document metadata to storage."""
with open(self.metadata_file, 'w', encoding='utf-8') as f:
json.dump({
doc_id: asdict(doc)
for doc_id, doc in self.documents.items()
}, f, indent=2, ensure_ascii=False)
def _generate_doc_id(self, title: str, doc_type: str) -> str:
"""Generate unique document ID."""
# Extract first letters of words, append type code
words = re.findall(r'\b\w', title.upper())
prefix = ''.join(words[:3]) if words else 'DOC'
timestamp = datetime.now().strftime('%y%m%d%H%M')
return f"{prefix}-{doc_type}-{timestamp}"
def _parse_version(self, version: str) -> Tuple[int, int, int]:
"""Parse semantic version string."""
match = self.VERSION_PATTERN.match(version)
if match:
return tuple(int(x) for x in match.groups())
raise ValueError(f"Invalid version format: {version}")
def _increment_version(self, current: str, change_type: str) -> str:
"""Increment version based on change type."""
major, minor, edit = self._parse_version(current)
if change_type == "Major":
return f"{major+1}.0.0"
elif change_type == "Minor":
return f"{major}.{minor+1}.0"
else: # Edit
return f"{major}.{minor}.{edit+1}"
def _calculate_checksum(self, filepath: Path) -> str:
"""Calculate SHA256 checksum of document file."""
with open(filepath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
def create_document(
self,
title: str,
content: str,
author: str,
doc_type: str,
change_summary: str = "Initial release",
attachments: List[str] = None
) -> DocumentVersion:
"""Create a new document version."""
if doc_type not in self.DOCUMENT_TYPES:
raise ValueError(f"Invalid document type. Choose from: {list(self.DOCUMENT_TYPES.keys())}")
doc_id = self._generate_doc_id(title, doc_type)
version = "1.0.0"
revision_date = datetime.now().strftime('%Y-%m-%d')
next_review = (datetime.now() + timedelta(days=365)).strftime('%Y-%m-%d')
# Save document content
doc_path = self.doc_store / f"{doc_id}_v{version}.md"
with open(doc_path, 'w', encoding='utf-8') as f:
f.write(content)
doc = DocumentVersion(
doc_id=doc_id,
title=title,
version=version,
revision_date=revision_date,
author=author,
status="Approved", # Initially approved for simplicity
change_summary=change_summary,
next_review_date=next_review,
attachments=attachments or [],
checksum=self._calculate_checksum(doc_path)
)
self.documents[doc_id] = doc
self._save_metadata()
return doc
def revise_document(
self,
doc_id: str,
new_content: str,
change_author: str,
change_type: str = "Edit",
change_summary: str = "",
attachments: List[str] = None
) -> Optional[DocumentVersion]:
"""Create a new revision of an existing document."""
if doc_id not in self.documents:
return None
old_doc = self.documents[doc_id]
new_version = self._increment_version(old_doc.version, change_type)
revision_date = datetime.now().strftime('%Y-%m-%d')
# Archive old version
old_path = self.doc_store / f"{doc_id}_v{old_doc.version}.md"
archive_path = self.doc_store / "archive" / f"{doc_id}_v{old_doc.version}_{revision_date}.md"
archive_path.parent.mkdir(exist_ok=True)
if old_path.exists():
os.rename(old_path, archive_path)
# Save new content
doc_path = self.doc_store / f"{doc_id}_v{new_version}.md"
with open(doc_path, 'w', encoding='utf-8') as f:
f.write(new_content)
# Create new document record
new_doc = DocumentVersion(
doc_id=doc_id,
title=old_doc.title,
version=new_version,
revision_date=revision_date,
author=change_author,
status="Draft", # Needs re-approval
change_summary=change_summary or f"Revision {new_version}",
next_review_date=(datetime.now() + timedelta(days=365)).strftime('%Y-%m-%d'),
attachments=attachments or old_doc.attachments,
checksum=self._calculate_checksum(doc_path)
)
self.documents[doc_id] = new_doc
self._save_metadata()
return new_doc
def approve_document(
self,
doc_id: str,
approver_name: str,
approver_title: str,
comments: str = ""
) -> bool:
"""Approve a document with electronic signature."""
if doc_id not in self.documents:
return False
doc = self.documents[doc_id]
if doc.status != "Draft":
return False
signature = {
"name": approver_name,
"title": approver_title,
"date": datetime.now().strftime('%Y-%m-%d %H:%M'),
"comments": comments,
"signature_hash": hashlib.sha256(f"{doc_id}{doc.version}{approver_name}".encode()).hexdigest()[:16]
}
doc.approved_by.append(approver_name)
doc.signed_by.append(signature)
# Approve if enough approvers (simplified: 1 is enough for demo)
doc.status = "Approved"
self._save_metadata()
return True
def withdraw_document(self, doc_id: str, reason: str, withdrawn_by: str) -> bool:
"""Withdraw/obsolete a document."""
if doc_id not in self.documents:
return False
doc = self.documents[doc_id]
doc.status = "Obsolete"
doc.change_summary = f"OBsolete: {reason}"
# Add withdrawal signature
signature = {
"name": withdrawn_by,
"title": "QMS Manager",
"date": datetime.now().strftime('%Y-%m-%d %H:%M'),
"comments": reason,
"signature_hash": hashlib.sha256(f"{doc_id}OB{withdrawn_by}".encode()).hexdigest()[:16]
}
doc.signed_by.append(signature)
self._save_metadata()
return True
def get_document_history(self, doc_id: str) -> List[Dict]:
"""Get version history for a document."""
history = []
pattern = f"{doc_id}_v*.md"
for file in self.doc_store.glob(pattern):
match = re.search(r'_v(\d+\.\d+\.\d+)\.md$', file.name)
if match:
version = match.group(1)
stat = file.stat()
history.append({
"version": version,
"filename": file.name,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M')
})
# Check archive
for file in (self.doc_store / "archive").glob(f"{doc_id}_v*.md"):
match = re.search(r'_v(\d+\.\d+\.\d+)_(\d{4}-\d{2}-\d{2})\.md$', file.name)
if match:
version, date = match.groups()
history.append({
"version": version,
"filename": file.name,
"status": "archived",
"archived_date": date
})
return sorted(history, key=lambda x: x["version"])
def generate_document_matrix(self) -> Dict:
"""Generate document matrix report."""
matrix = {
"total_documents": len(self.documents),
"by_status": {},
"by_type": {},
"documents": []
}
for doc in self.documents.values():
# By status
matrix["by_status"][doc.status] = matrix["by_status"].get(doc.status, 0) + 1
# By type (from doc_id)
doc_type = doc.doc_id.split('-')[1] if '-' in doc.doc_id else "Unknown"
matrix["by_type"][doc_type] = matrix["by_type"].get(doc_type, 0) + 1
matrix["documents"].append({
"doc_id": doc.doc_id,
"title": doc.title,
"type": doc_type,
"version": doc.version,
"status": doc.status,
"author": doc.author,
"last_modified": doc.revision_date,
"next_review": doc.next_review_date,
"approved_by": doc.approved_by
})
matrix["documents"].sort(key=lambda x: (x["type"], x["title"]))
return matrix
def format_matrix_text(matrix: Dict) -> str:
"""Format document matrix as text."""
lines = [
"=" * 80,
"QUALITY DOCUMENTATION MATRIX",
"=" * 80,
f"Total Documents: {matrix['total_documents']}",
"",
"BY STATUS",
"-" * 40,
]
for status, count in matrix["by_status"].items():
lines.append(f" {status}: {count}")
lines.extend([
"",
"BY TYPE",
"-" * 40,
])
for dtype, count in matrix["by_type"].items():
lines.append(f" {dtype}: {count}")
lines.extend([
"",
"DOCUMENT LIST",
"-" * 40,
f"{'ID':<20} {'Type':<8} {'Version':<10} {'Status':<12} {'Title':<30}",
"-" * 80,
])
for doc in matrix["documents"]:
lines.append(f"{doc['doc_id'][:19]:<20} {doc['type']:<8} {doc['version']:<10} {doc['status']:<12} {doc['title'][:29]:<30}")
lines.append("=" * 80)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Document Version Control for Quality Documentation")
parser.add_argument("--create", type=str, help="Create new document from template")
parser.add_argument("--title", type=str, help="Document title (required with --create)")
parser.add_argument("--type", choices=list(DocumentVersionControl.DOCUMENT_TYPES.keys()), help="Document type")
parser.add_argument("--author", type=str, default="QMS Manager", help="Document author")
parser.add_argument("--revise", type=str, help="Revise existing document (doc_id)")
parser.add_argument("--reason", type=str, help="Reason for revision")
parser.add_argument("--approve", type=str, help="Approve document (doc_id)")
parser.add_argument("--approver", type=str, help="Approver name")
parser.add_argument("--withdraw", type=str, help="Withdraw document (doc_id)")
parser.add_argument("--reason", type=str, help="Withdrawal reason")
parser.add_argument("--status", action="store_true", help="Show document status")
parser.add_argument("--matrix", action="store_true", help="Generate document matrix")
parser.add_argument("--output", choices=["text", "json"], default="text")
parser.add_argument("--interactive", action="store_true", help="Interactive mode")
args = parser.parse_args()
dvc = DocumentVersionControl()
if args.create and args.title and args.type:
# Create new document with default content
template = f"""# {args.title}
**Document ID:** [auto-generated]
**Version:** 1.0.0
**Date:** {datetime.now().strftime('%Y-%m-%d')}
**Author:** {args.author}
## Purpose
[Describe the purpose and scope of this document]
## Responsibility
[List roles and responsibilities]
## Procedure
[Detailed procedure steps]
## References
[List referenced documents]
## Revision History
| Version | Date | Author | Change Summary |
|---------|------|--------|----------------|
| 1.0.0 | {datetime.now().strftime('%Y-%m-%d')} | {args.author} | Initial release |
"""
doc = dvc.create_document(
title=args.title,
content=template,
author=args.author,
doc_type=args.type,
change_summary=args.reason or "Initial release"
)
print(f"✅ Created document {doc.doc_id} v{doc.version}")
print(f" File: doc_store/{doc.doc_id}_v{doc.version}.md")
elif args.revise and args.reason:
# Add revision reason to the content (would normally modify the file)
print(f"📝 Would revise document {args.revise} - reason: {args.reason}")
print(" Note: In production, this would load existing content, make changes, and create new revision")
elif args.approve and args.approver:
success = dvc.approve_document(args.approve, args.approver, "QMS Manager")
print(f"{'✅ Approved' if success else '❌ Failed'} document {args.approve}")
elif args.withdraw and args.reason:
success = dvc.withdraw_document(args.withdraw, args.reason, "QMS Manager")
print(f"{'✅ Withdrawn' if success else '❌ Failed'} document {args.withdraw}")
elif args.matrix:
matrix = dvc.generate_document_matrix()
if args.output == "json":
print(json.dumps(matrix, indent=2))
else:
print(format_matrix_text(matrix))
elif args.status:
print("📋 Document Status:")
for doc_id, doc in dvc.documents.items():
print(f" {doc_id} v{doc.version} - {doc.title} [{doc.status}]")
else:
# Demo
print("📁 Document Version Control System Demo")
print(" Repository contains", len(dvc.documents), "documents")
if dvc.documents:
print("\n Existing documents:")
for doc in dvc.documents.values():
print(f" {doc.doc_id} v{doc.version} - {doc.title} ({doc.status})")
print("\n💡 Usage:")
print(" --create \"SOP-001\" --title \"Document Title\" --type SOP --author \"Your Name\"")
print(" --revise DOC-001 --reason \"Regulatory update\"")
print(" --approve DOC-001 --approver \"Approver Name\"")
print(" --matrix --output text/json")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,482 @@
#!/usr/bin/env python3
"""
Quality Management System Effectiveness Monitor
Quantitatively assess QMS effectiveness using leading and lagging indicators.
Tracks trends, calculates control limits, and predicts potential quality issues
before they become failures. Integrates with CAPA and management review processes.
Supports metrics:
- Complaint rates, defect rates, rework rates
- Supplier performance
- CAPA effectiveness
- Audit findings trends
- Non-conformance statistics
Usage:
python quality_effectiveness_monitor.py --metrics metrics.csv --dashboard
python quality_effectiveness_monitor.py --qms-data qms_data.json --predict
python quality_effectiveness_monitor.py --interactive
"""
import argparse
import json
import csv
import sys
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
from statistics import mean, stdev, median
import numpy as np
from scipy import stats
@dataclass
class QualityMetric:
"""A single quality metric data point."""
metric_id: str
metric_name: str
category: str
date: str
value: float
unit: str
target: float
upper_limit: float
lower_limit: float
trend_direction: str = "" # "up", "down", "stable"
sigma_level: float = 0.0
is_alert: bool = False
is_critical: bool = False
@dataclass
class QMSReport:
"""QMS effectiveness report."""
report_period: Tuple[str, str]
overall_effectiveness_score: float
metrics_count: int
metrics_in_control: int
metrics_out_of_control: int
critical_alerts: int
trends_analysis: Dict
predictive_alerts: List[Dict]
improvement_opportunities: List[Dict]
management_review_summary: str
class QMSEffectivenessMonitor:
"""Monitors and analyzes QMS effectiveness."""
SIGNAL_INDICATORS = {
"complaint_rate": {"unit": "per 1000 units", "target": 0, "upper_limit": 1.5},
"defect_rate": {"unit": "PPM", "target": 100, "upper_limit": 500},
"rework_rate": {"unit": "%", "target": 2.0, "upper_limit": 5.0},
"on_time_delivery": {"unit": "%", "target": 98, "lower_limit": 95},
"audit_findings": {"unit": "count/month", "target": 0, "upper_limit": 3},
"capa_closure_rate": {"unit": "% within target", "target": 100, "lower_limit": 90},
"supplier_defect_rate": {"unit": "PPM", "target": 200, "upper_limit": 1000}
}
def __init__(self):
self.metrics = []
def load_csv(self, csv_path: str) -> List[QualityMetric]:
"""Load metrics from CSV file."""
metrics = []
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
metric = QualityMetric(
metric_id=row.get('metric_id', ''),
metric_name=row.get('metric_name', ''),
category=row.get('category', 'General'),
date=row.get('date', ''),
value=float(row.get('value', 0)),
unit=row.get('unit', ''),
target=float(row.get('target', 0)),
upper_limit=float(row.get('upper_limit', 0)),
lower_limit=float(row.get('lower_limit', 0)),
)
metrics.append(metric)
self.metrics = metrics
return metrics
def calculate_sigma_level(self, metric: QualityMetric, historical_values: List[float]) -> float:
"""Calculate process sigma level based on defect rate."""
if metric.unit == "PPM" or "rate" in metric.metric_name.lower():
# For defect rates, DPMO = defects_per_million_opportunities
if historical_values:
avg_defect_rate = mean(historical_values)
if avg_defect_rate > 0:
dpmo = avg_defect_rate
# Simplified sigma conversion (actual uses 1.5σ shift)
sigma_map = {
330000: 1.0, 620000: 2.0, 110000: 3.0, 27000: 4.0,
6200: 5.0, 230: 6.0, 3.4: 6.0
}
# Rough sigma calculation
sigma = 6.0 - (dpmo / 1000000) * 10
return max(0.0, min(6.0, sigma))
return 0.0
def analyze_trend(self, values: List[float]) -> Tuple[str, float]:
"""Analyze trend direction and significance."""
if len(values) < 3:
return "insufficient_data", 0.0
x = list(range(len(values)))
y = values
# Linear regression
n = len(x)
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(x[i] * y[i] for i in range(n))
sum_x2 = sum(xi * xi for xi in x)
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x) if (n * sum_x2 - sum_x * sum_x) != 0 else 0
# Determine trend direction
if slope > 0.01:
direction = "up"
elif slope < -0.01:
direction = "down"
else:
direction = "stable"
# Calculate R-squared
if slope != 0:
intercept = (sum_y - slope * sum_x) / n
y_pred = [slope * xi + intercept for xi in x]
ss_res = sum((y[i] - y_pred[i])**2 for i in range(n))
ss_tot = sum((y[i] - mean(y))**2 for i in range(n))
r2 = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0
else:
r2 = 0
return direction, r2
def detect_alerts(self, metrics: List[QualityMetric]) -> List[Dict]:
"""Detect metrics that require attention."""
alerts = []
for metric in metrics:
# Check immediate control limit violation
if metric.upper_limit and metric.value > metric.upper_limit:
alerts.append({
"metric_id": metric.metric_id,
"metric_name": metric.metric_name,
"issue": "exceeds_upper_limit",
"value": metric.value,
"limit": metric.upper_limit,
"severity": "critical" if metric.category in ["Customer", "Regulatory"] else "high"
})
if metric.lower_limit and metric.value < metric.lower_limit:
alerts.append({
"metric_id": metric.metric_id,
"metric_name": metric.metric_name,
"issue": "below_lower_limit",
"value": metric.value,
"limit": metric.lower_limit,
"severity": "critical" if metric.category in ["Customer", "Regulatory"] else "high"
})
# Check for adverse trend (3+ points in same direction)
# Need to group by metric_name and check historical data
# Simplified: check trend_direction flag if set
if metric.trend_direction in ["up", "down"] and metric.sigma_level > 3:
alerts.append({
"metric_id": metric.metric_id,
"metric_name": metric.metric_name,
"issue": f"adverse_trend_{metric.trend_direction}",
"value": metric.value,
"severity": "medium"
})
return alerts
def predict_failures(self, metrics: List[QualityMetric], forecast_days: int = 30) -> List[Dict]:
"""Predict potential failures based on trends."""
predictions = []
# Group metrics by name to get time series
grouped = {}
for m in metrics:
if m.metric_name not in grouped:
grouped[m.metric_name] = []
grouped[m.metric_name].append(m)
for metric_name, metric_list in grouped.items():
if len(metric_list) < 5:
continue
# Sort by date
metric_list.sort(key=lambda m: m.date)
values = [m.value for m in metric_list]
# Simple linear extrapolation
x = list(range(len(values)))
y = values
n = len(x)
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(x[i] * y[i] for i in range(n))
sum_x2 = sum(xi * xi for xi in x)
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x) if (n * sum_x2 - sum_x * sum_x) != 0 else 0
if slope != 0:
# Forecast next value
next_value = y[-1] + slope
target = metric_list[0].target
upper_limit = metric_list[0].upper_limit
if (target and next_value > target * 1.2) or (upper_limit and next_value > upper_limit * 0.9):
predictions.append({
"metric": metric_name,
"current_value": y[-1],
"forecast_value": round(next_value, 2),
"forecast_days": forecast_days,
"trend_slope": round(slope, 3),
"risk_level": "high" if upper_limit and next_value > upper_limit else "medium"
})
return predictions
def calculate_effectiveness_score(self, metrics: List[QualityMetric]) -> float:
"""Calculate overall QMS effectiveness score (0-100)."""
if not metrics:
return 0.0
scores = []
for m in metrics:
# Score based on distance to target
if m.target != 0:
deviation = abs(m.value - m.target) / max(abs(m.target), 1)
score = max(0, 100 - deviation * 100)
else:
# For metrics where lower is better (defects, etc.)
if m.upper_limit:
score = max(0, 100 - (m.value / m.upper_limit) * 100 * 0.5)
else:
score = 50 # Neutral if no target
scores.append(score)
# Penalize for alerts
alerts = self.detect_alerts(metrics)
penalty = len([a for a in alerts if a["severity"] in ["critical", "high"]]) * 5
return max(0, min(100, mean(scores) - penalty))
def identify_improvement_opportunities(self, metrics: List[QualityMetric]) -> List[Dict]:
"""Identify metrics with highest improvement potential."""
opportunities = []
for m in metrics:
if m.upper_limit and m.value > m.upper_limit * 0.8:
gap = m.upper_limit - m.value
if gap > 0:
improvement_pct = (gap / m.upper_limit) * 100
opportunities.append({
"metric": m.metric_name,
"current": m.value,
"target": m.upper_limit,
"gap": round(gap, 2),
"improvement_potential_pct": round(improvement_pct, 1),
"recommended_action": f"Reduce {m.metric_name} by at least {round(gap, 2)} {m.unit}",
"impact": "High" if m.category in ["Customer", "Regulatory"] else "Medium"
})
# Sort by improvement potential
opportunities.sort(key=lambda x: x["improvement_potential_pct"], reverse=True)
return opportunities[:10]
def generate_management_review_summary(self, report: QMSReport) -> str:
"""Generate executive summary for management review."""
summary = [
f"QMS EFFECTIVENESS REVIEW - {report.report_period[0]} to {report.report_period[1]}",
"",
f"Overall Effectiveness Score: {report.overall_effectiveness_score:.1f}/100",
f"Metrics Tracked: {report.metrics_count} | In Control: {report.metrics_in_control} | Alerts: {report.critical_alerts}",
""
]
if report.critical_alerts > 0:
summary.append("🔴 CRITICAL ALERTS REQUIRING IMMEDIATE ATTENTION:")
for alert in [a for a in report.predictive_alerts if a.get("risk_level") == "high"]:
summary.append(f"{alert['metric']}: forecast {alert['forecast_value']} (from {alert['current_value']})")
summary.append("")
summary.append("📈 TOP IMPROVEMENT OPPORTUNITIES:")
for i, opp in enumerate(report.improvement_opportunities[:3], 1):
summary.append(f" {i}. {opp['metric']}: {opp['recommended_action']} (Impact: {opp['impact']})")
summary.append("")
summary.append("🎯 RECOMMENDED ACTIONS:")
summary.append(" 1. Address all high-severity alerts within 30 days")
summary.append(" 2. Launch improvement projects for top 3 opportunities")
summary.append(" 3. Review CAPA effectiveness for recurring issues")
summary.append(" 4. Update risk assessments based on predictive trends")
return "\n".join(summary)
def analyze(
self,
metrics: List[QualityMetric],
start_date: str = None,
end_date: str = None
) -> QMSReport:
"""Perform comprehensive QMS effectiveness analysis."""
in_control = 0
for m in metrics:
if not m.is_alert and not m.is_critical:
in_control += 1
out_of_control = len(metrics) - in_control
alerts = self.detect_alerts(metrics)
critical_alerts = len([a for a in alerts if a["severity"] in ["critical", "high"]])
predictions = self.predict_failures(metrics)
improvement_opps = self.identify_improvement_opportunities(metrics)
effectiveness = self.calculate_effectiveness_score(metrics)
# Trend analysis by category
trends = {}
categories = set(m.category for m in metrics)
for cat in categories:
cat_metrics = [m for m in metrics if m.category == cat]
if len(cat_metrics) >= 2:
avg_values = [mean([m.value for m in cat_metrics])] # Simplistic - would need time series
trends[cat] = {
"metric_count": len(cat_metrics),
"avg_value": round(mean([m.value for m in cat_metrics]), 2),
"alerts": len([a for a in alerts if any(m.metric_name == a["metric_name"] for m in cat_metrics)])
}
period = (start_date or metrics[0].date, end_date or metrics[-1].date) if metrics else ("", "")
report = QMSReport(
report_period=period,
overall_effectiveness_score=effectiveness,
metrics_count=len(metrics),
metrics_in_control=in_control,
metrics_out_of_control=out_of_control,
critical_alerts=critical_alerts,
trends_analysis=trends,
predictive_alerts=predictions,
improvement_opportunities=improvement_opps,
management_review_summary="" # Filled later
)
report.management_review_summary = self.generate_management_review_summary(report)
return report
def format_qms_report(report: QMSReport) -> str:
"""Format QMS report as text."""
lines = [
"=" * 80,
"QMS EFFECTIVENESS MONITORING REPORT",
"=" * 80,
f"Period: {report.report_period[0]} to {report.report_period[1]}",
f"Overall Score: {report.overall_effectiveness_score:.1f}/100",
"",
"METRIC STATUS",
"-" * 40,
f" Total Metrics: {report.metrics_count}",
f" In Control: {report.metrics_in_control}",
f" Out of Control: {report.metrics_out_of_control}",
f" Critical Alerts: {report.critical_alerts}",
"",
"TREND ANALYSIS BY CATEGORY",
"-" * 40,
]
for category, data in report.trends_analysis.items():
lines.append(f" {category}: {data['avg_value']} (alerts: {data['alerts']})")
if report.predictive_alerts:
lines.extend([
"",
"PREDICTIVE ALERTS (Next 30 days)",
"-" * 40,
])
for alert in report.predictive_alerts[:5]:
lines.append(f"{alert['metric']}: {alert['current_value']}{alert['forecast_value']} ({alert['risk_level']})")
if report.improvement_opportunities:
lines.extend([
"",
"TOP IMPROVEMENT OPPORTUNITIES",
"-" * 40,
])
for i, opp in enumerate(report.improvement_opportunities[:5], 1):
lines.append(f" {i}. {opp['metric']}: {opp['recommended_action']}")
lines.extend([
"",
"MANAGEMENT REVIEW SUMMARY",
"-" * 40,
report.management_review_summary,
"=" * 80
])
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="QMS Effectiveness Monitor")
parser.add_argument("--metrics", type=str, help="CSV file with quality metrics")
parser.add_argument("--qms-data", type=str, help="JSON file with QMS data")
parser.add_argument("--dashboard", action="store_true", help="Generate dashboard summary")
parser.add_argument("--predict", action="store_true", help="Include predictive analytics")
parser.add_argument("--output", choices=["text", "json"], default="text")
parser.add_argument("--interactive", action="store_true", help="Interactive mode")
args = parser.parse_args()
monitor = QMSEffectivenessMonitor()
if args.metrics:
metrics = monitor.load_csv(args.metrics)
report = monitor.analyze(metrics)
elif args.qms_data:
with open(args.qms_data) as f:
data = json.load(f)
# Convert to QualityMetric objects
metrics = [QualityMetric(**m) for m in data.get("metrics", [])]
report = monitor.analyze(metrics)
else:
# Demo data
demo_metrics = [
QualityMetric("M001", "Customer Complaint Rate", "Customer", "2026-03-01", 0.8, "per 1000", 1.0, 1.5, 0.5),
QualityMetric("M002", "Defect Rate PPM", "Quality", "2026-03-01", 125, "PPM", 100, 500, 0, trend_direction="down", sigma_level=4.2),
QualityMetric("M003", "On-Time Delivery", "Operations", "2026-03-01", 96.5, "%", 98, 0, 95, trend_direction="down"),
QualityMetric("M004", "CAPA Closure Rate", "Quality", "2026-03-01", 92.0, "%", 100, 0, 90, is_alert=True),
QualityMetric("M005", "Supplier Defect Rate", "Supplier", "2026-03-01", 450, "PPM", 200, 1000, 0, is_critical=True),
]
# Simulate time series
all_metrics = []
for i in range(30):
for dm in demo_metrics:
new_metric = QualityMetric(
metric_id=dm.metric_id,
metric_name=dm.metric_name,
category=dm.category,
date=f"2026-03-{i+1:02d}",
value=dm.value + (i * 0.1) if dm.metric_name == "Customer Complaint Rate" else dm.value,
unit=dm.unit,
target=dm.target,
upper_limit=dm.upper_limit,
lower_limit=dm.lower_limit
)
all_metrics.append(new_metric)
report = monitor.analyze(all_metrics)
if args.output == "json":
result = asdict(report)
print(json.dumps(result, indent=2))
else:
print(format_qms_report(report))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,557 @@
#!/usr/bin/env python3
"""
Regulatory Pathway Analyzer - Determines optimal regulatory pathway for medical devices.
Analyzes device characteristics and recommends the most efficient regulatory pathway
across multiple markets (FDA, EU MDR, UK UKCA, Health Canada, TGA, PMDA).
Supports:
- FDA: 510(k), De Novo, PMA, Breakthrough Device
- EU MDR: Class I, IIa, IIb, III, AIMDD
- UK: UKCA marking
- Health Canada: Class I-IV
- TGA: Class I, IIa, IIb, III
- Japan PMDA: Class I-IV
Usage:
python regulatory_pathway_analyzer.py --device-class II --predicate yes --market all
python regulatory_pathway_analyzer.py --interactive
python regulatory_pathway_analyzer.py --data device_profile.json --output json
"""
import argparse
import json
import sys
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Tuple
from enum import Enum
class RiskClass(Enum):
CLASS_I = "I"
CLASS_IIA = "IIa"
CLASS_IIB = "IIb"
CLASS_III = "III"
CLASS_IV = "IV"
class MarketRegion(Enum):
US_FDA = "US-FDA"
EU_MDR = "EU-MDR"
UK_UKCA = "UK-UKCA"
HEALTH_CANADA = "Health-Canada"
AUSTRALIA_TGA = "Australia-TGA"
JAPAN_PMDA = "Japan-PMDA"
@dataclass
class DeviceProfile:
"""Medical device profile for pathway analysis."""
device_name: str
intended_use: str
device_class: str # I, IIa, IIb, III
novel_technology: bool = False
predicate_available: bool = True
implantable: bool = False
life_sustaining: bool = False
software_component: bool = False
ai_ml_component: bool = False
sterile: bool = False
measuring_function: bool = False
target_markets: List[str] = field(default_factory=lambda: ["US-FDA", "EU-MDR"])
@dataclass
class PathwayOption:
"""A regulatory pathway option."""
pathway_name: str
market: str
estimated_timeline_months: Tuple[int, int]
estimated_cost_usd: Tuple[int, int]
key_requirements: List[str]
advantages: List[str]
risks: List[str]
recommendation_level: str # "Recommended", "Alternative", "Not Recommended"
@dataclass
class PathwayAnalysis:
"""Complete pathway analysis result."""
device: DeviceProfile
recommended_pathways: List[PathwayOption]
optimal_sequence: List[str] # Recommended submission order
total_timeline_months: Tuple[int, int]
total_estimated_cost: Tuple[int, int]
critical_success_factors: List[str]
warnings: List[str]
class RegulatoryPathwayAnalyzer:
"""Analyzes and recommends regulatory pathways for medical devices."""
# FDA pathway decision matrix
FDA_PATHWAYS = {
"I": {
"pathway": "510(k) Exempt / Registration & Listing",
"timeline": (1, 3),
"cost": (5000, 15000),
"requirements": ["Establishment registration", "Device listing", "GMP compliance (if non-exempt)"]
},
"II": {
"pathway": "510(k)",
"timeline": (6, 12),
"cost": (50000, 250000),
"requirements": ["Predicate device identification", "Substantial equivalence demonstration", "Performance testing", "Biocompatibility (if applicable)", "Software documentation (if applicable)"]
},
"II-novel": {
"pathway": "De Novo",
"timeline": (12, 18),
"cost": (150000, 400000),
"requirements": ["Risk-based classification request", "Special controls development", "Performance testing", "Clinical data (potentially)"]
},
"III": {
"pathway": "PMA",
"timeline": (18, 36),
"cost": (500000, 2000000),
"requirements": ["Clinical investigations", "Manufacturing information", "Performance testing", "Risk-benefit analysis", "Post-approval studies"]
},
"III-breakthrough": {
"pathway": "Breakthrough Device Program + PMA",
"timeline": (12, 24),
"cost": (500000, 2000000),
"requirements": ["Breakthrough designation request", "More flexible clinical evidence", "Iterative FDA engagement", "Post-market data collection"]
}
}
# EU MDR pathway decision matrix
EU_MDR_PATHWAYS = {
"I": {
"pathway": "Self-declaration (Class I)",
"timeline": (2, 4),
"cost": (10000, 30000),
"requirements": ["Technical documentation", "EU Declaration of Conformity", "UDI assignment", "EUDAMED registration", "Authorized Representative (if non-EU)"]
},
"IIa": {
"pathway": "Notified Body assessment (Class IIa)",
"timeline": (12, 18),
"cost": (80000, 200000),
"requirements": ["QMS certification (ISO 13485)", "Technical documentation", "Clinical evaluation", "Notified Body audit", "Post-market surveillance plan"]
},
"IIb": {
"pathway": "Notified Body assessment (Class IIb)",
"timeline": (15, 24),
"cost": (150000, 400000),
"requirements": ["Full QMS certification", "Comprehensive technical documentation", "Clinical evaluation (may need clinical investigation)", "Type examination or product verification", "Notified Body scrutiny"]
},
"III": {
"pathway": "Notified Body assessment (Class III)",
"timeline": (18, 30),
"cost": (300000, 800000),
"requirements": ["Full QMS certification", "Complete technical documentation", "Clinical investigation (typically required)", "Notified Body clinical evaluation review", "Scrutiny procedure (possible)", "PMCF plan"]
}
}
def __init__(self):
self.analysis_warnings = []
def analyze_fda_pathway(self, device: DeviceProfile) -> PathwayOption:
"""Determine optimal FDA pathway."""
device_class = device.device_class.upper().replace("IIA", "II").replace("IIB", "II")
if device_class == "I":
pathway_data = self.FDA_PATHWAYS["I"]
return PathwayOption(
pathway_name=pathway_data["pathway"],
market="US-FDA",
estimated_timeline_months=pathway_data["timeline"],
estimated_cost_usd=pathway_data["cost"],
key_requirements=pathway_data["requirements"],
advantages=["Fastest path to market", "Minimal regulatory burden", "No premarket submission required (if exempt)"],
risks=["Limited to exempt product codes", "Still requires GMP compliance"],
recommendation_level="Recommended"
)
elif device_class == "III" or device.implantable or device.life_sustaining:
if device.novel_technology:
pathway_data = self.FDA_PATHWAYS["III-breakthrough"]
rec_level = "Recommended" if device.novel_technology else "Alternative"
else:
pathway_data = self.FDA_PATHWAYS["III"]
rec_level = "Recommended"
else: # Class II
if device.predicate_available and not device.novel_technology:
pathway_data = self.FDA_PATHWAYS["II"]
rec_level = "Recommended"
else:
pathway_data = self.FDA_PATHWAYS["II-novel"]
rec_level = "Recommended"
return PathwayOption(
pathway_name=pathway_data["pathway"],
market="US-FDA",
estimated_timeline_months=pathway_data["timeline"],
estimated_cost_usd=pathway_data["cost"],
key_requirements=pathway_data["requirements"],
advantages=self._get_fda_advantages(pathway_data["pathway"], device),
risks=self._get_fda_risks(pathway_data["pathway"], device),
recommendation_level=rec_level
)
def analyze_eu_mdr_pathway(self, device: DeviceProfile) -> PathwayOption:
"""Determine optimal EU MDR pathway."""
device_class = device.device_class.lower().replace("iia", "IIa").replace("iib", "IIb")
if device_class in ["i", "1"]:
pathway_data = self.EU_MDR_PATHWAYS["I"]
class_key = "I"
elif device_class in ["iia", "2a"]:
pathway_data = self.EU_MDR_PATHWAYS["IIa"]
class_key = "IIa"
elif device_class in ["iib", "2b"]:
pathway_data = self.EU_MDR_PATHWAYS["IIb"]
class_key = "IIb"
else:
pathway_data = self.EU_MDR_PATHWAYS["III"]
class_key = "III"
# Adjust for implantables
if device.implantable and class_key in ["IIa", "IIb"]:
pathway_data = self.EU_MDR_PATHWAYS["III"]
self.analysis_warnings.append(
f"Implantable devices are typically upclassified to Class III under EU MDR"
)
return PathwayOption(
pathway_name=pathway_data["pathway"],
market="EU-MDR",
estimated_timeline_months=pathway_data["timeline"],
estimated_cost_usd=pathway_data["cost"],
key_requirements=pathway_data["requirements"],
advantages=self._get_eu_advantages(pathway_data["pathway"], device),
risks=self._get_eu_risks(pathway_data["pathway"], device),
recommendation_level="Recommended"
)
def _get_fda_advantages(self, pathway: str, device: DeviceProfile) -> List[str]:
advantages = []
if "510(k)" in pathway:
advantages.extend([
"Well-established pathway with clear guidance",
"Predictable review timeline",
"Lower clinical evidence requirements vs PMA"
])
if device.predicate_available:
advantages.append("Predicate device identified - streamlined review")
elif "De Novo" in pathway:
advantages.extend([
"Creates new predicate for future 510(k) submissions",
"Appropriate for novel low-moderate risk devices",
"Can result in Class I or II classification"
])
elif "PMA" in pathway:
advantages.extend([
"Strongest FDA approval - highest market credibility",
"Difficult for competitors to challenge",
"May qualify for breakthrough device benefits"
])
elif "Breakthrough" in pathway:
advantages.extend([
"Priority review and interactive FDA engagement",
"Flexible clinical evidence requirements",
"Faster iterative development with FDA feedback"
])
return advantages
def _get_fda_risks(self, pathway: str, device: DeviceProfile) -> List[str]:
risks = []
if "510(k)" in pathway:
risks.extend([
"Predicate device may be challenged",
"SE determination can be subjective"
])
if device.software_component:
risks.append("Software documentation requirements increasing (Cybersecurity, AI/ML)")
elif "De Novo" in pathway:
risks.extend([
"Less predictable than 510(k)",
"May require more clinical data than expected",
"New special controls may be imposed"
])
elif "PMA" in pathway:
risks.extend([
"Very expensive and time-consuming",
"Clinical trial risks and delays",
"Post-approval study requirements"
])
if device.ai_ml_component:
risks.append("AI/ML components face evolving regulatory requirements")
return risks
def _get_eu_advantages(self, pathway: str, device: DeviceProfile) -> List[str]:
advantages = ["Access to entire EU/EEA market (27+ countries)"]
if "Self-declaration" in pathway:
advantages.extend([
"No Notified Body involvement required",
"Fastest path to EU market",
"Lowest cost option"
])
elif "IIa" in pathway:
advantages.append("Moderate regulatory burden with broad market access")
elif "IIb" in pathway or "III" in pathway:
advantages.extend([
"Strong market credibility with NB certification",
"Recognized globally for regulatory quality"
])
return advantages
def _get_eu_risks(self, pathway: str, device: DeviceProfile) -> List[str]:
risks = []
if "Self-declaration" not in pathway:
risks.extend([
"Limited Notified Body capacity - long wait times",
"Notified Body costs increasing under MDR"
])
risks.append("MDR transition still creating uncertainty")
if device.software_component:
risks.append("EU AI Act may apply to AI/ML medical devices")
return risks
def determine_optimal_sequence(self, pathways: List[PathwayOption], device: DeviceProfile) -> List[str]:
"""Determine optimal submission sequence across markets."""
# General principle: Start with fastest/cheapest, use data for subsequent submissions
sequence = []
# Sort by timeline (fastest first)
sorted_pathways = sorted(pathways, key=lambda p: p.estimated_timeline_months[0])
# FDA first if 510(k) - well recognized globally
fda_pathway = next((p for p in pathways if p.market == "US-FDA"), None)
eu_pathway = next((p for p in pathways if p.market == "EU-MDR"), None)
if fda_pathway and "510(k)" in fda_pathway.pathway_name:
sequence.append("1. US-FDA 510(k) first - clearance recognized globally, data reusable")
if eu_pathway:
sequence.append("2. EU-MDR - use FDA data in clinical evaluation")
elif eu_pathway and "Self-declaration" in eu_pathway.pathway_name:
sequence.append("1. EU-MDR (Class I self-declaration) - fastest market entry")
if fda_pathway:
sequence.append("2. US-FDA - use EU experience and data")
else:
for i, p in enumerate(sorted_pathways, 1):
sequence.append(f"{i}. {p.market} ({p.pathway_name})")
return sequence
def analyze(self, device: DeviceProfile) -> PathwayAnalysis:
"""Perform complete pathway analysis."""
self.analysis_warnings = []
pathways = []
for market in device.target_markets:
if "FDA" in market or "US" in market:
pathways.append(self.analyze_fda_pathway(device))
elif "MDR" in market or "EU" in market:
pathways.append(self.analyze_eu_mdr_pathway(device))
# Additional markets can be added here
sequence = self.determine_optimal_sequence(pathways, device)
total_timeline_min = sum(p.estimated_timeline_months[0] for p in pathways)
total_timeline_max = sum(p.estimated_timeline_months[1] for p in pathways)
total_cost_min = sum(p.estimated_cost_usd[0] for p in pathways)
total_cost_max = sum(p.estimated_cost_usd[1] for p in pathways)
csf = [
"Early engagement with regulators (Pre-Sub/Scientific Advice)",
"Robust QMS (ISO 13485) in place before submissions",
"Clinical evidence strategy aligned with target markets",
"Cybersecurity and software documentation (if applicable)"
]
if device.ai_ml_component:
csf.append("AI/ML transparency and bias documentation")
return PathwayAnalysis(
device=device,
recommended_pathways=pathways,
optimal_sequence=sequence,
total_timeline_months=(total_timeline_min, total_timeline_max),
total_estimated_cost=(total_cost_min, total_cost_max),
critical_success_factors=csf,
warnings=self.analysis_warnings
)
def format_analysis_text(analysis: PathwayAnalysis) -> str:
"""Format analysis as readable text report."""
lines = [
"=" * 70,
"REGULATORY PATHWAY ANALYSIS REPORT",
"=" * 70,
f"Device: {analysis.device.device_name}",
f"Intended Use: {analysis.device.intended_use}",
f"Device Class: {analysis.device.device_class}",
f"Target Markets: {', '.join(analysis.device.target_markets)}",
"",
"DEVICE CHARACTERISTICS",
"-" * 40,
f" Novel Technology: {'Yes' if analysis.device.novel_technology else 'No'}",
f" Predicate Available: {'Yes' if analysis.device.predicate_available else 'No'}",
f" Implantable: {'Yes' if analysis.device.implantable else 'No'}",
f" Life-Sustaining: {'Yes' if analysis.device.life_sustaining else 'No'}",
f" Software/AI Component: {'Yes' if analysis.device.software_component or analysis.device.ai_ml_component else 'No'}",
f" Sterile: {'Yes' if analysis.device.sterile else 'No'}",
"",
"RECOMMENDED PATHWAYS",
"-" * 40,
]
for pathway in analysis.recommended_pathways:
lines.extend([
"",
f" [{pathway.market}] {pathway.pathway_name}",
f" Recommendation: {pathway.recommendation_level}",
f" Timeline: {pathway.estimated_timeline_months[0]}-{pathway.estimated_timeline_months[1]} months",
f" Estimated Cost: ${pathway.estimated_cost_usd[0]:,} - ${pathway.estimated_cost_usd[1]:,}",
f" Key Requirements:",
])
for req in pathway.key_requirements:
lines.append(f"{req}")
lines.append(f" Advantages:")
for adv in pathway.advantages:
lines.append(f" + {adv}")
lines.append(f" Risks:")
for risk in pathway.risks:
lines.append(f" ! {risk}")
lines.extend([
"",
"OPTIMAL SUBMISSION SEQUENCE",
"-" * 40,
])
for step in analysis.optimal_sequence:
lines.append(f" {step}")
lines.extend([
"",
"TOTAL ESTIMATES",
"-" * 40,
f" Combined Timeline: {analysis.total_timeline_months[0]}-{analysis.total_timeline_months[1]} months",
f" Combined Cost: ${analysis.total_estimated_cost[0]:,} - ${analysis.total_estimated_cost[1]:,}",
"",
"CRITICAL SUCCESS FACTORS",
"-" * 40,
])
for i, factor in enumerate(analysis.critical_success_factors, 1):
lines.append(f" {i}. {factor}")
if analysis.warnings:
lines.extend([
"",
"WARNINGS",
"-" * 40,
])
for warning in analysis.warnings:
lines.append(f"{warning}")
lines.append("=" * 70)
return "\n".join(lines)
def interactive_mode():
"""Interactive device profiling."""
print("=" * 60)
print("Regulatory Pathway Analyzer - Interactive Mode")
print("=" * 60)
device = DeviceProfile(
device_name=input("\nDevice Name: ").strip(),
intended_use=input("Intended Use: ").strip(),
device_class=input("Device Class (I/IIa/IIb/III): ").strip(),
novel_technology=input("Novel technology? (y/n): ").strip().lower() == 'y',
predicate_available=input("Predicate device available? (y/n): ").strip().lower() == 'y',
implantable=input("Implantable? (y/n): ").strip().lower() == 'y',
life_sustaining=input("Life-sustaining? (y/n): ").strip().lower() == 'y',
software_component=input("Software component? (y/n): ").strip().lower() == 'y',
ai_ml_component=input("AI/ML component? (y/n): ").strip().lower() == 'y',
)
markets = input("Target markets (comma-separated, e.g., US-FDA,EU-MDR): ").strip()
if markets:
device.target_markets = [m.strip() for m in markets.split(",")]
analyzer = RegulatoryPathwayAnalyzer()
analysis = analyzer.analyze(device)
print("\n" + format_analysis_text(analysis))
def main():
parser = argparse.ArgumentParser(description="Regulatory Pathway Analyzer for Medical Devices")
parser.add_argument("--device-name", type=str, help="Device name")
parser.add_argument("--device-class", type=str, choices=["I", "IIa", "IIb", "III"], help="Device classification")
parser.add_argument("--predicate", type=str, choices=["yes", "no"], help="Predicate device available")
parser.add_argument("--novel", action="store_true", help="Novel technology")
parser.add_argument("--implantable", action="store_true", help="Implantable device")
parser.add_argument("--software", action="store_true", help="Software component")
parser.add_argument("--ai-ml", action="store_true", help="AI/ML component")
parser.add_argument("--market", type=str, default="all", help="Target market(s)")
parser.add_argument("--data", type=str, help="JSON file with device profile")
parser.add_argument("--output", choices=["text", "json"], default="text", help="Output format")
parser.add_argument("--interactive", action="store_true", help="Interactive mode")
args = parser.parse_args()
if args.interactive:
interactive_mode()
return
if args.data:
with open(args.data) as f:
data = json.load(f)
device = DeviceProfile(**data)
elif args.device_class:
device = DeviceProfile(
device_name=args.device_name or "Unnamed Device",
intended_use="Medical device",
device_class=args.device_class,
novel_technology=args.novel,
predicate_available=args.predicate == "yes" if args.predicate else True,
implantable=args.implantable,
software_component=args.software,
ai_ml_component=args.ai_ml,
)
if args.market != "all":
device.target_markets = [m.strip() for m in args.market.split(",")]
else:
# Demo mode
device = DeviceProfile(
device_name="SmartGlucose Monitor Pro",
intended_use="Continuous glucose monitoring for diabetes management",
device_class="II",
novel_technology=False,
predicate_available=True,
software_component=True,
ai_ml_component=True,
target_markets=["US-FDA", "EU-MDR"]
)
analyzer = RegulatoryPathwayAnalyzer()
analysis = analyzer.analyze(device)
if args.output == "json":
result = {
"device": asdict(analysis.device),
"pathways": [asdict(p) for p in analysis.recommended_pathways],
"optimal_sequence": analysis.optimal_sequence,
"total_timeline_months": list(analysis.total_timeline_months),
"total_estimated_cost": list(analysis.total_estimated_cost),
"critical_success_factors": analysis.critical_success_factors,
"warnings": analysis.warnings
}
print(json.dumps(result, indent=2))
else:
print(format_analysis_text(analysis))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,442 @@
#!/usr/bin/env python3
"""
FMEA Analyzer - Failure Mode and Effects Analysis for medical device risk management.
Supports Design FMEA (dFMEA) and Process FMEA (pFMEA) per ISO 14971 and IEC 60812.
Calculates Risk Priority Numbers (RPN), identifies critical items, and generates
risk reduction recommendations.
Usage:
python fmea_analyzer.py --data fmea_input.json
python fmea_analyzer.py --interactive
python fmea_analyzer.py --data fmea_input.json --output json
"""
import argparse
import json
import sys
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Tuple
from enum import Enum
from datetime import datetime
class FMEAType(Enum):
DESIGN = "Design FMEA"
PROCESS = "Process FMEA"
class Severity(Enum):
INCONSEQUENTIAL = 1
MINOR = 2
MODERATE = 3
SIGNIFICANT = 4
SERIOUS = 5
CRITICAL = 6
SERIOUS_HAZARD = 7
HAZARDOUS = 8
HAZARDOUS_NO_WARNING = 9
CATASTROPHIC = 10
class Occurrence(Enum):
REMOTE = 1
LOW = 2
LOW_MODERATE = 3
MODERATE = 4
MODERATE_HIGH = 5
HIGH = 6
VERY_HIGH = 7
EXTREMELY_HIGH = 8
ALMOST_CERTAIN = 9
INEVITABLE = 10
class Detection(Enum):
ALMOST_CERTAIN = 1
VERY_HIGH = 2
HIGH = 3
MODERATE_HIGH = 4
MODERATE = 5
LOW_MODERATE = 6
LOW = 7
VERY_LOW = 8
REMOTE = 9
ABSOLUTELY_UNCERTAIN = 10
@dataclass
class FMEAEntry:
"""Single FMEA line item."""
item_process: str
function: str
failure_mode: str
effect: str
severity: int
cause: str
occurrence: int
current_controls: str
detection: int
rpn: int = 0
criticality: str = ""
recommended_actions: List[str] = field(default_factory=list)
responsibility: str = ""
target_date: str = ""
actions_taken: str = ""
revised_severity: int = 0
revised_occurrence: int = 0
revised_detection: int = 0
revised_rpn: int = 0
def calculate_rpn(self):
self.rpn = self.severity * self.occurrence * self.detection
if self.severity >= 8:
self.criticality = "CRITICAL"
elif self.rpn >= 200:
self.criticality = "HIGH"
elif self.rpn >= 100:
self.criticality = "MEDIUM"
else:
self.criticality = "LOW"
def calculate_revised_rpn(self):
if self.revised_severity and self.revised_occurrence and self.revised_detection:
self.revised_rpn = self.revised_severity * self.revised_occurrence * self.revised_detection
@dataclass
class FMEAReport:
"""Complete FMEA analysis report."""
fmea_type: str
product_process: str
team: List[str]
date: str
entries: List[FMEAEntry]
summary: Dict
risk_reduction_actions: List[Dict]
class FMEAAnalyzer:
"""Analyzes FMEA data and generates risk assessments."""
# RPN thresholds
RPN_CRITICAL = 200
RPN_HIGH = 100
RPN_MEDIUM = 50
def __init__(self, fmea_type: FMEAType = FMEAType.DESIGN):
self.fmea_type = fmea_type
def analyze_entries(self, entries: List[FMEAEntry]) -> Dict:
"""Analyze all FMEA entries and generate summary."""
for entry in entries:
entry.calculate_rpn()
entry.calculate_revised_rpn()
rpns = [e.rpn for e in entries if e.rpn > 0]
revised_rpns = [e.revised_rpn for e in entries if e.revised_rpn > 0]
critical = [e for e in entries if e.criticality == "CRITICAL"]
high = [e for e in entries if e.criticality == "HIGH"]
medium = [e for e in entries if e.criticality == "MEDIUM"]
# Severity distribution
sev_dist = {}
for e in entries:
sev_range = "1-3 (Low)" if e.severity <= 3 else "4-6 (Medium)" if e.severity <= 6 else "7-10 (High)"
sev_dist[sev_range] = sev_dist.get(sev_range, 0) + 1
summary = {
"total_entries": len(entries),
"rpn_statistics": {
"min": min(rpns) if rpns else 0,
"max": max(rpns) if rpns else 0,
"average": round(sum(rpns) / len(rpns), 1) if rpns else 0,
"median": sorted(rpns)[len(rpns) // 2] if rpns else 0
},
"risk_distribution": {
"critical_severity": len(critical),
"high_rpn": len(high),
"medium_rpn": len(medium),
"low_rpn": len(entries) - len(critical) - len(high) - len(medium)
},
"severity_distribution": sev_dist,
"top_risks": [
{
"item": e.item_process,
"failure_mode": e.failure_mode,
"rpn": e.rpn,
"severity": e.severity
}
for e in sorted(entries, key=lambda x: x.rpn, reverse=True)[:5]
]
}
if revised_rpns:
summary["revised_rpn_statistics"] = {
"min": min(revised_rpns),
"max": max(revised_rpns),
"average": round(sum(revised_rpns) / len(revised_rpns), 1),
"improvement": round((sum(rpns) - sum(revised_rpns)) / sum(rpns) * 100, 1) if rpns else 0
}
return summary
def generate_risk_reduction_actions(self, entries: List[FMEAEntry]) -> List[Dict]:
"""Generate recommended risk reduction actions."""
actions = []
# Sort by RPN descending
sorted_entries = sorted(entries, key=lambda e: e.rpn, reverse=True)
for entry in sorted_entries[:10]: # Top 10 risks
if entry.rpn >= self.RPN_HIGH or entry.severity >= 8:
strategies = []
# Severity reduction strategies (highest priority for high severity)
if entry.severity >= 7:
strategies.append({
"type": "Severity Reduction",
"action": f"Redesign {entry.item_process} to eliminate failure mode: {entry.failure_mode}",
"priority": "Highest",
"expected_impact": "May reduce severity by 2-4 points"
})
# Occurrence reduction strategies
if entry.occurrence >= 5:
strategies.append({
"type": "Occurrence Reduction",
"action": f"Implement preventive controls for cause: {entry.cause}",
"priority": "High",
"expected_impact": f"Target occurrence reduction from {entry.occurrence} to {max(1, entry.occurrence - 3)}"
})
# Detection improvement strategies
if entry.detection >= 5:
strategies.append({
"type": "Detection Improvement",
"action": f"Enhance detection methods: {entry.current_controls}",
"priority": "Medium",
"expected_impact": f"Target detection improvement from {entry.detection} to {max(1, entry.detection - 3)}"
})
actions.append({
"item": entry.item_process,
"failure_mode": entry.failure_mode,
"current_rpn": entry.rpn,
"current_severity": entry.severity,
"strategies": strategies
})
return actions
def create_entry_from_dict(self, data: Dict) -> FMEAEntry:
"""Create FMEA entry from dictionary."""
entry = FMEAEntry(
item_process=data.get("item_process", ""),
function=data.get("function", ""),
failure_mode=data.get("failure_mode", ""),
effect=data.get("effect", ""),
severity=data.get("severity", 1),
cause=data.get("cause", ""),
occurrence=data.get("occurrence", 1),
current_controls=data.get("current_controls", ""),
detection=data.get("detection", 1),
recommended_actions=data.get("recommended_actions", []),
responsibility=data.get("responsibility", ""),
target_date=data.get("target_date", ""),
actions_taken=data.get("actions_taken", ""),
revised_severity=data.get("revised_severity", 0),
revised_occurrence=data.get("revised_occurrence", 0),
revised_detection=data.get("revised_detection", 0)
)
entry.calculate_rpn()
entry.calculate_revised_rpn()
return entry
def generate_report(self, product_process: str, team: List[str], entries_data: List[Dict]) -> FMEAReport:
"""Generate complete FMEA report."""
entries = [self.create_entry_from_dict(e) for e in entries_data]
summary = self.analyze_entries(entries)
actions = self.generate_risk_reduction_actions(entries)
return FMEAReport(
fmea_type=self.fmea_type.value,
product_process=product_process,
team=team,
date=datetime.now().strftime("%Y-%m-%d"),
entries=entries,
summary=summary,
risk_reduction_actions=actions
)
def format_fmea_text(report: FMEAReport) -> str:
"""Format FMEA report as text."""
lines = [
"=" * 80,
f"{report.fmea_type.upper()} REPORT",
"=" * 80,
f"Product/Process: {report.product_process}",
f"Date: {report.date}",
f"Team: {', '.join(report.team)}",
"",
"SUMMARY",
"-" * 60,
f"Total Failure Modes Analyzed: {report.summary['total_entries']}",
f"Critical Severity (≥8): {report.summary['risk_distribution']['critical_severity']}",
f"High RPN (≥100): {report.summary['risk_distribution']['high_rpn']}",
f"Medium RPN (50-99): {report.summary['risk_distribution']['medium_rpn']}",
"",
"RPN Statistics:",
f" Min: {report.summary['rpn_statistics']['min']}",
f" Max: {report.summary['rpn_statistics']['max']}",
f" Average: {report.summary['rpn_statistics']['average']}",
f" Median: {report.summary['rpn_statistics']['median']}",
]
if "revised_rpn_statistics" in report.summary:
lines.extend([
"",
"Revised RPN Statistics:",
f" Average: {report.summary['revised_rpn_statistics']['average']}",
f" Improvement: {report.summary['revised_rpn_statistics']['improvement']}%",
])
lines.extend([
"",
"TOP RISKS",
"-" * 60,
f"{'Item':<25} {'Failure Mode':<30} {'RPN':>5} {'Sev':>4}",
"-" * 66,
])
for risk in report.summary.get("top_risks", []):
lines.append(f"{risk['item'][:24]:<25} {risk['failure_mode'][:29]:<30} {risk['rpn']:>5} {risk['severity']:>4}")
lines.extend([
"",
"FMEA ENTRIES",
"-" * 60,
])
for i, entry in enumerate(report.entries, 1):
marker = "" if entry.criticality in ["CRITICAL", "HIGH"] else ""
lines.extend([
f"",
f"{marker} Entry {i}: {entry.item_process} - {entry.function}",
f" Failure Mode: {entry.failure_mode}",
f" Effect: {entry.effect}",
f" Cause: {entry.cause}",
f" S={entry.severity} × O={entry.occurrence} × D={entry.detection} = RPN {entry.rpn} [{entry.criticality}]",
f" Current Controls: {entry.current_controls}",
])
if entry.recommended_actions:
lines.append(f" Recommended Actions:")
for action in entry.recommended_actions:
lines.append(f"{action}")
if entry.revised_rpn > 0:
lines.append(f" Revised: S={entry.revised_severity} × O={entry.revised_occurrence} × D={entry.revised_detection} = RPN {entry.revised_rpn}")
if report.risk_reduction_actions:
lines.extend([
"",
"RISK REDUCTION RECOMMENDATIONS",
"-" * 60,
])
for action in report.risk_reduction_actions:
lines.extend([
f"",
f" {action['item']} - {action['failure_mode']}",
f" Current RPN: {action['current_rpn']} (Severity: {action['current_severity']})",
])
for strategy in action["strategies"]:
lines.append(f" [{strategy['priority']}] {strategy['type']}: {strategy['action']}")
lines.append(f" Expected: {strategy['expected_impact']}")
lines.append("=" * 80)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="FMEA Analyzer for Medical Device Risk Management")
parser.add_argument("--type", choices=["design", "process"], default="design", help="FMEA type")
parser.add_argument("--data", type=str, help="JSON file with FMEA data")
parser.add_argument("--output", choices=["text", "json"], default="text", help="Output format")
parser.add_argument("--interactive", action="store_true", help="Interactive mode")
args = parser.parse_args()
fmea_type = FMEAType.DESIGN if args.type == "design" else FMEAType.PROCESS
analyzer = FMEAAnalyzer(fmea_type)
if args.data:
with open(args.data) as f:
data = json.load(f)
report = analyzer.generate_report(
product_process=data.get("product_process", ""),
team=data.get("team", []),
entries_data=data.get("entries", [])
)
else:
# Demo data
demo_entries = [
{
"item_process": "Battery Module",
"function": "Provide power for 8 hours",
"failure_mode": "Premature battery drain",
"effect": "Device shuts down during procedure",
"severity": 8,
"cause": "Cell degradation due to temperature cycling",
"occurrence": 4,
"current_controls": "Incoming battery testing, temperature spec in IFU",
"detection": 5,
"recommended_actions": ["Add battery health monitoring algorithm", "Implement low-battery warning at 20%"]
},
{
"item_process": "Software Controller",
"function": "Control device operation",
"failure_mode": "Firmware crash",
"effect": "Device becomes unresponsive",
"severity": 7,
"cause": "Memory leak in logging module",
"occurrence": 3,
"current_controls": "Code review, unit testing, integration testing",
"detection": 4,
"recommended_actions": ["Add watchdog timer", "Implement memory usage monitoring"]
},
{
"item_process": "Sterile Packaging",
"function": "Maintain sterility until use",
"failure_mode": "Seal breach",
"effect": "Device contamination",
"severity": 9,
"cause": "Sealing jaw temperature variation",
"occurrence": 2,
"current_controls": "Seal integrity testing (dye penetration), SPC on sealing process",
"detection": 3,
"recommended_actions": ["Add real-time seal temperature monitoring", "Implement 100% seal integrity testing"]
}
]
report = analyzer.generate_report(
product_process="Insulin Pump Model X200",
team=["Quality Engineer", "R&D Lead", "Manufacturing Engineer", "Risk Manager"],
entries_data=demo_entries
)
if args.output == "json":
result = {
"fmea_type": report.fmea_type,
"product_process": report.product_process,
"date": report.date,
"team": report.team,
"entries": [asdict(e) for e in report.entries],
"summary": report.summary,
"risk_reduction_actions": report.risk_reduction_actions
}
print(json.dumps(result, indent=2))
else:
print(format_fmea_text(report))
if __name__ == "__main__":
main()