fix(security): harden bundle and plugin validation
This commit is contained in:
@@ -2,6 +2,8 @@ import os
|
|||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
|
from collections.abc import Mapping
|
||||||
|
from datetime import date, datetime
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
from _project_paths import find_repo_root
|
from _project_paths import find_repo_root
|
||||||
@@ -161,6 +163,27 @@ def infer_category(skill_info, metadata, body_text):
|
|||||||
|
|
||||||
return infer_dynamic_category(str(skill_info.get("id", "")))
|
return infer_dynamic_category(str(skill_info.get("id", "")))
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_yaml_value(value):
|
||||||
|
if isinstance(value, Mapping):
|
||||||
|
return {key: normalize_yaml_value(val) for key, val in value.items()}
|
||||||
|
if isinstance(value, list):
|
||||||
|
return [normalize_yaml_value(item) for item in value]
|
||||||
|
if isinstance(value, (date, datetime)):
|
||||||
|
return value.isoformat()
|
||||||
|
if isinstance(value, (bytes, bytearray)):
|
||||||
|
return bytes(value).decode("utf-8", errors="replace")
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def coerce_metadata_text(value):
|
||||||
|
if value is None or isinstance(value, (Mapping, list, tuple, set)):
|
||||||
|
return None
|
||||||
|
if isinstance(value, str):
|
||||||
|
return value
|
||||||
|
return str(value)
|
||||||
|
|
||||||
|
|
||||||
def parse_frontmatter(content):
|
def parse_frontmatter(content):
|
||||||
"""
|
"""
|
||||||
Parses YAML frontmatter, sanitizing unquoted values containing @.
|
Parses YAML frontmatter, sanitizing unquoted values containing @.
|
||||||
@@ -190,7 +213,12 @@ def parse_frontmatter(content):
|
|||||||
sanitized_yaml = '\n'.join(sanitized_lines)
|
sanitized_yaml = '\n'.join(sanitized_lines)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return yaml.safe_load(sanitized_yaml) or {}
|
parsed = yaml.safe_load(sanitized_yaml) or {}
|
||||||
|
parsed = normalize_yaml_value(parsed)
|
||||||
|
if not isinstance(parsed, Mapping):
|
||||||
|
print("⚠️ YAML frontmatter must be a mapping/object")
|
||||||
|
return {}
|
||||||
|
return dict(parsed)
|
||||||
except yaml.YAMLError as e:
|
except yaml.YAMLError as e:
|
||||||
print(f"⚠️ YAML parsing error: {e}")
|
print(f"⚠️ YAML parsing error: {e}")
|
||||||
return {}
|
return {}
|
||||||
@@ -240,11 +268,25 @@ def generate_index(skills_dir, output_file):
|
|||||||
body = content[fm_match.end():].strip()
|
body = content[fm_match.end():].strip()
|
||||||
|
|
||||||
# Merge Metadata (frontmatter takes priority)
|
# Merge Metadata (frontmatter takes priority)
|
||||||
if "name" in metadata: skill_info["name"] = metadata["name"]
|
name = coerce_metadata_text(metadata.get("name"))
|
||||||
if "description" in metadata: skill_info["description"] = metadata["description"]
|
description = coerce_metadata_text(metadata.get("description"))
|
||||||
if "risk" in metadata: skill_info["risk"] = metadata["risk"]
|
risk = coerce_metadata_text(metadata.get("risk"))
|
||||||
if "source" in metadata: skill_info["source"] = metadata["source"]
|
source = coerce_metadata_text(metadata.get("source"))
|
||||||
if "date_added" in metadata: skill_info["date_added"] = metadata["date_added"]
|
date_added = coerce_metadata_text(metadata.get("date_added"))
|
||||||
|
category = coerce_metadata_text(metadata.get("category"))
|
||||||
|
|
||||||
|
if name is not None:
|
||||||
|
skill_info["name"] = name
|
||||||
|
if description is not None:
|
||||||
|
skill_info["description"] = description
|
||||||
|
if risk is not None:
|
||||||
|
skill_info["risk"] = risk
|
||||||
|
if source is not None:
|
||||||
|
skill_info["source"] = source
|
||||||
|
if date_added is not None:
|
||||||
|
skill_info["date_added"] = date_added
|
||||||
|
if category is not None:
|
||||||
|
skill_info["category"] = category
|
||||||
|
|
||||||
# Category: prefer frontmatter, then folder structure, then default
|
# Category: prefer frontmatter, then folder structure, then default
|
||||||
inferred_category, confidence, reason = infer_category(skill_info, metadata, body)
|
inferred_category, confidence, reason = infer_category(skill_info, metadata, body)
|
||||||
|
|||||||
@@ -808,8 +808,18 @@ def normalize_yaml_value(value):
|
|||||||
return [normalize_yaml_value(item) for item in value]
|
return [normalize_yaml_value(item) for item in value]
|
||||||
if isinstance(value, (date, datetime)):
|
if isinstance(value, (date, datetime)):
|
||||||
return value.isoformat()
|
return value.isoformat()
|
||||||
|
if isinstance(value, (bytes, bytearray)):
|
||||||
|
return bytes(value).decode("utf-8", errors="replace")
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def coerce_metadata_text(value):
|
||||||
|
if value is None or isinstance(value, (Mapping, list, tuple, set)):
|
||||||
|
return None
|
||||||
|
if isinstance(value, str):
|
||||||
|
return value
|
||||||
|
return str(value)
|
||||||
|
|
||||||
def parse_frontmatter(content):
|
def parse_frontmatter(content):
|
||||||
"""
|
"""
|
||||||
Parses YAML frontmatter, sanitizing unquoted values containing @.
|
Parses YAML frontmatter, sanitizing unquoted values containing @.
|
||||||
@@ -905,15 +915,27 @@ def generate_index(skills_dir, output_file, compatibility_report=None):
|
|||||||
metadata = parse_frontmatter(content)
|
metadata = parse_frontmatter(content)
|
||||||
|
|
||||||
# Merge Metadata (frontmatter takes priority)
|
# Merge Metadata (frontmatter takes priority)
|
||||||
if "name" in metadata: skill_info["name"] = metadata["name"]
|
name = coerce_metadata_text(metadata.get("name"))
|
||||||
if "description" in metadata: skill_info["description"] = metadata["description"]
|
description = coerce_metadata_text(metadata.get("description"))
|
||||||
if "risk" in metadata: skill_info["risk"] = metadata["risk"]
|
risk = coerce_metadata_text(metadata.get("risk"))
|
||||||
if "source" in metadata: skill_info["source"] = metadata["source"]
|
source = coerce_metadata_text(metadata.get("source"))
|
||||||
if "date_added" in metadata: skill_info["date_added"] = metadata["date_added"]
|
date_added = coerce_metadata_text(metadata.get("date_added"))
|
||||||
|
category = coerce_metadata_text(metadata.get("category"))
|
||||||
|
|
||||||
|
if name is not None:
|
||||||
|
skill_info["name"] = name
|
||||||
|
if description is not None:
|
||||||
|
skill_info["description"] = description
|
||||||
|
if risk is not None:
|
||||||
|
skill_info["risk"] = risk
|
||||||
|
if source is not None:
|
||||||
|
skill_info["source"] = source
|
||||||
|
if date_added is not None:
|
||||||
|
skill_info["date_added"] = date_added
|
||||||
|
|
||||||
# Category: prefer frontmatter, then folder structure, then conservative inference
|
# Category: prefer frontmatter, then folder structure, then conservative inference
|
||||||
if "category" in metadata:
|
if category is not None:
|
||||||
skill_info["category"] = metadata["category"]
|
skill_info["category"] = category
|
||||||
elif skill_info["category"] is None:
|
elif skill_info["category"] is None:
|
||||||
inferred_category = infer_category(
|
inferred_category = infer_category(
|
||||||
skill_info["id"],
|
skill_info["id"],
|
||||||
|
|||||||
@@ -65,6 +65,8 @@ def _normalize_yaml_value(value: Any) -> Any:
|
|||||||
return [_normalize_yaml_value(item) for item in value]
|
return [_normalize_yaml_value(item) for item in value]
|
||||||
if isinstance(value, (date, datetime)):
|
if isinstance(value, (date, datetime)):
|
||||||
return value.isoformat()
|
return value.isoformat()
|
||||||
|
if isinstance(value, (bytes, bytearray)):
|
||||||
|
return bytes(value).decode("utf-8", errors="replace")
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
@@ -114,6 +116,7 @@ def _runtime_dependency_files(skill_dir: Path) -> list[str]:
|
|||||||
|
|
||||||
def _local_link_reasons(content: str, skill_dir: Path) -> set[str]:
|
def _local_link_reasons(content: str, skill_dir: Path) -> set[str]:
|
||||||
reasons: set[str] = set()
|
reasons: set[str] = set()
|
||||||
|
resolved_skill_dir = skill_dir.resolve()
|
||||||
|
|
||||||
for link in LOCAL_LINK_RE.findall(content):
|
for link in LOCAL_LINK_RE.findall(content):
|
||||||
link_clean = link.split("#", 1)[0].strip()
|
link_clean = link.split("#", 1)[0].strip()
|
||||||
@@ -125,6 +128,11 @@ def _local_link_reasons(content: str, skill_dir: Path) -> set[str]:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
target_path = (skill_dir / link_clean).resolve(strict=False)
|
target_path = (skill_dir / link_clean).resolve(strict=False)
|
||||||
|
try:
|
||||||
|
target_path.relative_to(resolved_skill_dir)
|
||||||
|
except ValueError:
|
||||||
|
reasons.add("escaped_local_reference")
|
||||||
|
continue
|
||||||
if not target_path.exists():
|
if not target_path.exists():
|
||||||
reasons.add("broken_local_reference")
|
reasons.add("broken_local_reference")
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ from update_readme import configure_utf8_output, load_metadata
|
|||||||
SAFE_SKILL_ID_RE = re.compile(
|
SAFE_SKILL_ID_RE = re.compile(
|
||||||
r"^(?!.*(?:^|/)\.{1,2}(?:/|$))[A-Za-z0-9._-]+(?:/[A-Za-z0-9._-]+)*$"
|
r"^(?!.*(?:^|/)\.{1,2}(?:/|$))[A-Za-z0-9._-]+(?:/[A-Za-z0-9._-]+)*$"
|
||||||
)
|
)
|
||||||
|
SAFE_BUNDLE_ID_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9._-]*[A-Za-z0-9])?$")
|
||||||
REPO_URL = "https://github.com/sickn33/antigravity-awesome-skills"
|
REPO_URL = "https://github.com/sickn33/antigravity-awesome-skills"
|
||||||
AUTHOR = {
|
AUTHOR = {
|
||||||
"name": "sickn33 and contributors",
|
"name": "sickn33 and contributors",
|
||||||
@@ -182,6 +183,11 @@ def _validate_bundle_skill_id(skill_id: str) -> None:
|
|||||||
raise ValueError(f"Invalid skill id in editorial bundles manifest: {skill_id!r}")
|
raise ValueError(f"Invalid skill id in editorial bundles manifest: {skill_id!r}")
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_bundle_id(bundle_id: str) -> None:
|
||||||
|
if not SAFE_BUNDLE_ID_RE.fullmatch(bundle_id):
|
||||||
|
raise ValueError(f"Invalid editorial bundle id: {bundle_id!r}")
|
||||||
|
|
||||||
|
|
||||||
def _validate_editorial_bundles(root: Path, payload: dict[str, Any]) -> list[dict[str, Any]]:
|
def _validate_editorial_bundles(root: Path, payload: dict[str, Any]) -> list[dict[str, Any]]:
|
||||||
bundles = payload.get("bundles")
|
bundles = payload.get("bundles")
|
||||||
if not isinstance(bundles, list) or not bundles:
|
if not isinstance(bundles, list) or not bundles:
|
||||||
@@ -199,6 +205,7 @@ def _validate_editorial_bundles(root: Path, payload: dict[str, Any]) -> list[dic
|
|||||||
bundle_name = str(bundle.get("name", "")).strip()
|
bundle_name = str(bundle.get("name", "")).strip()
|
||||||
if not bundle_id or not bundle_name:
|
if not bundle_id or not bundle_name:
|
||||||
raise ValueError("Each editorial bundle requires non-empty 'id' and 'name'.")
|
raise ValueError("Each editorial bundle requires non-empty 'id' and 'name'.")
|
||||||
|
_validate_bundle_id(bundle_id)
|
||||||
if bundle_id in seen_bundle_ids:
|
if bundle_id in seen_bundle_ids:
|
||||||
raise ValueError(f"Duplicate editorial bundle id: {bundle_id}")
|
raise ValueError(f"Duplicate editorial bundle id: {bundle_id}")
|
||||||
if bundle_name in seen_bundle_names:
|
if bundle_name in seen_bundle_names:
|
||||||
|
|||||||
@@ -106,6 +106,30 @@ class EditorialBundlesTests(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
self.assertEqual(generated_plugins, expected_plugins)
|
self.assertEqual(generated_plugins, expected_plugins)
|
||||||
|
|
||||||
|
def test_manifest_rejects_bundle_ids_with_path_traversal(self):
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
temp_root = pathlib.Path(temp_dir)
|
||||||
|
skill_dir = temp_root / "skills" / "safe-skill"
|
||||||
|
skill_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"bundles": [
|
||||||
|
{
|
||||||
|
"id": "../../outside",
|
||||||
|
"name": "Safe Bundle",
|
||||||
|
"group": "Security",
|
||||||
|
"emoji": "🛡️",
|
||||||
|
"tagline": "Test bundle",
|
||||||
|
"audience": "Testers",
|
||||||
|
"description": "Testers",
|
||||||
|
"skills": [{"id": "safe-skill", "summary": "ok"}],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
with self.assertRaisesRegex(ValueError, "Invalid editorial bundle id"):
|
||||||
|
editorial_bundles._validate_editorial_bundles(temp_root, payload)
|
||||||
|
|
||||||
def test_sample_bundle_copy_matches_source_file_inventory(self):
|
def test_sample_bundle_copy_matches_source_file_inventory(self):
|
||||||
sample_bundle = next(bundle for bundle in self.manifest_bundles if bundle["id"] == "documents-presentations")
|
sample_bundle = next(bundle for bundle in self.manifest_bundles if bundle["id"] == "documents-presentations")
|
||||||
plugin_skills_root = REPO_ROOT / "plugins" / "antigravity-bundle-documents-presentations" / "skills"
|
plugin_skills_root = REPO_ROOT / "plugins" / "antigravity-bundle-documents-presentations" / "skills"
|
||||||
|
|||||||
@@ -21,6 +21,10 @@ def load_module(relative_path: str, module_name: str):
|
|||||||
|
|
||||||
|
|
||||||
generate_index = load_module("tools/scripts/generate_index.py", "generate_index")
|
generate_index = load_module("tools/scripts/generate_index.py", "generate_index")
|
||||||
|
legacy_generate_index = load_module(
|
||||||
|
"skill_categorization/tools/scripts/generate_index.py",
|
||||||
|
"legacy_generate_index",
|
||||||
|
)
|
||||||
validate_skills = load_module("tools/scripts/validate_skills.py", "validate_skills")
|
validate_skills = load_module("tools/scripts/validate_skills.py", "validate_skills")
|
||||||
|
|
||||||
|
|
||||||
@@ -80,6 +84,34 @@ class FrontmatterParsingSecurityTests(unittest.TestCase):
|
|||||||
self.assertEqual(skills[0]["date_added"], "2026-03-15")
|
self.assertEqual(skills[0]["date_added"], "2026-03-15")
|
||||||
self.assertIn('"date_added": "2026-03-15"', output_file.read_text(encoding="utf-8"))
|
self.assertIn('"date_added": "2026-03-15"', output_file.read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
def test_generate_index_normalizes_binary_frontmatter_scalars(self):
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
root = Path(temp_dir)
|
||||||
|
skills_dir = root / "skills"
|
||||||
|
skill_dir = skills_dir / "demo"
|
||||||
|
output_file = root / "skills_index.json"
|
||||||
|
|
||||||
|
skill_dir.mkdir(parents=True)
|
||||||
|
(skill_dir / "SKILL.md").write_text(
|
||||||
|
"---\nname: !!binary aGVsbG8=\ndescription: !!binary d29ybGQ=\n---\nBody\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
skills = generate_index.generate_index(str(skills_dir), str(output_file))
|
||||||
|
|
||||||
|
self.assertEqual(skills[0]["name"], "hello")
|
||||||
|
self.assertEqual(skills[0]["description"], "world")
|
||||||
|
self.assertIn('"name": "hello"', output_file.read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
def test_legacy_generate_index_rejects_non_mapping_and_normalizes_binary_scalars(self):
|
||||||
|
self.assertEqual(legacy_generate_index.parse_frontmatter("---\njust-a-string\n---\nbody\n"), {})
|
||||||
|
metadata = legacy_generate_index.parse_frontmatter(
|
||||||
|
"---\nname: !!binary aGVsbG8=\ndescription: !!binary d29ybGQ=\n---\nbody\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(metadata["name"], "hello")
|
||||||
|
self.assertEqual(metadata["description"], "world")
|
||||||
|
|
||||||
def test_generate_index_ignores_symlinked_skill_markdown(self):
|
def test_generate_index_ignores_symlinked_skill_markdown(self):
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
root = Path(temp_dir)
|
root = Path(temp_dir)
|
||||||
|
|||||||
@@ -78,6 +78,30 @@ class PluginCompatibilityTests(unittest.TestCase):
|
|||||||
self.assertEqual(entry["targets"]["claude"], "blocked")
|
self.assertEqual(entry["targets"]["claude"], "blocked")
|
||||||
self.assertIn("undeclared_runtime_dependency", entry["reasons"])
|
self.assertIn("undeclared_runtime_dependency", entry["reasons"])
|
||||||
|
|
||||||
|
def test_relative_links_cannot_escape_skill_directory(self):
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
skills_dir = pathlib.Path(temp_dir) / "skills"
|
||||||
|
self._write_skill(
|
||||||
|
skills_dir,
|
||||||
|
"escaping-link-skill",
|
||||||
|
(
|
||||||
|
"---\n"
|
||||||
|
"name: escaping-link-skill\n"
|
||||||
|
"description: Example\n"
|
||||||
|
"---\n"
|
||||||
|
"Read [secret](../../outside/secret.txt)\n"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
outside_dir = pathlib.Path(temp_dir) / "outside"
|
||||||
|
outside_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
(outside_dir / "secret.txt").write_text("secret", encoding="utf-8")
|
||||||
|
|
||||||
|
report = plugin_compatibility.build_report(skills_dir)
|
||||||
|
entry = report["skills"][0]
|
||||||
|
self.assertEqual(entry["targets"]["codex"], "blocked")
|
||||||
|
self.assertEqual(entry["targets"]["claude"], "blocked")
|
||||||
|
self.assertIn("escaped_local_reference", entry["reasons"])
|
||||||
|
|
||||||
def test_manual_setup_metadata_can_make_runtime_skill_supported(self):
|
def test_manual_setup_metadata_can_make_runtime_skill_supported(self):
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
skills_dir = pathlib.Path(temp_dir) / "skills"
|
skills_dir = pathlib.Path(temp_dir) / "skills"
|
||||||
|
|||||||
Reference in New Issue
Block a user