IMPLEMENTED: Label-based project board sync workflow Changes: - Created needs-board-sync label (ID: 34, orange #FFA500) - Modified sync script to auto-add label to new non-complete issues - Created manual workflow documentation Why this approach: - Gitea Projects REST API does NOT exist even in 1.25.5 - Gemini was incorrect about API availability in 1.22+ - Projects API still in development (PR #36824, targeting 1.26.0+) - Confirmed via swagger spec: zero /projects endpoints exist How it works: 1. Sync script creates issues with needs-board-sync label 2. Filter by label in Gitea UI 3. Drag to project board (Backlog column) 4. Remove label after syncing 5. Takes 30-60 seconds per sync session Future automation: When Gitea 1.26.0+ releases with Projects API, we'll modify the sync script to use /projects/ endpoints and remove this manual workflow. Related: Gitea successfully upgraded to 1.25.5 earlier this session Signed-off-by: The Chronicler <claude@firefrostgaming.com>
357 lines
12 KiB
Python
Executable File
357 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Firefrost Gaming - Task to Gitea Issue Synchronization
|
|
Automatically creates/updates Gitea issues based on docs/core/tasks.md
|
|
|
|
This script:
|
|
1. Parses docs/core/tasks.md to extract all tasks
|
|
2. Checks which tasks have corresponding Gitea issues
|
|
3. Creates missing issues with appropriate labels
|
|
4. Updates existing issues if task details changed
|
|
|
|
Usage:
|
|
python3 scripts/sync-tasks-to-issues.py [--dry-run] [--force]
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import re
|
|
import sys
|
|
import argparse
|
|
from typing import Dict, List, Optional, Set
|
|
|
|
# Configuration
|
|
GITEA_URL = "https://git.firefrostgaming.com"
|
|
GITEA_TOKEN = "e0e330cba1749b01ab505093a160e4423ebbbe36"
|
|
REPO_OWNER = "firefrost-gaming"
|
|
REPO_NAME = "firefrost-operations-manual"
|
|
TASKS_FILE = "docs/core/tasks.md"
|
|
|
|
# Label IDs (from Gitea API)
|
|
LABEL_IDS = {
|
|
'area/automation': 23,
|
|
'area/billing': 20,
|
|
'area/email': 21,
|
|
'area/game-servers': 25,
|
|
'area/networking': 24,
|
|
'area/operations': 26,
|
|
'area/panel': 18,
|
|
'area/website': 22,
|
|
'area/wings': 19,
|
|
'for/holly': 27,
|
|
'for/meg': 28,
|
|
'for/michael': 29,
|
|
'needs-board-sync': 34,
|
|
'priority/critical': 8,
|
|
'priority/high': 9,
|
|
'priority/low': 11,
|
|
'priority/medium': 10,
|
|
'status/backlog': 2,
|
|
'status/blocked': 6,
|
|
'status/done': 7,
|
|
'status/in-progress': 4,
|
|
'status/review': 5,
|
|
'status/to-do': 3,
|
|
'type/bug': 12,
|
|
'type/docs': 15,
|
|
'type/feature': 13,
|
|
'type/infrastructure': 16,
|
|
'type/refactor': 17,
|
|
'type/task': 14,
|
|
}
|
|
|
|
headers = {
|
|
"Authorization": f"token {GITEA_TOKEN}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
|
|
class Task:
|
|
"""Represents a task from tasks.md"""
|
|
|
|
def __init__(self, number: int, title: str, content: str):
|
|
self.number = number
|
|
self.title = title
|
|
self.content = content
|
|
self.status = self._parse_status()
|
|
self.priority = self._parse_priority()
|
|
self.time_estimate = self._parse_time()
|
|
self.assignees = self._parse_assignees()
|
|
self.areas = self._parse_areas()
|
|
self.task_type = self._parse_type()
|
|
|
|
def _parse_status(self) -> str:
|
|
"""Extract status from task content"""
|
|
if '✅ COMPLETE' in self.content:
|
|
return 'status/done'
|
|
if '**Status:** BLOCKED' in self.content or 'blocked' in self.content.lower():
|
|
return 'status/blocked'
|
|
if '**Status:** IN PROGRESS' in self.content:
|
|
return 'status/in-progress'
|
|
# Default to backlog for new tasks
|
|
return 'status/backlog'
|
|
|
|
def _parse_priority(self) -> str:
|
|
"""Extract priority from task content"""
|
|
content_lower = self.content.lower()
|
|
if 'priority: top' in content_lower or 'tier 0' in content_lower or 'critical' in content_lower:
|
|
return 'priority/critical'
|
|
if 'priority: high' in content_lower or 'high priority' in content_lower:
|
|
return 'priority/high'
|
|
if 'priority: medium' in content_lower or 'tier 2' in content_lower:
|
|
return 'priority/medium'
|
|
if 'priority: low' in content_lower or 'tier 3' in content_lower:
|
|
return 'priority/low'
|
|
# Default to medium
|
|
return 'priority/medium'
|
|
|
|
def _parse_time(self) -> Optional[str]:
|
|
"""Extract time estimate"""
|
|
match = re.search(r'\*\*Time:\*\*\s*([^\n]+)', self.content)
|
|
return match.group(1).strip() if match else None
|
|
|
|
def _parse_assignees(self) -> List[str]:
|
|
"""Extract assignees from task content"""
|
|
assignees = []
|
|
content_lower = self.content.lower()
|
|
|
|
if 'michael' in content_lower or 'frostystyle' in content_lower:
|
|
assignees.append('for/michael')
|
|
if 'meg' in content_lower or 'gingerfury' in content_lower or 'emissary' in content_lower:
|
|
assignees.append('for/meg')
|
|
if 'holly' in content_lower or 'unicorn' in content_lower or 'builder' in content_lower:
|
|
assignees.append('for/holly')
|
|
|
|
# Default to Michael if no assignee found
|
|
if not assignees:
|
|
assignees.append('for/michael')
|
|
|
|
return assignees
|
|
|
|
def _parse_areas(self) -> List[str]:
|
|
"""Extract area labels from task content"""
|
|
areas = []
|
|
content_lower = self.content.lower()
|
|
|
|
if 'ghost' in content_lower or 'website' in content_lower or 'homepage' in content_lower:
|
|
areas.append('area/website')
|
|
if 'pterodactyl' in content_lower or 'panel' in content_lower:
|
|
areas.append('area/panel')
|
|
if 'wings' in content_lower or 'game server' in content_lower:
|
|
areas.append('area/wings')
|
|
if 'mailcow' in content_lower or 'email' in content_lower:
|
|
areas.append('area/email')
|
|
if 'paymenter' in content_lower or 'billing' in content_lower:
|
|
areas.append('area/billing')
|
|
if 'n8n' in content_lower or 'automation' in content_lower or 'workflow' in content_lower:
|
|
areas.append('area/automation')
|
|
if 'network' in content_lower or 'firewall' in content_lower or 'tunnel' in content_lower:
|
|
areas.append('area/networking')
|
|
|
|
# Default to operations if no specific area
|
|
if not areas:
|
|
areas.append('area/operations')
|
|
|
|
return areas
|
|
|
|
def _parse_type(self) -> str:
|
|
"""Extract task type"""
|
|
content_lower = self.content.lower()
|
|
|
|
if 'bug' in content_lower or 'fix' in content_lower:
|
|
return 'type/bug'
|
|
if 'feature' in content_lower or 'new' in content_lower:
|
|
return 'type/feature'
|
|
if 'infrastructure' in content_lower or 'deploy' in content_lower or 'server' in content_lower:
|
|
return 'type/infrastructure'
|
|
if 'documentation' in content_lower or 'docs' in content_lower or 'guide' in content_lower:
|
|
return 'type/docs'
|
|
|
|
# Default to task
|
|
return 'type/task'
|
|
|
|
def get_label_ids(self) -> List[int]:
|
|
"""Get all label IDs for this task"""
|
|
label_names = [self.status, self.priority, self.task_type] + self.assignees + self.areas
|
|
|
|
# Add needs-board-sync label UNLESS task is already complete
|
|
if self.status != 'status/done':
|
|
label_names.append('needs-board-sync')
|
|
|
|
return [LABEL_IDS[name] for name in label_names if name in LABEL_IDS]
|
|
|
|
def to_issue_body(self) -> str:
|
|
"""Convert task content to issue body"""
|
|
body = f"### Task #{self.number}: {self.title}\n\n"
|
|
|
|
if self.time_estimate:
|
|
body += f"**Time Estimate:** {self.time_estimate}\n\n"
|
|
|
|
body += f"**Documentation:** `docs/tasks/` (see operations manual)\n\n"
|
|
body += "---\n\n"
|
|
body += self.content.strip()
|
|
body += f"\n\n---\n\n**Source:** `docs/core/tasks.md` (Task #{self.number})"
|
|
|
|
return body
|
|
|
|
|
|
def parse_tasks_file() -> List[Task]:
|
|
"""Parse docs/core/tasks.md and extract all tasks"""
|
|
tasks = []
|
|
|
|
try:
|
|
with open(TASKS_FILE, 'r') as f:
|
|
content = f.read()
|
|
except FileNotFoundError:
|
|
print(f"❌ Error: {TASKS_FILE} not found")
|
|
return []
|
|
|
|
# Split by task headers (### N. Title)
|
|
task_pattern = r'###\s+(\d+)\.\s+([^\n]+)\n(.*?)(?=\n###\s+\d+\.|\Z)'
|
|
matches = re.findall(task_pattern, content, re.DOTALL)
|
|
|
|
for match in matches:
|
|
number = int(match[0])
|
|
title = match[1].strip()
|
|
task_content = match[2].strip()
|
|
|
|
tasks.append(Task(number, title, task_content))
|
|
|
|
print(f"📋 Parsed {len(tasks)} tasks from {TASKS_FILE}")
|
|
return tasks
|
|
|
|
|
|
def get_existing_issues() -> Dict[int, dict]:
|
|
"""Fetch all existing issues and map them by task number"""
|
|
url = f"{GITEA_URL}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}/issues?state=all&limit=200"
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
issues = response.json()
|
|
except Exception as e:
|
|
print(f"❌ Error fetching issues: {e}")
|
|
return {}
|
|
|
|
# Map issues by task number extracted from title
|
|
issue_map = {}
|
|
for issue in issues:
|
|
title = issue['title']
|
|
match = re.search(r'Task #(\d+):', title)
|
|
if match:
|
|
task_num = int(match.group(1))
|
|
issue_map[task_num] = issue
|
|
|
|
print(f"🔍 Found {len(issue_map)} existing task issues")
|
|
return issue_map
|
|
|
|
|
|
def create_issue(task: Task, dry_run: bool = False) -> bool:
|
|
"""Create a Gitea issue for the task"""
|
|
url = f"{GITEA_URL}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}/issues"
|
|
|
|
data = {
|
|
"title": f"Task #{task.number}: {task.title}",
|
|
"body": task.to_issue_body(),
|
|
}
|
|
|
|
if dry_run:
|
|
print(f" [DRY RUN] Would create issue: Task #{task.number}")
|
|
return True
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, data=json.dumps(data))
|
|
response.raise_for_status()
|
|
issue = response.json()
|
|
|
|
# Add labels
|
|
label_url = f"{GITEA_URL}/api/v1/repos/{REPO_OWNER}/{REPO_NAME}/issues/{issue['number']}/labels"
|
|
label_data = {"labels": task.get_label_ids()}
|
|
label_response = requests.put(label_url, headers=headers, data=json.dumps(label_data))
|
|
|
|
if label_response.status_code == 200:
|
|
print(f" ✅ Created issue #{issue['number']}: Task #{task.number} - {task.title}")
|
|
return True
|
|
else:
|
|
print(f" ⚠️ Created issue #{issue['number']} but failed to add labels")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" ❌ Failed to create issue for Task #{task.number}: {e}")
|
|
return False
|
|
|
|
|
|
def sync_tasks_to_issues(dry_run: bool = False, force: bool = False):
|
|
"""Main synchronization function"""
|
|
print("🔥❄️ Firefrost Gaming - Task to Issue Sync\n")
|
|
|
|
# Parse tasks from tasks.md
|
|
tasks = parse_tasks_file()
|
|
if not tasks:
|
|
print("❌ No tasks found")
|
|
return
|
|
|
|
# Get existing issues
|
|
existing_issues = get_existing_issues()
|
|
|
|
# Find tasks that need issues created
|
|
missing_tasks = [task for task in tasks if task.number not in existing_issues]
|
|
existing_tasks = [task for task in tasks if task.number in existing_issues]
|
|
|
|
print(f"\n📊 Status:")
|
|
print(f" - Total tasks: {len(tasks)}")
|
|
print(f" - Already have issues: {len(existing_tasks)}")
|
|
print(f" - Need issues created: {len(missing_tasks)}")
|
|
|
|
if not missing_tasks:
|
|
print("\n✅ All tasks already have Gitea issues!")
|
|
return
|
|
|
|
# Create missing issues
|
|
print(f"\n🚀 Creating {len(missing_tasks)} missing issues...\n")
|
|
|
|
created = 0
|
|
failed = 0
|
|
|
|
for task in missing_tasks:
|
|
if create_issue(task, dry_run):
|
|
created += 1
|
|
else:
|
|
failed += 1
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"✅ Successfully created {created} issues")
|
|
if failed:
|
|
print(f"❌ Failed to create {failed} issues")
|
|
print(f"{'='*60}")
|
|
|
|
if not dry_run:
|
|
print("\n💡 Next steps:")
|
|
print(" 1. Issues created with status/backlog label")
|
|
print(" 2. Manually add issues to 'Firefrost Operations' project board via web UI")
|
|
print(" 3. Or wait for project board automation (future enhancement)")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Sync tasks from docs/core/tasks.md to Gitea issues"
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run',
|
|
action='store_true',
|
|
help="Show what would be created without actually creating"
|
|
)
|
|
parser.add_argument(
|
|
'--force',
|
|
action='store_true',
|
|
help="Force update of existing issues (not implemented yet)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
sync_tasks_to_issues(dry_run=args.dry_run, force=args.force)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|