diff --git a/package.json b/package.json index 42ad3237..b0881081 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "audit:consistency:github": "node tools/scripts/run-python.js tools/scripts/audit_consistency.py --check-github-about", "audit:maintainer": "node tools/scripts/run-python.js tools/scripts/maintainer_audit.py", "security:docs": "node tools/scripts/tests/docs_security_content.test.js", - "pr:preflight": "node tools/scripts/pr_preflight.js", + "pr:preflight": "node tools/scripts/pr_preflight.cjs", "release:preflight": "node tools/scripts/release_workflow.js preflight", "release:prepare": "node tools/scripts/release_workflow.js prepare", "release:publish": "node tools/scripts/release_workflow.js publish", diff --git a/skills/last30days/SKILL.md b/skills/last30days/SKILL.md index 44a52460..8b7e83f1 100644 --- a/skills/last30days/SKILL.md +++ b/skills/last30days/SKILL.md @@ -93,7 +93,12 @@ echo "Edit to add your API keys for enhanced research." **Step 1: Run the research script** ```bash -python3 ~/.claude/skills/last30days/scripts/last30days.py "$ARGUMENTS" --emit=compact 2>&1 +TOPIC_FILE="$(mktemp)" +trap 'rm -f "$TOPIC_FILE"' EXIT +cat <<'LAST30DAYS_TOPIC' > "$TOPIC_FILE" +$ARGUMENTS +LAST30DAYS_TOPIC +python3 ~/.claude/skills/last30days/scripts/last30days.py "$(cat "$TOPIC_FILE")" --emit=compact 2>&1 ``` The script will automatically: diff --git a/skills/videodb/scripts/ws_listener.py b/skills/videodb/scripts/ws_listener.py index aac8821c..3316de4a 100644 --- a/skills/videodb/scripts/ws_listener.py +++ b/skills/videodb/scripts/ws_listener.py @@ -6,7 +6,7 @@ Usage: python scripts/ws_listener.py [OPTIONS] [output_dir] Arguments: - output_dir Directory for output files (default: /tmp or VIDEODB_EVENTS_DIR env var) + output_dir Directory for output files (default: XDG_STATE_HOME/videodb-events or VIDEODB_EVENTS_DIR env var) Options: --clear Clear the events file before starting (use when starting a new session) @@ -20,14 +20,15 @@ Output (first line, for parsing): WS_ID= Examples: - python scripts/ws_listener.py & # Run in background - python scripts/ws_listener.py --clear # Clear events and start fresh - python scripts/ws_listener.py --clear /tmp/mydir # Custom dir with clear - kill $(cat /tmp/videodb_ws_pid) # Stop the listener + python scripts/ws_listener.py & # Run in background + python scripts/ws_listener.py --clear # Clear events and start fresh + python scripts/ws_listener.py --clear /path/mydir # Custom dir with clear + kill $(cat ~/.local/state/videodb-events/videodb_ws_pid) # Stop the listener """ import os import sys import json +import stat import signal import asyncio from datetime import datetime, timezone @@ -42,6 +43,8 @@ import videodb MAX_RETRIES = 10 INITIAL_BACKOFF = 1 # seconds MAX_BACKOFF = 60 # seconds +FILE_MODE = 0o600 +DIR_MODE = 0o700 # Parse arguments def parse_args(): @@ -78,18 +81,62 @@ def log(msg: str): print(f"[{ts}] {msg}", flush=True) +def ensure_output_dir(): + """Create the output directory if needed and reject symlinked paths.""" + if OUTPUT_DIR.exists(): + if OUTPUT_DIR.is_symlink(): + raise OSError(f"Refusing to use symlinked output directory: {OUTPUT_DIR}") + if not OUTPUT_DIR.is_dir(): + raise OSError(f"Output path is not a directory: {OUTPUT_DIR}") + return + + OUTPUT_DIR.mkdir(parents=True, mode=DIR_MODE, exist_ok=True) + + +def secure_open(path: Path, *, append: bool): + """Open a regular file without following symlinks.""" + ensure_output_dir() + flags = os.O_WRONLY | os.O_CREAT + flags |= os.O_APPEND if append else os.O_TRUNC + flags |= getattr(os, "O_NOFOLLOW", 0) + + fd = os.open(path, flags, FILE_MODE) + try: + file_stat = os.fstat(fd) + if not stat.S_ISREG(file_stat.st_mode): + raise OSError(f"Refusing to write non-regular file: {path}") + if file_stat.st_nlink != 1: + raise OSError(f"Refusing to write multiply linked file: {path}") + return fd + except Exception: + os.close(fd) + raise + + +def secure_write_text(path: Path, content: str): + """Write text to a regular file with private permissions.""" + fd = secure_open(path, append=False) + with os.fdopen(fd, "w", encoding="utf-8") as handle: + handle.write(content) + + +def secure_append_text(path: Path, content: str): + """Append text to a regular file with private permissions.""" + fd = secure_open(path, append=True) + with os.fdopen(fd, "a", encoding="utf-8") as handle: + handle.write(content) + + def append_event(event: dict): """Append event to JSONL file with timestamps.""" event["ts"] = datetime.now(timezone.utc).isoformat() event["unix_ts"] = datetime.now(timezone.utc).timestamp() - with open(EVENTS_FILE, "a") as f: - f.write(json.dumps(event) + "\n") + secure_append_text(EVENTS_FILE, json.dumps(event) + "\n") def write_pid(): """Write PID file for easy process management.""" - OUTPUT_DIR.mkdir(parents=True, exist_ok=True) - PID_FILE.write_text(str(os.getpid())) + secure_write_text(PID_FILE, str(os.getpid())) def cleanup_pid(): @@ -115,7 +162,7 @@ async def listen_with_retry(): ws_id = ws.connection_id # Ensure output directory exists - OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + ensure_output_dir() # Clear events file only on first connection if --clear flag is set if _first_connection and CLEAR_EVENTS: @@ -124,7 +171,7 @@ async def listen_with_retry(): _first_connection = False # Write ws_id to file for easy retrieval - WS_ID_FILE.write_text(ws_id) + secure_write_text(WS_ID_FILE, ws_id) # Print ws_id (parseable format for LLM) if retry_count == 0: diff --git a/tools/lib/skill-utils.js b/tools/lib/skill-utils.js index 42ebeca0..9a24c249 100644 --- a/tools/lib/skill-utils.js +++ b/tools/lib/skill-utils.js @@ -2,6 +2,24 @@ const fs = require('fs'); const path = require('path'); const yaml = require('yaml'); +function isSafeDirectory(dirPath) { + try { + const stats = fs.lstatSync(dirPath); + return stats.isDirectory() && !stats.isSymbolicLink(); + } catch { + return false; + } +} + +function isSafeSkillFile(skillPath) { + try { + const stats = fs.lstatSync(skillPath); + return stats.isFile() && !stats.isSymbolicLink(); + } catch { + return false; + } +} + function stripQuotes(value) { if (typeof value !== 'string') return value; if (value.length < 2) return value.trim(); @@ -152,11 +170,9 @@ function listSkillIds(skillsDir) { .filter(entry => { if (entry.startsWith('.')) return false; const dirPath = path.join(skillsDir, entry); - const entryStats = fs.lstatSync(dirPath); - if (entryStats.isSymbolicLink() || !entryStats.isDirectory()) return false; + if (!isSafeDirectory(dirPath)) return false; const skillPath = path.join(dirPath, 'SKILL.md'); - if (!fs.existsSync(skillPath)) return false; - return !fs.lstatSync(skillPath).isSymbolicLink(); + return isSafeSkillFile(skillPath); }) .sort(); } @@ -166,17 +182,28 @@ function listSkillIds(skillsDir) { * Matches generate_index.py behavior so catalog includes nested skills (e.g. game-development/2d-games). */ function listSkillIdsRecursive(skillsDir, baseDir = skillsDir, acc = []) { - const entries = fs.readdirSync(baseDir, { withFileTypes: true }); - for (const entry of entries) { - if (entry.name.startsWith('.')) continue; - if (!entry.isDirectory()) continue; - const dirPath = path.join(baseDir, entry.name); - const skillPath = path.join(dirPath, 'SKILL.md'); - const relPath = path.relative(skillsDir, dirPath); - if (fs.existsSync(skillPath) && !fs.lstatSync(skillPath).isSymbolicLink()) { - acc.push(relPath); + const stack = [baseDir]; + const visited = new Set(); + + while (stack.length > 0) { + const currentDir = stack.pop(); + const resolvedDir = path.resolve(currentDir); + if (visited.has(resolvedDir)) continue; + visited.add(resolvedDir); + + const entries = fs.readdirSync(currentDir, { withFileTypes: true }); + for (const entry of entries) { + if (entry.name.startsWith('.')) continue; + const dirPath = path.join(currentDir, entry.name); + if (!isSafeDirectory(dirPath)) continue; + + const skillPath = path.join(dirPath, 'SKILL.md'); + const relPath = path.relative(skillsDir, dirPath); + if (isSafeSkillFile(skillPath)) { + acc.push(relPath); + } + stack.push(dirPath); } - listSkillIdsRecursive(skillsDir, dirPath, acc); } return acc.sort(); } diff --git a/tools/scripts/_safe_files.py b/tools/scripts/_safe_files.py new file mode 100644 index 00000000..f273b628 --- /dev/null +++ b/tools/scripts/_safe_files.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from pathlib import Path + + +def is_safe_regular_file(path: str | Path) -> bool: + candidate = Path(path) + try: + return candidate.is_file() and not candidate.is_symlink() + except OSError: + return False diff --git a/tools/scripts/cleanup_synthetic_skill_sections.py b/tools/scripts/cleanup_synthetic_skill_sections.py index 546d4c12..39d19c82 100644 --- a/tools/scripts/cleanup_synthetic_skill_sections.py +++ b/tools/scripts/cleanup_synthetic_skill_sections.py @@ -8,6 +8,7 @@ import subprocess import sys from pathlib import Path +from _safe_files import is_safe_regular_file from _project_paths import find_repo_root from fix_missing_skill_sections import ( build_examples_section, @@ -47,6 +48,9 @@ def remove_exact_section(content: str, section_text: str) -> str: def cleanup_skill_file(repo_root: Path, skill_path: Path) -> tuple[bool, list[str]]: + if not is_safe_regular_file(skill_path): + return False, [] + current_content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(current_content, skill_path.as_posix()) if not metadata: @@ -100,6 +104,9 @@ def main() -> int: continue skill_path = Path(root) / "SKILL.md" + if not is_safe_regular_file(skill_path): + print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]") + continue current_content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(current_content, skill_path.as_posix()) if not metadata or not isinstance(metadata.get("description"), str): diff --git a/tools/scripts/fix_missing_skill_metadata.py b/tools/scripts/fix_missing_skill_metadata.py index 39dca512..0c88b988 100644 --- a/tools/scripts/fix_missing_skill_metadata.py +++ b/tools/scripts/fix_missing_skill_metadata.py @@ -7,6 +7,7 @@ import re import sys from pathlib import Path +from _safe_files import is_safe_regular_file from _project_paths import find_repo_root from validate_skills import configure_utf8_output, parse_frontmatter @@ -141,6 +142,9 @@ def insert_metadata_keys(frontmatter_text: str, additions: dict[str, str]) -> st def update_skill_file(skill_path: Path) -> tuple[bool, list[str]]: + if not is_safe_regular_file(skill_path): + return False, [] + content = skill_path.read_text(encoding="utf-8") repaired_content = repair_malformed_injected_metadata(content) if repaired_content != content: @@ -197,6 +201,9 @@ def main() -> int: continue skill_path = Path(root) / "SKILL.md" + if not is_safe_regular_file(skill_path): + print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]") + continue content = skill_path.read_text(encoding="utf-8") repaired_content = repair_malformed_injected_metadata(content) if repaired_content != content: diff --git a/tools/scripts/fix_missing_skill_sections.py b/tools/scripts/fix_missing_skill_sections.py index e315faba..e53de856 100644 --- a/tools/scripts/fix_missing_skill_sections.py +++ b/tools/scripts/fix_missing_skill_sections.py @@ -7,6 +7,7 @@ import re import sys from pathlib import Path +from _safe_files import is_safe_regular_file from _project_paths import find_repo_root from validate_skills import configure_utf8_output, has_when_to_use_section, parse_frontmatter @@ -115,6 +116,9 @@ def append_section(content: str, section_text: str) -> str: def update_skill_file(skill_path: Path, *, add_missing: bool = False) -> tuple[bool, list[str]]: + if not is_safe_regular_file(skill_path): + return False, [] + content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(content, skill_path.as_posix()) if not metadata: @@ -166,6 +170,9 @@ def main() -> int: continue skill_path = Path(root) / "SKILL.md" + if not is_safe_regular_file(skill_path): + print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]") + continue content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(content, skill_path.as_posix()) if not metadata or not isinstance(metadata.get("description"), str): diff --git a/tools/scripts/fix_truncated_descriptions.py b/tools/scripts/fix_truncated_descriptions.py index 00a2c00f..5ca9a281 100644 --- a/tools/scripts/fix_truncated_descriptions.py +++ b/tools/scripts/fix_truncated_descriptions.py @@ -7,6 +7,7 @@ import re import sys from pathlib import Path +from _safe_files import is_safe_regular_file from _project_paths import find_repo_root from validate_skills import configure_utf8_output, parse_frontmatter @@ -167,6 +168,9 @@ def replace_description(frontmatter_text: str, new_description: str) -> str: def update_skill_file(skill_path: Path) -> tuple[bool, str | None]: + if not is_safe_regular_file(skill_path): + return False, None + content = skill_path.read_text(encoding="utf-8") match = FRONTMATTER_PATTERN.search(content) if not match: @@ -215,6 +219,9 @@ def main() -> int: continue skill_path = Path(root) / "SKILL.md" + if not is_safe_regular_file(skill_path): + print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]") + continue content = skill_path.read_text(encoding="utf-8") metadata, _ = parse_frontmatter(content, skill_path.as_posix()) description = metadata.get("description") if metadata else None diff --git a/tools/scripts/sync_repo_metadata.py b/tools/scripts/sync_repo_metadata.py index e34a4bb8..16c4c567 100644 --- a/tools/scripts/sync_repo_metadata.py +++ b/tools/scripts/sync_repo_metadata.py @@ -218,7 +218,7 @@ def sync_regex_text(content: str, replacements: list[tuple[str, str]]) -> str: def update_text_file(path: Path, transform, metadata: dict, dry_run: bool) -> bool: - if not path.is_file(): + if not path.is_file() or path.is_symlink(): return False original = path.read_text(encoding="utf-8") diff --git a/tools/scripts/tests/skill_utils_security.test.js b/tools/scripts/tests/skill_utils_security.test.js index f71df8fb..3903e239 100644 --- a/tools/scripts/tests/skill_utils_security.test.js +++ b/tools/scripts/tests/skill_utils_security.test.js @@ -3,7 +3,7 @@ const fs = require("fs"); const os = require("os"); const path = require("path"); -const { listSkillIds } = require("../../lib/skill-utils"); +const { listSkillIds, listSkillIdsRecursive } = require("../../lib/skill-utils"); function withTempDir(fn) { const dir = fs.mkdtempSync(path.join(os.tmpdir(), "skill-utils-security-")); @@ -35,3 +35,78 @@ withTempDir((root) => { "symlinked skill directories must not be treated as local skills", ); }); + +withTempDir((root) => { + const skillsDir = path.join(root, "skills"); + const outsideDir = path.join(root, "outside-secret"); + + fs.mkdirSync(skillsDir, { recursive: true }); + fs.mkdirSync(outsideDir, { recursive: true }); + + fs.mkdirSync(path.join(skillsDir, "nested", "safe-skill"), { recursive: true }); + fs.writeFileSync(path.join(skillsDir, "nested", "safe-skill", "SKILL.md"), "# safe\n"); + + fs.mkdirSync(path.join(outsideDir, "loop-target"), { recursive: true }); + fs.symlinkSync(outsideDir, path.join(skillsDir, "nested", "linked-secret")); + + const skillIds = listSkillIdsRecursive(skillsDir); + + assert.deepStrictEqual( + skillIds, + ["nested/safe-skill"], + "recursive skill listing must ignore symlinked directories", + ); +}); + +withTempDir((root) => { + const skillsDir = path.join(root, "skills"); + let currentDir = skillsDir; + + fs.mkdirSync(skillsDir, { recursive: true }); + currentDir = skillsDir; + const originalReaddirSync = fs.readdirSync; + const originalLstatSync = fs.lstatSync; + + try { + const depth = 1500; + const directorySet = new Set([skillsDir]); + for (let i = 0; i < depth; i += 1) { + currentDir = path.join(currentDir, `d${i}`); + directorySet.add(currentDir); + } + const deepestSkill = path.join(currentDir, "SKILL.md"); + + fs.readdirSync = (targetPath, options) => { + if (targetPath === skillsDir) { + return [{ name: "d0", isDirectory: () => true }]; + } + + const match = targetPath.match(/\/d(\d+)$/); + if (!match) return originalReaddirSync(targetPath, options); + + const index = Number(match[1]); + if (index >= depth - 1) { + return []; + } + return [{ name: `d${index + 1}`, isDirectory: () => true }]; + }; + + fs.lstatSync = (targetPath) => { + if (directorySet.has(targetPath)) { + return { isDirectory: () => true, isSymbolicLink: () => false, isFile: () => false }; + } + if (targetPath === deepestSkill) { + return { isDirectory: () => false, isSymbolicLink: () => false, isFile: () => true }; + } + return originalLstatSync(targetPath); + }; + + const skillIds = listSkillIdsRecursive(skillsDir); + + assert.strictEqual(skillIds.length, 1, "deep trees should still produce exactly one skill"); + assert.match(skillIds[0], /d1499$/, "deepest nested skill should be discovered without stack overflow"); + } finally { + fs.readdirSync = originalReaddirSync; + fs.lstatSync = originalLstatSync; + } +}); diff --git a/tools/scripts/tests/test_cleanup_synthetic_skill_sections.py b/tools/scripts/tests/test_cleanup_synthetic_skill_sections.py index 2eba4a05..b1ec5ac6 100644 --- a/tools/scripts/tests/test_cleanup_synthetic_skill_sections.py +++ b/tools/scripts/tests/test_cleanup_synthetic_skill_sections.py @@ -157,6 +157,32 @@ description: {description} self.assertEqual(changes, []) self.assertEqual(updated, current_content) + def test_cleanup_skill_file_skips_symlinked_skill_markdown(self): + with tempfile.TemporaryDirectory() as temp_dir: + repo_root = Path(temp_dir) + skill_dir = repo_root / "skills" / "demo" + outside_dir = repo_root / "outside" + skill_dir.mkdir(parents=True) + outside_dir.mkdir() + + target = outside_dir / "SKILL.md" + original = """--- +name: demo +description: Build and distribute Expo development clients locally or via TestFlight. +--- + +# Demo +""" + target.write_text(original, encoding="utf-8") + skill_path = skill_dir / "SKILL.md" + skill_path.symlink_to(target) + + changed, changes = cleanup_synthetic_skill_sections.cleanup_skill_file(repo_root, skill_path) + + self.assertFalse(changed) + self.assertEqual(changes, []) + self.assertEqual(target.read_text(encoding="utf-8"), original) + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_fix_missing_skill_metadata.py b/tools/scripts/tests/test_fix_missing_skill_metadata.py index 4d16cf52..64ea12df 100644 --- a/tools/scripts/tests/test_fix_missing_skill_metadata.py +++ b/tools/scripts/tests/test_fix_missing_skill_metadata.py @@ -137,6 +137,32 @@ source: community repaired = fix_missing_skill_metadata.repair_malformed_injected_metadata(content) self.assertIn("risk: unknown\nsource: community\nmetadata:\n author: sanjay3290", repaired) + def test_update_skill_file_skips_symlinked_skill_markdown(self): + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + skill_dir = root / "skill" + outside_dir = root / "outside" + skill_dir.mkdir() + outside_dir.mkdir() + + target = outside_dir / "SKILL.md" + original = """--- +name: outside +description: "External file." +--- + +# Outside +""" + target.write_text(original, encoding="utf-8") + skill_path = skill_dir / "SKILL.md" + skill_path.symlink_to(target) + + changed, changes = fix_missing_skill_metadata.update_skill_file(skill_path) + + self.assertFalse(changed) + self.assertEqual(changes, []) + self.assertEqual(target.read_text(encoding="utf-8"), original) + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_fix_missing_skill_sections.py b/tools/scripts/tests/test_fix_missing_skill_sections.py index 96117163..dcdd9b3c 100644 --- a/tools/scripts/tests/test_fix_missing_skill_sections.py +++ b/tools/scripts/tests/test_fix_missing_skill_sections.py @@ -122,6 +122,32 @@ Activate this skill when: self.assertIn("## When to Use", updated) self.assertNotIn("## Examples", updated) + def test_update_skill_file_skips_symlinked_skill_markdown(self): + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + skill_dir = root / "skill" + outside_dir = root / "outside" + skill_dir.mkdir() + outside_dir.mkdir() + + target = outside_dir / "SKILL.md" + original = """--- +name: outside +description: Demo description. +--- + +# Outside +""" + target.write_text(original, encoding="utf-8") + skill_path = skill_dir / "SKILL.md" + skill_path.symlink_to(target) + + changed, changes = fix_missing_skill_sections.update_skill_file(skill_path, add_missing=True) + + self.assertFalse(changed) + self.assertEqual(changes, []) + self.assertEqual(target.read_text(encoding="utf-8"), original) + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_fix_truncated_descriptions.py b/tools/scripts/tests/test_fix_truncated_descriptions.py index fdf68a98..5ed20db3 100644 --- a/tools/scripts/tests/test_fix_truncated_descriptions.py +++ b/tools/scripts/tests/test_fix_truncated_descriptions.py @@ -107,6 +107,34 @@ Lightweight calendar automation with standalone OAuth authentication and event m updated, ) + def test_update_skill_file_skips_symlinked_skill_markdown(self): + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + skill_dir = root / "skill" + outside_dir = root / "outside" + skill_dir.mkdir() + outside_dir.mkdir() + + target = outside_dir / "SKILL.md" + original = """--- +name: outside +description: "This description is truncated..." +--- + +# Outside + +This paragraph should never be written back through a symlink. +""" + target.write_text(original, encoding="utf-8") + skill_path = skill_dir / "SKILL.md" + skill_path.symlink_to(target) + + changed, new_description = fix_truncated_descriptions.update_skill_file(skill_path) + + self.assertFalse(changed) + self.assertIsNone(new_description) + self.assertEqual(target.read_text(encoding="utf-8"), original) + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_last30days_skill_security.py b/tools/scripts/tests/test_last30days_skill_security.py new file mode 100644 index 00000000..7d1286b4 --- /dev/null +++ b/tools/scripts/tests/test_last30days_skill_security.py @@ -0,0 +1,18 @@ +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[3] +SKILL_PATH = REPO_ROOT / "skills" / "last30days" / "SKILL.md" + + +class Last30DaysSkillSecurityTests(unittest.TestCase): + def test_skill_does_not_embed_user_arguments_directly_in_shell_command(self): + content = SKILL_PATH.read_text(encoding="utf-8") + + self.assertNotIn('last30days.py "$ARGUMENTS"', content) + self.assertIn("cat <<'LAST30DAYS_TOPIC'", content) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/scripts/tests/test_sync_repo_metadata.py b/tools/scripts/tests/test_sync_repo_metadata.py index f9ad61f7..1b40f0d2 100644 --- a/tools/scripts/tests/test_sync_repo_metadata.py +++ b/tools/scripts/tests/test_sync_repo_metadata.py @@ -151,6 +151,26 @@ If you want a faster answer than "browse all 1,273+ skills", start with a tool-s self.assertIn("names[]=claude-code", topics_args) self.assertIn("names[]=skill-library", topics_args) + def test_update_text_file_skips_symlinked_targets(self): + metadata = {"version": "8.4.0"} + + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + outside = root / "outside.md" + outside.write_text("original", encoding="utf-8") + linked = root / "README.md" + linked.symlink_to(outside) + + changed = sync_repo_metadata.update_text_file( + linked, + lambda content, current_metadata: "rewritten", + metadata, + dry_run=False, + ) + + self.assertFalse(changed) + self.assertEqual(outside.read_text(encoding="utf-8"), "original") + if __name__ == "__main__": unittest.main() diff --git a/tools/scripts/tests/test_ws_listener_security.py b/tools/scripts/tests/test_ws_listener_security.py new file mode 100644 index 00000000..26603d95 --- /dev/null +++ b/tools/scripts/tests/test_ws_listener_security.py @@ -0,0 +1,85 @@ +import importlib.util +import os +import stat +import sys +import tempfile +import types +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[3] +MODULE_PATH = REPO_ROOT / "skills" / "videodb" / "scripts" / "ws_listener.py" + + +def load_module(module_name: str, state_home: Path): + fake_videodb = types.ModuleType("videodb") + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda: None + + old_argv = sys.argv[:] + old_xdg = os.environ.get("XDG_STATE_HOME") + sys.argv = [str(MODULE_PATH)] + os.environ["XDG_STATE_HOME"] = str(state_home) + sys.modules["videodb"] = fake_videodb + sys.modules["dotenv"] = fake_dotenv + try: + spec = importlib.util.spec_from_file_location(module_name, MODULE_PATH) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + sys.modules[module_name] = module + spec.loader.exec_module(module) + return module + finally: + sys.argv = old_argv + if old_xdg is None: + os.environ.pop("XDG_STATE_HOME", None) + else: + os.environ["XDG_STATE_HOME"] = old_xdg + + +class WsListenerSecurityTests(unittest.TestCase): + def test_write_pid_rejects_symlink_target(self): + with tempfile.TemporaryDirectory() as temp_dir: + state_home = Path(temp_dir) + module = load_module("ws_listener_security_pid", state_home) + + module.OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + outside = state_home / "outside.txt" + outside.write_text("secret", encoding="utf-8") + module.PID_FILE.symlink_to(outside) + + with self.assertRaises(OSError): + module.write_pid() + + self.assertEqual(outside.read_text(encoding="utf-8"), "secret") + + def test_append_event_creates_private_regular_file(self): + with tempfile.TemporaryDirectory() as temp_dir: + state_home = Path(temp_dir) + module = load_module("ws_listener_security_append", state_home) + + module.append_event({"channel": "demo"}) + + self.assertTrue(module.EVENTS_FILE.is_file()) + file_mode = stat.S_IMODE(module.EVENTS_FILE.stat().st_mode) + self.assertEqual(file_mode, 0o600) + + def test_append_event_rejects_symlink_target(self): + with tempfile.TemporaryDirectory() as temp_dir: + state_home = Path(temp_dir) + module = load_module("ws_listener_security_symlink", state_home) + + module.OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + outside = state_home / "outside.jsonl" + outside.write_text("secret\n", encoding="utf-8") + module.EVENTS_FILE.symlink_to(outside) + + with self.assertRaises(OSError): + module.append_event({"channel": "demo"}) + + self.assertEqual(outside.read_text(encoding="utf-8"), "secret\n") + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/scripts/tests/workflow_contracts.test.js b/tools/scripts/tests/workflow_contracts.test.js index 033070aa..36bb08cd 100644 --- a/tools/scripts/tests/workflow_contracts.test.js +++ b/tools/scripts/tests/workflow_contracts.test.js @@ -32,10 +32,10 @@ assert.deepStrictEqual(docsOnly.categories, ["docs"]); assert.strictEqual(docsOnly.primaryCategory, "docs"); assert.strictEqual(requiresReferencesValidation(["README.md"], contract), true); -const infraChange = classifyChangedFiles([".github/workflows/ci.yml", "tools/scripts/pr_preflight.js"], contract); +const infraChange = classifyChangedFiles([".github/workflows/ci.yml", "tools/scripts/pr_preflight.cjs"], contract); assert.deepStrictEqual(infraChange.categories, ["infra"]); assert.strictEqual(infraChange.primaryCategory, "infra"); -assert.strictEqual(requiresReferencesValidation(["tools/scripts/pr_preflight.js"], contract), true); +assert.strictEqual(requiresReferencesValidation(["tools/scripts/pr_preflight.cjs"], contract), true); const mixedChange = classifyChangedFiles(["skills/example/SKILL.md", "README.md"], contract); assert.deepStrictEqual(mixedChange.categories, ["skill", "docs"]);