diff --git a/services/trinity-core/index.js b/services/trinity-core/index.js index f36ae60..da13767 100644 --- a/services/trinity-core/index.js +++ b/services/trinity-core/index.js @@ -1,10 +1,12 @@ import express from 'express'; +import { randomUUID } from 'node:crypto'; import { spawn } from 'child_process'; import fs from 'fs'; import cors from 'cors'; import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; -import { ListToolsRequestSchema, CallToolRequestSchema } from '@modelcontextprotocol/sdk/types.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import { isInitializeRequest, ListToolsRequestSchema, CallToolRequestSchema } from '@modelcontextprotocol/sdk/types.js'; const API_TOKEN = 'FFG-Trinity-2026-Core-Access'; const LOG_FILE = '/home/claude_executor/mcp-server/command.log'; @@ -149,8 +151,19 @@ function setupToolHandlers(mcpServer) { }); } +// Track all active transports (both SSE and Streamable HTTP) const activeSessions = new Map(); +// Create a fresh MCP server instance with tool handlers +function createMcpServer() { + const mcpServer = new Server( + { name: "trinity-core", version: "2.4.0" }, + { capabilities: { tools: {} } } + ); + setupToolHandlers(mcpServer); + return mcpServer; +} + // ─── REST API (for Arbiter / internal services) ─── app.get('/servers', auth, (req, res) => { @@ -182,46 +195,102 @@ app.post('/exec', auth, async (req, res) => { }); }); -// ─── MCP Protocol (for Claude.ai connector) ─── +// ─── Streamable HTTP Transport (protocol 2025-11-25) ─── -app.get('/mcp', auth, async (req, res) => { - log(`SSE connection from ${req.ip}`); +app.all('/mcp', auth, async (req, res) => { + const sessionId = req.headers['mcp-session-id']; + log(`StreamableHTTP ${req.method} from ${req.ip} session=${sessionId || 'none'}`); - const mcpServer = new Server({ name: "trinity-core", version: "2.3.0" }, { capabilities: { tools: {} } }); - setupToolHandlers(mcpServer); + try { + let transport; - mcpServer.onmessage = (msg) => log(`SERVER MSG: ${JSON.stringify(msg)}`); + if (sessionId && activeSessions.has(sessionId)) { + const existing = activeSessions.get(sessionId); + if (existing instanceof StreamableHTTPServerTransport) { + transport = existing; + } else { + return res.status(400).json({ + jsonrpc: '2.0', + error: { code: -32000, message: 'Session uses different transport' }, + id: null + }); + } + } else if (!sessionId && req.method === 'POST' && isInitializeRequest(req.body)) { + log(`StreamableHTTP new session (initialize)`); + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: (sid) => { + log(`StreamableHTTP session ready: ${sid}`); + activeSessions.set(sid, transport); + } + }); + transport.onclose = () => { + const sid = transport.sessionId; + if (sid && activeSessions.has(sid)) { + log(`StreamableHTTP closed: ${sid}`); + activeSessions.delete(sid); + } + }; + const mcpServer = createMcpServer(); + await mcpServer.connect(transport); + } else if (!sessionId && req.method === 'GET') { + // Legacy SSE client connecting via GET /mcp + return legacySSE(req, res); + } else { + return res.status(400).json({ + jsonrpc: '2.0', + error: { code: -32000, message: 'Bad request: no valid session' }, + id: null + }); + } + await transport.handleRequest(req, res, req.body); + } catch (err) { + log(`StreamableHTTP ERROR: ${err.message}`); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { code: -32603, message: 'Internal server error' }, + id: null + }); + } + } +}); + +// ─── Legacy SSE Transport (protocol 2024-11-05) ─── + +async function legacySSE(req, res) { + log(`Legacy SSE connection from ${req.ip}`); + const mcpServer = createMcpServer(); const transport = new SSEServerTransport(`${BASE_URL}/mcp/messages`, res); - await mcpServer.connect(transport); activeSessions.set(transport.sessionId, transport); - log(`Session ${transport.sessionId} ready`); + log(`Legacy SSE session ${transport.sessionId} ready`); res.on('close', () => { - log(`SSE closed: ${transport.sessionId}`); + log(`Legacy SSE closed: ${transport.sessionId}`); activeSessions.delete(transport.sessionId); }); -}); +} app.post('/mcp/messages', auth, async (req, res) => { const sessionId = req.query.sessionId; const method = req.body?.method || 'unknown'; - log(`POST ${method} for ${sessionId}`); + log(`Legacy POST ${method} for ${sessionId}`); const transport = activeSessions.get(sessionId); - if (!transport) { - log(`Session not found: ${sessionId}`); + if (!transport || !(transport instanceof SSEServerTransport)) { + log(`Legacy session not found: ${sessionId}`); return res.status(404).json({ error: "Session not found" }); } try { await transport.handlePostMessage(req, res, req.body); - log(`POST ${method} handled OK`); + log(`Legacy POST ${method} handled OK`); } catch (err) { - log(`POST ${method} ERROR: ${err.message}`); + log(`Legacy POST ${method} ERROR: ${err.message}`); console.error(err); } }); -app.listen(PORT, () => log(`Trinity Core MCP v2.3.0 started on port ${PORT}`)); +app.listen(PORT, () => log(`Trinity Core MCP v2.4.0 started on port ${PORT}`));