from collections.abc import Generator import requests import time import base64 import markdown import certifi from typing import Dict, List, Optional, Any from urllib.parse import urlparse from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from dify_plugin.entities.datasource import ( DatasourceGetPagesResponse, DatasourceMessage, GetOnlineDocumentPageContentRequest, OnlineDocumentInfo, ) from dify_plugin.interfaces.datasource.online_document import OnlineDocumentDatasource class GitLabDataSource(OnlineDocumentDatasource): def __init__(self, **kwargs): super().__init__(**kwargs) # GitLab URL will be set from credentials, default to gitlab.com self.gitlab_url = None self.base_url = None def _get_requests_session(self) -> requests.Session: """Create a requests session with retry strategy and SSL verification""" session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE", "POST"], ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) session.verify = certifi.where() return session def _safe_json_response(self, response: requests.Response) -> dict: """Safely parse JSON response with validation""" if response.status_code >= 400: self._handle_rate_limit(response) try: data = response.json() if not isinstance(data, dict) and not isinstance(data, list): raise ValueError(f"Invalid response format: expected dict or list, got {type(data)}") return data except ValueError as e: raise ValueError(f"Invalid JSON response: {str(e)}") except Exception as e: raise ValueError(f"Failed to parse response: {str(e)}") def _get_headers(self) -> Dict[str, str]: """获取 API 请求头""" credentials = self.runtime.credentials access_token = credentials.get("access_token") if not access_token: raise ValueError("Access token not found in credentials") # Validate access token format (basic check) if not isinstance(access_token, str) or len(access_token.strip()) == 0: raise ValueError("Invalid access token format") return { "Authorization": f"Bearer {access_token.strip()}", "User-Agent": "Dify-GitLab-Datasource" } def _get_gitlab_url(self) -> str: """获取 GitLab URL""" if self.gitlab_url is None: credentials = self.runtime.credentials gitlab_url = credentials.get("gitlab_url", "https://gitlab.com") # Validate URL format if not isinstance(gitlab_url, str): raise ValueError("Invalid GitLab URL: must be a string") gitlab_url = gitlab_url.strip().rstrip("/") if not gitlab_url.startswith(("http://", "https://")): raise ValueError(f"Invalid GitLab URL format: {gitlab_url}") self.gitlab_url = gitlab_url self.base_url = f"{self.gitlab_url}/api/v4" return self.gitlab_url def _handle_rate_limit(self, response: requests.Response) -> None: """处理 API 限流""" if response.status_code == 429: # GitLab uses 429 for rate limiting retry_after = response.headers.get("Retry-After", "60") try: sleep_time = int(retry_after) except ValueError: sleep_time = 60 raise ValueError(f"GitLab API rate limit exceeded. Please wait {sleep_time} seconds.") elif response.status_code == 401: raise ValueError("Invalid GitLab access token. Please check your credentials.") elif response.status_code == 403: raise ValueError("Access forbidden. Check your GitLab permissions.") elif response.status_code >= 400: raise ValueError(f"GitLab API error: {response.status_code} - {response.text}") def _make_request(self, url: str, params: Optional[Dict] = None) -> Dict: """发起 API 请求并处理错误""" # Ensure GitLab URL is set self._get_gitlab_url() headers = self._get_headers() # Validate URL to prevent SSRF if not url.startswith(("http://", "https://")): raise ValueError(f"Invalid URL format: {url}") try: session = self._get_requests_session() response = session.get(url, headers=headers, params=params, timeout=30) return self._safe_json_response(response) except requests.exceptions.RequestException as e: raise ValueError(f"Network error when accessing GitLab API: {str(e)}") def _get_pages(self, datasource_parameters: dict[str, Any]) -> DatasourceGetPagesResponse: """获取 GitLab 页面列表(项目、Issues、MRs)""" access_token = self.runtime.credentials.get("access_token") if not access_token: raise ValueError("Access token not found in credentials") # 确保设置了 GitLab URL self._get_gitlab_url() # 获取用户信息 user_info = self._make_request(f"{self.base_url}/user") workspace_name = f"{user_info.get('name', user_info.get('username'))}'s GitLab" workspace_icon = user_info.get('avatar_url', '') workspace_id = str(user_info.get('id', '')) pages = [] # 获取用户项目 projects = self._get_projects() for project in projects: # 添加项目作为页面 pages.append({ "page_id": f"project:{project['path_with_namespace']}", "page_name": project['name'], "last_edited_time": project.get("last_activity_at", ""), "type": "project", "url": project['web_url'], "metadata": { "description": project.get("description", ""), "language": project.get("default_branch", ""), "stars": project.get("star_count", 0), "updated_at": project.get("last_activity_at", ""), "private": not project.get("visibility", "private") == "public" } }) # 添加 README 文件(如果存在) try: # GitLab API: GET /projects/:id/repository/files/:file_path project_id = project['id'] readme_info = self._make_request(f"{self.base_url}/projects/{project_id}/repository/files/README.md") pages.append({ "page_id": f"file:{project['path_with_namespace']}:README.md", "page_name": f"{project['name']} - README", "last_edited_time": project.get("last_activity_at", ""), "type": "file", "url": f"{project['web_url']}/-/blob/{project.get('default_branch', 'main')}/README.md", "metadata": { "project": project['path_with_namespace'], "file_path": "README.md", "size": readme_info.get('size', 0) } }) except ValueError: pass # README 不存在 # 添加热门 Issues try: project_id = project['id'] issues = self._make_request( f"{self.base_url}/projects/{project_id}/issues", params={"state": "all", "per_page": 5, "order_by": "updated_at"} ) for issue in issues: pages.append({ "page_id": f"issue:{project['path_with_namespace']}:{issue['iid']}", "page_name": f"Issue #{issue['iid']}: {issue['title']}", "last_edited_time": issue.get('updated_at', ''), "type": "issue", "url": issue['web_url'], "metadata": { "project": project['path_with_namespace'], "issue_number": issue['iid'], "state": issue['state'], "author": issue['author']['username'], "created_at": issue['created_at'] } }) except ValueError: pass # Issues 访问失败 # 添加热门 Merge Requests try: project_id = project['id'] merge_requests = self._make_request( f"{self.base_url}/projects/{project_id}/merge_requests", params={"state": "all", "per_page": 5, "order_by": "updated_at"} ) for mr in merge_requests: pages.append({ "page_id": f"mr:{project['path_with_namespace']}:{mr['iid']}", "page_name": f"MR #{mr['iid']}: {mr['title']}", "last_edited_time": mr.get('updated_at', ''), "type": "merge_request", "url": mr['web_url'], "metadata": { "project": project['path_with_namespace'], "mr_number": mr['iid'], "state": mr['state'], "author": mr['author']['username'], "target_branch": mr['target_branch'], "source_branch": mr['source_branch'] } }) except ValueError: pass # MRs 访问失败 online_document_info = OnlineDocumentInfo( workspace_name=workspace_name, workspace_icon=workspace_icon, workspace_id=workspace_id, pages=pages, total=len(pages), ) return DatasourceGetPagesResponse(result=[online_document_info]) def _get_projects(self, max_projects: int = 20) -> List[Dict]: """获取用户项目列表""" params = { "per_page": max_projects, "order_by": "last_activity_at", "sort": "desc", "membership": True # 只获取用户有权限的项目 } projects = self._make_request(f"{self.base_url}/projects", params) return projects def _get_content(self, page: GetOnlineDocumentPageContentRequest) -> Generator[DatasourceMessage, None, None]: """获取页面内容""" access_token = self.runtime.credentials.get("access_token") if not access_token: raise ValueError("Access token not found in credentials") # 确保设置了 GitLab URL self._get_gitlab_url() page_id = page.page_id if page_id.startswith("project:"): # 获取项目信息 yield from self._get_project_content(page_id) elif page_id.startswith("file:"): # 获取文件内容 yield from self._get_file_content(page_id) elif page_id.startswith("issue:"): # 获取 Issue 内容 yield from self._get_issue_content(page_id) elif page_id.startswith("mr:"): # 获取 MR 内容 yield from self._get_mr_content(page_id) else: raise ValueError(f"Unsupported page type: {page_id}") def _get_project_content(self, page_id: str) -> Generator[DatasourceMessage, None, None]: """获取项目信息内容""" project_path = page_id[8:] # 移除 "project:" 前缀 # GitLab uses project path with namespace or project ID project_info = self._make_request(f"{self.base_url}/projects/{project_path.replace('/', '%2F')}") content = f"# {project_info['name']}\n\n" content += f"**Project:** {project_info['path_with_namespace']}\n" content += f"**Description:** {project_info.get('description', 'No description')}\n" content += f"**Default Branch:** {project_info.get('default_branch', 'main')}\n" content += f"**Stars:** {project_info.get('star_count', 0)}\n" content += f"**Forks:** {project_info.get('forks_count', 0)}\n" content += f"**Created:** {project_info.get('created_at', '')}\n" content += f"**Last Activity:** {project_info.get('last_activity_at', '')}\n" content += f"**URL:** {project_info.get('web_url', '')}\n\n" if project_info.get('topics'): topics = ", ".join(project_info['topics']) content += f"**Topics:** {topics}\n\n" # 尝试获取 README try: project_id = project_info['id'] readme_info = self._make_request(f"{self.base_url}/projects/{project_id}/repository/files/README.md") if readme_info.get("encoding") == "base64": try: readme_content = base64.b64decode(readme_info["content"]).decode("utf-8") content += "## README\n\n" + readme_content except (ValueError, UnicodeDecodeError) as e: content += "## README\n\nError decoding README content." except ValueError: content += "## README\n\nNo README file found." yield self.create_variable_message("content", content) yield self.create_variable_message("page_id", page_id) yield self.create_variable_message("title", project_info['name']) yield self.create_variable_message("project", project_path) yield self.create_variable_message("type", "project") def _get_file_content(self, page_id: str) -> Generator[DatasourceMessage, None, None]: """获取文件内容""" # page_id format: "file:namespace/project:path" parts = page_id.split(":", 2) if len(parts) != 3: raise ValueError(f"Invalid file page_id format: {page_id}") project_path = parts[1] file_path = parts[2] # Basic input validation if not project_path or not file_path: raise ValueError(f"Invalid project path or file path in page_id: {page_id}") # URL encode the project path for GitLab API encoded_project = project_path.replace('/', '%2F') encoded_file_path = file_path.replace('/', '%2F') file_info = self._make_request(f"{self.base_url}/projects/{encoded_project}/repository/files/{encoded_file_path}") # 获取文件内容 if file_info.get("encoding") == "base64": try: content = base64.b64decode(file_info["content"]).decode("utf-8") except (ValueError, UnicodeDecodeError) as e: raise ValueError(f"Failed to decode file content: {str(e)}") else: content = file_info.get("content", "") # Validate that content is a string if not isinstance(content, str): content = str(content) # 如果是 Markdown 文件,添加标题 file_name = file_path.split('/')[-1] # 获取文件名 if file_name.lower().endswith(('.md', '.markdown')): content = f"# {file_name}\n\n{content}" yield self.create_variable_message("content", content) yield self.create_variable_message("page_id", page_id) yield self.create_variable_message("title", file_name) yield self.create_variable_message("project", project_path) yield self.create_variable_message("file_path", file_path) yield self.create_variable_message("type", "file") def _get_issue_content(self, page_id: str) -> Generator[DatasourceMessage, None, None]: """获取 Issue 内容""" # page_id format: "issue:namespace/project:iid" parts = page_id.split(":", 2) if len(parts) != 3: raise ValueError(f"Invalid issue page_id format: {page_id}") project_path = parts[1] issue_iid = parts[2] # Basic input validation if not project_path or not issue_iid: raise ValueError(f"Invalid project path or issue IID in page_id: {page_id}") # URL encode the project path for GitLab API encoded_project = project_path.replace('/', '%2F') issue = self._make_request(f"{self.base_url}/projects/{encoded_project}/issues/{issue_iid}") content = f"# Issue #{issue['iid']}: {issue['title']}\n\n" content += f"**Project:** {project_path}\n" content += f"**Author:** {issue['author']['username']}\n" content += f"**State:** {issue['state']}\n" content += f"**Created:** {issue['created_at']}\n" content += f"**Updated:** {issue['updated_at']}\n" content += f"**URL:** {issue['web_url']}\n\n" if issue.get('labels'): labels = ", ".join(issue['labels']) content += f"**Labels:** {labels}\n\n" if issue.get('description'): content += "## Description\n\n" content += issue['description'] + "\n\n" # 获取评论 (notes) try: comments = self._make_request(f"{self.base_url}/projects/{encoded_project}/issues/{issue_iid}/notes") if comments: content += "## Comments\n\n" for comment in comments: if not comment.get('system', False): # 排除系统消息 content += f"### {comment['author']['username']} - {comment['created_at']}\n\n" content += comment['body'] + "\n\n" except ValueError: pass yield self.create_variable_message("content", content) yield self.create_variable_message("page_id", page_id) yield self.create_variable_message("title", f"Issue #{issue['iid']}: {issue['title']}") yield self.create_variable_message("project", project_path) yield self.create_variable_message("issue_number", issue_iid) yield self.create_variable_message("type", "issue") def _get_mr_content(self, page_id: str) -> Generator[DatasourceMessage, None, None]: """获取 MR 内容""" # page_id format: "mr:namespace/project:iid" parts = page_id.split(":", 2) if len(parts) != 3: raise ValueError(f"Invalid merge request page_id format: {page_id}") project_path = parts[1] mr_iid = parts[2] # Basic input validation if not project_path or not mr_iid: raise ValueError(f"Invalid project path or MR IID in page_id: {page_id}") # URL encode the project path for GitLab API encoded_project = project_path.replace('/', '%2F') mr = self._make_request(f"{self.base_url}/projects/{encoded_project}/merge_requests/{mr_iid}") content = f"# Merge Request #{mr['iid']}: {mr['title']}\n\n" content += f"**Project:** {project_path}\n" content += f"**Author:** {mr['author']['username']}\n" content += f"**State:** {mr['state']}\n" content += f"**Target Branch:** {mr['target_branch']}\n" content += f"**Source Branch:** {mr['source_branch']}\n" content += f"**Created:** {mr['created_at']}\n" content += f"**Updated:** {mr['updated_at']}\n" content += f"**URL:** {mr['web_url']}\n\n" if mr.get('description'): content += "## Description\n\n" content += mr['description'] + "\n\n" # 获取评论 (notes) try: comments = self._make_request(f"{self.base_url}/projects/{encoded_project}/merge_requests/{mr_iid}/notes") if comments: content += "## Comments\n\n" for comment in comments: if not comment.get('system', False): # 排除系统消息 content += f"### {comment['author']['username']} - {comment['created_at']}\n\n" content += comment['body'] + "\n\n" except ValueError: pass yield self.create_variable_message("content", content) yield self.create_variable_message("page_id", page_id) yield self.create_variable_message("title", f"MR #{mr['iid']}: {mr['title']}") yield self.create_variable_message("project", project_path) yield self.create_variable_message("mr_number", mr_iid) yield self.create_variable_message("type", "merge_request")