Initial commit: The Ultimate Antigravity Skills Collection (58 Skills)

This commit is contained in:
sck_0
2026-01-14 18:48:08 +01:00
commit 7f46ed8ca1
447 changed files with 110829 additions and 0 deletions

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""
NotebookLM Skill Scripts Package
Provides automatic environment management for all scripts
"""
import os
import sys
import subprocess
from pathlib import Path
def ensure_venv_and_run():
"""
Ensure virtual environment exists and run the requested script.
This is called when any script is imported or run directly.
"""
# Only do this if we're not already in the skill's venv
skill_dir = Path(__file__).parent.parent
venv_dir = skill_dir / ".venv"
# Check if we're in a venv
in_venv = hasattr(sys, 'real_prefix') or (
hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix
)
# Check if it's OUR venv
if in_venv:
venv_path = Path(sys.prefix)
if venv_path == venv_dir:
# We're already in the correct venv
return
# We need to set up or switch to our venv
if not venv_dir.exists():
print("🔧 First-time setup detected...")
print(" Creating isolated environment for NotebookLM skill...")
print(" This ensures clean dependency management...")
# Create venv
import venv
venv.create(venv_dir, with_pip=True)
# Install requirements
requirements_file = skill_dir / "requirements.txt"
if requirements_file.exists():
if os.name == 'nt': # Windows
pip_exe = venv_dir / "Scripts" / "pip.exe"
else:
pip_exe = venv_dir / "bin" / "pip"
print(" Installing dependencies in isolated environment...")
subprocess.run(
[str(pip_exe), "install", "-q", "-r", str(requirements_file)],
check=True
)
# Also install patchright's chromium
print(" Setting up browser automation...")
if os.name == 'nt':
python_exe = venv_dir / "Scripts" / "python.exe"
else:
python_exe = venv_dir / "bin" / "python"
subprocess.run(
[str(python_exe), "-m", "patchright", "install", "chromium"],
check=True,
capture_output=True
)
print("✅ Environment ready! All dependencies isolated in .venv/")
# If we're here and not in the venv, we should recommend using the venv
if not in_venv:
print("\n⚠️ Running outside virtual environment")
print(" Recommended: Use scripts/run.py to ensure clean execution")
print(" Or activate: source .venv/bin/activate")
# Check environment when module is imported
ensure_venv_and_run()

View File

@@ -0,0 +1,256 @@
#!/usr/bin/env python3
"""
Simple NotebookLM Question Interface
Based on MCP server implementation - simplified without sessions
Implements hybrid auth approach:
- Persistent browser profile (user_data_dir) for fingerprint consistency
- Manual cookie injection from state.json for session cookies (Playwright bug workaround)
See: https://github.com/microsoft/playwright/issues/36139
"""
import argparse
import sys
import time
import re
from pathlib import Path
from patchright.sync_api import sync_playwright
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from auth_manager import AuthManager
from notebook_manager import NotebookLibrary
from config import QUERY_INPUT_SELECTORS, RESPONSE_SELECTORS
from browser_utils import BrowserFactory, StealthUtils
# Follow-up reminder (adapted from MCP server for stateless operation)
# Since we don't have persistent sessions, we encourage comprehensive questions
FOLLOW_UP_REMINDER = (
"\n\nEXTREMELY IMPORTANT: Is that ALL you need to know? "
"You can always ask another question! Think about it carefully: "
"before you reply to the user, review their original request and this answer. "
"If anything is still unclear or missing, ask me another comprehensive question "
"that includes all necessary context (since each question opens a new browser session)."
)
def ask_notebooklm(question: str, notebook_url: str, headless: bool = True) -> str:
"""
Ask a question to NotebookLM
Args:
question: Question to ask
notebook_url: NotebookLM notebook URL
headless: Run browser in headless mode
Returns:
Answer text from NotebookLM
"""
auth = AuthManager()
if not auth.is_authenticated():
print("⚠️ Not authenticated. Run: python auth_manager.py setup")
return None
print(f"💬 Asking: {question}")
print(f"📚 Notebook: {notebook_url}")
playwright = None
context = None
try:
# Start playwright
playwright = sync_playwright().start()
# Launch persistent browser context using factory
context = BrowserFactory.launch_persistent_context(
playwright,
headless=headless
)
# Navigate to notebook
page = context.new_page()
print(" 🌐 Opening notebook...")
page.goto(notebook_url, wait_until="domcontentloaded")
# Wait for NotebookLM
page.wait_for_url(re.compile(r"^https://notebooklm\.google\.com/"), timeout=10000)
# Wait for query input (MCP approach)
print(" ⏳ Waiting for query input...")
query_element = None
for selector in QUERY_INPUT_SELECTORS:
try:
query_element = page.wait_for_selector(
selector,
timeout=10000,
state="visible" # Only check visibility, not disabled!
)
if query_element:
print(f" ✓ Found input: {selector}")
break
except:
continue
if not query_element:
print(" ❌ Could not find query input")
return None
# Type question (human-like, fast)
print(" ⏳ Typing question...")
# Use primary selector for typing
input_selector = QUERY_INPUT_SELECTORS[0]
StealthUtils.human_type(page, input_selector, question)
# Submit
print(" 📤 Submitting...")
page.keyboard.press("Enter")
# Small pause
StealthUtils.random_delay(500, 1500)
# Wait for response (MCP approach: poll for stable text)
print(" ⏳ Waiting for answer...")
answer = None
stable_count = 0
last_text = None
deadline = time.time() + 120 # 2 minutes timeout
while time.time() < deadline:
# Check if NotebookLM is still thinking (most reliable indicator)
try:
thinking_element = page.query_selector('div.thinking-message')
if thinking_element and thinking_element.is_visible():
time.sleep(1)
continue
except:
pass
# Try to find response with MCP selectors
for selector in RESPONSE_SELECTORS:
try:
elements = page.query_selector_all(selector)
if elements:
# Get last (newest) response
latest = elements[-1]
text = latest.inner_text().strip()
if text:
if text == last_text:
stable_count += 1
if stable_count >= 3: # Stable for 3 polls
answer = text
break
else:
stable_count = 0
last_text = text
except:
continue
if answer:
break
time.sleep(1)
if not answer:
print(" ❌ Timeout waiting for answer")
return None
print(" ✅ Got answer!")
# Add follow-up reminder to encourage Claude to ask more questions
return answer + FOLLOW_UP_REMINDER
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return None
finally:
# Always clean up
if context:
try:
context.close()
except:
pass
if playwright:
try:
playwright.stop()
except:
pass
def main():
parser = argparse.ArgumentParser(description='Ask NotebookLM a question')
parser.add_argument('--question', required=True, help='Question to ask')
parser.add_argument('--notebook-url', help='NotebookLM notebook URL')
parser.add_argument('--notebook-id', help='Notebook ID from library')
parser.add_argument('--show-browser', action='store_true', help='Show browser')
args = parser.parse_args()
# Resolve notebook URL
notebook_url = args.notebook_url
if not notebook_url and args.notebook_id:
library = NotebookLibrary()
notebook = library.get_notebook(args.notebook_id)
if notebook:
notebook_url = notebook['url']
else:
print(f"❌ Notebook '{args.notebook_id}' not found")
return 1
if not notebook_url:
# Check for active notebook first
library = NotebookLibrary()
active = library.get_active_notebook()
if active:
notebook_url = active['url']
print(f"📚 Using active notebook: {active['name']}")
else:
# Show available notebooks
notebooks = library.list_notebooks()
if notebooks:
print("\n📚 Available notebooks:")
for nb in notebooks:
mark = " [ACTIVE]" if nb.get('id') == library.active_notebook_id else ""
print(f" {nb['id']}: {nb['name']}{mark}")
print("\nSpecify with --notebook-id or set active:")
print("python scripts/run.py notebook_manager.py activate --id ID")
else:
print("❌ No notebooks in library. Add one first:")
print("python scripts/run.py notebook_manager.py add --url URL --name NAME --description DESC --topics TOPICS")
return 1
# Ask the question
answer = ask_notebooklm(
question=args.question,
notebook_url=notebook_url,
headless=not args.show_browser
)
if answer:
print("\n" + "=" * 60)
print(f"Question: {args.question}")
print("=" * 60)
print()
print(answer)
print()
print("=" * 60)
return 0
else:
print("\n❌ Failed to get answer")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,358 @@
#!/usr/bin/env python3
"""
Authentication Manager for NotebookLM
Handles Google login and browser state persistence
Based on the MCP server implementation
Implements hybrid auth approach:
- Persistent browser profile (user_data_dir) for fingerprint consistency
- Manual cookie injection from state.json for session cookies (Playwright bug workaround)
See: https://github.com/microsoft/playwright/issues/36139
"""
import json
import time
import argparse
import shutil
import re
import sys
from pathlib import Path
from typing import Optional, Dict, Any
from patchright.sync_api import sync_playwright, BrowserContext
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from config import BROWSER_STATE_DIR, STATE_FILE, AUTH_INFO_FILE, DATA_DIR
from browser_utils import BrowserFactory
class AuthManager:
"""
Manages authentication and browser state for NotebookLM
Features:
- Interactive Google login
- Browser state persistence
- Session restoration
- Account switching
"""
def __init__(self):
"""Initialize the authentication manager"""
# Ensure directories exist
DATA_DIR.mkdir(parents=True, exist_ok=True)
BROWSER_STATE_DIR.mkdir(parents=True, exist_ok=True)
self.state_file = STATE_FILE
self.auth_info_file = AUTH_INFO_FILE
self.browser_state_dir = BROWSER_STATE_DIR
def is_authenticated(self) -> bool:
"""Check if valid authentication exists"""
if not self.state_file.exists():
return False
# Check if state file is not too old (7 days)
age_days = (time.time() - self.state_file.stat().st_mtime) / 86400
if age_days > 7:
print(f"⚠️ Browser state is {age_days:.1f} days old, may need re-authentication")
return True
def get_auth_info(self) -> Dict[str, Any]:
"""Get authentication information"""
info = {
'authenticated': self.is_authenticated(),
'state_file': str(self.state_file),
'state_exists': self.state_file.exists()
}
if self.auth_info_file.exists():
try:
with open(self.auth_info_file, 'r') as f:
saved_info = json.load(f)
info.update(saved_info)
except Exception:
pass
if info['state_exists']:
age_hours = (time.time() - self.state_file.stat().st_mtime) / 3600
info['state_age_hours'] = age_hours
return info
def setup_auth(self, headless: bool = False, timeout_minutes: int = 10) -> bool:
"""
Perform interactive authentication setup
Args:
headless: Run browser in headless mode (False for login)
timeout_minutes: Maximum time to wait for login
Returns:
True if authentication successful
"""
print("🔐 Starting authentication setup...")
print(f" Timeout: {timeout_minutes} minutes")
playwright = None
context = None
try:
playwright = sync_playwright().start()
# Launch using factory
context = BrowserFactory.launch_persistent_context(
playwright,
headless=headless
)
# Navigate to NotebookLM
page = context.new_page()
page.goto("https://notebooklm.google.com", wait_until="domcontentloaded")
# Check if already authenticated
if "notebooklm.google.com" in page.url and "accounts.google.com" not in page.url:
print(" ✅ Already authenticated!")
self._save_browser_state(context)
return True
# Wait for manual login
print("\n ⏳ Please log in to your Google account...")
print(f" ⏱️ Waiting up to {timeout_minutes} minutes for login...")
try:
# Wait for URL to change to NotebookLM (regex ensures it's the actual domain, not a parameter)
timeout_ms = int(timeout_minutes * 60 * 1000)
page.wait_for_url(re.compile(r"^https://notebooklm\.google\.com/"), timeout=timeout_ms)
print(f" ✅ Login successful!")
# Save authentication state
self._save_browser_state(context)
self._save_auth_info()
return True
except Exception as e:
print(f" ❌ Authentication timeout: {e}")
return False
except Exception as e:
print(f" ❌ Error: {e}")
return False
finally:
# Clean up browser resources
if context:
try:
context.close()
except Exception:
pass
if playwright:
try:
playwright.stop()
except Exception:
pass
def _save_browser_state(self, context: BrowserContext):
"""Save browser state to disk"""
try:
# Save storage state (cookies, localStorage)
context.storage_state(path=str(self.state_file))
print(f" 💾 Saved browser state to: {self.state_file}")
except Exception as e:
print(f" ❌ Failed to save browser state: {e}")
raise
def _save_auth_info(self):
"""Save authentication metadata"""
try:
info = {
'authenticated_at': time.time(),
'authenticated_at_iso': time.strftime('%Y-%m-%d %H:%M:%S')
}
with open(self.auth_info_file, 'w') as f:
json.dump(info, f, indent=2)
except Exception:
pass # Non-critical
def clear_auth(self) -> bool:
"""
Clear all authentication data
Returns:
True if cleared successfully
"""
print("🗑️ Clearing authentication data...")
try:
# Remove browser state
if self.state_file.exists():
self.state_file.unlink()
print(" ✅ Removed browser state")
# Remove auth info
if self.auth_info_file.exists():
self.auth_info_file.unlink()
print(" ✅ Removed auth info")
# Clear entire browser state directory
if self.browser_state_dir.exists():
shutil.rmtree(self.browser_state_dir)
self.browser_state_dir.mkdir(parents=True, exist_ok=True)
print(" ✅ Cleared browser data")
return True
except Exception as e:
print(f" ❌ Error clearing auth: {e}")
return False
def re_auth(self, headless: bool = False, timeout_minutes: int = 10) -> bool:
"""
Perform re-authentication (clear and setup)
Args:
headless: Run browser in headless mode
timeout_minutes: Login timeout in minutes
Returns:
True if successful
"""
print("🔄 Starting re-authentication...")
# Clear existing auth
self.clear_auth()
# Setup new auth
return self.setup_auth(headless, timeout_minutes)
def validate_auth(self) -> bool:
"""
Validate that stored authentication works
Uses persistent context to match actual usage pattern
Returns:
True if authentication is valid
"""
if not self.is_authenticated():
return False
print("🔍 Validating authentication...")
playwright = None
context = None
try:
playwright = sync_playwright().start()
# Launch using factory
context = BrowserFactory.launch_persistent_context(
playwright,
headless=True
)
# Try to access NotebookLM
page = context.new_page()
page.goto("https://notebooklm.google.com", wait_until="domcontentloaded", timeout=30000)
# Check if we can access NotebookLM
if "notebooklm.google.com" in page.url and "accounts.google.com" not in page.url:
print(" ✅ Authentication is valid")
return True
else:
print(" ❌ Authentication is invalid (redirected to login)")
return False
except Exception as e:
print(f" ❌ Validation failed: {e}")
return False
finally:
if context:
try:
context.close()
except Exception:
pass
if playwright:
try:
playwright.stop()
except Exception:
pass
def main():
"""Command-line interface for authentication management"""
parser = argparse.ArgumentParser(description='Manage NotebookLM authentication')
subparsers = parser.add_subparsers(dest='command', help='Commands')
# Setup command
setup_parser = subparsers.add_parser('setup', help='Setup authentication')
setup_parser.add_argument('--headless', action='store_true', help='Run in headless mode')
setup_parser.add_argument('--timeout', type=float, default=10, help='Login timeout in minutes (default: 10)')
# Status command
subparsers.add_parser('status', help='Check authentication status')
# Validate command
subparsers.add_parser('validate', help='Validate authentication')
# Clear command
subparsers.add_parser('clear', help='Clear authentication')
# Re-auth command
reauth_parser = subparsers.add_parser('reauth', help='Re-authenticate (clear + setup)')
reauth_parser.add_argument('--timeout', type=float, default=10, help='Login timeout in minutes (default: 10)')
args = parser.parse_args()
# Initialize manager
auth = AuthManager()
# Execute command
if args.command == 'setup':
if auth.setup_auth(headless=args.headless, timeout_minutes=args.timeout):
print("\n✅ Authentication setup complete!")
print("You can now use ask_question.py to query NotebookLM")
else:
print("\n❌ Authentication setup failed")
exit(1)
elif args.command == 'status':
info = auth.get_auth_info()
print("\n🔐 Authentication Status:")
print(f" Authenticated: {'Yes' if info['authenticated'] else 'No'}")
if info.get('state_age_hours'):
print(f" State age: {info['state_age_hours']:.1f} hours")
if info.get('authenticated_at_iso'):
print(f" Last auth: {info['authenticated_at_iso']}")
print(f" State file: {info['state_file']}")
elif args.command == 'validate':
if auth.validate_auth():
print("Authentication is valid and working")
else:
print("Authentication is invalid or expired")
print("Run: auth_manager.py setup")
elif args.command == 'clear':
if auth.clear_auth():
print("Authentication cleared")
elif args.command == 'reauth':
if auth.re_auth(timeout_minutes=args.timeout):
print("\n✅ Re-authentication complete!")
else:
print("\n❌ Re-authentication failed")
exit(1)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,255 @@
#!/usr/bin/env python3
"""
Browser Session Management for NotebookLM
Individual browser session for persistent NotebookLM conversations
Based on the original NotebookLM API implementation
"""
import time
import sys
from typing import Any, Dict, Optional
from pathlib import Path
from patchright.sync_api import BrowserContext, Page
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from browser_utils import StealthUtils
class BrowserSession:
"""
Represents a single persistent browser session for NotebookLM
Each session gets its own Page (tab) within a shared BrowserContext,
allowing for contextual conversations where NotebookLM remembers
previous messages.
"""
def __init__(self, session_id: str, context: BrowserContext, notebook_url: str):
"""
Initialize a new browser session
Args:
session_id: Unique identifier for this session
context: Browser context (shared or dedicated)
notebook_url: Target NotebookLM URL for this session
"""
self.id = session_id
self.created_at = time.time()
self.last_activity = time.time()
self.message_count = 0
self.notebook_url = notebook_url
self.context = context
self.page = None
self.stealth = StealthUtils()
# Initialize the session
self._initialize()
def _initialize(self):
"""Initialize the browser session and navigate to NotebookLM"""
print(f"🚀 Creating session {self.id}...")
# Create new page (tab) in context
self.page = self.context.new_page()
print(f" 🌐 Navigating to NotebookLM...")
try:
# Navigate to notebook
self.page.goto(self.notebook_url, wait_until="domcontentloaded", timeout=30000)
# Check if login is needed
if "accounts.google.com" in self.page.url:
raise RuntimeError("Authentication required. Please run auth_manager.py setup first.")
# Wait for page to be ready
self._wait_for_ready()
# Simulate human inspection
self.stealth.random_mouse_movement(self.page)
self.stealth.random_delay(300, 600)
print(f"✅ Session {self.id} ready!")
except Exception as e:
print(f"❌ Failed to initialize session: {e}")
if self.page:
self.page.close()
raise
def _wait_for_ready(self):
"""Wait for NotebookLM page to be ready"""
try:
# Wait for chat input
self.page.wait_for_selector("textarea.query-box-input", timeout=10000, state="visible")
except Exception:
# Try alternative selector
self.page.wait_for_selector('textarea[aria-label="Feld für Anfragen"]', timeout=5000, state="visible")
def ask(self, question: str) -> Dict[str, Any]:
"""
Ask a question in this session
Args:
question: The question to ask
Returns:
Dict with status, question, answer, session_id
"""
try:
self.last_activity = time.time()
self.message_count += 1
print(f"💬 [{self.id}] Asking: {question}")
# Snapshot current answer to detect new response
previous_answer = self._snapshot_latest_response()
# Find chat input
chat_input_selector = "textarea.query-box-input"
try:
self.page.wait_for_selector(chat_input_selector, timeout=5000, state="visible")
except Exception:
chat_input_selector = 'textarea[aria-label="Feld für Anfragen"]'
self.page.wait_for_selector(chat_input_selector, timeout=5000, state="visible")
# Click and type with human-like behavior
self.stealth.realistic_click(self.page, chat_input_selector)
self.stealth.human_type(self.page, chat_input_selector, question)
# Small pause before submit
self.stealth.random_delay(300, 800)
# Submit
self.page.keyboard.press("Enter")
# Wait for response
print(" ⏳ Waiting for response...")
self.stealth.random_delay(1500, 3000)
# Get new answer
answer = self._wait_for_latest_answer(previous_answer)
if not answer:
raise Exception("Empty response from NotebookLM")
print(f" ✅ Got response ({len(answer)} chars)")
return {
"status": "success",
"question": question,
"answer": answer,
"session_id": self.id,
"notebook_url": self.notebook_url
}
except Exception as e:
print(f" ❌ Error: {e}")
return {
"status": "error",
"question": question,
"error": str(e),
"session_id": self.id
}
def _snapshot_latest_response(self) -> Optional[str]:
"""Get the current latest response text"""
try:
# Use correct NotebookLM selector
responses = self.page.query_selector_all(".to-user-container .message-text-content")
if responses:
return responses[-1].inner_text()
except Exception:
pass
return None
def _wait_for_latest_answer(self, previous_answer: Optional[str], timeout: int = 120) -> str:
"""Wait for and extract the new answer"""
start_time = time.time()
last_candidate = None
stable_count = 0
while time.time() - start_time < timeout:
# Check if NotebookLM is still thinking (most reliable indicator)
try:
thinking_element = self.page.query_selector('div.thinking-message')
if thinking_element and thinking_element.is_visible():
time.sleep(0.5)
continue
except Exception:
pass
try:
# Use correct NotebookLM selector
responses = self.page.query_selector_all(".to-user-container .message-text-content")
if responses:
latest_text = responses[-1].inner_text().strip()
# Check if it's a new response
if latest_text and latest_text != previous_answer:
# Check if text is stable (3 consecutive polls)
if latest_text == last_candidate:
stable_count += 1
if stable_count >= 3:
return latest_text
else:
stable_count = 1
last_candidate = latest_text
except Exception:
pass
time.sleep(0.5)
raise TimeoutError(f"No response received within {timeout} seconds")
def reset(self):
"""Reset the chat by reloading the page"""
print(f"🔄 Resetting session {self.id}...")
self.page.reload(wait_until="domcontentloaded")
self._wait_for_ready()
previous_count = self.message_count
self.message_count = 0
self.last_activity = time.time()
print(f"✅ Session reset (cleared {previous_count} messages)")
return previous_count
def close(self):
"""Close this session and clean up resources"""
print(f"🛑 Closing session {self.id}...")
if self.page:
try:
self.page.close()
except Exception as e:
print(f" ⚠️ Error closing page: {e}")
print(f"✅ Session {self.id} closed")
def get_info(self) -> Dict[str, Any]:
"""Get information about this session"""
return {
"id": self.id,
"created_at": self.created_at,
"last_activity": self.last_activity,
"age_seconds": time.time() - self.created_at,
"inactive_seconds": time.time() - self.last_activity,
"message_count": self.message_count,
"notebook_url": self.notebook_url
}
def is_expired(self, timeout_seconds: int = 900) -> bool:
"""Check if session has expired (default: 15 minutes)"""
return (time.time() - self.last_activity) > timeout_seconds
if __name__ == "__main__":
# Example usage
print("Browser Session Module - Use ask_question.py for main interface")
print("This module provides low-level browser session management.")

View File

@@ -0,0 +1,107 @@
"""
Browser Utilities for NotebookLM Skill
Handles browser launching, stealth features, and common interactions
"""
import json
import time
import random
from typing import Optional, List
from patchright.sync_api import Playwright, BrowserContext, Page
from config import BROWSER_PROFILE_DIR, STATE_FILE, BROWSER_ARGS, USER_AGENT
class BrowserFactory:
"""Factory for creating configured browser contexts"""
@staticmethod
def launch_persistent_context(
playwright: Playwright,
headless: bool = True,
user_data_dir: str = str(BROWSER_PROFILE_DIR)
) -> BrowserContext:
"""
Launch a persistent browser context with anti-detection features
and cookie workaround.
"""
# Launch persistent context
context = playwright.chromium.launch_persistent_context(
user_data_dir=user_data_dir,
channel="chrome", # Use real Chrome
headless=headless,
no_viewport=True,
ignore_default_args=["--enable-automation"],
user_agent=USER_AGENT,
args=BROWSER_ARGS
)
# Cookie Workaround for Playwright bug #36139
# Session cookies (expires=-1) don't persist in user_data_dir automatically
BrowserFactory._inject_cookies(context)
return context
@staticmethod
def _inject_cookies(context: BrowserContext):
"""Inject cookies from state.json if available"""
if STATE_FILE.exists():
try:
with open(STATE_FILE, 'r') as f:
state = json.load(f)
if 'cookies' in state and len(state['cookies']) > 0:
context.add_cookies(state['cookies'])
# print(f" 🔧 Injected {len(state['cookies'])} cookies from state.json")
except Exception as e:
print(f" ⚠️ Could not load state.json: {e}")
class StealthUtils:
"""Human-like interaction utilities"""
@staticmethod
def random_delay(min_ms: int = 100, max_ms: int = 500):
"""Add random delay"""
time.sleep(random.uniform(min_ms / 1000, max_ms / 1000))
@staticmethod
def human_type(page: Page, selector: str, text: str, wpm_min: int = 320, wpm_max: int = 480):
"""Type with human-like speed"""
element = page.query_selector(selector)
if not element:
# Try waiting if not immediately found
try:
element = page.wait_for_selector(selector, timeout=2000)
except:
pass
if not element:
print(f"⚠️ Element not found for typing: {selector}")
return
# Click to focus
element.click()
# Type
for char in text:
element.type(char, delay=random.uniform(25, 75))
if random.random() < 0.05:
time.sleep(random.uniform(0.15, 0.4))
@staticmethod
def realistic_click(page: Page, selector: str):
"""Click with realistic movement"""
element = page.query_selector(selector)
if not element:
return
# Optional: Move mouse to element (simplified)
box = element.bounding_box()
if box:
x = box['x'] + box['width'] / 2
y = box['y'] + box['height'] / 2
page.mouse.move(x, y, steps=5)
StealthUtils.random_delay(100, 300)
element.click()
StealthUtils.random_delay(100, 300)

View File

@@ -0,0 +1,302 @@
#!/usr/bin/env python3
"""
Cleanup Manager for NotebookLM Skill
Manages cleanup of skill data and browser state
"""
import shutil
import argparse
from pathlib import Path
from typing import Dict, List, Any
class CleanupManager:
"""
Manages cleanup of NotebookLM skill data
Features:
- Preview what will be deleted
- Selective cleanup options
- Library preservation
- Safe deletion with confirmation
"""
def __init__(self):
"""Initialize the cleanup manager"""
# Skill directory paths
self.skill_dir = Path(__file__).parent.parent
self.data_dir = self.skill_dir / "data"
def get_cleanup_paths(self, preserve_library: bool = False) -> Dict[str, Any]:
"""
Get paths that would be cleaned up
Args:
preserve_library: Keep library.json if True
Returns:
Dict with paths and sizes
Note: .venv is NEVER deleted - it's part of the skill infrastructure
"""
paths = {
'browser_state': [],
'sessions': [],
'library': [],
'auth': [],
'other': []
}
total_size = 0
if self.data_dir.exists():
# Browser state
browser_state_dir = self.data_dir / "browser_state"
if browser_state_dir.exists():
for item in browser_state_dir.iterdir():
size = self._get_size(item)
paths['browser_state'].append({
'path': str(item),
'size': size,
'type': 'dir' if item.is_dir() else 'file'
})
total_size += size
# Sessions
sessions_file = self.data_dir / "sessions.json"
if sessions_file.exists():
size = sessions_file.stat().st_size
paths['sessions'].append({
'path': str(sessions_file),
'size': size,
'type': 'file'
})
total_size += size
# Library (unless preserved)
if not preserve_library:
library_file = self.data_dir / "library.json"
if library_file.exists():
size = library_file.stat().st_size
paths['library'].append({
'path': str(library_file),
'size': size,
'type': 'file'
})
total_size += size
# Auth info
auth_info = self.data_dir / "auth_info.json"
if auth_info.exists():
size = auth_info.stat().st_size
paths['auth'].append({
'path': str(auth_info),
'size': size,
'type': 'file'
})
total_size += size
# Other files in data dir (but NEVER .venv!)
for item in self.data_dir.iterdir():
if item.name not in ['browser_state', 'sessions.json', 'library.json', 'auth_info.json']:
size = self._get_size(item)
paths['other'].append({
'path': str(item),
'size': size,
'type': 'dir' if item.is_dir() else 'file'
})
total_size += size
return {
'categories': paths,
'total_size': total_size,
'total_items': sum(len(items) for items in paths.values())
}
def _get_size(self, path: Path) -> int:
"""Get size of file or directory in bytes"""
if path.is_file():
return path.stat().st_size
elif path.is_dir():
total = 0
try:
for item in path.rglob('*'):
if item.is_file():
total += item.stat().st_size
except Exception:
pass
return total
return 0
def _format_size(self, size: int) -> str:
"""Format size in human-readable form"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024:
return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
def perform_cleanup(
self,
preserve_library: bool = False,
dry_run: bool = False
) -> Dict[str, Any]:
"""
Perform the actual cleanup
Args:
preserve_library: Keep library.json if True
dry_run: Preview only, don't delete
Returns:
Dict with cleanup results
"""
cleanup_data = self.get_cleanup_paths(preserve_library)
deleted_items = []
failed_items = []
deleted_size = 0
if dry_run:
return {
'dry_run': True,
'would_delete': cleanup_data['total_items'],
'would_free': cleanup_data['total_size']
}
# Perform deletion
for category, items in cleanup_data['categories'].items():
for item_info in items:
path = Path(item_info['path'])
try:
if path.exists():
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
deleted_items.append(str(path))
deleted_size += item_info['size']
print(f" ✅ Deleted: {path.name}")
except Exception as e:
failed_items.append({
'path': str(path),
'error': str(e)
})
print(f" ❌ Failed: {path.name} ({e})")
# Recreate browser_state dir if everything was deleted
if not preserve_library and not failed_items:
browser_state_dir = self.data_dir / "browser_state"
browser_state_dir.mkdir(parents=True, exist_ok=True)
return {
'deleted_items': deleted_items,
'failed_items': failed_items,
'deleted_size': deleted_size,
'deleted_count': len(deleted_items),
'failed_count': len(failed_items)
}
def print_cleanup_preview(self, preserve_library: bool = False):
"""Print a preview of what will be cleaned"""
data = self.get_cleanup_paths(preserve_library)
print("\n🔍 Cleanup Preview")
print("=" * 60)
for category, items in data['categories'].items():
if items:
print(f"\n📁 {category.replace('_', ' ').title()}:")
for item in items:
path = Path(item['path'])
size_str = self._format_size(item['size'])
type_icon = "📂" if item['type'] == 'dir' else "📄"
print(f" {type_icon} {path.name:<30} {size_str:>10}")
print("\n" + "=" * 60)
print(f"Total items: {data['total_items']}")
print(f"Total size: {self._format_size(data['total_size'])}")
if preserve_library:
print("\n📚 Library will be preserved")
print("\nThis preview shows what would be deleted.")
print("Use --confirm to actually perform the cleanup.")
def main():
"""Command-line interface for cleanup management"""
parser = argparse.ArgumentParser(
description='Clean up NotebookLM skill data',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Preview what will be deleted
python cleanup_manager.py
# Perform cleanup (delete everything)
python cleanup_manager.py --confirm
# Cleanup but keep library
python cleanup_manager.py --confirm --preserve-library
# Force cleanup without preview
python cleanup_manager.py --confirm --force
"""
)
parser.add_argument(
'--confirm',
action='store_true',
help='Actually perform the cleanup (without this, only preview)'
)
parser.add_argument(
'--preserve-library',
action='store_true',
help='Keep the notebook library (library.json)'
)
parser.add_argument(
'--force',
action='store_true',
help='Skip confirmation prompt'
)
args = parser.parse_args()
# Initialize manager
manager = CleanupManager()
if args.confirm:
# Show preview first unless forced
if not args.force:
manager.print_cleanup_preview(args.preserve_library)
print("\n⚠️ WARNING: This will delete the files shown above!")
print(" Note: .venv is preserved (part of skill infrastructure)")
response = input("Are you sure? (yes/no): ")
if response.lower() != 'yes':
print("Cleanup cancelled.")
return
# Perform cleanup
print("\n🗑️ Performing cleanup...")
result = manager.perform_cleanup(args.preserve_library, dry_run=False)
print(f"\n✅ Cleanup complete!")
print(f" Deleted: {result['deleted_count']} items")
print(f" Freed: {manager._format_size(result['deleted_size'])}")
if result['failed_count'] > 0:
print(f" ⚠️ Failed: {result['failed_count']} items")
else:
# Just show preview
manager.print_cleanup_preview(args.preserve_library)
print("\n💡 Note: Virtual environment (.venv) is never deleted")
print(" It's part of the skill infrastructure, not user data")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,44 @@
"""
Configuration for NotebookLM Skill
Centralizes constants, selectors, and paths
"""
from pathlib import Path
# Paths
SKILL_DIR = Path(__file__).parent.parent
DATA_DIR = SKILL_DIR / "data"
BROWSER_STATE_DIR = DATA_DIR / "browser_state"
BROWSER_PROFILE_DIR = BROWSER_STATE_DIR / "browser_profile"
STATE_FILE = BROWSER_STATE_DIR / "state.json"
AUTH_INFO_FILE = DATA_DIR / "auth_info.json"
LIBRARY_FILE = DATA_DIR / "library.json"
# NotebookLM Selectors
QUERY_INPUT_SELECTORS = [
"textarea.query-box-input", # Primary
'textarea[aria-label="Feld für Anfragen"]', # Fallback German
'textarea[aria-label="Input for queries"]', # Fallback English
]
RESPONSE_SELECTORS = [
".to-user-container .message-text-content", # Primary
"[data-message-author='bot']",
"[data-message-author='assistant']",
]
# Browser Configuration
BROWSER_ARGS = [
'--disable-blink-features=AutomationControlled', # Patches navigator.webdriver
'--disable-dev-shm-usage',
'--no-sandbox',
'--no-first-run',
'--no-default-browser-check'
]
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
# Timeouts
LOGIN_TIMEOUT_MINUTES = 10
QUERY_TIMEOUT_SECONDS = 120
PAGE_LOAD_TIMEOUT = 30000

View File

@@ -0,0 +1,410 @@
#!/usr/bin/env python3
"""
Notebook Library Management for NotebookLM
Manages a library of NotebookLM notebooks with metadata
Based on the MCP server implementation
"""
import json
import argparse
import uuid
import os
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime
class NotebookLibrary:
"""Manages a collection of NotebookLM notebooks with metadata"""
def __init__(self):
"""Initialize the notebook library"""
# Store data within the skill directory
skill_dir = Path(__file__).parent.parent
self.data_dir = skill_dir / "data"
self.data_dir.mkdir(parents=True, exist_ok=True)
self.library_file = self.data_dir / "library.json"
self.notebooks: Dict[str, Dict[str, Any]] = {}
self.active_notebook_id: Optional[str] = None
# Load existing library
self._load_library()
def _load_library(self):
"""Load library from disk"""
if self.library_file.exists():
try:
with open(self.library_file, 'r') as f:
data = json.load(f)
self.notebooks = data.get('notebooks', {})
self.active_notebook_id = data.get('active_notebook_id')
print(f"📚 Loaded library with {len(self.notebooks)} notebooks")
except Exception as e:
print(f"⚠️ Error loading library: {e}")
self.notebooks = {}
self.active_notebook_id = None
else:
self._save_library()
def _save_library(self):
"""Save library to disk"""
try:
data = {
'notebooks': self.notebooks,
'active_notebook_id': self.active_notebook_id,
'updated_at': datetime.now().isoformat()
}
with open(self.library_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"❌ Error saving library: {e}")
def add_notebook(
self,
url: str,
name: str,
description: str,
topics: List[str],
content_types: Optional[List[str]] = None,
use_cases: Optional[List[str]] = None,
tags: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Add a new notebook to the library
Args:
url: NotebookLM notebook URL
name: Display name for the notebook
description: What's in this notebook
topics: Topics covered
content_types: Types of content (optional)
use_cases: When to use this notebook (optional)
tags: Additional tags for organization (optional)
Returns:
The created notebook object
"""
# Generate ID from name
notebook_id = name.lower().replace(' ', '-').replace('_', '-')
# Check for duplicates
if notebook_id in self.notebooks:
raise ValueError(f"Notebook with ID '{notebook_id}' already exists")
# Create notebook object
notebook = {
'id': notebook_id,
'url': url,
'name': name,
'description': description,
'topics': topics,
'content_types': content_types or [],
'use_cases': use_cases or [],
'tags': tags or [],
'created_at': datetime.now().isoformat(),
'updated_at': datetime.now().isoformat(),
'use_count': 0,
'last_used': None
}
# Add to library
self.notebooks[notebook_id] = notebook
# Set as active if it's the first notebook
if len(self.notebooks) == 1:
self.active_notebook_id = notebook_id
self._save_library()
print(f"✅ Added notebook: {name} ({notebook_id})")
return notebook
def remove_notebook(self, notebook_id: str) -> bool:
"""
Remove a notebook from the library
Args:
notebook_id: ID of notebook to remove
Returns:
True if removed, False if not found
"""
if notebook_id in self.notebooks:
del self.notebooks[notebook_id]
# Clear active if it was removed
if self.active_notebook_id == notebook_id:
self.active_notebook_id = None
# Set new active if there are other notebooks
if self.notebooks:
self.active_notebook_id = list(self.notebooks.keys())[0]
self._save_library()
print(f"✅ Removed notebook: {notebook_id}")
return True
print(f"⚠️ Notebook not found: {notebook_id}")
return False
def update_notebook(
self,
notebook_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
topics: Optional[List[str]] = None,
content_types: Optional[List[str]] = None,
use_cases: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
url: Optional[str] = None
) -> Dict[str, Any]:
"""
Update notebook metadata
Args:
notebook_id: ID of notebook to update
Other args: Fields to update (None = keep existing)
Returns:
Updated notebook object
"""
if notebook_id not in self.notebooks:
raise ValueError(f"Notebook not found: {notebook_id}")
notebook = self.notebooks[notebook_id]
# Update fields if provided
if name is not None:
notebook['name'] = name
if description is not None:
notebook['description'] = description
if topics is not None:
notebook['topics'] = topics
if content_types is not None:
notebook['content_types'] = content_types
if use_cases is not None:
notebook['use_cases'] = use_cases
if tags is not None:
notebook['tags'] = tags
if url is not None:
notebook['url'] = url
notebook['updated_at'] = datetime.now().isoformat()
self._save_library()
print(f"✅ Updated notebook: {notebook['name']}")
return notebook
def get_notebook(self, notebook_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific notebook by ID"""
return self.notebooks.get(notebook_id)
def list_notebooks(self) -> List[Dict[str, Any]]:
"""List all notebooks in the library"""
return list(self.notebooks.values())
def search_notebooks(self, query: str) -> List[Dict[str, Any]]:
"""
Search notebooks by query
Args:
query: Search query (searches name, description, topics, tags)
Returns:
List of matching notebooks
"""
query_lower = query.lower()
results = []
for notebook in self.notebooks.values():
# Search in various fields
searchable = [
notebook['name'].lower(),
notebook['description'].lower(),
' '.join(notebook['topics']).lower(),
' '.join(notebook['tags']).lower(),
' '.join(notebook.get('use_cases', [])).lower()
]
if any(query_lower in field for field in searchable):
results.append(notebook)
return results
def select_notebook(self, notebook_id: str) -> Dict[str, Any]:
"""
Set a notebook as active
Args:
notebook_id: ID of notebook to activate
Returns:
The activated notebook
"""
if notebook_id not in self.notebooks:
raise ValueError(f"Notebook not found: {notebook_id}")
self.active_notebook_id = notebook_id
self._save_library()
notebook = self.notebooks[notebook_id]
print(f"✅ Activated notebook: {notebook['name']}")
return notebook
def get_active_notebook(self) -> Optional[Dict[str, Any]]:
"""Get the currently active notebook"""
if self.active_notebook_id:
return self.notebooks.get(self.active_notebook_id)
return None
def increment_use_count(self, notebook_id: str) -> Dict[str, Any]:
"""
Increment usage counter for a notebook
Args:
notebook_id: ID of notebook that was used
Returns:
Updated notebook
"""
if notebook_id not in self.notebooks:
raise ValueError(f"Notebook not found: {notebook_id}")
notebook = self.notebooks[notebook_id]
notebook['use_count'] += 1
notebook['last_used'] = datetime.now().isoformat()
self._save_library()
return notebook
def get_stats(self) -> Dict[str, Any]:
"""Get library statistics"""
total_notebooks = len(self.notebooks)
total_topics = set()
total_use_count = 0
for notebook in self.notebooks.values():
total_topics.update(notebook['topics'])
total_use_count += notebook['use_count']
# Find most used
most_used = None
if self.notebooks:
most_used = max(
self.notebooks.values(),
key=lambda n: n['use_count']
)
return {
'total_notebooks': total_notebooks,
'total_topics': len(total_topics),
'total_use_count': total_use_count,
'active_notebook': self.get_active_notebook(),
'most_used_notebook': most_used,
'library_path': str(self.library_file)
}
def main():
"""Command-line interface for notebook management"""
parser = argparse.ArgumentParser(description='Manage NotebookLM library')
subparsers = parser.add_subparsers(dest='command', help='Commands')
# Add command
add_parser = subparsers.add_parser('add', help='Add a notebook')
add_parser.add_argument('--url', required=True, help='NotebookLM URL')
add_parser.add_argument('--name', required=True, help='Display name')
add_parser.add_argument('--description', required=True, help='Description')
add_parser.add_argument('--topics', required=True, help='Comma-separated topics')
add_parser.add_argument('--use-cases', help='Comma-separated use cases')
add_parser.add_argument('--tags', help='Comma-separated tags')
# List command
subparsers.add_parser('list', help='List all notebooks')
# Search command
search_parser = subparsers.add_parser('search', help='Search notebooks')
search_parser.add_argument('--query', required=True, help='Search query')
# Activate command
activate_parser = subparsers.add_parser('activate', help='Set active notebook')
activate_parser.add_argument('--id', required=True, help='Notebook ID')
# Remove command
remove_parser = subparsers.add_parser('remove', help='Remove a notebook')
remove_parser.add_argument('--id', required=True, help='Notebook ID')
# Stats command
subparsers.add_parser('stats', help='Show library statistics')
args = parser.parse_args()
# Initialize library
library = NotebookLibrary()
# Execute command
if args.command == 'add':
topics = [t.strip() for t in args.topics.split(',')]
use_cases = [u.strip() for u in args.use_cases.split(',')] if args.use_cases else None
tags = [t.strip() for t in args.tags.split(',')] if args.tags else None
notebook = library.add_notebook(
url=args.url,
name=args.name,
description=args.description,
topics=topics,
use_cases=use_cases,
tags=tags
)
print(json.dumps(notebook, indent=2))
elif args.command == 'list':
notebooks = library.list_notebooks()
if notebooks:
print("\n📚 Notebook Library:")
for notebook in notebooks:
active = " [ACTIVE]" if notebook['id'] == library.active_notebook_id else ""
print(f"\n 📓 {notebook['name']}{active}")
print(f" ID: {notebook['id']}")
print(f" Topics: {', '.join(notebook['topics'])}")
print(f" Uses: {notebook['use_count']}")
else:
print("📚 Library is empty. Add notebooks with: notebook_manager.py add")
elif args.command == 'search':
results = library.search_notebooks(args.query)
if results:
print(f"\n🔍 Found {len(results)} notebooks:")
for notebook in results:
print(f"\n 📓 {notebook['name']} ({notebook['id']})")
print(f" {notebook['description']}")
else:
print(f"🔍 No notebooks found for: {args.query}")
elif args.command == 'activate':
notebook = library.select_notebook(args.id)
print(f"Now using: {notebook['name']}")
elif args.command == 'remove':
if library.remove_notebook(args.id):
print("Notebook removed from library")
elif args.command == 'stats':
stats = library.get_stats()
print("\n📊 Library Statistics:")
print(f" Total notebooks: {stats['total_notebooks']}")
print(f" Total topics: {stats['total_topics']}")
print(f" Total uses: {stats['total_use_count']}")
if stats['active_notebook']:
print(f" Active: {stats['active_notebook']['name']}")
if stats['most_used_notebook']:
print(f" Most used: {stats['most_used_notebook']['name']} ({stats['most_used_notebook']['use_count']} uses)")
print(f" Library path: {stats['library_path']}")
else:
parser.print_help()
if __name__ == "__main__":
main()

102
skills/notebooklm/scripts/run.py Executable file
View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""
Universal runner for NotebookLM skill scripts
Ensures all scripts run with the correct virtual environment
"""
import os
import sys
import subprocess
from pathlib import Path
def get_venv_python():
"""Get the virtual environment Python executable"""
skill_dir = Path(__file__).parent.parent
venv_dir = skill_dir / ".venv"
if os.name == 'nt': # Windows
venv_python = venv_dir / "Scripts" / "python.exe"
else: # Unix/Linux/Mac
venv_python = venv_dir / "bin" / "python"
return venv_python
def ensure_venv():
"""Ensure virtual environment exists"""
skill_dir = Path(__file__).parent.parent
venv_dir = skill_dir / ".venv"
setup_script = skill_dir / "scripts" / "setup_environment.py"
# Check if venv exists
if not venv_dir.exists():
print("🔧 First-time setup: Creating virtual environment...")
print(" This may take a minute...")
# Run setup with system Python
result = subprocess.run([sys.executable, str(setup_script)])
if result.returncode != 0:
print("❌ Failed to set up environment")
sys.exit(1)
print("✅ Environment ready!")
return get_venv_python()
def main():
"""Main runner"""
if len(sys.argv) < 2:
print("Usage: python run.py <script_name> [args...]")
print("\nAvailable scripts:")
print(" ask_question.py - Query NotebookLM")
print(" notebook_manager.py - Manage notebook library")
print(" session_manager.py - Manage sessions")
print(" auth_manager.py - Handle authentication")
print(" cleanup_manager.py - Clean up skill data")
sys.exit(1)
script_name = sys.argv[1]
script_args = sys.argv[2:]
# Handle both "scripts/script.py" and "script.py" formats
if script_name.startswith('scripts/'):
# Remove the scripts/ prefix if provided
script_name = script_name[8:] # len('scripts/') = 8
# Ensure .py extension
if not script_name.endswith('.py'):
script_name += '.py'
# Get script path
skill_dir = Path(__file__).parent.parent
script_path = skill_dir / "scripts" / script_name
if not script_path.exists():
print(f"❌ Script not found: {script_name}")
print(f" Working directory: {Path.cwd()}")
print(f" Skill directory: {skill_dir}")
print(f" Looked for: {script_path}")
sys.exit(1)
# Ensure venv exists and get Python executable
venv_python = ensure_venv()
# Build command
cmd = [str(venv_python), str(script_path)] + script_args
# Run the script
try:
result = subprocess.run(cmd)
sys.exit(result.returncode)
except KeyboardInterrupt:
print("\n⚠️ Interrupted by user")
sys.exit(130)
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Environment Setup for NotebookLM Skill
Manages virtual environment and dependencies automatically
"""
import os
import sys
import subprocess
import venv
from pathlib import Path
class SkillEnvironment:
"""Manages skill-specific virtual environment"""
def __init__(self):
# Skill directory paths
self.skill_dir = Path(__file__).parent.parent
self.venv_dir = self.skill_dir / ".venv"
self.requirements_file = self.skill_dir / "requirements.txt"
# Python executable in venv
if os.name == 'nt': # Windows
self.venv_python = self.venv_dir / "Scripts" / "python.exe"
self.venv_pip = self.venv_dir / "Scripts" / "pip.exe"
else: # Unix/Linux/Mac
self.venv_python = self.venv_dir / "bin" / "python"
self.venv_pip = self.venv_dir / "bin" / "pip"
def ensure_venv(self) -> bool:
"""Ensure virtual environment exists and is set up"""
# Check if we're already in the correct venv
if self.is_in_skill_venv():
print("✅ Already running in skill virtual environment")
return True
# Create venv if it doesn't exist
if not self.venv_dir.exists():
print(f"🔧 Creating virtual environment in {self.venv_dir.name}/")
try:
venv.create(self.venv_dir, with_pip=True)
print("✅ Virtual environment created")
except Exception as e:
print(f"❌ Failed to create venv: {e}")
return False
# Install/update dependencies
if self.requirements_file.exists():
print("📦 Installing dependencies...")
try:
# Upgrade pip first
subprocess.run(
[str(self.venv_pip), "install", "--upgrade", "pip"],
check=True,
capture_output=True,
text=True
)
# Install requirements
result = subprocess.run(
[str(self.venv_pip), "install", "-r", str(self.requirements_file)],
check=True,
capture_output=True,
text=True
)
print("✅ Dependencies installed")
# Install Chrome for Patchright (not Chromium!)
# Using real Chrome ensures cross-platform reliability and consistent browser fingerprinting
# See: https://github.com/Kaliiiiiiiiii-Vinyzu/patchright-python#anti-detection
print("🌐 Installing Google Chrome for Patchright...")
try:
subprocess.run(
[str(self.venv_python), "-m", "patchright", "install", "chrome"],
check=True,
capture_output=True,
text=True
)
print("✅ Chrome installed")
except subprocess.CalledProcessError as e:
print(f"⚠️ Warning: Failed to install Chrome: {e}")
print(" You may need to run manually: python -m patchright install chrome")
print(" Chrome is required (not Chromium) for reliability!")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Failed to install dependencies: {e}")
print(f" Output: {e.output if hasattr(e, 'output') else 'No output'}")
return False
else:
print("⚠️ No requirements.txt found, skipping dependency installation")
return True
def is_in_skill_venv(self) -> bool:
"""Check if we're already running in the skill's venv"""
if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
# We're in a venv, check if it's ours
venv_path = Path(sys.prefix)
return venv_path == self.venv_dir
return False
def get_python_executable(self) -> str:
"""Get the correct Python executable to use"""
if self.venv_python.exists():
return str(self.venv_python)
return sys.executable
def run_script(self, script_name: str, args: list = None) -> int:
"""Run a script with the virtual environment"""
script_path = self.skill_dir / "scripts" / script_name
if not script_path.exists():
print(f"❌ Script not found: {script_path}")
return 1
# Ensure venv is set up
if not self.ensure_venv():
print("❌ Failed to set up environment")
return 1
# Build command
cmd = [str(self.venv_python), str(script_path)]
if args:
cmd.extend(args)
print(f"🚀 Running: {script_name} with venv Python")
try:
# Run the script with venv Python
result = subprocess.run(cmd)
return result.returncode
except Exception as e:
print(f"❌ Failed to run script: {e}")
return 1
def activate_instructions(self) -> str:
"""Get instructions for manual activation"""
if os.name == 'nt':
activate = self.venv_dir / "Scripts" / "activate.bat"
return f"Run: {activate}"
else:
activate = self.venv_dir / "bin" / "activate"
return f"Run: source {activate}"
def main():
"""Main entry point for environment setup"""
import argparse
parser = argparse.ArgumentParser(
description='Setup NotebookLM skill environment'
)
parser.add_argument(
'--check',
action='store_true',
help='Check if environment is set up'
)
parser.add_argument(
'--run',
help='Run a script with the venv (e.g., --run ask_question.py)'
)
parser.add_argument(
'args',
nargs='*',
help='Arguments to pass to the script'
)
args = parser.parse_args()
env = SkillEnvironment()
if args.check:
if env.venv_dir.exists():
print(f"✅ Virtual environment exists: {env.venv_dir}")
print(f" Python: {env.get_python_executable()}")
print(f" To activate manually: {env.activate_instructions()}")
else:
print(f"❌ No virtual environment found")
print(f" Run setup_environment.py to create it")
return
if args.run:
# Run a script with venv
return env.run_script(args.run, args.args)
# Default: ensure environment is set up
if env.ensure_venv():
print("\n✅ Environment ready!")
print(f" Virtual env: {env.venv_dir}")
print(f" Python: {env.get_python_executable()}")
print(f"\nTo activate manually: {env.activate_instructions()}")
print(f"Or run scripts directly: python setup_environment.py --run script_name.py")
else:
print("\n❌ Environment setup failed")
return 1
if __name__ == "__main__":
sys.exit(main() or 0)