fix(repo): Harden catalog sync and release integrity

Tighten the repo-state automation so canonical bot commits remain
predictable while leaving main clean after each sync.

Make the public catalog UI more honest by hiding dev-only sync,
turning stars into explicit browser-local saves, aligning risk types,
and removing hardcoded catalog counts.

Add shared public asset URL helpers, risk suggestion plumbing,
safer unpack/sync guards, and CI coverage gates so release and
maintainer workflows catch drift earlier.
This commit is contained in:
sickn33
2026-03-29 09:22:09 +02:00
parent 141fd58568
commit 08a31cacf5
46 changed files with 1903 additions and 523 deletions

View File

@@ -12,6 +12,7 @@ from datetime import datetime, timezone
from pathlib import Path
from _project_paths import find_repo_root
from risk_classifier import suggest_risk
from validate_skills import configure_utf8_output, has_when_to_use_section, parse_frontmatter
@@ -34,6 +35,7 @@ SECURITY_DISCLAIMER_PATTERN = re.compile(r"AUTHORIZED USE ONLY", re.IGNORECASE)
VALID_RISK_LEVELS = {"none", "safe", "critical", "offensive", "unknown"}
DEFAULT_MARKDOWN_TOP_FINDINGS = 15
DEFAULT_MARKDOWN_TOP_SKILLS = 20
DEFAULT_MARKDOWN_TOP_RISK_SUGGESTIONS = 20
@dataclass(frozen=True)
@@ -48,8 +50,6 @@ class Finding:
"code": self.code,
"message": self.message,
}
def has_examples(content: str) -> bool:
return bool(FENCED_CODE_BLOCK_PATTERN.search(content)) or any(
pattern.search(content) for pattern in EXAMPLES_HEADING_PATTERNS
@@ -110,6 +110,7 @@ def build_skill_report(skill_root: Path, skills_dir: Path) -> dict[str, object]:
risk = metadata.get("risk")
source = metadata.get("source")
date_added = metadata.get("date_added")
risk_suggestion = suggest_risk(content, metadata)
if name != skill_root.name:
findings.append(
@@ -162,6 +163,17 @@ def build_skill_report(skill_root: Path, skills_dir: Path) -> dict[str, object]:
)
)
if risk_suggestion.risk not in ("unknown", "none"):
risk_needs_review = risk is None or risk == "unknown" or risk != risk_suggestion.risk
if risk_needs_review:
findings.append(
Finding(
"info" if risk in (None, "unknown") else "warning",
"risk_suggestion",
f"Suggested risk is {risk_suggestion.risk} based on: {', '.join(risk_suggestion.reasons[:3])}.",
)
)
if source is None:
findings.append(Finding("warning", "missing_source", "Missing source attribution."))
@@ -211,10 +223,25 @@ def build_skill_report(skill_root: Path, skills_dir: Path) -> dict[str, object]:
)
)
return finalize_skill_report(rel_dir, rel_file, findings)
return finalize_skill_report(
rel_dir,
rel_file,
findings,
risk=risk,
suggested_risk=risk_suggestion.risk,
suggested_risk_reasons=list(risk_suggestion.reasons),
)
def finalize_skill_report(skill_id: str, rel_file: str, findings: list[Finding]) -> dict[str, object]:
def finalize_skill_report(
skill_id: str,
rel_file: str,
findings: list[Finding],
*,
risk: str | None = None,
suggested_risk: str = "unknown",
suggested_risk_reasons: list[str] | None = None,
) -> dict[str, object]:
severity_counts = Counter(finding.severity for finding in findings)
if severity_counts["error"] > 0:
status = "error"
@@ -230,6 +257,9 @@ def finalize_skill_report(skill_id: str, rel_file: str, findings: list[Finding])
"error_count": severity_counts["error"],
"warning_count": severity_counts["warning"],
"info_count": severity_counts["info"],
"risk": risk,
"suggested_risk": suggested_risk,
"suggested_risk_reasons": suggested_risk_reasons or [],
"findings": [finding.to_dict() for finding in findings],
}
@@ -250,19 +280,30 @@ def audit_skills(skills_dir: str | Path) -> dict[str, object]:
code_counts = Counter()
severity_counts = Counter()
risk_suggestion_counts = Counter()
for report in reports:
for finding in report["findings"]:
code_counts[finding["code"]] += 1
severity_counts[finding["severity"]] += 1
if report["suggested_risk"] not in (None, "unknown", "none"):
risk_suggestion_counts[report["suggested_risk"]] += 1
summary = {
"skills_scanned": len(reports),
"skills_ok": sum(report["status"] == "ok" for report in reports),
"skills_with_errors": sum(report["status"] == "error" for report in reports),
"skills_with_warnings_only": sum(report["status"] == "warning" for report in reports),
"skills_with_suggested_risk": sum(
report["suggested_risk"] not in ("unknown", "none")
for report in reports
),
"errors": severity_counts["error"],
"warnings": severity_counts["warning"],
"infos": severity_counts["info"],
"risk_suggestions": [
{"risk": risk, "count": count}
for risk, count in risk_suggestion_counts.most_common()
],
"top_finding_codes": [
{"code": code, "count": count}
for code, count in code_counts.most_common()
@@ -284,6 +325,15 @@ def write_markdown_report(report: dict[str, object], destination: str | Path) ->
top_skills = [
skill for skill in skills if skill["status"] != "ok"
][:DEFAULT_MARKDOWN_TOP_SKILLS]
risk_suggestions = [
skill
for skill in skills
if skill.get("suggested_risk") not in (None, "unknown", "none")
and (
skill.get("risk") in (None, "unknown")
or skill.get("risk") != skill.get("suggested_risk")
)
][:DEFAULT_MARKDOWN_TOP_RISK_SUGGESTIONS]
lines = [
"# Skills Audit Report",
@@ -296,15 +346,28 @@ def write_markdown_report(report: dict[str, object], destination: str | Path) ->
f"- Skills ready: **{summary['skills_ok']}**",
f"- Skills with errors: **{summary['skills_with_errors']}**",
f"- Skills with warnings only: **{summary['skills_with_warnings_only']}**",
f"- Skills with suggested risk: **{summary['skills_with_suggested_risk']}**",
f"- Total errors: **{summary['errors']}**",
f"- Total warnings: **{summary['warnings']}**",
"",
"## Top Finding Codes",
"",
"| Code | Count |",
"| --- | ---: |",
f"- Total info findings: **{summary['infos']}**",
]
if summary.get("risk_suggestions"):
summary_text = ", ".join(
f"{item['risk']}: {item['count']}" for item in summary["risk_suggestions"]
)
lines.append(f"- Suggested risks: **{summary_text}**")
lines.extend(
[
"",
"## Top Finding Codes",
"",
"| Code | Count |",
"| --- | ---: |",
]
)
if top_findings:
lines.extend(f"| `{item['code']}` | {item['count']} |" for item in top_findings)
else:
@@ -328,6 +391,24 @@ def write_markdown_report(report: dict[str, object], destination: str | Path) ->
else:
lines.append("| _none_ | ok | 0 | 0 |")
lines.extend(
[
"",
"## Risk Suggestions",
"",
"| Skill | Current | Suggested | Why |",
"| --- | --- | --- | --- |",
]
)
if risk_suggestions:
lines.extend(
f"| `{skill['id']}` | {skill.get('risk') or 'unknown'} | {skill.get('suggested_risk') or 'unknown'} | {', '.join(skill.get('suggested_risk_reasons', [])[:3]) or '_n/a_'} |"
for skill in risk_suggestions
)
else:
lines.append("| _none_ | _none_ | _none_ | _n/a_ |")
Path(destination).write_text("\n".join(lines) + "\n", encoding="utf-8")
@@ -338,8 +419,16 @@ def print_summary(report: dict[str, object]) -> None:
print(f" Ready: {summary['skills_ok']}")
print(f" Warning only: {summary['skills_with_warnings_only']}")
print(f" With errors: {summary['skills_with_errors']}")
print(f" With suggested risk: {summary['skills_with_suggested_risk']}")
print(f" Total warnings: {summary['warnings']}")
print(f" Total errors: {summary['errors']}")
print(f" Total info findings: {summary['infos']}")
if summary.get("risk_suggestions"):
risk_summary = ", ".join(
f"{item['risk']}: {item['count']}"
for item in summary["risk_suggestions"]
)
print(f" Suggested risks: {risk_summary}")
top_findings = summary["top_finding_codes"][:10]
if top_findings:

View File

@@ -815,7 +815,7 @@ def parse_frontmatter(content):
Parses YAML frontmatter, sanitizing unquoted values containing @.
Handles single values and comma-separated lists by quoting the entire line.
"""
fm_match = re.search(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
fm_match = re.search(r'^---\s*\n(.*?)\n?---(?:\s*\n|$)', content, re.DOTALL)
if not fm_match:
return {}

View File

@@ -15,6 +15,7 @@ from datetime import datetime
from pathlib import Path
import yaml
from _project_paths import find_repo_root
from risk_classifier import suggest_risk
def get_project_root():
"""Get the project root directory."""
@@ -32,9 +33,10 @@ def parse_frontmatter(content):
except yaml.YAMLError:
return None
def generate_skills_report(output_file=None, sort_by='date'):
def generate_skills_report(output_file=None, sort_by='date', project_root=None):
"""Generate a report of all skills with their metadata."""
skills_dir = os.path.join(get_project_root(), 'skills')
root = str(project_root or get_project_root())
skills_dir = os.path.join(root, 'skills')
skills_data = []
for root, dirs, files in os.walk(skills_dir):
@@ -52,14 +54,22 @@ def generate_skills_report(output_file=None, sort_by='date'):
metadata = parse_frontmatter(content)
if metadata is None:
continue
suggested_risk = suggest_risk(content, metadata)
date_added = metadata.get('date_added', None)
if date_added is not None:
date_added = date_added.isoformat() if hasattr(date_added, 'isoformat') else str(date_added)
skill_info = {
'id': metadata.get('id', skill_name),
'name': metadata.get('name', skill_name),
'description': metadata.get('description', ''),
'date_added': metadata.get('date_added', None),
'date_added': date_added,
'source': metadata.get('source', 'unknown'),
'risk': metadata.get('risk', 'unknown'),
'suggested_risk': suggested_risk.risk,
'suggested_risk_reasons': list(suggested_risk.reasons),
'category': metadata.get('category', metadata.get('id', '').split('-')[0] if '-' in metadata.get('id', '') else 'other'),
}
@@ -80,6 +90,11 @@ def generate_skills_report(output_file=None, sort_by='date'):
'total_skills': len(skills_data),
'skills_with_dates': sum(1 for s in skills_data if s['date_added']),
'skills_without_dates': sum(1 for s in skills_data if not s['date_added']),
'skills_with_suggested_risk': sum(1 for s in skills_data if s['suggested_risk'] != 'unknown'),
'suggested_risk_counts': {
risk: sum(1 for skill in skills_data if skill['suggested_risk'] == risk)
for risk in sorted({skill['suggested_risk'] for skill in skills_data if skill['suggested_risk'] != 'unknown'})
},
'coverage_percentage': round(
sum(1 for s in skills_data if s['date_added']) / len(skills_data) * 100 if skills_data else 0,
1

View File

@@ -0,0 +1,99 @@
#!/usr/bin/env python3
from __future__ import annotations
import re
from dataclasses import dataclass
from collections.abc import Mapping
@dataclass(frozen=True)
class RiskSuggestion:
risk: str
reasons: tuple[str, ...]
OFFENSIVE_HINTS = [
(re.compile(r"AUTHORIZED USE ONLY", re.IGNORECASE), "explicit offensive disclaimer"),
(
re.compile(
r"\b(?:pentest(?:ing)?|penetration testing|red team(?:ing)?|exploit(?:ation)?|malware|phishing|sql injection|xss|csrf|jailbreak|sandbox escape|credential theft|exfiltrat\w*|prompt injection)\b",
re.IGNORECASE,
),
"offensive security language",
),
]
CRITICAL_HINTS = [
(re.compile(r"\bcurl\b[^\n]*\|\s*(?:bash|sh)\b", re.IGNORECASE), "curl pipes into a shell"),
(re.compile(r"\bwget\b[^\n]*\|\s*(?:bash|sh)\b", re.IGNORECASE), "wget pipes into a shell"),
(re.compile(r"\birm\b[^\n]*\|\s*iex\b", re.IGNORECASE), "PowerShell invoke-expression"),
(re.compile(r"\brm\s+-rf\b", re.IGNORECASE), "destructive filesystem delete"),
(re.compile(r"\bgit\s+(?:commit|push|merge|reset)\b", re.IGNORECASE), "git mutation"),
(re.compile(r"\b(?:npm|pnpm|yarn|bun)\s+publish\b", re.IGNORECASE), "package publication"),
(re.compile(r"\b(?:kubectl\s+apply|terraform\s+apply|ansible-playbook|docker\s+push)\b", re.IGNORECASE), "deployment or infrastructure mutation"),
(
re.compile(r"\b(?:POST|PUT|PATCH|DELETE)\b", re.IGNORECASE),
"mutating HTTP verb",
),
(
re.compile(r"\b(?:insert|update|upsert|delete|drop|truncate|alter)\b", re.IGNORECASE),
"state-changing data operation",
),
(
re.compile(r"\b(?:api key|api[_ -]?key|token|secret|password|bearer token|oauth token)\b", re.IGNORECASE),
"secret or token handling",
),
(
re.compile(
r"\b(?:write|overwrite|append|create|modify|remove|rename|move)\b[^\n]{0,60}\b(?:file|files|directory|repo|repository|config|skill|document|artifact|database|table|record|row|branch|release|production|server|endpoint|resource)\b",
re.IGNORECASE,
),
"state-changing instruction",
),
]
SAFE_HINTS = [
(
re.compile(
r"\b(?:echo|cat|ls|rg|grep|find|sed\s+-n|git\s+status|git\s+diff|pytest|npm\s+test|ruff|eslint|tsc)\b",
re.IGNORECASE,
),
"non-mutating command example",
),
(re.compile(r"^```", re.MULTILINE), "contains fenced examples"),
(
re.compile(r"\b(?:read|inspect|analyze|audit|validate|test|search|summarize|monitor|review|list|fetch|get|query|lint)\b", re.IGNORECASE),
"read-only or diagnostic language",
),
(
re.compile(r"\b(?:api|http|graphql|webhook|endpoint|cli|sdk|docs?|database|log|logs)\b", re.IGNORECASE),
"technical or integration language",
),
]
def _collect_reasons(text: str, patterns: list[tuple[re.Pattern[str], str]]) -> list[str]:
return [reason for pattern, reason in patterns if pattern.search(text)]
def suggest_risk(content: str, metadata: Mapping[str, object] | None = None) -> RiskSuggestion:
text = content if isinstance(content, str) else str(content or "")
if metadata:
if isinstance(metadata.get("description"), str):
text = f"{metadata['description']}\n{text}"
if isinstance(metadata.get("name"), str):
text = f"{metadata['name']}\n{text}"
offensive_reasons = _collect_reasons(text, OFFENSIVE_HINTS)
if offensive_reasons:
return RiskSuggestion("offensive", tuple(offensive_reasons))
critical_reasons = _collect_reasons(text, CRITICAL_HINTS)
if critical_reasons:
return RiskSuggestion("critical", tuple(critical_reasons))
safe_reasons = _collect_reasons(text, SAFE_HINTS)
if safe_reasons:
return RiskSuggestion("safe", tuple(safe_reasons))
return RiskSuggestion("none", ())

View File

@@ -207,6 +207,9 @@ def find_plugin_skills(source_dir: Path, already_synced_names: set):
skill_dir = skill_file.parent
skill_name = skill_dir.name
if not is_safe_regular_file(skill_file, source_dir):
continue
if skill_name not in already_synced_names:
results.append({
"relative_path": Path("plugins") / skill_name,

View File

@@ -98,6 +98,31 @@ assert.match(
/GH_TOKEN: \$\{\{ github\.token \}\}/,
"main CI should provide GH_TOKEN for contributor synchronization",
);
assert.match(
ciWorkflow,
/main-validation-and-sync:[\s\S]*?concurrency:[\s\S]*?group: canonical-main-sync[\s\S]*?cancel-in-progress: false/,
"main validation should serialize canonical sync writers",
);
assert.match(
ciWorkflow,
/pip install -r tools\/requirements\.txt/g,
"CI workflows should install Python dependencies from tools/requirements.txt",
);
assert.match(
ciWorkflow,
/- name: Audit npm dependencies[\s\S]*?run: npm audit --audit-level=high/,
"CI should run npm audit at high severity",
);
assert.match(
ciWorkflow,
/main-validation-and-sync:[\s\S]*?- name: Audit npm dependencies[\s\S]*?run: npm audit --audit-level=high/,
"main validation should enforce npm audit before syncing canonical state",
);
assert.doesNotMatch(
ciWorkflow,
/main-validation-and-sync:[\s\S]*?continue-on-error: true/,
"main validation should not treat high-severity npm audit findings as non-blocking",
);
assert.doesNotMatch(
ciWorkflow,
/^ - name: Generate index$/m,
@@ -113,16 +138,46 @@ assert.doesNotMatch(
/^ - name: Build catalog$/m,
"main CI should not keep the old standalone Build catalog step",
);
assert.match(
ciWorkflow,
/git commit -m "chore: sync repo state \[ci skip\]"/,
"main CI should keep bot-generated canonical sync commits out of the normal CI loop",
);
assert.match(
ciWorkflow,
/git ls-files --others --exclude-standard/,
"main CI should fail if canonical sync leaves unmanaged untracked drift",
);
assert.match(
ciWorkflow,
/git diff --name-only/,
"main CI should fail if canonical sync leaves unmanaged tracked drift",
);
assert.ok(fs.existsSync(hygieneWorkflowPath), "repo hygiene workflow should exist");
const hygieneWorkflow = readText(".github/workflows/repo-hygiene.yml");
assert.match(hygieneWorkflow, /^on:\n workflow_dispatch:\n schedule:/m, "repo hygiene workflow should support schedule and manual runs");
assert.match(
hygieneWorkflow,
/concurrency:\n\s+group: canonical-main-sync\n\s+cancel-in-progress: false/,
"repo hygiene workflow should serialize canonical sync writers with main CI",
);
assert.match(
hygieneWorkflow,
/GH_TOKEN: \$\{\{ github\.token \}\}/,
"repo hygiene workflow should provide GH_TOKEN for gh-based contributor sync",
);
assert.match(
hygieneWorkflow,
/pip install -r tools\/requirements\.txt/,
"repo hygiene workflow should install Python dependencies from tools/requirements.txt",
);
assert.match(
hygieneWorkflow,
/run: npm audit --audit-level=high/,
"repo hygiene workflow should block on high-severity npm audit findings before syncing",
);
assert.match(
hygieneWorkflow,
/run: npm run sync:repo-state/,
@@ -133,8 +188,33 @@ assert.match(
/generated_files\.js --include-mixed/,
"repo hygiene workflow should resolve and stage the mixed generated files contract",
);
assert.match(
hygieneWorkflow,
/git commit -m "chore: scheduled repo hygiene sync \[ci skip\]"/,
"repo hygiene workflow should keep bot-generated sync commits out of the normal CI loop",
);
assert.match(
hygieneWorkflow,
/git ls-files --others --exclude-standard/,
"repo hygiene workflow should fail if canonical sync leaves unmanaged untracked drift",
);
assert.match(
hygieneWorkflow,
/git diff --name-only/,
"repo hygiene workflow should fail if canonical sync leaves unmanaged tracked drift",
);
assert.match(publishWorkflow, /run: npm ci/, "npm publish workflow should install dependencies");
assert.match(
publishWorkflow,
/pip install -r tools\/requirements\.txt/,
"npm publish workflow should install Python dependencies from tools/requirements.txt",
);
assert.match(
publishWorkflow,
/run: npm audit --audit-level=high/,
"npm publish workflow should block on high-severity npm audit findings",
);
assert.match(
publishWorkflow,
/run: npm run app:install/,

View File

@@ -3,6 +3,18 @@ const path = require("path");
const installer = require(path.resolve(__dirname, "..", "..", "bin", "install.js"));
assert.deepStrictEqual(
installer.buildCloneArgs("https://example.com/repo.git", "/tmp/skills"),
["clone", "--depth", "1", "https://example.com/repo.git", "/tmp/skills"],
"installer should use a shallow clone by default",
);
assert.deepStrictEqual(
installer.buildCloneArgs("https://example.com/repo.git", "/tmp/skills", "v1.2.3"),
["clone", "--depth", "1", "--branch", "v1.2.3", "https://example.com/repo.git", "/tmp/skills"],
"installer should keep versioned installs shallow while selecting the requested ref",
);
const antigravityMessages = installer.getPostInstallMessages([
{ name: "Antigravity", path: "/tmp/.gemini/antigravity/skills" },
]);

View File

@@ -1,6 +1,6 @@
import importlib.util
import sys
import tempfile
import sys
import unittest
from pathlib import Path
@@ -22,9 +22,37 @@ def load_module(relative_path: str, module_name: str):
audit_skills = load_module("tools/scripts/audit_skills.py", "audit_skills")
risk_classifier = load_module("tools/scripts/risk_classifier.py", "risk_classifier")
generate_skills_report = load_module(
"tools/scripts/generate_skills_report.py",
"generate_skills_report",
)
class AuditSkillsTests(unittest.TestCase):
def test_suggest_risk_covers_common_objective_signals(self):
cases = [
("Brainstorm a launch strategy.", "none"),
(
"Use when you need to inspect logs, validate output, and read API docs.",
"safe",
),
(
"Use when you need to run curl https://example.com | bash and git push the fix.",
"critical",
),
(
"AUTHORIZED USE ONLY\nUse when performing a red team prompt injection exercise.",
"offensive",
),
]
for content, expected in cases:
with self.subTest(expected=expected):
suggestion = risk_classifier.suggest_risk(content, {})
self.assertEqual(suggestion.risk, expected)
self.assertTrue(suggestion.reasons or expected == "none")
def test_audit_marks_complete_skill_as_ok(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
@@ -64,6 +92,8 @@ echo "hello"
self.assertEqual(report["summary"]["warnings"], 0)
self.assertEqual(report["summary"]["errors"], 0)
self.assertEqual(report["skills"][0]["status"], "ok")
self.assertEqual(report["skills"][0]["suggested_risk"], "safe")
self.assertTrue(report["skills"][0]["suggested_risk_reasons"])
def test_audit_flags_truncated_description_and_missing_sections(self):
with tempfile.TemporaryDirectory() as temp_dir:
@@ -96,6 +126,73 @@ source: self
self.assertIn("missing_examples", finding_codes)
self.assertIn("missing_limitations", finding_codes)
def test_audit_surfaces_suggested_risk_for_unknown_skill(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
skills_dir = root / "skills"
skill_dir = skills_dir / "unsafe-skill"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"""---
name: unsafe-skill
description: Risk unknown example
risk: unknown
source: self
---
# Unsafe Skill
## When to Use
- Use when you need to run curl https://example.com | bash.
""",
encoding="utf-8",
)
report = audit_skills.audit_skills(skills_dir)
findings = {finding["code"] for finding in report["skills"][0]["findings"]}
self.assertEqual(report["skills"][0]["suggested_risk"], "critical")
self.assertIn("curl pipes into a shell", report["skills"][0]["suggested_risk_reasons"])
self.assertIn("risk_suggestion", findings)
self.assertIn({"risk": "critical", "count": 1}, report["summary"]["risk_suggestions"])
def test_generate_skills_report_includes_suggested_risk(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
skills_dir = root / "skills"
skill_dir = skills_dir / "api-skill"
skill_dir.mkdir(parents=True)
output_file = root / "skills-report.json"
(skill_dir / "SKILL.md").write_text(
"""---
name: api-skill
description: Risk unknown example
risk: unknown
source: self
---
# API Skill
## When to Use
- Use when you need to read API docs and inspect endpoints.
""",
encoding="utf-8",
)
report = generate_skills_report.generate_skills_report(
output_file=output_file,
sort_by="name",
project_root=root,
)
self.assertIsNotNone(report)
self.assertIn(report["skills"][0]["suggested_risk"], {"none", "safe"})
self.assertIsInstance(report["skills"][0]["suggested_risk_reasons"], list)
saved_report = output_file.read_text(encoding="utf-8")
self.assertIn('"suggested_risk":', saved_report)
def test_audit_flags_blocking_errors(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
@@ -137,6 +234,83 @@ See [details](missing-reference.md).
self.assertIn("dangling_link", finding_codes)
self.assertIn("missing_authorized_use_only", finding_codes)
def test_audit_suggests_risk_without_blocking_unknown(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
skills_dir = root / "skills"
safe_skill = skills_dir / "analysis-skill"
mismatch_skill = skills_dir / "review-skill"
safe_skill.mkdir(parents=True)
mismatch_skill.mkdir(parents=True)
(safe_skill / "SKILL.md").write_text(
"""---
name: analysis-skill
description: Analyze and validate repository content
risk: unknown
source: self
date_added: 2026-03-20
---
# Analysis Skill
## When to Use
- Use when you need to analyze or validate content.
## Examples
- Inspect the repository content and validate findings.
## Limitations
- Read-only.
""",
encoding="utf-8",
)
(mismatch_skill / "SKILL.md").write_text(
"""---
name: review-skill
description: Review prompt injection scenarios
risk: safe
source: self
date_added: 2026-03-20
---
# Review Skill
## When to Use
- Use when you need to test prompt injection defenses.
## Examples
```bash
echo "prompt injection"
```
## Limitations
- Demo only.
""",
encoding="utf-8",
)
report = audit_skills.audit_skills(skills_dir)
by_id = {skill["id"]: skill for skill in report["skills"]}
analysis_findings = {finding["code"] for finding in by_id["analysis-skill"]["findings"]}
review_findings = {finding["code"] for finding in by_id["review-skill"]["findings"]}
self.assertEqual(by_id["analysis-skill"]["status"], "ok")
self.assertEqual(by_id["analysis-skill"]["suggested_risk"], "safe")
self.assertIn("risk_suggestion", analysis_findings)
self.assertEqual(by_id["review-skill"]["status"], "warning")
self.assertEqual(by_id["review-skill"]["suggested_risk"], "offensive")
self.assertIn("risk_suggestion", review_findings)
markdown_path = root / "audit.md"
audit_skills.write_markdown_report(report, markdown_path)
markdown = markdown_path.read_text(encoding="utf-8")
self.assertIn("## Risk Suggestions", markdown)
self.assertIn("analysis-skill", markdown)
self.assertIn("review-skill", markdown)
if __name__ == "__main__":
unittest.main()

View File

@@ -38,6 +38,23 @@ class FrontmatterParsingSecurityTests(unittest.TestCase):
self.assertIsNone(metadata)
self.assertTrue(any("mapping" in error.lower() for error in errors))
def test_validate_skills_empty_frontmatter_is_schema_checked(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
skills_dir = root / "skills"
skill_dir = skills_dir / "demo"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text("---\n---\n# Demo\n", encoding="utf-8")
results = validate_skills.collect_validation_results(str(skills_dir))
self.assertTrue(any("Missing 'name'" in error for error in results["errors"]))
self.assertTrue(any("Missing 'description'" in error for error in results["errors"]))
self.assertFalse(
any("Missing or malformed YAML frontmatter" in error for error in results["errors"])
)
def test_validate_skills_normalizes_unquoted_yaml_dates(self):
content = "---\nname: demo\ndescription: ok\ndate_added: 2026-03-15\n---\nbody\n"
metadata, errors = validate_skills.parse_frontmatter(content)

View File

@@ -2,6 +2,7 @@ import importlib.util
import sys
import tempfile
import unittest
import stat
import zipfile
from pathlib import Path
@@ -20,21 +21,51 @@ def load_module(relative_path: str, module_name: str):
class OfficeUnpackSecurityTests(unittest.TestCase):
def test_extract_archive_safely_blocks_zip_slip(self):
module = load_module("skills/docx/ooxml/scripts/unpack.py", "docx_unpack")
for relative_path, module_name in [
("skills/docx-official/ooxml/scripts/unpack.py", "docx_unpack"),
("skills/pptx-official/ooxml/scripts/unpack.py", "pptx_unpack"),
]:
module = load_module(relative_path, module_name)
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
archive_path = temp_path / "payload.zip"
output_dir = temp_path / "output"
with self.subTest(module=relative_path):
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
archive_path = temp_path / "payload.zip"
output_dir = temp_path / "output"
with zipfile.ZipFile(archive_path, "w") as archive:
archive.writestr("../escape.txt", "escape")
archive.writestr("word/document.xml", "<w:document/>")
with zipfile.ZipFile(archive_path, "w") as archive:
archive.writestr("../escape.txt", "escape")
archive.writestr("word/document.xml", "<w:document/>")
with self.assertRaises(ValueError):
module.extract_archive_safely(archive_path, output_dir)
with self.assertRaises(ValueError):
module.extract_archive_safely(archive_path, output_dir)
self.assertFalse((temp_path / "escape.txt").exists())
self.assertFalse((temp_path / "escape.txt").exists())
def test_extract_archive_safely_blocks_zip_symlinks(self):
for relative_path, module_name in [
("skills/docx-official/ooxml/scripts/unpack.py", "docx_unpack_symlink"),
("skills/pptx-official/ooxml/scripts/unpack.py", "pptx_unpack_symlink"),
]:
module = load_module(relative_path, module_name)
with self.subTest(module=relative_path):
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
archive_path = temp_path / "payload.zip"
output_dir = temp_path / "output"
with zipfile.ZipFile(archive_path, "w") as archive:
symlink_info = zipfile.ZipInfo("word/link")
symlink_info.create_system = 3
symlink_info.external_attr = (stat.S_IFLNK | 0o777) << 16
archive.writestr(symlink_info, "../escape.txt")
archive.writestr("word/document.xml", "<w:document/>")
with self.assertRaises(ValueError):
module.extract_archive_safely(archive_path, output_dir)
self.assertFalse((temp_path / "escape.txt").exists())
if __name__ == "__main__":

View File

@@ -95,6 +95,33 @@ class SyncMicrosoftSkillsSecurityTests(unittest.TestCase):
target.unlink()
outside.rmdir()
def test_find_plugin_skills_ignores_symlinked_skill_markdown(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
github_plugins = root / ".github" / "plugins"
github_plugins.mkdir(parents=True)
safe_plugin = github_plugins / "safe-plugin"
safe_plugin.mkdir()
(safe_plugin / "SKILL.md").write_text("---\nname: safe-plugin\n---\n", encoding="utf-8")
linked_plugin = github_plugins / "linked-plugin"
linked_plugin.mkdir()
outside = Path(tempfile.mkdtemp())
try:
target = outside / "SKILL.md"
target.write_text("---\nname: escaped\n---\n", encoding="utf-8")
(linked_plugin / "SKILL.md").symlink_to(target)
entries = sms.find_plugin_skills(root, set())
relative_paths = {str(entry["relative_path"]) for entry in entries}
self.assertEqual(relative_paths, {"plugins/safe-plugin"})
finally:
target.unlink()
outside.rmdir()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,26 @@
const assert = require("assert");
const fs = require("fs");
const path = require("path");
const repoRoot = path.resolve(__dirname, "..", "..", "..");
const readme = fs.readFileSync(path.join(repoRoot, "apps", "web-app", "README.md"), "utf8");
assert.doesNotMatch(
readme,
/^# React \+ Vite$/m,
"web app README should be project-specific, not the default Vite template",
);
for (const section of [
"## What This App Does",
"## Development",
"## Environment Variables",
"## Deploy Model",
"## Testing",
]) {
assert.match(
readme,
new RegExp(`^${section.replace(/[.*+?^${}()|[\]\\\\]/g, "\\$&")}$`, "m"),
`web app README should document ${section}`,
);
}

View File

@@ -53,7 +53,7 @@ def parse_frontmatter(content, rel_path=None):
Parse frontmatter using PyYAML for robustness.
Returns a dict of key-values and a list of error messages.
"""
fm_match = re.search(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
fm_match = re.search(r'^---\s*\n(.*?)\n?---(?:\s*\n|$)', content, re.DOTALL)
if not fm_match:
return None, ["Missing or malformed YAML frontmatter"]
@@ -109,7 +109,7 @@ def collect_validation_results(skills_dir, strict_mode=False):
# 1. Frontmatter Check
metadata, fm_errors = parse_frontmatter(content, rel_path)
if not metadata:
if metadata is None:
errors.append(f"{rel_path}: Missing or malformed YAML frontmatter")
continue # Cannot proceed without metadata