fix: harden filesystem trust boundaries
This commit is contained in:
@@ -8,6 +8,9 @@ def fix_skills(skills_dir):
|
||||
dirs[:] = [d for d in dirs if not d.startswith('.')]
|
||||
if "SKILL.md" in files:
|
||||
skill_path = os.path.join(root, "SKILL.md")
|
||||
if os.path.islink(skill_path):
|
||||
print(f"⚠️ Skipping symlinked skill file: {skill_path}")
|
||||
continue
|
||||
with open(skill_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
@@ -25,6 +28,10 @@ def fix_skills(skills_dir):
|
||||
print(f"⚠️ {skill_path}: YAML error - {e}")
|
||||
continue
|
||||
|
||||
if not isinstance(metadata, dict):
|
||||
print(f"⚠️ {skill_path}: Frontmatter must be a mapping, skipping")
|
||||
continue
|
||||
|
||||
changed = False
|
||||
|
||||
# 1. Fix Name
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from collections.abc import Mapping
|
||||
|
||||
import yaml
|
||||
from _project_paths import find_repo_root
|
||||
@@ -41,7 +42,11 @@ def parse_frontmatter(content):
|
||||
sanitized_yaml = '\n'.join(sanitized_lines)
|
||||
|
||||
try:
|
||||
return yaml.safe_load(sanitized_yaml) or {}
|
||||
parsed = yaml.safe_load(sanitized_yaml) or {}
|
||||
if not isinstance(parsed, Mapping):
|
||||
print("⚠️ YAML frontmatter must be a mapping/object")
|
||||
return {}
|
||||
return dict(parsed)
|
||||
except yaml.YAMLError as e:
|
||||
print(f"⚠️ YAML parsing error: {e}")
|
||||
return {}
|
||||
@@ -56,6 +61,9 @@ def generate_index(skills_dir, output_file):
|
||||
|
||||
if "SKILL.md" in files:
|
||||
skill_path = os.path.join(root, "SKILL.md")
|
||||
if os.path.islink(skill_path):
|
||||
print(f"⚠️ Skipping symlinked SKILL.md: {skill_path}")
|
||||
continue
|
||||
dir_name = os.path.basename(root)
|
||||
parent_dir = os.path.basename(os.path.dirname(root))
|
||||
|
||||
|
||||
@@ -7,53 +7,66 @@ const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
const require = createRequire(import.meta.url);
|
||||
const { findProjectRoot } = require('../lib/project-root');
|
||||
const { resolveSafeRealPath } = require('../lib/symlink-safety');
|
||||
|
||||
const ROOT_DIR = findProjectRoot(__dirname);
|
||||
const WEB_APP_PUBLIC = path.join(ROOT_DIR, 'apps', 'web-app', 'public');
|
||||
|
||||
// Ensure public dir exists
|
||||
if (!fs.existsSync(WEB_APP_PUBLIC)) {
|
||||
fs.mkdirSync(WEB_APP_PUBLIC, { recursive: true });
|
||||
}
|
||||
|
||||
// 1. Copy skills_index.json
|
||||
const sourceIndex = path.join(ROOT_DIR, 'skills_index.json');
|
||||
const destIndex = path.join(WEB_APP_PUBLIC, 'skills.json');
|
||||
|
||||
console.log(`Copying ${sourceIndex} -> ${destIndex}...`);
|
||||
fs.copyFileSync(sourceIndex, destIndex);
|
||||
|
||||
// 2. Copy skills directory content
|
||||
// Note: Symlinking is better, but Windows often requires admin for symlinks.
|
||||
// We will try to copy for reliability in this environment.
|
||||
const sourceSkills = path.join(ROOT_DIR, 'skills');
|
||||
const destSkills = path.join(WEB_APP_PUBLIC, 'skills');
|
||||
|
||||
console.log(`Copying skills directory...`);
|
||||
|
||||
// Recursive copy function (follows symlinks to copy resolved content)
|
||||
function copyFolderSync(from, to) {
|
||||
function copyFolderSync(from, to, rootDir = from) {
|
||||
if (!fs.existsSync(to)) fs.mkdirSync(to, { recursive: true });
|
||||
|
||||
fs.readdirSync(from).forEach(element => {
|
||||
const srcPath = path.join(from, element);
|
||||
const destPath = path.join(to, element);
|
||||
const stat = fs.statSync(srcPath); // statSync follows symlinks
|
||||
const stat = fs.lstatSync(srcPath);
|
||||
const realPath = stat.isSymbolicLink() ? resolveSafeRealPath(rootDir, srcPath) : srcPath;
|
||||
|
||||
if (stat.isFile()) {
|
||||
fs.copyFileSync(srcPath, destPath);
|
||||
} else if (stat.isDirectory()) {
|
||||
copyFolderSync(srcPath, destPath);
|
||||
if (!realPath) {
|
||||
console.warn(`[app:setup] Skipping symlink outside skills root: ${srcPath}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const realStat = fs.statSync(realPath);
|
||||
|
||||
if (realStat.isFile()) {
|
||||
fs.copyFileSync(realPath, destPath);
|
||||
} else if (realStat.isDirectory()) {
|
||||
copyFolderSync(realPath, destPath, rootDir);
|
||||
}
|
||||
// Skip other types (e.g. sockets, FIFOs)
|
||||
});
|
||||
}
|
||||
|
||||
// Check if destination exists and remove it to ensure fresh copy
|
||||
if (fs.existsSync(destSkills)) {
|
||||
fs.rmSync(destSkills, { recursive: true, force: true });
|
||||
function main() {
|
||||
if (!fs.existsSync(WEB_APP_PUBLIC)) {
|
||||
fs.mkdirSync(WEB_APP_PUBLIC, { recursive: true });
|
||||
}
|
||||
|
||||
const sourceIndex = path.join(ROOT_DIR, 'skills_index.json');
|
||||
const destIndex = path.join(WEB_APP_PUBLIC, 'skills.json');
|
||||
console.log(`Copying ${sourceIndex} -> ${destIndex}...`);
|
||||
fs.copyFileSync(sourceIndex, destIndex);
|
||||
|
||||
const sourceSkills = path.join(ROOT_DIR, 'skills');
|
||||
const destSkills = path.join(WEB_APP_PUBLIC, 'skills');
|
||||
|
||||
console.log(`Copying skills directory...`);
|
||||
|
||||
// Check if destination exists and remove it to ensure fresh copy
|
||||
if (fs.existsSync(destSkills)) {
|
||||
fs.rmSync(destSkills, { recursive: true, force: true });
|
||||
}
|
||||
|
||||
copyFolderSync(sourceSkills, destSkills, sourceSkills);
|
||||
|
||||
console.log('✅ Web app assets setup complete!');
|
||||
}
|
||||
|
||||
copyFolderSync(sourceSkills, destSkills);
|
||||
if (process.argv[1] === fileURLToPath(import.meta.url)) {
|
||||
main();
|
||||
}
|
||||
|
||||
console.log('✅ Web app assets setup complete!');
|
||||
export { copyFolderSync, main };
|
||||
|
||||
@@ -16,6 +16,16 @@ from pathlib import Path
|
||||
SKILLS_DIR = Path(__file__).parent.parent / "skills"
|
||||
DISABLED_DIR = SKILLS_DIR / ".disabled"
|
||||
|
||||
|
||||
def resolve_skill_path(base_dir: Path, skill_name: str) -> Path | None:
|
||||
candidate = (base_dir / skill_name).resolve()
|
||||
try:
|
||||
candidate.relative_to(base_dir.resolve())
|
||||
return candidate
|
||||
except ValueError:
|
||||
print(f"❌ Invalid skill name: {skill_name}")
|
||||
return None
|
||||
|
||||
def list_active():
|
||||
"""List all active skills"""
|
||||
print("🟢 Active Skills:\n")
|
||||
@@ -51,8 +61,11 @@ def list_disabled():
|
||||
|
||||
def enable_skill(skill_name):
|
||||
"""Enable a disabled skill"""
|
||||
source = DISABLED_DIR / skill_name
|
||||
target = SKILLS_DIR / skill_name
|
||||
source = resolve_skill_path(DISABLED_DIR, skill_name)
|
||||
target = resolve_skill_path(SKILLS_DIR, skill_name)
|
||||
|
||||
if source is None or target is None:
|
||||
return False
|
||||
|
||||
if not source.exists():
|
||||
print(f"❌ Skill '{skill_name}' not found in .disabled/")
|
||||
@@ -68,8 +81,11 @@ def enable_skill(skill_name):
|
||||
|
||||
def disable_skill(skill_name):
|
||||
"""Disable an active skill"""
|
||||
source = SKILLS_DIR / skill_name
|
||||
target = DISABLED_DIR / skill_name
|
||||
source = resolve_skill_path(SKILLS_DIR, skill_name)
|
||||
target = resolve_skill_path(DISABLED_DIR, skill_name)
|
||||
|
||||
if source is None or target is None:
|
||||
return False
|
||||
|
||||
if not source.exists():
|
||||
print(f"❌ Skill '{skill_name}' not found")
|
||||
|
||||
@@ -49,7 +49,13 @@ def cleanup_previous_sync():
|
||||
if not flat_name:
|
||||
continue
|
||||
|
||||
skill_dir = TARGET_DIR / flat_name
|
||||
sanitized = sanitize_flat_name(flat_name, "")
|
||||
if not sanitized:
|
||||
continue
|
||||
|
||||
skill_dir = TARGET_DIR / sanitized
|
||||
if not is_path_within(TARGET_DIR, skill_dir):
|
||||
continue
|
||||
if skill_dir.exists() and skill_dir.is_dir():
|
||||
shutil.rmtree(skill_dir)
|
||||
removed_count += 1
|
||||
@@ -61,6 +67,50 @@ def cleanup_previous_sync():
|
||||
|
||||
import yaml
|
||||
|
||||
SAFE_FLAT_NAME_PATTERN = re.compile(r"[^A-Za-z0-9._-]+")
|
||||
|
||||
|
||||
def is_path_within(base_dir: Path, target_path: Path) -> bool:
|
||||
"""Return True when target_path resolves inside base_dir."""
|
||||
try:
|
||||
target_path.resolve().relative_to(base_dir.resolve())
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def sanitize_flat_name(candidate: str | None, fallback: str) -> str:
|
||||
"""Accept only flat skill directory names; fall back on unsafe values."""
|
||||
if not candidate:
|
||||
return fallback
|
||||
|
||||
stripped = candidate.strip()
|
||||
parts = Path(stripped).parts
|
||||
if (
|
||||
not stripped
|
||||
or Path(stripped).is_absolute()
|
||||
or any(part in ("..", ".") for part in parts)
|
||||
or "/" in stripped
|
||||
or "\\" in stripped
|
||||
):
|
||||
return fallback
|
||||
|
||||
sanitized = SAFE_FLAT_NAME_PATTERN.sub("-", stripped).strip("-.")
|
||||
return sanitized or fallback
|
||||
|
||||
|
||||
def copy_safe_skill_files(source_dir: Path, target_dir: Path, source_root: Path):
|
||||
"""Copy regular files only when their resolved path stays inside source_root."""
|
||||
for file_item in source_dir.iterdir():
|
||||
if file_item.name == "SKILL.md" or file_item.is_symlink() or not file_item.is_file():
|
||||
continue
|
||||
|
||||
resolved = file_item.resolve()
|
||||
if not is_path_within(source_root, resolved):
|
||||
continue
|
||||
|
||||
shutil.copy2(resolved, target_dir / file_item.name)
|
||||
|
||||
def extract_skill_name(skill_md_path: Path) -> str | None:
|
||||
"""Extract the 'name' field from SKILL.md YAML frontmatter using PyYAML."""
|
||||
try:
|
||||
@@ -95,6 +145,7 @@ def find_skills_in_directory(source_dir: Path):
|
||||
Returns list of dicts: {relative_path, skill_md_path, source_dir}.
|
||||
"""
|
||||
skills_source = source_dir / "skills"
|
||||
source_root = source_dir.resolve()
|
||||
results = []
|
||||
|
||||
if not skills_source.exists():
|
||||
@@ -110,6 +161,8 @@ def find_skills_in_directory(source_dir: Path):
|
||||
if item.is_symlink():
|
||||
try:
|
||||
resolved = item.resolve()
|
||||
if not is_path_within(source_root, resolved):
|
||||
continue
|
||||
if (resolved / "SKILL.md").exists():
|
||||
skill_md = resolved / "SKILL.md"
|
||||
actual_dir = resolved
|
||||
@@ -207,10 +260,11 @@ def sync_skills_flat(source_dir: Path, target_dir: Path):
|
||||
used_names: dict[str, str] = {}
|
||||
|
||||
for entry in all_skill_entries:
|
||||
skill_name = extract_skill_name(entry["skill_md"])
|
||||
fallback_name = generate_fallback_name(entry["relative_path"])
|
||||
skill_name = sanitize_flat_name(
|
||||
extract_skill_name(entry["skill_md"]), fallback_name)
|
||||
|
||||
if not skill_name:
|
||||
skill_name = generate_fallback_name(entry["relative_path"])
|
||||
if skill_name == fallback_name:
|
||||
print(
|
||||
f" ⚠️ No frontmatter name for {entry['relative_path']}, using fallback: {skill_name}")
|
||||
|
||||
@@ -241,9 +295,7 @@ def sync_skills_flat(source_dir: Path, target_dir: Path):
|
||||
shutil.copy2(entry["skill_md"], target_skill_dir / "SKILL.md")
|
||||
|
||||
# Copy other files from the skill directory
|
||||
for file_item in entry["source_dir"].iterdir():
|
||||
if file_item.name != "SKILL.md" and file_item.is_file():
|
||||
shutil.copy2(file_item, target_skill_dir / file_item.name)
|
||||
copy_safe_skill_files(entry["source_dir"], target_skill_dir, source_dir)
|
||||
|
||||
skill_metadata.append({
|
||||
"flat_name": skill_name,
|
||||
@@ -265,9 +317,8 @@ def sync_skills_flat(source_dir: Path, target_dir: Path):
|
||||
if plugin_entries:
|
||||
print(f"\n 📦 Found {len(plugin_entries)} additional plugin skills")
|
||||
for entry in plugin_entries:
|
||||
skill_name = extract_skill_name(entry["skill_md"])
|
||||
if not skill_name:
|
||||
skill_name = entry["source_dir"].name
|
||||
skill_name = sanitize_flat_name(
|
||||
extract_skill_name(entry["skill_md"]), entry["source_dir"].name)
|
||||
|
||||
if skill_name in synced_names:
|
||||
skill_name = f"{skill_name}-plugin"
|
||||
@@ -288,9 +339,7 @@ def sync_skills_flat(source_dir: Path, target_dir: Path):
|
||||
|
||||
shutil.copy2(entry["skill_md"], target_skill_dir / "SKILL.md")
|
||||
|
||||
for file_item in entry["source_dir"].iterdir():
|
||||
if file_item.name != "SKILL.md" and file_item.is_file():
|
||||
shutil.copy2(file_item, target_skill_dir / file_item.name)
|
||||
copy_safe_skill_files(entry["source_dir"], target_skill_dir, source_dir)
|
||||
|
||||
skill_metadata.append({
|
||||
"flat_name": skill_name,
|
||||
@@ -309,9 +358,8 @@ def sync_skills_flat(source_dir: Path, target_dir: Path):
|
||||
print(
|
||||
f"\n <20> Found {len(github_skill_entries)} skills in .github/skills/ not linked from skills/")
|
||||
for entry in github_skill_entries:
|
||||
skill_name = extract_skill_name(entry["skill_md"])
|
||||
if not skill_name:
|
||||
skill_name = entry["source_dir"].name
|
||||
skill_name = sanitize_flat_name(
|
||||
extract_skill_name(entry["skill_md"]), entry["source_dir"].name)
|
||||
|
||||
if skill_name in synced_names:
|
||||
skill_name = f"{skill_name}-github"
|
||||
@@ -331,9 +379,7 @@ def sync_skills_flat(source_dir: Path, target_dir: Path):
|
||||
|
||||
shutil.copy2(entry["skill_md"], target_skill_dir / "SKILL.md")
|
||||
|
||||
for file_item in entry["source_dir"].iterdir():
|
||||
if file_item.name != "SKILL.md" and file_item.is_file():
|
||||
shutil.copy2(file_item, target_skill_dir / file_item.name)
|
||||
copy_safe_skill_files(entry["source_dir"], target_skill_dir, source_dir)
|
||||
|
||||
skill_metadata.append({
|
||||
"flat_name": skill_name,
|
||||
|
||||
@@ -90,7 +90,7 @@ MISSING_COUNT=0
|
||||
|
||||
for skill in "${RECOMMENDED_SKILLS[@]}"; do
|
||||
if [ -d "$GITHUB_REPO/$skill" ]; then
|
||||
cp -r "$GITHUB_REPO/$skill" "$LOCAL_LIBRARY/"
|
||||
cp -RP "$GITHUB_REPO/$skill" "$LOCAL_LIBRARY/"
|
||||
echo " ✅ $skill"
|
||||
((SUCCESS_COUNT++))
|
||||
else
|
||||
|
||||
53
tools/scripts/tests/copy_security.test.js
Normal file
53
tools/scripts/tests/copy_security.test.js
Normal file
@@ -0,0 +1,53 @@
|
||||
const assert = require("assert");
|
||||
const fs = require("fs");
|
||||
const os = require("os");
|
||||
const path = require("path");
|
||||
|
||||
const { copyRecursiveSync } = require("../../bin/install");
|
||||
|
||||
async function main() {
|
||||
const { copyFolderSync } = await import("../../scripts/setup_web.js");
|
||||
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "copy-security-"));
|
||||
try {
|
||||
const safeRoot = path.join(root, "safe-root");
|
||||
const destRoot = path.join(root, "dest-root");
|
||||
const outsideDir = path.join(root, "outside");
|
||||
|
||||
fs.mkdirSync(path.join(safeRoot, "nested"), { recursive: true });
|
||||
fs.mkdirSync(outsideDir, { recursive: true });
|
||||
|
||||
fs.writeFileSync(path.join(safeRoot, "nested", "ok.txt"), "ok");
|
||||
fs.writeFileSync(path.join(outsideDir, "secret.txt"), "secret");
|
||||
fs.symlinkSync(outsideDir, path.join(safeRoot, "escape-link"));
|
||||
|
||||
copyRecursiveSync(safeRoot, path.join(destRoot, "install-copy"), safeRoot);
|
||||
copyFolderSync(safeRoot, path.join(destRoot, "web-copy"), safeRoot);
|
||||
|
||||
assert.strictEqual(
|
||||
fs.existsSync(path.join(destRoot, "install-copy", "escape-link", "secret.txt")),
|
||||
false,
|
||||
"installer copy must not follow symlinks outside the cloned root",
|
||||
);
|
||||
assert.strictEqual(
|
||||
fs.existsSync(path.join(destRoot, "web-copy", "escape-link", "secret.txt")),
|
||||
false,
|
||||
"web setup copy must not follow symlinks outside the skills root",
|
||||
);
|
||||
assert.strictEqual(
|
||||
fs.readFileSync(path.join(destRoot, "install-copy", "nested", "ok.txt"), "utf8"),
|
||||
"ok",
|
||||
);
|
||||
assert.strictEqual(
|
||||
fs.readFileSync(path.join(destRoot, "web-copy", "nested", "ok.txt"), "utf8"),
|
||||
"ok",
|
||||
);
|
||||
} finally {
|
||||
fs.rmSync(root, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((error) => {
|
||||
console.error(error);
|
||||
process.exit(1);
|
||||
});
|
||||
37
tools/scripts/tests/skill_utils_security.test.js
Normal file
37
tools/scripts/tests/skill_utils_security.test.js
Normal file
@@ -0,0 +1,37 @@
|
||||
const assert = require("assert");
|
||||
const fs = require("fs");
|
||||
const os = require("os");
|
||||
const path = require("path");
|
||||
|
||||
const { listSkillIds } = require("../../lib/skill-utils");
|
||||
|
||||
function withTempDir(fn) {
|
||||
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "skill-utils-security-"));
|
||||
try {
|
||||
fn(dir);
|
||||
} finally {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
withTempDir((root) => {
|
||||
const skillsDir = path.join(root, "skills");
|
||||
const outsideDir = path.join(root, "outside-secret");
|
||||
|
||||
fs.mkdirSync(skillsDir, { recursive: true });
|
||||
fs.mkdirSync(outsideDir, { recursive: true });
|
||||
|
||||
fs.mkdirSync(path.join(skillsDir, "safe-skill"));
|
||||
fs.writeFileSync(path.join(skillsDir, "safe-skill", "SKILL.md"), "# safe\n");
|
||||
|
||||
fs.writeFileSync(path.join(outsideDir, "SKILL.md"), "# secret\n");
|
||||
fs.symlinkSync(outsideDir, path.join(skillsDir, "linked-secret"));
|
||||
|
||||
const skillIds = listSkillIds(skillsDir);
|
||||
|
||||
assert.deepStrictEqual(
|
||||
skillIds,
|
||||
["safe-skill"],
|
||||
"symlinked skill directories must not be treated as local skills",
|
||||
);
|
||||
});
|
||||
37
tools/scripts/tests/symlink_safety.test.js
Normal file
37
tools/scripts/tests/symlink_safety.test.js
Normal file
@@ -0,0 +1,37 @@
|
||||
const assert = require("assert");
|
||||
const fs = require("fs");
|
||||
const os = require("os");
|
||||
const path = require("path");
|
||||
|
||||
const { resolveSafeRealPath } = require("../../lib/symlink-safety");
|
||||
|
||||
function withTempDir(fn) {
|
||||
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "symlink-safety-"));
|
||||
try {
|
||||
fn(dir);
|
||||
} finally {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
withTempDir((root) => {
|
||||
const safeRoot = path.join(root, "safe-root");
|
||||
const internalDir = path.join(safeRoot, "internal");
|
||||
const outsideDir = path.join(root, "outside");
|
||||
const internalLink = path.join(safeRoot, "internal-link");
|
||||
const outsideLink = path.join(safeRoot, "outside-link");
|
||||
|
||||
fs.mkdirSync(internalDir, { recursive: true });
|
||||
fs.mkdirSync(outsideDir, { recursive: true });
|
||||
fs.writeFileSync(path.join(internalDir, "data.txt"), "ok");
|
||||
fs.writeFileSync(path.join(outsideDir, "secret.txt"), "secret");
|
||||
|
||||
fs.symlinkSync(internalDir, internalLink);
|
||||
fs.symlinkSync(outsideDir, outsideLink);
|
||||
|
||||
const internalResolved = resolveSafeRealPath(safeRoot, internalLink);
|
||||
const outsideResolved = resolveSafeRealPath(safeRoot, outsideLink);
|
||||
|
||||
assert.strictEqual(internalResolved, fs.realpathSync(internalDir));
|
||||
assert.strictEqual(outsideResolved, null);
|
||||
});
|
||||
36
tools/scripts/tests/test_fix_skills_metadata_security.py
Normal file
36
tools/scripts/tests/test_fix_skills_metadata_security.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
TOOLS_SCRIPTS_DIR = Path(__file__).resolve().parents[1]
|
||||
if str(TOOLS_SCRIPTS_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(TOOLS_SCRIPTS_DIR))
|
||||
|
||||
import fix_skills_metadata
|
||||
|
||||
|
||||
class FixSkillsMetadataSecurityTests(unittest.TestCase):
|
||||
def test_skips_symlinked_skill_markdown(self):
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
root = Path(temp_dir)
|
||||
skill_dir = root / "safe-skill"
|
||||
outside_dir = root / "outside"
|
||||
skill_dir.mkdir()
|
||||
outside_dir.mkdir()
|
||||
|
||||
target = outside_dir / "SKILL.md"
|
||||
target.write_text("---\nname: outside\n---\nbody\n", encoding="utf-8")
|
||||
(skill_dir / "SKILL.md").symlink_to(target)
|
||||
|
||||
fix_skills_metadata.fix_skills(root)
|
||||
|
||||
self.assertEqual(
|
||||
target.read_text(encoding="utf-8"),
|
||||
"---\nname: outside\n---\nbody\n",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
53
tools/scripts/tests/test_generate_index_security.py
Normal file
53
tools/scripts/tests/test_generate_index_security.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import importlib.util
|
||||
import json
|
||||
import pathlib
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
|
||||
REPO_ROOT = pathlib.Path(__file__).resolve().parents[3]
|
||||
sys.path.insert(0, str(REPO_ROOT / "tools" / "scripts"))
|
||||
|
||||
|
||||
def load_module(module_path: str, module_name: str):
|
||||
spec = importlib.util.spec_from_file_location(module_name, REPO_ROOT / module_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
generate_index = load_module("tools/scripts/generate_index.py", "generate_index")
|
||||
|
||||
|
||||
class GenerateIndexSecurityTests(unittest.TestCase):
|
||||
def test_parse_frontmatter_rejects_non_mapping_yaml(self):
|
||||
metadata = generate_index.parse_frontmatter("---\njust-a-string\n---\nbody\n")
|
||||
self.assertEqual(metadata, {})
|
||||
|
||||
def test_generate_index_skips_symlinked_skill_markdown(self):
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
skills_dir = pathlib.Path(temp_dir) / "skills"
|
||||
safe_skill_dir = skills_dir / "safe-skill"
|
||||
linked_skill_dir = skills_dir / "linked-skill"
|
||||
outside_dir = pathlib.Path(temp_dir) / "outside"
|
||||
output_file = pathlib.Path(temp_dir) / "skills_index.json"
|
||||
|
||||
safe_skill_dir.mkdir(parents=True)
|
||||
linked_skill_dir.mkdir(parents=True)
|
||||
outside_dir.mkdir()
|
||||
|
||||
(safe_skill_dir / "SKILL.md").write_text("---\nname: Safe Skill\n---\nbody\n", encoding="utf-8")
|
||||
target = outside_dir / "secret.txt"
|
||||
target.write_text("outside data", encoding="utf-8")
|
||||
(linked_skill_dir / "SKILL.md").symlink_to(target)
|
||||
|
||||
skills = generate_index.generate_index(str(skills_dir), str(output_file))
|
||||
|
||||
self.assertEqual([skill["id"] for skill in skills], ["safe-skill"])
|
||||
written = json.loads(output_file.read_text(encoding="utf-8"))
|
||||
self.assertEqual([skill["id"] for skill in written], ["safe-skill"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
41
tools/scripts/tests/test_office_unpack_security.py
Normal file
41
tools/scripts/tests/test_office_unpack_security.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import importlib.util
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[3]
|
||||
|
||||
|
||||
def load_module(relative_path: str, module_name: str):
|
||||
module_path = REPO_ROOT / relative_path
|
||||
spec = importlib.util.spec_from_file_location(module_name, module_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
class OfficeUnpackSecurityTests(unittest.TestCase):
|
||||
def test_extract_archive_safely_blocks_zip_slip(self):
|
||||
module = load_module("skills/docx/ooxml/scripts/unpack.py", "docx_unpack")
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_path = Path(temp_dir)
|
||||
archive_path = temp_path / "payload.zip"
|
||||
output_dir = temp_path / "output"
|
||||
|
||||
with zipfile.ZipFile(archive_path, "w") as archive:
|
||||
archive.writestr("../escape.txt", "escape")
|
||||
archive.writestr("word/document.xml", "<w:document/>")
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
module.extract_archive_safely(archive_path, output_dir)
|
||||
|
||||
self.assertFalse((temp_path / "escape.txt").exists())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
33
tools/scripts/tests/test_skills_manager_security.py
Normal file
33
tools/scripts/tests/test_skills_manager_security.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
TOOLS_SCRIPTS_DIR = Path(__file__).resolve().parents[1]
|
||||
if str(TOOLS_SCRIPTS_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(TOOLS_SCRIPTS_DIR))
|
||||
|
||||
import skills_manager
|
||||
|
||||
|
||||
class SkillsManagerSecurityTests(unittest.TestCase):
|
||||
def test_rejects_path_traversal_skill_names(self):
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
root = Path(temp_dir)
|
||||
skills_manager.SKILLS_DIR = root / "skills"
|
||||
skills_manager.DISABLED_DIR = skills_manager.SKILLS_DIR / ".disabled"
|
||||
skills_manager.SKILLS_DIR.mkdir(parents=True)
|
||||
skills_manager.DISABLED_DIR.mkdir(parents=True)
|
||||
|
||||
outside = root / "outside"
|
||||
outside.mkdir()
|
||||
escaped = skills_manager.DISABLED_DIR.parent / "escaped-skill"
|
||||
escaped.mkdir()
|
||||
|
||||
self.assertFalse(skills_manager.enable_skill("../escaped-skill"))
|
||||
self.assertTrue(escaped.exists())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
46
tools/scripts/tests/test_sync_microsoft_skills_security.py
Normal file
46
tools/scripts/tests/test_sync_microsoft_skills_security.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
TOOLS_SCRIPTS_DIR = Path(__file__).resolve().parents[1]
|
||||
if str(TOOLS_SCRIPTS_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(TOOLS_SCRIPTS_DIR))
|
||||
|
||||
import sync_microsoft_skills as sms
|
||||
|
||||
|
||||
class SyncMicrosoftSkillsSecurityTests(unittest.TestCase):
|
||||
def test_sanitize_flat_name_rejects_path_traversal(self):
|
||||
sanitized = sms.sanitize_flat_name("../../.ssh", "fallback-name")
|
||||
self.assertEqual(sanitized, "fallback-name")
|
||||
|
||||
def test_find_skills_ignores_symlinks_outside_clone(self):
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
root = Path(temp_dir)
|
||||
skills_dir = root / "skills"
|
||||
skills_dir.mkdir()
|
||||
|
||||
safe_skill = root / ".github" / "skills" / "safe-skill"
|
||||
safe_skill.mkdir(parents=True)
|
||||
(safe_skill / "SKILL.md").write_text("---\nname: safe-skill\n---\n", encoding="utf-8")
|
||||
(skills_dir / "safe-skill").symlink_to(safe_skill, target_is_directory=True)
|
||||
|
||||
outside = Path(tempfile.mkdtemp())
|
||||
try:
|
||||
(outside / "SKILL.md").write_text("---\nname: leaked\n---\n", encoding="utf-8")
|
||||
(skills_dir / "escape").symlink_to(outside, target_is_directory=True)
|
||||
|
||||
entries = sms.find_skills_in_directory(root)
|
||||
relative_paths = {str(entry["relative_path"]) for entry in entries}
|
||||
|
||||
self.assertEqual(relative_paths, {"safe-skill"})
|
||||
finally:
|
||||
for child in outside.iterdir():
|
||||
child.unlink()
|
||||
outside.rmdir()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user