From 3efff111d20593112607350659650cb217d174d9 Mon Sep 17 00:00:00 2001 From: sickn33 Date: Sat, 21 Mar 2026 11:34:38 +0100 Subject: [PATCH] fix(security): Harden skill tooling file handling Guard metadata repair and doc sync scripts against symlink targets so repo maintenance tasks cannot overwrite arbitrary local files. Replace recursive skill discovery with an iterative walk that skips symlinked directories, and harden the VideoDB listener to write only private regular files in the user-owned state directory. Also fix the broken pr:preflight script entry and make the last30days skill stop embedding raw user arguments directly in the shell command. --- package.json | 2 +- skills/last30days/SKILL.md | 7 +- skills/videodb/scripts/ws_listener.py | 69 ++++++++++++--- tools/lib/skill-utils.js | 55 +++++++++--- tools/scripts/_safe_files.py | 11 +++ .../cleanup_synthetic_skill_sections.py | 7 ++ tools/scripts/fix_missing_skill_metadata.py | 7 ++ tools/scripts/fix_missing_skill_sections.py | 7 ++ tools/scripts/fix_truncated_descriptions.py | 7 ++ tools/scripts/sync_repo_metadata.py | 2 +- .../tests/skill_utils_security.test.js | 77 ++++++++++++++++- .../test_cleanup_synthetic_skill_sections.py | 26 ++++++ .../tests/test_fix_missing_skill_metadata.py | 26 ++++++ .../tests/test_fix_missing_skill_sections.py | 26 ++++++ .../tests/test_fix_truncated_descriptions.py | 28 ++++++ .../tests/test_last30days_skill_security.py | 18 ++++ .../scripts/tests/test_sync_repo_metadata.py | 20 +++++ .../tests/test_ws_listener_security.py | 85 +++++++++++++++++++ .../scripts/tests/workflow_contracts.test.js | 4 +- 19 files changed, 453 insertions(+), 31 deletions(-) create mode 100644 tools/scripts/_safe_files.py create mode 100644 tools/scripts/tests/test_last30days_skill_security.py create mode 100644 tools/scripts/tests/test_ws_listener_security.py diff --git a/package.json b/package.json index 42ad3237..b0881081 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "audit:consistency:github": "node tools/scripts/run-python.js tools/scripts/audit_consistency.py --check-github-about", "audit:maintainer": "node tools/scripts/run-python.js tools/scripts/maintainer_audit.py", "security:docs": "node tools/scripts/tests/docs_security_content.test.js", - "pr:preflight": "node tools/scripts/pr_preflight.js", + "pr:preflight": "node tools/scripts/pr_preflight.cjs", "release:preflight": "node tools/scripts/release_workflow.js preflight", "release:prepare": "node tools/scripts/release_workflow.js prepare", "release:publish": "node tools/scripts/release_workflow.js publish", diff --git a/skills/last30days/SKILL.md b/skills/last30days/SKILL.md index 44a52460..8b7e83f1 100644 --- a/skills/last30days/SKILL.md +++ b/skills/last30days/SKILL.md @@ -93,7 +93,12 @@ echo "Edit to add your API keys for enhanced research." **Step 1: Run the research script** ```bash -python3 ~/.claude/skills/last30days/scripts/last30days.py "$ARGUMENTS" --emit=compact 2>&1 +TOPIC_FILE="$(mktemp)" +trap 'rm -f "$TOPIC_FILE"' EXIT +cat <<'LAST30DAYS_TOPIC' > "$TOPIC_FILE" +$ARGUMENTS +LAST30DAYS_TOPIC +python3 ~/.claude/skills/last30days/scripts/last30days.py "$(cat "$TOPIC_FILE")" --emit=compact 2>&1 ``` The script will automatically: diff --git a/skills/videodb/scripts/ws_listener.py b/skills/videodb/scripts/ws_listener.py index aac8821c..3316de4a 100644 --- a/skills/videodb/scripts/ws_listener.py +++ b/skills/videodb/scripts/ws_listener.py @@ -6,7 +6,7 @@ Usage: python scripts/ws_listener.py [OPTIONS] [output_dir] Arguments: - output_dir Directory for output files (default: /tmp or VIDEODB_EVENTS_DIR env var) + output_dir Directory for output files (default: XDG_STATE_HOME/videodb-events or VIDEODB_EVENTS_DIR env var) Options: --clear Clear the events file before starting (use when starting a new session) @@ -20,14 +20,15 @@ Output (first line, for parsing): WS_ID= Examples: - python scripts/ws_listener.py & # Run in background - python scripts/ws_listener.py --clear # Clear events and start fresh - python scripts/ws_listener.py --clear /tmp/mydir # Custom dir with clear - kill $(cat /tmp/videodb_ws_pid) # Stop the listener + python scripts/ws_listener.py & # Run in background + python scripts/ws_listener.py --clear # Clear events and start fresh + python scripts/ws_listener.py --clear /path/mydir # Custom dir with clear + kill $(cat ~/.local/state/videodb-events/videodb_ws_pid) # Stop the listener """ import os import sys import json +import stat import signal import asyncio from datetime import datetime, timezone @@ -42,6 +43,8 @@ import videodb MAX_RETRIES = 10 INITIAL_BACKOFF = 1 # seconds MAX_BACKOFF = 60 # seconds +FILE_MODE = 0o600 +DIR_MODE = 0o700 # Parse arguments def parse_args(): @@ -78,18 +81,62 @@ def log(msg: str): print(f"[{ts}] {msg}", flush=True) +def ensure_output_dir(): + """Create the output directory if needed and reject symlinked paths.""" + if OUTPUT_DIR.exists(): + if OUTPUT_DIR.is_symlink(): + raise OSError(f"Refusing to use symlinked output directory: {OUTPUT_DIR}") + if not OUTPUT_DIR.is_dir(): + raise OSError(f"Output path is not a directory: {OUTPUT_DIR}") + return + + OUTPUT_DIR.mkdir(parents=True, mode=DIR_MODE, exist_ok=True) + + +def secure_open(path: Path, *, append: bool): + """Open a regular file without following symlinks.""" + ensure_output_dir() + flags = os.O_WRONLY | os.O_CREAT + flags |= os.O_APPEND if append else os.O_TRUNC + flags |= getattr(os, "O_NOFOLLOW", 0) + + fd = os.open(path, flags, FILE_MODE) + try: + file_stat = os.fstat(fd) + if not stat.S_ISREG(file_stat.st_mode): + raise OSError(f"Refusing to write non-regular file: {path}") + if file_stat.st_nlink != 1: + raise OSError(f"Refusing to write multiply linked file: {path}") + return fd + except Exception: + os.close(fd) + raise + + +def secure_write_text(path: Path, content: str): + """Write text to a regular file with private permissions.""" + fd = secure_open(path, append=False) + with os.fdopen(fd, "w", encoding="utf-8") as handle: + handle.write(content) + + +def secure_append_text(path: Path, content: str): + """Append text to a regular file with private permissions.""" + fd = secure_open(path, append=True) + with os.fdopen(fd, "a", encoding="utf-8") as handle: + handle.write(content) + + def append_event(event: dict): """Append event to JSONL file with timestamps.""" event["ts"] = datetime.now(timezone.utc).isoformat() event["unix_ts"] = datetime.now(timezone.utc).timestamp() - with open(EVENTS_FILE, "a") as f: - f.write(json.dumps(event) + "\n") + secure_append_text(EVENTS_FILE, json.dumps(event) + "\n") def write_pid(): """Write PID file for easy process management.""" - OUTPUT_DIR.mkdir(parents=True, exist_ok=True) - PID_FILE.write_text(str(os.getpid())) + secure_write_text(PID_FILE, str(os.getpid())) def cleanup_pid(): @@ -115,7 +162,7 @@ async def listen_with_retry(): ws_id = ws.connection_id # Ensure output directory exists - OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + ensure_output_dir() # Clear events file only on first connection if --clear flag is set if _first_connection and CLEAR_EVENTS: @@ -124,7 +171,7 @@ async def listen_with_retry(): _first_connection = False # Write ws_id to file for easy retrieval - WS_ID_FILE.write_text(ws_id) + secure_write_text(WS_ID_FILE, ws_id) # Print ws_id (parseable format for LLM) if retry_count == 0: diff --git a/tools/lib/skill-utils.js b/tools/lib/skill-utils.js index 42ebeca0..9a24c249 100644 --- a/tools/lib/skill-utils.js +++ b/tools/lib/skill-utils.js @@ -2,6 +2,24 @@ const fs = require('fs'); const path = require('path'); const yaml = require('yaml'); +function isSafeDirectory(dirPath) { + try { + const stats = fs.lstatSync(dirPath); + return stats.isDirectory() && !stats.isSymbolicLink(); + } catch { + return false; + } +} + +function isSafeSkillFile(skillPath) { + try { + const stats = fs.lstatSync(skillPath); + return stats.isFile() && !stats.isSymbolicLink(); + } catch { + return false; + } +} + function stripQuotes(value) { if (typeof value !== 'string') return value; if (value.length < 2) return value.trim(); @@ -152,11 +170,9 @@ function listSkillIds(skillsDir) { .filter(entry => { if (entry.startsWith('.')) return false; const dirPath = path.join(skillsDir, entry); - const entryStats = fs.lstatSync(dirPath); - if (entryStats.isSymbolicLink() || !entryStats.isDirectory()) return false; + if (!isSafeDirectory(dirPath)) return false; const skillPath = path.join(dirPath, 'SKILL.md'); - if (!fs.existsSync(skillPath)) return false; - return !fs.lstatSync(skillPath).isSymbolicLink(); + return isSafeSkillFile(skillPath); }) .sort(); } @@ -166,17 +182,28 @@ function listSkillIds(skillsDir) { * Matches generate_index.py behavior so catalog includes nested skills (e.g. game-development/2d-games). */ function listSkillIdsRecursive(skillsDir, baseDir = skillsDir, acc = []) { - const entries = fs.readdirSync(baseDir, { withFileTypes: true }); - for (const entry of entries) { - if (entry.name.startsWith('.')) continue; - if (!entry.isDirectory()) continue; - const dirPath = path.join(baseDir, entry.name); - const skillPath = path.join(dirPath, 'SKILL.md'); - const relPath = path.relative(skillsDir, dirPath); - if (fs.existsSync(skillPath) && !fs.lstatSync(skillPath).isSymbolicLink()) { - acc.push(relPath); + const stack = [baseDir]; + const visited = new Set(); + + while (stack.length > 0) { + const currentDir = stack.pop(); + const resolvedDir = path.resolve(currentDir); + if (visited.has(resolvedDir)) continue; + visited.add(resolvedDir); + + const entries = fs.readdirSync(currentDir, { withFileTypes: true }); + for (const entry of entries) { + if (entry.name.startsWith('.')) continue; + const dirPath = path.join(currentDir, entry.name); + if (!isSafeDirectory(dirPath)) continue; + + const skillPath = path.join(dirPath, 'SKILL.md'); + const relPath = path.relative(skillsDir, dirPath); + if (isSafeSkillFile(skillPath)) { + acc.push(relPath); + } + stack.push(dirPath); } - listSkillIdsRecursive(skillsDir, dirPath, acc); } return acc.sort(); } diff --git a/tools/scripts/_safe_files.py b/tools/scripts/_safe_files.py new file mode 100644 index 00000000..f273b628 --- /dev/null +++ b/tools/scripts/_safe_files.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from pathlib import Path + + +def is_safe_regular_file(path: str | Path) -> bool: + candidate = Path(path) + try: + return candidate.is_file() and not candidate.is_symlink() + except OSError: + return False diff --git a/tools/scripts/cleanup_synthetic_skill_sections.py b/tools/scripts/cleanup_synthetic_skill_sections.py index 546d4c12..39d19c82 100644 --- a/tools/scripts/cleanup_synthetic_skill_sections.py +++ b/tools/scripts/cleanup_synthetic_skill_sections.py @@ -8,6 +8,7 @@ import subprocess import sys from pathlib import Path +from _safe_files import is_safe_regular_file from _project_paths import find_repo_root from fix_missing_skill_sections import ( build_examples_section, @@ -47,6 +48,9 @@ def remove_exact_section(content: str, section_text: str) -> str: def cleanup_skill_file(repo_root: Path, skill_path: Path) -> tuple[bool, list[str]]: + if not is_safe_regular_file(skill_path): + return False, [] + current_content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(current_content, skill_path.as_posix()) if not metadata: @@ -100,6 +104,9 @@ def main() -> int: continue skill_path = Path(root) / "SKILL.md" + if not is_safe_regular_file(skill_path): + print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]") + continue current_content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(current_content, skill_path.as_posix()) if not metadata or not isinstance(metadata.get("description"), str): diff --git a/tools/scripts/fix_missing_skill_metadata.py b/tools/scripts/fix_missing_skill_metadata.py index 39dca512..0c88b988 100644 --- a/tools/scripts/fix_missing_skill_metadata.py +++ b/tools/scripts/fix_missing_skill_metadata.py @@ -7,6 +7,7 @@ import re import sys from pathlib import Path +from _safe_files import is_safe_regular_file from _project_paths import find_repo_root from validate_skills import configure_utf8_output, parse_frontmatter @@ -141,6 +142,9 @@ def insert_metadata_keys(frontmatter_text: str, additions: dict[str, str]) -> st def update_skill_file(skill_path: Path) -> tuple[bool, list[str]]: + if not is_safe_regular_file(skill_path): + return False, [] + content = skill_path.read_text(encoding="utf-8") repaired_content = repair_malformed_injected_metadata(content) if repaired_content != content: @@ -197,6 +201,9 @@ def main() -> int: continue skill_path = Path(root) / "SKILL.md" + if not is_safe_regular_file(skill_path): + print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]") + continue content = skill_path.read_text(encoding="utf-8") repaired_content = repair_malformed_injected_metadata(content) if repaired_content != content: diff --git a/tools/scripts/fix_missing_skill_sections.py b/tools/scripts/fix_missing_skill_sections.py index e315faba..e53de856 100644 --- a/tools/scripts/fix_missing_skill_sections.py +++ b/tools/scripts/fix_missing_skill_sections.py @@ -7,6 +7,7 @@ import re import sys from pathlib import Path +from _safe_files import is_safe_regular_file from _project_paths import find_repo_root from validate_skills import configure_utf8_output, has_when_to_use_section, parse_frontmatter @@ -115,6 +116,9 @@ def append_section(content: str, section_text: str) -> str: def update_skill_file(skill_path: Path, *, add_missing: bool = False) -> tuple[bool, list[str]]: + if not is_safe_regular_file(skill_path): + return False, [] + content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(content, skill_path.as_posix()) if not metadata: @@ -166,6 +170,9 @@ def main() -> int: continue skill_path = Path(root) / "SKILL.md" + if not is_safe_regular_file(skill_path): + print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]") + continue content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(content, skill_path.as_posix()) if not metadata or not isinstance(metadata.get("description"), str): diff --git a/tools/scripts/fix_truncated_descriptions.py b/tools/scripts/fix_truncated_descriptions.py index 00a2c00f..5ca9a281 100644 --- a/tools/scripts/fix_truncated_descriptions.py +++ b/tools/scripts/fix_truncated_descriptions.py @@ -7,6 +7,7 @@ import re import sys from pathlib import Path +from _safe_files import is_safe_regular_file from _project_paths import find_repo_root from validate_skills import configure_utf8_output, parse_frontmatter @@ -167,6 +168,9 @@ def replace_description(frontmatter_text: str, new_description: str) -> str: def update_skill_file(skill_path: Path) -> tuple[bool, str | None]: + if not is_safe_regular_file(skill_path): + return False, None + content = skill_path.read_text(encoding="utf-8") match = FRONTMATTER_PATTERN.search(content) if not match: @@ -215,6 +219,9 @@ def main() -> int: continue skill_path = Path(root) / "SKILL.md" + if not is_safe_regular_file(skill_path): + print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]") + continue content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(content, skill_path.as_posix()) description = metadata.get("description") if metadata else None diff --git a/tools/scripts/sync_repo_metadata.py b/tools/scripts/sync_repo_metadata.py index e34a4bb8..16c4c567 100644 --- a/tools/scripts/sync_repo_metadata.py +++ b/tools/scripts/sync_repo_metadata.py @@ -218,7 +218,7 @@ def sync_regex_text(content: str, replacements: list[tuple[str, str]]) -> str: def update_text_file(path: Path, transform, metadata: dict, dry_run: bool) -> bool: - if not path.is_file(): + if not path.is_file() or path.is_symlink(): return False original = path.read_text(encoding="utf-8") diff --git a/tools/scripts/tests/skill_utils_security.test.js b/tools/scripts/tests/skill_utils_security.test.js index f71df8fb..3903e239 100644 --- a/tools/scripts/tests/skill_utils_security.test.js +++ b/tools/scripts/tests/skill_utils_security.test.js @@ -3,7 +3,7 @@ const fs = require("fs"); const os = require("os"); const path = require("path"); -const { listSkillIds } = require("../../lib/skill-utils"); +const { listSkillIds, listSkillIdsRecursive } = require("../../lib/skill-utils"); function withTempDir(fn) { const dir = fs.mkdtempSync(path.join(os.tmpdir(), "skill-utils-security-")); @@ -35,3 +35,78 @@ withTempDir((root) => { "symlinked skill directories must not be treated as local skills", ); }); + +withTempDir((root) => { + const skillsDir = path.join(root, "skills"); + const outsideDir = path.join(root, "outside-secret"); + + fs.mkdirSync(skillsDir, { recursive: true }); + fs.mkdirSync(outsideDir, { recursive: true }); + + fs.mkdirSync(path.join(skillsDir, "nested", "safe-skill"), { recursive: true }); + fs.writeFileSync(path.join(skillsDir, "nested", "safe-skill", "SKILL.md"), "# safe\n"); + + fs.mkdirSync(path.join(outsideDir, "loop-target"), { recursive: true }); + fs.symlinkSync(outsideDir, path.join(skillsDir, "nested", "linked-secret")); + + const skillIds = listSkillIdsRecursive(skillsDir); + + assert.deepStrictEqual( + skillIds, + ["nested/safe-skill"], + "recursive skill listing must ignore symlinked directories", + ); +}); + +withTempDir((root) => { + const skillsDir = path.join(root, "skills"); + let currentDir = skillsDir; + + fs.mkdirSync(skillsDir, { recursive: true }); + currentDir = skillsDir; + const originalReaddirSync = fs.readdirSync; + const originalLstatSync = fs.lstatSync; + + try { + const depth = 1500; + const directorySet = new Set([skillsDir]); + for (let i = 0; i < depth; i += 1) { + currentDir = path.join(currentDir, `d${i}`); + directorySet.add(currentDir); + } + const deepestSkill = path.join(currentDir, "SKILL.md"); + + fs.readdirSync = (targetPath, options) => { + if (targetPath === skillsDir) { + return [{ name: "d0", isDirectory: () => true }]; + } + + const match = targetPath.match(/\/d(\d+)$/); + if (!match) return originalReaddirSync(targetPath, options); + + const index = Number(match[1]); + if (index >= depth - 1) { + return []; + } + return [{ name: `d${index + 1}`, isDirectory: () => true }]; + }; + + fs.lstatSync = (targetPath) => { + if (directorySet.has(targetPath)) { + return { isDirectory: () => true, isSymbolicLink: () => false, isFile: () => false }; + } + if (targetPath === deepestSkill) { + return { isDirectory: () => false, isSymbolicLink: () => false, isFile: () => true }; + } + return originalLstatSync(targetPath); + }; + + const skillIds = listSkillIdsRecursive(skillsDir); + + assert.strictEqual(skillIds.length, 1, "deep trees should still produce exactly one skill"); + assert.match(skillIds[0], /d1499$/, "deepest nested skill should be discovered without stack overflow"); + } finally { + fs.readdirSync = originalReaddirSync; + fs.lstatSync = originalLstatSync; + } +}); diff --git a/tools/scripts/tests/test_cleanup_synthetic_skill_sections.py b/tools/scripts/tests/test_cleanup_synthetic_skill_sections.py index 2eba4a05..b1ec5ac6 100644 --- a/tools/scripts/tests/test_cleanup_synthetic_skill_sections.py +++ b/tools/scripts/tests/test_cleanup_synthetic_skill_sections.py @@ -157,6 +157,32 @@ description: {description} self.assertEqual(changes, []) self.assertEqual(updated, current_content) + def test_cleanup_skill_file_skips_symlinked_skill_markdown(self): + with tempfile.TemporaryDirectory() as temp_dir: + repo_root = Path(temp_dir) + skill_dir = repo_root / "skills" / "demo" + outside_dir = repo_root / "outside" + skill_dir.mkdir(parents=True) + outside_dir.mkdir() + + target = outside_dir / "SKILL.md" + original = """--- +name: demo +description: Build and distribute Expo development clients locally or via TestFlight. +--- + +# Demo +""" + target.write_text(original, encoding="utf-8") + skill_path = skill_dir / "SKILL.md" + skill_path.symlink_to(target) + + changed, changes = cleanup_synthetic_skill_sections.cleanup_skill_file(repo_root, skill_path) + + self.assertFalse(changed) + self.assertEqual(changes, []) + self.assertEqual(target.read_text(encoding="utf-8"), original) + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_fix_missing_skill_metadata.py b/tools/scripts/tests/test_fix_missing_skill_metadata.py index 4d16cf52..64ea12df 100644 --- a/tools/scripts/tests/test_fix_missing_skill_metadata.py +++ b/tools/scripts/tests/test_fix_missing_skill_metadata.py @@ -137,6 +137,32 @@ source: community repaired = fix_missing_skill_metadata.repair_malformed_injected_metadata(content) self.assertIn("risk: unknown\nsource: community\nmetadata:\n author: sanjay3290", repaired) + def test_update_skill_file_skips_symlinked_skill_markdown(self): + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + skill_dir = root / "skill" + outside_dir = root / "outside" + skill_dir.mkdir() + outside_dir.mkdir() + + target = outside_dir / "SKILL.md" + original = """--- +name: outside +description: "External file." +--- + +# Outside +""" + target.write_text(original, encoding="utf-8") + skill_path = skill_dir / "SKILL.md" + skill_path.symlink_to(target) + + changed, changes = fix_missing_skill_metadata.update_skill_file(skill_path) + + self.assertFalse(changed) + self.assertEqual(changes, []) + self.assertEqual(target.read_text(encoding="utf-8"), original) + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_fix_missing_skill_sections.py b/tools/scripts/tests/test_fix_missing_skill_sections.py index 96117163..dcdd9b3c 100644 --- a/tools/scripts/tests/test_fix_missing_skill_sections.py +++ b/tools/scripts/tests/test_fix_missing_skill_sections.py @@ -122,6 +122,32 @@ Activate this skill when: self.assertIn("## When to Use", updated) self.assertNotIn("## Examples", updated) + def test_update_skill_file_skips_symlinked_skill_markdown(self): + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + skill_dir = root / "skill" + outside_dir = root / "outside" + skill_dir.mkdir() + outside_dir.mkdir() + + target = outside_dir / "SKILL.md" + original = """--- +name: outside +description: Demo description. +--- + +# Outside +""" + target.write_text(original, encoding="utf-8") + skill_path = skill_dir / "SKILL.md" + skill_path.symlink_to(target) + + changed, changes = fix_missing_skill_sections.update_skill_file(skill_path, add_missing=True) + + self.assertFalse(changed) + self.assertEqual(changes, []) + self.assertEqual(target.read_text(encoding="utf-8"), original) + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_fix_truncated_descriptions.py b/tools/scripts/tests/test_fix_truncated_descriptions.py index fdf68a98..5ed20db3 100644 --- a/tools/scripts/tests/test_fix_truncated_descriptions.py +++ b/tools/scripts/tests/test_fix_truncated_descriptions.py @@ -107,6 +107,34 @@ Lightweight calendar automation with standalone OAuth authentication and event m updated, ) + def test_update_skill_file_skips_symlinked_skill_markdown(self): + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + skill_dir = root / "skill" + outside_dir = root / "outside" + skill_dir.mkdir() + outside_dir.mkdir() + + target = outside_dir / "SKILL.md" + original = """--- +name: outside +description: "This description is truncated..." +--- + +# Outside + +This paragraph should never be written back through a symlink. +""" + target.write_text(original, encoding="utf-8") + skill_path = skill_dir / "SKILL.md" + skill_path.symlink_to(target) + + changed, new_description = fix_truncated_descriptions.update_skill_file(skill_path) + + self.assertFalse(changed) + self.assertIsNone(new_description) + self.assertEqual(target.read_text(encoding="utf-8"), original) + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_last30days_skill_security.py b/tools/scripts/tests/test_last30days_skill_security.py new file mode 100644 index 00000000..7d1286b4 --- /dev/null +++ b/tools/scripts/tests/test_last30days_skill_security.py @@ -0,0 +1,18 @@ +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[3] +SKILL_PATH = REPO_ROOT / "skills" / "last30days" / "SKILL.md" + + +class Last30DaysSkillSecurityTests(unittest.TestCase): + def test_skill_does_not_embed_user_arguments_directly_in_shell_command(self): + content = SKILL_PATH.read_text(encoding="utf-8") + + self.assertNotIn('last30days.py "$ARGUMENTS"', content) + self.assertIn("cat <<'LAST30DAYS_TOPIC'", content) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/scripts/tests/test_sync_repo_metadata.py b/tools/scripts/tests/test_sync_repo_metadata.py index f9ad61f7..1b40f0d2 100644 --- a/tools/scripts/tests/test_sync_repo_metadata.py +++ b/tools/scripts/tests/test_sync_repo_metadata.py @@ -151,6 +151,26 @@ If you want a faster answer than "browse all 1,273+ skills", start with a tool-s self.assertIn("names[]=claude-code", topics_args) self.assertIn("names[]=skill-library", topics_args) + def test_update_text_file_skips_symlinked_targets(self): + metadata = {"version": "8.4.0"} + + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + outside = root / "outside.md" + outside.write_text("original", encoding="utf-8") + linked = root / "README.md" + linked.symlink_to(outside) + + changed = sync_repo_metadata.update_text_file( + linked, + lambda content, current_metadata: "rewritten", + metadata, + dry_run=False, + ) + + self.assertFalse(changed) + self.assertEqual(outside.read_text(encoding="utf-8"), "original") + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_ws_listener_security.py b/tools/scripts/tests/test_ws_listener_security.py new file mode 100644 index 00000000..26603d95 --- /dev/null +++ b/tools/scripts/tests/test_ws_listener_security.py @@ -0,0 +1,85 @@ +import importlib.util +import os +import stat +import sys +import tempfile +import types +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[3] +MODULE_PATH = REPO_ROOT / "skills" / "videodb" / "scripts" / "ws_listener.py" + + +def load_module(module_name: str, state_home: Path): + fake_videodb = types.ModuleType("videodb") + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda: None + + old_argv = sys.argv[:] + old_xdg = os.environ.get("XDG_STATE_HOME") + sys.argv = [str(MODULE_PATH)] + os.environ["XDG_STATE_HOME"] = str(state_home) + sys.modules["videodb"] = fake_videodb + sys.modules["dotenv"] = fake_dotenv + try: + spec = importlib.util.spec_from_file_location(module_name, MODULE_PATH) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + sys.modules[module_name] = module + spec.loader.exec_module(module) + return module + finally: + sys.argv = old_argv + if old_xdg is None: + os.environ.pop("XDG_STATE_HOME", None) + else: + os.environ["XDG_STATE_HOME"] = old_xdg + + +class WsListenerSecurityTests(unittest.TestCase): + def test_write_pid_rejects_symlink_target(self): + with tempfile.TemporaryDirectory() as temp_dir: + state_home = Path(temp_dir) + module = load_module("ws_listener_security_pid", state_home) + + module.OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + outside = state_home / "outside.txt" + outside.write_text("secret", encoding="utf-8") + module.PID_FILE.symlink_to(outside) + + with self.assertRaises(OSError): + module.write_pid() + + self.assertEqual(outside.read_text(encoding="utf-8"), "secret") + + def test_append_event_creates_private_regular_file(self): + with tempfile.TemporaryDirectory() as temp_dir: + state_home = Path(temp_dir) + module = load_module("ws_listener_security_append", state_home) + + module.append_event({"channel": "demo"}) + + self.assertTrue(module.EVENTS_FILE.is_file()) + file_mode = stat.S_IMODE(module.EVENTS_FILE.stat().st_mode) + self.assertEqual(file_mode, 0o600) + + def test_append_event_rejects_symlink_target(self): + with tempfile.TemporaryDirectory() as temp_dir: + state_home = Path(temp_dir) + module = load_module("ws_listener_security_symlink", state_home) + + module.OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + outside = state_home / "outside.jsonl" + outside.write_text("secret\n", encoding="utf-8") + module.EVENTS_FILE.symlink_to(outside) + + with self.assertRaises(OSError): + module.append_event({"channel": "demo"}) + + self.assertEqual(outside.read_text(encoding="utf-8"), "secret\n") + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/scripts/tests/workflow_contracts.test.js b/tools/scripts/tests/workflow_contracts.test.js index 033070aa..36bb08cd 100644 --- a/tools/scripts/tests/workflow_contracts.test.js +++ b/tools/scripts/tests/workflow_contracts.test.js @@ -32,10 +32,10 @@ assert.deepStrictEqual(docsOnly.categories, ["docs"]); assert.strictEqual(docsOnly.primaryCategory, "docs"); assert.strictEqual(requiresReferencesValidation(["README.md"], contract), true); -const infraChange = classifyChangedFiles([".github/workflows/ci.yml", "tools/scripts/pr_preflight.js"], contract); +const infraChange = classifyChangedFiles([".github/workflows/ci.yml", "tools/scripts/pr_preflight.cjs"], contract); assert.deepStrictEqual(infraChange.categories, ["infra"]); assert.strictEqual(infraChange.primaryCategory, "infra"); -assert.strictEqual(requiresReferencesValidation(["tools/scripts/pr_preflight.js"], contract), true); +assert.strictEqual(requiresReferencesValidation(["tools/scripts/pr_preflight.cjs"], contract), true); const mixedChange = classifyChangedFiles(["skills/example/SKILL.md", "README.md"], contract); assert.deepStrictEqual(mixedChange.categories, ["skill", "docs"]);