fix(security): Harden skill tooling file handling

Guard metadata repair and doc sync scripts against symlink targets so
repo maintenance tasks cannot overwrite arbitrary local files.

Replace recursive skill discovery with an iterative walk that skips
symlinked directories, and harden the VideoDB listener to write only
private regular files in the user-owned state directory.

Also fix the broken pr:preflight script entry and make the last30days
skill stop embedding raw user arguments directly in the shell command.
This commit is contained in:
sickn33
2026-03-21 11:34:38 +01:00
parent a0e8381775
commit 3efff111d2
19 changed files with 453 additions and 31 deletions

View File

@@ -32,7 +32,7 @@
"audit:consistency:github": "node tools/scripts/run-python.js tools/scripts/audit_consistency.py --check-github-about", "audit:consistency:github": "node tools/scripts/run-python.js tools/scripts/audit_consistency.py --check-github-about",
"audit:maintainer": "node tools/scripts/run-python.js tools/scripts/maintainer_audit.py", "audit:maintainer": "node tools/scripts/run-python.js tools/scripts/maintainer_audit.py",
"security:docs": "node tools/scripts/tests/docs_security_content.test.js", "security:docs": "node tools/scripts/tests/docs_security_content.test.js",
"pr:preflight": "node tools/scripts/pr_preflight.js", "pr:preflight": "node tools/scripts/pr_preflight.cjs",
"release:preflight": "node tools/scripts/release_workflow.js preflight", "release:preflight": "node tools/scripts/release_workflow.js preflight",
"release:prepare": "node tools/scripts/release_workflow.js prepare", "release:prepare": "node tools/scripts/release_workflow.js prepare",
"release:publish": "node tools/scripts/release_workflow.js publish", "release:publish": "node tools/scripts/release_workflow.js publish",

View File

@@ -93,7 +93,12 @@ echo "Edit to add your API keys for enhanced research."
**Step 1: Run the research script** **Step 1: Run the research script**
```bash ```bash
python3 ~/.claude/skills/last30days/scripts/last30days.py "$ARGUMENTS" --emit=compact 2>&1 TOPIC_FILE="$(mktemp)"
trap 'rm -f "$TOPIC_FILE"' EXIT
cat <<'LAST30DAYS_TOPIC' > "$TOPIC_FILE"
$ARGUMENTS
LAST30DAYS_TOPIC
python3 ~/.claude/skills/last30days/scripts/last30days.py "$(cat "$TOPIC_FILE")" --emit=compact 2>&1
``` ```
The script will automatically: The script will automatically:

View File

@@ -6,7 +6,7 @@ Usage:
python scripts/ws_listener.py [OPTIONS] [output_dir] python scripts/ws_listener.py [OPTIONS] [output_dir]
Arguments: Arguments:
output_dir Directory for output files (default: /tmp or VIDEODB_EVENTS_DIR env var) output_dir Directory for output files (default: XDG_STATE_HOME/videodb-events or VIDEODB_EVENTS_DIR env var)
Options: Options:
--clear Clear the events file before starting (use when starting a new session) --clear Clear the events file before starting (use when starting a new session)
@@ -20,14 +20,15 @@ Output (first line, for parsing):
WS_ID=<connection_id> WS_ID=<connection_id>
Examples: Examples:
python scripts/ws_listener.py & # Run in background python scripts/ws_listener.py & # Run in background
python scripts/ws_listener.py --clear # Clear events and start fresh python scripts/ws_listener.py --clear # Clear events and start fresh
python scripts/ws_listener.py --clear /tmp/mydir # Custom dir with clear python scripts/ws_listener.py --clear /path/mydir # Custom dir with clear
kill $(cat /tmp/videodb_ws_pid) # Stop the listener kill $(cat ~/.local/state/videodb-events/videodb_ws_pid) # Stop the listener
""" """
import os import os
import sys import sys
import json import json
import stat
import signal import signal
import asyncio import asyncio
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -42,6 +43,8 @@ import videodb
MAX_RETRIES = 10 MAX_RETRIES = 10
INITIAL_BACKOFF = 1 # seconds INITIAL_BACKOFF = 1 # seconds
MAX_BACKOFF = 60 # seconds MAX_BACKOFF = 60 # seconds
FILE_MODE = 0o600
DIR_MODE = 0o700
# Parse arguments # Parse arguments
def parse_args(): def parse_args():
@@ -78,18 +81,62 @@ def log(msg: str):
print(f"[{ts}] {msg}", flush=True) print(f"[{ts}] {msg}", flush=True)
def ensure_output_dir():
"""Create the output directory if needed and reject symlinked paths."""
if OUTPUT_DIR.exists():
if OUTPUT_DIR.is_symlink():
raise OSError(f"Refusing to use symlinked output directory: {OUTPUT_DIR}")
if not OUTPUT_DIR.is_dir():
raise OSError(f"Output path is not a directory: {OUTPUT_DIR}")
return
OUTPUT_DIR.mkdir(parents=True, mode=DIR_MODE, exist_ok=True)
def secure_open(path: Path, *, append: bool):
"""Open a regular file without following symlinks."""
ensure_output_dir()
flags = os.O_WRONLY | os.O_CREAT
flags |= os.O_APPEND if append else os.O_TRUNC
flags |= getattr(os, "O_NOFOLLOW", 0)
fd = os.open(path, flags, FILE_MODE)
try:
file_stat = os.fstat(fd)
if not stat.S_ISREG(file_stat.st_mode):
raise OSError(f"Refusing to write non-regular file: {path}")
if file_stat.st_nlink != 1:
raise OSError(f"Refusing to write multiply linked file: {path}")
return fd
except Exception:
os.close(fd)
raise
def secure_write_text(path: Path, content: str):
"""Write text to a regular file with private permissions."""
fd = secure_open(path, append=False)
with os.fdopen(fd, "w", encoding="utf-8") as handle:
handle.write(content)
def secure_append_text(path: Path, content: str):
"""Append text to a regular file with private permissions."""
fd = secure_open(path, append=True)
with os.fdopen(fd, "a", encoding="utf-8") as handle:
handle.write(content)
def append_event(event: dict): def append_event(event: dict):
"""Append event to JSONL file with timestamps.""" """Append event to JSONL file with timestamps."""
event["ts"] = datetime.now(timezone.utc).isoformat() event["ts"] = datetime.now(timezone.utc).isoformat()
event["unix_ts"] = datetime.now(timezone.utc).timestamp() event["unix_ts"] = datetime.now(timezone.utc).timestamp()
with open(EVENTS_FILE, "a") as f: secure_append_text(EVENTS_FILE, json.dumps(event) + "\n")
f.write(json.dumps(event) + "\n")
def write_pid(): def write_pid():
"""Write PID file for easy process management.""" """Write PID file for easy process management."""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) secure_write_text(PID_FILE, str(os.getpid()))
PID_FILE.write_text(str(os.getpid()))
def cleanup_pid(): def cleanup_pid():
@@ -115,7 +162,7 @@ async def listen_with_retry():
ws_id = ws.connection_id ws_id = ws.connection_id
# Ensure output directory exists # Ensure output directory exists
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) ensure_output_dir()
# Clear events file only on first connection if --clear flag is set # Clear events file only on first connection if --clear flag is set
if _first_connection and CLEAR_EVENTS: if _first_connection and CLEAR_EVENTS:
@@ -124,7 +171,7 @@ async def listen_with_retry():
_first_connection = False _first_connection = False
# Write ws_id to file for easy retrieval # Write ws_id to file for easy retrieval
WS_ID_FILE.write_text(ws_id) secure_write_text(WS_ID_FILE, ws_id)
# Print ws_id (parseable format for LLM) # Print ws_id (parseable format for LLM)
if retry_count == 0: if retry_count == 0:

View File

@@ -2,6 +2,24 @@ const fs = require('fs');
const path = require('path'); const path = require('path');
const yaml = require('yaml'); const yaml = require('yaml');
function isSafeDirectory(dirPath) {
try {
const stats = fs.lstatSync(dirPath);
return stats.isDirectory() && !stats.isSymbolicLink();
} catch {
return false;
}
}
function isSafeSkillFile(skillPath) {
try {
const stats = fs.lstatSync(skillPath);
return stats.isFile() && !stats.isSymbolicLink();
} catch {
return false;
}
}
function stripQuotes(value) { function stripQuotes(value) {
if (typeof value !== 'string') return value; if (typeof value !== 'string') return value;
if (value.length < 2) return value.trim(); if (value.length < 2) return value.trim();
@@ -152,11 +170,9 @@ function listSkillIds(skillsDir) {
.filter(entry => { .filter(entry => {
if (entry.startsWith('.')) return false; if (entry.startsWith('.')) return false;
const dirPath = path.join(skillsDir, entry); const dirPath = path.join(skillsDir, entry);
const entryStats = fs.lstatSync(dirPath); if (!isSafeDirectory(dirPath)) return false;
if (entryStats.isSymbolicLink() || !entryStats.isDirectory()) return false;
const skillPath = path.join(dirPath, 'SKILL.md'); const skillPath = path.join(dirPath, 'SKILL.md');
if (!fs.existsSync(skillPath)) return false; return isSafeSkillFile(skillPath);
return !fs.lstatSync(skillPath).isSymbolicLink();
}) })
.sort(); .sort();
} }
@@ -166,17 +182,28 @@ function listSkillIds(skillsDir) {
* Matches generate_index.py behavior so catalog includes nested skills (e.g. game-development/2d-games). * Matches generate_index.py behavior so catalog includes nested skills (e.g. game-development/2d-games).
*/ */
function listSkillIdsRecursive(skillsDir, baseDir = skillsDir, acc = []) { function listSkillIdsRecursive(skillsDir, baseDir = skillsDir, acc = []) {
const entries = fs.readdirSync(baseDir, { withFileTypes: true }); const stack = [baseDir];
for (const entry of entries) { const visited = new Set();
if (entry.name.startsWith('.')) continue;
if (!entry.isDirectory()) continue; while (stack.length > 0) {
const dirPath = path.join(baseDir, entry.name); const currentDir = stack.pop();
const skillPath = path.join(dirPath, 'SKILL.md'); const resolvedDir = path.resolve(currentDir);
const relPath = path.relative(skillsDir, dirPath); if (visited.has(resolvedDir)) continue;
if (fs.existsSync(skillPath) && !fs.lstatSync(skillPath).isSymbolicLink()) { visited.add(resolvedDir);
acc.push(relPath);
const entries = fs.readdirSync(currentDir, { withFileTypes: true });
for (const entry of entries) {
if (entry.name.startsWith('.')) continue;
const dirPath = path.join(currentDir, entry.name);
if (!isSafeDirectory(dirPath)) continue;
const skillPath = path.join(dirPath, 'SKILL.md');
const relPath = path.relative(skillsDir, dirPath);
if (isSafeSkillFile(skillPath)) {
acc.push(relPath);
}
stack.push(dirPath);
} }
listSkillIdsRecursive(skillsDir, dirPath, acc);
} }
return acc.sort(); return acc.sort();
} }

View File

@@ -0,0 +1,11 @@
from __future__ import annotations
from pathlib import Path
def is_safe_regular_file(path: str | Path) -> bool:
candidate = Path(path)
try:
return candidate.is_file() and not candidate.is_symlink()
except OSError:
return False

View File

@@ -8,6 +8,7 @@ import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
from _safe_files import is_safe_regular_file
from _project_paths import find_repo_root from _project_paths import find_repo_root
from fix_missing_skill_sections import ( from fix_missing_skill_sections import (
build_examples_section, build_examples_section,
@@ -47,6 +48,9 @@ def remove_exact_section(content: str, section_text: str) -> str:
def cleanup_skill_file(repo_root: Path, skill_path: Path) -> tuple[bool, list[str]]: def cleanup_skill_file(repo_root: Path, skill_path: Path) -> tuple[bool, list[str]]:
if not is_safe_regular_file(skill_path):
return False, []
current_content = skill_path.read_text(encoding="utf-8") current_content = skill_path.read_text(encoding="utf-8")
metadata, _ = parse_frontmatter(current_content, skill_path.as_posix()) metadata, _ = parse_frontmatter(current_content, skill_path.as_posix())
if not metadata: if not metadata:
@@ -100,6 +104,9 @@ def main() -> int:
continue continue
skill_path = Path(root) / "SKILL.md" skill_path = Path(root) / "SKILL.md"
if not is_safe_regular_file(skill_path):
print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]")
continue
current_content = skill_path.read_text(encoding="utf-8") current_content = skill_path.read_text(encoding="utf-8")
metadata, _ = parse_frontmatter(current_content, skill_path.as_posix()) metadata, _ = parse_frontmatter(current_content, skill_path.as_posix())
if not metadata or not isinstance(metadata.get("description"), str): if not metadata or not isinstance(metadata.get("description"), str):

View File

@@ -7,6 +7,7 @@ import re
import sys import sys
from pathlib import Path from pathlib import Path
from _safe_files import is_safe_regular_file
from _project_paths import find_repo_root from _project_paths import find_repo_root
from validate_skills import configure_utf8_output, parse_frontmatter from validate_skills import configure_utf8_output, parse_frontmatter
@@ -141,6 +142,9 @@ def insert_metadata_keys(frontmatter_text: str, additions: dict[str, str]) -> st
def update_skill_file(skill_path: Path) -> tuple[bool, list[str]]: def update_skill_file(skill_path: Path) -> tuple[bool, list[str]]:
if not is_safe_regular_file(skill_path):
return False, []
content = skill_path.read_text(encoding="utf-8") content = skill_path.read_text(encoding="utf-8")
repaired_content = repair_malformed_injected_metadata(content) repaired_content = repair_malformed_injected_metadata(content)
if repaired_content != content: if repaired_content != content:
@@ -197,6 +201,9 @@ def main() -> int:
continue continue
skill_path = Path(root) / "SKILL.md" skill_path = Path(root) / "SKILL.md"
if not is_safe_regular_file(skill_path):
print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]")
continue
content = skill_path.read_text(encoding="utf-8") content = skill_path.read_text(encoding="utf-8")
repaired_content = repair_malformed_injected_metadata(content) repaired_content = repair_malformed_injected_metadata(content)
if repaired_content != content: if repaired_content != content:

View File

@@ -7,6 +7,7 @@ import re
import sys import sys
from pathlib import Path from pathlib import Path
from _safe_files import is_safe_regular_file
from _project_paths import find_repo_root from _project_paths import find_repo_root
from validate_skills import configure_utf8_output, has_when_to_use_section, parse_frontmatter from validate_skills import configure_utf8_output, has_when_to_use_section, parse_frontmatter
@@ -115,6 +116,9 @@ def append_section(content: str, section_text: str) -> str:
def update_skill_file(skill_path: Path, *, add_missing: bool = False) -> tuple[bool, list[str]]: def update_skill_file(skill_path: Path, *, add_missing: bool = False) -> tuple[bool, list[str]]:
if not is_safe_regular_file(skill_path):
return False, []
content = skill_path.read_text(encoding="utf-8") content = skill_path.read_text(encoding="utf-8")
metadata, _ = parse_frontmatter(content, skill_path.as_posix()) metadata, _ = parse_frontmatter(content, skill_path.as_posix())
if not metadata: if not metadata:
@@ -166,6 +170,9 @@ def main() -> int:
continue continue
skill_path = Path(root) / "SKILL.md" skill_path = Path(root) / "SKILL.md"
if not is_safe_regular_file(skill_path):
print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]")
continue
content = skill_path.read_text(encoding="utf-8") content = skill_path.read_text(encoding="utf-8")
metadata, _ = parse_frontmatter(content, skill_path.as_posix()) metadata, _ = parse_frontmatter(content, skill_path.as_posix())
if not metadata or not isinstance(metadata.get("description"), str): if not metadata or not isinstance(metadata.get("description"), str):

View File

@@ -7,6 +7,7 @@ import re
import sys import sys
from pathlib import Path from pathlib import Path
from _safe_files import is_safe_regular_file
from _project_paths import find_repo_root from _project_paths import find_repo_root
from validate_skills import configure_utf8_output, parse_frontmatter from validate_skills import configure_utf8_output, parse_frontmatter
@@ -167,6 +168,9 @@ def replace_description(frontmatter_text: str, new_description: str) -> str:
def update_skill_file(skill_path: Path) -> tuple[bool, str | None]: def update_skill_file(skill_path: Path) -> tuple[bool, str | None]:
if not is_safe_regular_file(skill_path):
return False, None
content = skill_path.read_text(encoding="utf-8") content = skill_path.read_text(encoding="utf-8")
match = FRONTMATTER_PATTERN.search(content) match = FRONTMATTER_PATTERN.search(content)
if not match: if not match:
@@ -215,6 +219,9 @@ def main() -> int:
continue continue
skill_path = Path(root) / "SKILL.md" skill_path = Path(root) / "SKILL.md"
if not is_safe_regular_file(skill_path):
print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]")
continue
content = skill_path.read_text(encoding="utf-8") content = skill_path.read_text(encoding="utf-8")
metadata, _ = parse_frontmatter(content, skill_path.as_posix()) metadata, _ = parse_frontmatter(content, skill_path.as_posix())
description = metadata.get("description") if metadata else None description = metadata.get("description") if metadata else None

View File

@@ -218,7 +218,7 @@ def sync_regex_text(content: str, replacements: list[tuple[str, str]]) -> str:
def update_text_file(path: Path, transform, metadata: dict, dry_run: bool) -> bool: def update_text_file(path: Path, transform, metadata: dict, dry_run: bool) -> bool:
if not path.is_file(): if not path.is_file() or path.is_symlink():
return False return False
original = path.read_text(encoding="utf-8") original = path.read_text(encoding="utf-8")

View File

@@ -3,7 +3,7 @@ const fs = require("fs");
const os = require("os"); const os = require("os");
const path = require("path"); const path = require("path");
const { listSkillIds } = require("../../lib/skill-utils"); const { listSkillIds, listSkillIdsRecursive } = require("../../lib/skill-utils");
function withTempDir(fn) { function withTempDir(fn) {
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "skill-utils-security-")); const dir = fs.mkdtempSync(path.join(os.tmpdir(), "skill-utils-security-"));
@@ -35,3 +35,78 @@ withTempDir((root) => {
"symlinked skill directories must not be treated as local skills", "symlinked skill directories must not be treated as local skills",
); );
}); });
withTempDir((root) => {
const skillsDir = path.join(root, "skills");
const outsideDir = path.join(root, "outside-secret");
fs.mkdirSync(skillsDir, { recursive: true });
fs.mkdirSync(outsideDir, { recursive: true });
fs.mkdirSync(path.join(skillsDir, "nested", "safe-skill"), { recursive: true });
fs.writeFileSync(path.join(skillsDir, "nested", "safe-skill", "SKILL.md"), "# safe\n");
fs.mkdirSync(path.join(outsideDir, "loop-target"), { recursive: true });
fs.symlinkSync(outsideDir, path.join(skillsDir, "nested", "linked-secret"));
const skillIds = listSkillIdsRecursive(skillsDir);
assert.deepStrictEqual(
skillIds,
["nested/safe-skill"],
"recursive skill listing must ignore symlinked directories",
);
});
withTempDir((root) => {
const skillsDir = path.join(root, "skills");
let currentDir = skillsDir;
fs.mkdirSync(skillsDir, { recursive: true });
currentDir = skillsDir;
const originalReaddirSync = fs.readdirSync;
const originalLstatSync = fs.lstatSync;
try {
const depth = 1500;
const directorySet = new Set([skillsDir]);
for (let i = 0; i < depth; i += 1) {
currentDir = path.join(currentDir, `d${i}`);
directorySet.add(currentDir);
}
const deepestSkill = path.join(currentDir, "SKILL.md");
fs.readdirSync = (targetPath, options) => {
if (targetPath === skillsDir) {
return [{ name: "d0", isDirectory: () => true }];
}
const match = targetPath.match(/\/d(\d+)$/);
if (!match) return originalReaddirSync(targetPath, options);
const index = Number(match[1]);
if (index >= depth - 1) {
return [];
}
return [{ name: `d${index + 1}`, isDirectory: () => true }];
};
fs.lstatSync = (targetPath) => {
if (directorySet.has(targetPath)) {
return { isDirectory: () => true, isSymbolicLink: () => false, isFile: () => false };
}
if (targetPath === deepestSkill) {
return { isDirectory: () => false, isSymbolicLink: () => false, isFile: () => true };
}
return originalLstatSync(targetPath);
};
const skillIds = listSkillIdsRecursive(skillsDir);
assert.strictEqual(skillIds.length, 1, "deep trees should still produce exactly one skill");
assert.match(skillIds[0], /d1499$/, "deepest nested skill should be discovered without stack overflow");
} finally {
fs.readdirSync = originalReaddirSync;
fs.lstatSync = originalLstatSync;
}
});

View File

@@ -157,6 +157,32 @@ description: {description}
self.assertEqual(changes, []) self.assertEqual(changes, [])
self.assertEqual(updated, current_content) self.assertEqual(updated, current_content)
def test_cleanup_skill_file_skips_symlinked_skill_markdown(self):
with tempfile.TemporaryDirectory() as temp_dir:
repo_root = Path(temp_dir)
skill_dir = repo_root / "skills" / "demo"
outside_dir = repo_root / "outside"
skill_dir.mkdir(parents=True)
outside_dir.mkdir()
target = outside_dir / "SKILL.md"
original = """---
name: demo
description: Build and distribute Expo development clients locally or via TestFlight.
---
# Demo
"""
target.write_text(original, encoding="utf-8")
skill_path = skill_dir / "SKILL.md"
skill_path.symlink_to(target)
changed, changes = cleanup_synthetic_skill_sections.cleanup_skill_file(repo_root, skill_path)
self.assertFalse(changed)
self.assertEqual(changes, [])
self.assertEqual(target.read_text(encoding="utf-8"), original)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -137,6 +137,32 @@ source: community
repaired = fix_missing_skill_metadata.repair_malformed_injected_metadata(content) repaired = fix_missing_skill_metadata.repair_malformed_injected_metadata(content)
self.assertIn("risk: unknown\nsource: community\nmetadata:\n author: sanjay3290", repaired) self.assertIn("risk: unknown\nsource: community\nmetadata:\n author: sanjay3290", repaired)
def test_update_skill_file_skips_symlinked_skill_markdown(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
skill_dir = root / "skill"
outside_dir = root / "outside"
skill_dir.mkdir()
outside_dir.mkdir()
target = outside_dir / "SKILL.md"
original = """---
name: outside
description: "External file."
---
# Outside
"""
target.write_text(original, encoding="utf-8")
skill_path = skill_dir / "SKILL.md"
skill_path.symlink_to(target)
changed, changes = fix_missing_skill_metadata.update_skill_file(skill_path)
self.assertFalse(changed)
self.assertEqual(changes, [])
self.assertEqual(target.read_text(encoding="utf-8"), original)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -122,6 +122,32 @@ Activate this skill when:
self.assertIn("## When to Use", updated) self.assertIn("## When to Use", updated)
self.assertNotIn("## Examples", updated) self.assertNotIn("## Examples", updated)
def test_update_skill_file_skips_symlinked_skill_markdown(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
skill_dir = root / "skill"
outside_dir = root / "outside"
skill_dir.mkdir()
outside_dir.mkdir()
target = outside_dir / "SKILL.md"
original = """---
name: outside
description: Demo description.
---
# Outside
"""
target.write_text(original, encoding="utf-8")
skill_path = skill_dir / "SKILL.md"
skill_path.symlink_to(target)
changed, changes = fix_missing_skill_sections.update_skill_file(skill_path, add_missing=True)
self.assertFalse(changed)
self.assertEqual(changes, [])
self.assertEqual(target.read_text(encoding="utf-8"), original)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -107,6 +107,34 @@ Lightweight calendar automation with standalone OAuth authentication and event m
updated, updated,
) )
def test_update_skill_file_skips_symlinked_skill_markdown(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
skill_dir = root / "skill"
outside_dir = root / "outside"
skill_dir.mkdir()
outside_dir.mkdir()
target = outside_dir / "SKILL.md"
original = """---
name: outside
description: "This description is truncated..."
---
# Outside
This paragraph should never be written back through a symlink.
"""
target.write_text(original, encoding="utf-8")
skill_path = skill_dir / "SKILL.md"
skill_path.symlink_to(target)
changed, new_description = fix_truncated_descriptions.update_skill_file(skill_path)
self.assertFalse(changed)
self.assertIsNone(new_description)
self.assertEqual(target.read_text(encoding="utf-8"), original)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -0,0 +1,18 @@
import unittest
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[3]
SKILL_PATH = REPO_ROOT / "skills" / "last30days" / "SKILL.md"
class Last30DaysSkillSecurityTests(unittest.TestCase):
def test_skill_does_not_embed_user_arguments_directly_in_shell_command(self):
content = SKILL_PATH.read_text(encoding="utf-8")
self.assertNotIn('last30days.py "$ARGUMENTS"', content)
self.assertIn("cat <<'LAST30DAYS_TOPIC'", content)
if __name__ == "__main__":
unittest.main()

View File

@@ -151,6 +151,26 @@ If you want a faster answer than "browse all 1,273+ skills", start with a tool-s
self.assertIn("names[]=claude-code", topics_args) self.assertIn("names[]=claude-code", topics_args)
self.assertIn("names[]=skill-library", topics_args) self.assertIn("names[]=skill-library", topics_args)
def test_update_text_file_skips_symlinked_targets(self):
metadata = {"version": "8.4.0"}
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
outside = root / "outside.md"
outside.write_text("original", encoding="utf-8")
linked = root / "README.md"
linked.symlink_to(outside)
changed = sync_repo_metadata.update_text_file(
linked,
lambda content, current_metadata: "rewritten",
metadata,
dry_run=False,
)
self.assertFalse(changed)
self.assertEqual(outside.read_text(encoding="utf-8"), "original")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -0,0 +1,85 @@
import importlib.util
import os
import stat
import sys
import tempfile
import types
import unittest
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[3]
MODULE_PATH = REPO_ROOT / "skills" / "videodb" / "scripts" / "ws_listener.py"
def load_module(module_name: str, state_home: Path):
fake_videodb = types.ModuleType("videodb")
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda: None
old_argv = sys.argv[:]
old_xdg = os.environ.get("XDG_STATE_HOME")
sys.argv = [str(MODULE_PATH)]
os.environ["XDG_STATE_HOME"] = str(state_home)
sys.modules["videodb"] = fake_videodb
sys.modules["dotenv"] = fake_dotenv
try:
spec = importlib.util.spec_from_file_location(module_name, MODULE_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
finally:
sys.argv = old_argv
if old_xdg is None:
os.environ.pop("XDG_STATE_HOME", None)
else:
os.environ["XDG_STATE_HOME"] = old_xdg
class WsListenerSecurityTests(unittest.TestCase):
def test_write_pid_rejects_symlink_target(self):
with tempfile.TemporaryDirectory() as temp_dir:
state_home = Path(temp_dir)
module = load_module("ws_listener_security_pid", state_home)
module.OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
outside = state_home / "outside.txt"
outside.write_text("secret", encoding="utf-8")
module.PID_FILE.symlink_to(outside)
with self.assertRaises(OSError):
module.write_pid()
self.assertEqual(outside.read_text(encoding="utf-8"), "secret")
def test_append_event_creates_private_regular_file(self):
with tempfile.TemporaryDirectory() as temp_dir:
state_home = Path(temp_dir)
module = load_module("ws_listener_security_append", state_home)
module.append_event({"channel": "demo"})
self.assertTrue(module.EVENTS_FILE.is_file())
file_mode = stat.S_IMODE(module.EVENTS_FILE.stat().st_mode)
self.assertEqual(file_mode, 0o600)
def test_append_event_rejects_symlink_target(self):
with tempfile.TemporaryDirectory() as temp_dir:
state_home = Path(temp_dir)
module = load_module("ws_listener_security_symlink", state_home)
module.OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
outside = state_home / "outside.jsonl"
outside.write_text("secret\n", encoding="utf-8")
module.EVENTS_FILE.symlink_to(outside)
with self.assertRaises(OSError):
module.append_event({"channel": "demo"})
self.assertEqual(outside.read_text(encoding="utf-8"), "secret\n")
if __name__ == "__main__":
unittest.main()

View File

@@ -32,10 +32,10 @@ assert.deepStrictEqual(docsOnly.categories, ["docs"]);
assert.strictEqual(docsOnly.primaryCategory, "docs"); assert.strictEqual(docsOnly.primaryCategory, "docs");
assert.strictEqual(requiresReferencesValidation(["README.md"], contract), true); assert.strictEqual(requiresReferencesValidation(["README.md"], contract), true);
const infraChange = classifyChangedFiles([".github/workflows/ci.yml", "tools/scripts/pr_preflight.js"], contract); const infraChange = classifyChangedFiles([".github/workflows/ci.yml", "tools/scripts/pr_preflight.cjs"], contract);
assert.deepStrictEqual(infraChange.categories, ["infra"]); assert.deepStrictEqual(infraChange.categories, ["infra"]);
assert.strictEqual(infraChange.primaryCategory, "infra"); assert.strictEqual(infraChange.primaryCategory, "infra");
assert.strictEqual(requiresReferencesValidation(["tools/scripts/pr_preflight.js"], contract), true); assert.strictEqual(requiresReferencesValidation(["tools/scripts/pr_preflight.cjs"], contract), true);
const mixedChange = classifyChangedFiles(["skills/example/SKILL.md", "README.md"], contract); const mixedChange = classifyChangedFiles(["skills/example/SKILL.md", "README.md"], contract);
assert.deepStrictEqual(mixedChange.categories, ["skill", "docs"]); assert.deepStrictEqual(mixedChange.categories, ["skill", "docs"]);