From b31b2e95126518b8044fe8545238a09dca9dde6b Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Mon, 20 Oct 2025 20:41:44 -0400 Subject: [PATCH 1/2] update to 1.6.6 0.6.6 --- pyproject.toml | 4 ++-- src/backend/base/pyproject.toml | 2 +- src/frontend/package-lock.json | 4 ++-- src/frontend/package.json | 2 +- uv.lock | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7e3446e4819c..eaa7caf1e3e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "langflow" -version = "1.6.5" +version = "1.6.6" description = "A Python package with a built-in web application" requires-python = ">=3.10,<3.14" license = "MIT" @@ -17,7 +17,7 @@ maintainers = [ ] # Define your main dependencies here dependencies = [ - "langflow-base~=0.6.5", + "langflow-base~=0.6.6", "beautifulsoup4==4.12.3", "google-search-results>=2.4.1,<3.0.0", "google-api-python-client==2.154.0", diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index 89cf5886d925..98ba809d974c 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "langflow-base" -version = "0.6.5" +version = "0.6.6" description = "A Python package with a built-in web application" requires-python = ">=3.10,<3.14" license = "MIT" diff --git a/src/frontend/package-lock.json b/src/frontend/package-lock.json index c1c40ae695d6..9b19b779040d 100644 --- a/src/frontend/package-lock.json +++ b/src/frontend/package-lock.json @@ -1,12 +1,12 @@ { "name": "langflow", - "version": "1.6.5", + "version": "1.6.6", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "langflow", - "version": "1.6.5", + "version": "1.6.6", "dependencies": { "@chakra-ui/number-input": "^2.1.2", "@headlessui/react": "^2.0.4", diff --git a/src/frontend/package.json b/src/frontend/package.json index 96c40a4d7634..a1e7b1267048 100644 --- a/src/frontend/package.json +++ b/src/frontend/package.json @@ -1,6 +1,6 @@ { "name": "langflow", - "version": "1.6.5", + "version": "1.6.6", "private": true, "dependencies": { "@chakra-ui/number-input": "^2.1.2", diff --git a/uv.lock b/uv.lock index af3307573184..0d180c222cd3 100644 --- a/uv.lock +++ b/uv.lock @@ -4929,7 +4929,7 @@ wheels = [ [[package]] name = "langflow" -version = "1.6.5" +version = "1.6.6" source = { editable = "." } dependencies = [ { name = "aiofile" }, @@ -5321,7 +5321,7 @@ dev = [ [[package]] name = "langflow-base" -version = "0.6.5" +version = "0.6.6" source = { editable = "src/backend/base" } dependencies = [ { name = "aiofile" }, From fb1026105ee5f68de3c47a8644d819577175f4ae Mon Sep 17 00:00:00 2001 From: Cristhian Zanforlin Lousa Date: Mon, 3 Nov 2025 10:19:50 -0300 Subject: [PATCH 2/2] feat: Add robust startup retry logic and port conflict resolution (#10347) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix mcp timeout and settings * ✨ (service.py): add new methods to read process output, ensure port availability, log startup error details, and normalize config values 🐛 (service.py): fix handling of process output, error messages, and port availability to improve error handling and logging during startup * ✨ (service.py): add tracking of which project is using which port to prevent conflicts 🐛 (service.py): check if another project is using the same port before starting a new project to avoid conflicts 🐛 (service.py): handle releasing port and cleaning up port tracking when stopping a project to prevent memory leaks 🐛 (service.py): re-check port availability before each retry when starting a project to prevent race conditions 🐛 (service.py): register the port used by a project to prevent other projects from using the same port 🐛 (McpServerTab.tsx): invalidate MCP project data and composer URL queries to refresh auth settings and OAuth server info 🐛 (McpServerTab.tsx): clear waiting state if the auth type is not OAuth * fix killing port on tab changes * add not persist oauth on failure start * add oath check on available port * add improvements mcp service * fix: suppress SSE streaming warnings and handle connection closed errors - Add warning filters for ResourceWarning from anyio streams - Handle SSE connection closed errors gracefully - Prevent log spam from normal SSE lifecycle events 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * add host and port to uvx command, safe redact value * fix error messages state not been displayed * fix mypy errors * [autofix.ci] apply automated fixes * add the correct auth test --------- Co-authored-by: Claude Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../base/langflow/api/v1/mcp_projects.py | 54 +- src/backend/base/langflow/main.py | 15 + .../langflow/services/mcp_composer/service.py | 818 ++++++++++++++---- .../services/settings/test_mcp_composer.py | 322 +++++++ .../API/queries/mcp/use-patch-flows-mcp.ts | 9 +- .../API/queries/mcp/use-patch-install-mcp.ts | 2 +- .../authModal/__tests__/AuthModal.test.tsx | 275 ++++++ src/frontend/src/modals/authModal/index.tsx | 24 +- .../homePage/components/McpServerTab.tsx | 116 ++- .../__tests__/McpServerTab.test.tsx | 794 +++++++++++++++++ 10 files changed, 2230 insertions(+), 199 deletions(-) create mode 100644 src/backend/tests/unit/services/settings/test_mcp_composer.py create mode 100644 src/frontend/src/modals/authModal/__tests__/AuthModal.test.tsx create mode 100644 src/frontend/src/pages/MainPage/pages/homePage/components/__tests__/McpServerTab.test.tsx diff --git a/src/backend/base/langflow/api/v1/mcp_projects.py b/src/backend/base/langflow/api/v1/mcp_projects.py index f8eb7d08478f..2b339b282107 100644 --- a/src/backend/base/langflow/api/v1/mcp_projects.py +++ b/src/backend/base/langflow/api/v1/mcp_projects.py @@ -385,6 +385,9 @@ async def update_project_mcp_settings( should_start_composer = False should_stop_composer = False + # Store original auth settings in case we need to rollback + original_auth_settings = project.auth_settings + # Update project-level auth settings with encryption if "auth_settings" in request.model_fields_set and request.auth_settings is not None: auth_result = handle_auth_settings_update( @@ -396,8 +399,6 @@ async def update_project_mcp_settings( should_start_composer = auth_result["should_start_composer"] should_stop_composer = auth_result["should_stop_composer"] - session.add(project) - # Query flows in the project flows = (await session.exec(select(Flow).where(Flow.folder_id == project_id))).all() flows_to_update = {x.id: x for x in request.settings} @@ -416,13 +417,17 @@ async def update_project_mcp_settings( session.add(flow) updated_flows.append(flow) - await session.commit() - response: dict[str, Any] = { "message": f"Updated MCP settings for {len(updated_flows)} flows and project auth settings" } + # Handle MCP Composer start/stop before committing auth settings if should_handle_mcp_composer: + # Get MCP Composer service once for all branches + mcp_composer_service: MCPComposerService = cast( + MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE) + ) + if should_start_composer: await logger.adebug( f"Auth settings changed to OAuth for project {project.name} ({project_id}), " @@ -434,26 +439,37 @@ async def update_project_mcp_settings( auth_config = await _get_mcp_composer_auth_config(project) await get_or_start_mcp_composer(auth_config, project.name, project_id) composer_sse_url = await get_composer_sse_url(project) + # Clear any previous error on success + mcp_composer_service.clear_last_error(str(project_id)) response["result"] = { "project_id": str(project_id), "sse_url": composer_sse_url, "uses_composer": True, } except MCPComposerError as e: + # Don't rollback auth settings - persist them so UI can show the error + await logger.awarning(f"MCP Composer failed to start for project {project_id}: {e.message}") + # Store the error message so it can be retrieved via composer-url endpoint + mcp_composer_service.set_last_error(str(project_id), e.message) response["result"] = { "project_id": str(project_id), "uses_composer": True, "error_message": e.message, } except Exception as e: - # Unexpected errors - await logger.aerror(f"Failed to get mcp composer URL for project {project_id}: {e}") + # Rollback auth settings on unexpected errors + await logger.aerror( + f"Unexpected error starting MCP Composer for project {project_id}, " + f"rolling back auth settings: {e}" + ) + project.auth_settings = original_auth_settings raise HTTPException(status_code=500, detail=str(e)) from e else: - # This shouldn't happen - we determined we should start composer but now we can't use it + # OAuth is set but MCP Composer is disabled - save settings but return error await logger.aerror( f"PATCH: OAuth set but MCP Composer is disabled in settings for project {project_id}" ) + # Don't rollback - keep the auth settings so they can be used when composer is enabled response["result"] = { "project_id": str(project_id), "uses_composer": False, @@ -464,10 +480,9 @@ async def update_project_mcp_settings( f"Auth settings changed from OAuth for project {project.name} ({project_id}), " "stopping MCP Composer" ) - mcp_composer_service: MCPComposerService = cast( - MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE) - ) await mcp_composer_service.stop_project_composer(str(project_id)) + # Clear any error when user explicitly disables OAuth + mcp_composer_service.clear_last_error(str(project_id)) # Provide the direct SSE URL since we're no longer using composer sse_url = await get_project_sse_url(project_id) @@ -480,6 +495,10 @@ async def update_project_mcp_settings( "uses_composer": False, } + # Only commit if composer started successfully (or wasn't needed) + session.add(project) + await session.commit() + return response except Exception as e: @@ -731,7 +750,22 @@ async def get_project_composer_url( """ try: project = await verify_project_access(project_id, current_user) + + # Check if there's a recent error from a failed OAuth attempt + mcp_composer_service: MCPComposerService = cast( + MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE) + ) + last_error = mcp_composer_service.get_last_error(str(project_id)) + if not should_use_mcp_composer(project): + # If there's a recent error, return it even though OAuth is not currently active + # This happens when OAuth was attempted but rolled back due to an error + if last_error: + return { + "project_id": str(project_id), + "uses_composer": False, + "error_message": last_error, + } return { "project_id": str(project_id), "uses_composer": False, diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index 9c24ae920126..d74ef3ea369b 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -47,6 +47,10 @@ # Ignore Pydantic deprecation warnings from Langchain warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) +# Suppress ResourceWarning from anyio streams (SSE connections) +warnings.filterwarnings("ignore", category=ResourceWarning, message=".*MemoryObjectReceiveStream.*") +warnings.filterwarnings("ignore", category=ResourceWarning, message=".*MemoryObjectSendStream.*") + _tasks: list[asyncio.Task] = [] MAX_PORT = 65535 @@ -458,6 +462,17 @@ async def exception_handler(_request: Request, exc: Exception): status_code=exc.status_code, content={"message": str(exc.detail)}, ) + + # Suppress known SSE streaming errors that are harmless + exc_str = str(exc) + if "Unexpected message" in exc_str and "http.response.start" in exc_str: + # This is a known issue with SSE connections being closed + await logger.adebug(f"SSE connection closed: {exc_str}") + return JSONResponse( + status_code=200, + content={"message": "Connection closed"}, + ) + await logger.aerror(f"unhandled error: {exc}", exc_info=exc) await log_exception_to_telemetry(exc, "handler") diff --git a/src/backend/base/langflow/services/mcp_composer/service.py b/src/backend/base/langflow/services/mcp_composer/service.py index 15944abcfb14..60520beaff2e 100644 --- a/src/backend/base/langflow/services/mcp_composer/service.py +++ b/src/backend/base/langflow/services/mcp_composer/service.py @@ -2,6 +2,7 @@ import asyncio import os +import platform import re import select import socket @@ -10,7 +11,7 @@ from functools import wraps from typing import Any -from langflow.logging.logger import logger +from langflow.logging import logger from langflow.services.base import Service from langflow.services.deps import get_settings_service @@ -72,14 +73,176 @@ def __init__(self): self._start_locks: dict[ str, asyncio.Lock ] = {} # Lock to prevent concurrent start operations for the same project + self._active_start_tasks: dict[ + str, asyncio.Task + ] = {} # Track active start tasks to cancel them when new request arrives + self._port_to_project: dict[int, str] = {} # Track which project is using which port + self._pid_to_project: dict[int, str] = {} # Track which PID belongs to which project + self._last_errors: dict[str, str] = {} # Track last error message per project for UI display - def _is_port_available(self, port: int) -> bool: - """Check if a port is available (not in use).""" + def get_last_error(self, project_id: str) -> str | None: + """Get the last error message for a project, if any.""" + return self._last_errors.get(project_id) + + def set_last_error(self, project_id: str, error_message: str) -> None: + """Set the last error message for a project.""" + self._last_errors[project_id] = error_message + + def clear_last_error(self, project_id: str) -> None: + """Clear the last error message for a project.""" + self._last_errors.pop(project_id, None) + + def _is_port_available(self, port: int, host: str = "localhost") -> bool: + """Check if a port is available by trying to bind to it. + + Args: + port: Port number to check + host: Host to check (default: localhost) + + Returns: + True if port is available (not in use), False if in use + """ + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + # Don't use SO_REUSEADDR here as it can give false positives + sock.bind((host, port)) + return True # Port is available + except OSError: + return False # Port is in use/bound + + async def _kill_process_on_port(self, port: int) -> bool: + """Kill the process using the specified port. + + Cross-platform implementation supporting Windows, macOS, and Linux. + + Args: + port: The port number to check + + Returns: + True if a process was found and killed, False otherwise + """ try: - with socket.create_connection(("localhost", port), timeout=1.0): - return False # Port is in use - except (OSError, ConnectionRefusedError): - return True # Port is available + await logger.adebug(f"Checking for processes using port {port}...") + os_type = platform.system() + + # Platform-specific command to find PID + if os_type == "Windows": + # Use netstat on Windows + result = await asyncio.to_thread( + subprocess.run, + ["netstat", "-ano"], + capture_output=True, + text=True, + check=False, + ) + + if result.returncode == 0: + # Parse netstat output to find PID + # Format: TCP 0.0.0.0:PORT 0.0.0.0:0 LISTENING PID + windows_pids: list[int] = [] + for line in result.stdout.split("\n"): + if f":{port}" in line and "LISTENING" in line: + parts = line.split() + if parts: + try: + pid = int(parts[-1]) + windows_pids.append(pid) + except (ValueError, IndexError): + continue + + await logger.adebug(f"Found {len(windows_pids)} process(es) using port {port}: {windows_pids}") + + for pid in windows_pids: + try: + await logger.adebug(f"Attempting to kill process {pid} on port {port}...") + # Use taskkill on Windows + kill_result = await asyncio.to_thread( + subprocess.run, + ["taskkill", "/F", "/PID", str(pid)], + capture_output=True, + check=False, + ) + + if kill_result.returncode == 0: + await logger.adebug(f"Successfully killed process {pid} on port {port}") + return True + await logger.awarning( + f"taskkill returned {kill_result.returncode} for process {pid} on port {port}" + ) + except Exception as e: # noqa: BLE001 + await logger.aerror(f"Error killing PID {pid}: {e}") + + return False + else: + # Use lsof on Unix-like systems (macOS, Linux) + result = await asyncio.to_thread( + subprocess.run, + ["lsof", "-ti", f":{port}"], + capture_output=True, + text=True, + check=False, + ) + + await logger.adebug(f"lsof returned code {result.returncode} for port {port}") + + # Extract PIDs from lsof output + lsof_output = result.stdout.strip() + lsof_errors = result.stderr.strip() + + if lsof_output: + await logger.adebug(f"lsof stdout: {lsof_output}") + if lsof_errors: + await logger.adebug(f"lsof stderr: {lsof_errors}") + + if result.returncode == 0 and lsof_output: + unix_pids = lsof_output.split("\n") + await logger.adebug(f"Found {len(unix_pids)} process(es) using port {port}: {unix_pids}") + + for pid_str in unix_pids: + try: + pid = int(pid_str.strip()) + await logger.adebug(f"Attempting to kill process {pid} on port {port}...") + + # Try to kill the process + kill_result = await asyncio.to_thread( + subprocess.run, + ["kill", "-9", str(pid)], + capture_output=True, + check=False, + ) + + if kill_result.returncode == 0: + await logger.adebug(f"Successfully sent kill signal to process {pid} on port {port}") + return True + await logger.awarning( + f"kill command returned {kill_result.returncode} for process {pid} on port {port}" + ) + except (ValueError, ProcessLookupError) as e: + await logger.aerror(f"Error processing PID {pid_str}: {e}") + + # If we get here, we found processes but couldn't kill any + return False + await logger.adebug(f"No process found using port {port}") + return False + except Exception as e: # noqa: BLE001 + await logger.aerror(f"Error finding/killing process on port {port}: {e}") + return False + return False + + def _is_port_used_by_another_project(self, port: int, current_project_id: str) -> tuple[bool, str | None]: + """Check if a port is being used by another project. + + Args: + port: The port to check + current_project_id: The current project ID + + Returns: + Tuple of (is_used_by_other, other_project_id) + """ + other_project_id = self._port_to_project.get(port) + if other_project_id and other_project_id != current_project_id: + return True, other_project_id + return False, None async def start(self): """Check if the MCP Composer service is enabled.""" @@ -123,47 +286,224 @@ async def _do_stop_project_composer(self, project_id: str): composer_info = self.project_composers[project_id] process = composer_info.get("process") - if process: - try: - # Check if process is still running before trying to terminate - if process.poll() is None: - await logger.adebug(f"Terminating MCP Composer process {process.pid} for project {project_id}") - process.terminate() + try: + if process: + try: + # Check if process is still running before trying to terminate + if process.poll() is None: + await logger.adebug(f"Terminating MCP Composer process {process.pid} for project {project_id}") + process.terminate() - # Wait longer for graceful shutdown - try: - await asyncio.wait_for(self._wait_for_process_exit(process), timeout=3.0) - await logger.adebug(f"MCP Composer for project {project_id} terminated gracefully") - except asyncio.TimeoutError: - await logger.aerror( - f"MCP Composer for project {project_id} did not terminate gracefully, force killing" - ) - process.kill() - # Wait a bit more for force kill to complete + # Wait longer for graceful shutdown try: - await asyncio.wait_for(self._wait_for_process_exit(process), timeout=2.0) + await asyncio.wait_for(asyncio.to_thread(process.wait), timeout=2.0) + await logger.adebug(f"MCP Composer for project {project_id} terminated gracefully") except asyncio.TimeoutError: await logger.aerror( - f"Failed to kill MCP Composer process {process.pid} for project {project_id}" + f"MCP Composer for project {project_id} did not terminate gracefully, force killing" ) - else: - await logger.adebug(f"MCP Composer process for project {project_id} was already terminated") - - await logger.adebug(f"MCP Composer stopped for project {project_id}") + await asyncio.to_thread(process.kill) + await asyncio.to_thread(process.wait) + else: + await logger.adebug(f"MCP Composer process for project {project_id} was already terminated") - except ProcessLookupError: - # Process already terminated - await logger.adebug(f"MCP Composer process for project {project_id} was already terminated") - except Exception as e: # noqa: BLE001 - await logger.aerror(f"Error stopping MCP Composer for project {project_id}: {e}") + await logger.adebug(f"MCP Composer stopped for project {project_id}") - # Remove from tracking - del self.project_composers[project_id] + except ProcessLookupError: + # Process already terminated + await logger.adebug(f"MCP Composer process for project {project_id} was already terminated") + except Exception as e: # noqa: BLE001 + await logger.aerror(f"Error stopping MCP Composer for project {project_id}: {e}") + finally: + # Always clean up tracking, even if stopping failed + port = composer_info.get("port") + if port and self._port_to_project.get(port) == project_id: + self._port_to_project.pop(port, None) + await logger.adebug(f"Released port {port} from project {project_id}") + + # Clean up PID tracking + if process and process.pid: + self._pid_to_project.pop(process.pid, None) + await logger.adebug(f"Released PID {process.pid} tracking for project {project_id}") + + # Remove from tracking + self.project_composers.pop(project_id, None) + await logger.adebug(f"Removed tracking for project {project_id}") async def _wait_for_process_exit(self, process): """Wait for a process to exit.""" await asyncio.to_thread(process.wait) + async def _read_process_output_and_extract_error( + self, process: subprocess.Popen, oauth_server_url: str | None, timeout: float = 2.0 + ) -> tuple[str, str, str]: + """Read process output and extract user-friendly error message. + + Args: + process: The subprocess to read from + oauth_server_url: OAuth server URL for error messages + timeout: Timeout for reading output + + Returns: + Tuple of (stdout, stderr, error_message) + """ + try: + # Use asyncio.to_thread to avoid blocking the event loop + # Process returns bytes, decode with error handling + stdout_bytes, stderr_bytes = await asyncio.to_thread(process.communicate, timeout=timeout) + stdout_content = stdout_bytes.decode("utf-8", errors="replace") if stdout_bytes else "" + stderr_content = stderr_bytes.decode("utf-8", errors="replace") if stderr_bytes else "" + except subprocess.TimeoutExpired: + process.kill() + error_msg = self._extract_error_message("", "", oauth_server_url) + return "", "", error_msg + else: + error_msg = self._extract_error_message(stdout_content, stderr_content, oauth_server_url) + return stdout_content, stderr_content, error_msg + + async def _read_stream_non_blocking(self, stream, stream_name: str) -> None: + """Read from a stream without blocking and log the content. + + Args: + stream: The stream to read from (stdout or stderr) + stream_name: Name of the stream for logging ("stdout" or "stderr") + """ + if stream and select.select([stream], [], [], 0)[0]: + try: + line_bytes = stream.readline() + if line_bytes: + # Decode bytes with error handling + line = line_bytes.decode("utf-8", errors="replace") if isinstance(line_bytes, bytes) else line_bytes + stripped = line.strip() + if stripped: + # Log errors at error level, everything else at debug + if stream_name == "stderr" and ("ERROR" in stripped or "error" in stripped): + await logger.aerror(f"MCP Composer {stream_name}: {stripped}") + else: + await logger.adebug(f"MCP Composer {stream_name}: {stripped}") + except Exception as e: # noqa: BLE001 + await logger.adebug(f"Error reading {stream_name}: {e}") + + async def _ensure_port_available(self, port: int, current_project_id: str) -> None: + """Ensure a port is available, only killing untracked processes. + + Args: + port: The port number to ensure is available + current_project_id: The project ID requesting the port + + Raises: + MCPComposerPortError: If port cannot be made available + """ + is_port_available = self._is_port_available(port) + await logger.adebug(f"Port {port} availability check: {is_port_available}") + + if not is_port_available: + # Check if the port is being used by a tracked project + is_used_by_other, other_project_id = self._is_port_used_by_another_project(port, current_project_id) + + if is_used_by_other and other_project_id: + # Port is being used by another tracked project + # Check if we can take ownership (e.g., the other project is failing) + other_composer = self.project_composers.get(other_project_id) + if other_composer and other_composer.get("process"): + other_process = other_composer["process"] + # If the other process is still running and healthy, don't kill it + if other_process.poll() is None: + await logger.aerror( + f"Port {port} requested by project {current_project_id} is already in use by " + f"project {other_project_id}. Will not kill active MCP Composer process." + ) + port_error_msg = ( + f"Port {port} is already in use by another project. " + f"Please choose a different port (e.g., {port + 1}) " + f"or disable OAuth on the other project first." + ) + raise MCPComposerPortError(port_error_msg, current_project_id) + + # Process died but port tracking wasn't cleaned up - allow takeover + await logger.adebug( + f"Port {port} was tracked to project {other_project_id} but process died. " + f"Allowing project {current_project_id} to take ownership." + ) + # Clean up the old tracking + await self._do_stop_project_composer(other_project_id) + + # Check if port is used by a process owned by the current project (e.g., stuck in startup loop) + port_owner_project = self._port_to_project.get(port) + if port_owner_project == current_project_id: + # Port is owned by current project - safe to kill + await logger.adebug( + f"Port {port} is in use by current project {current_project_id} (likely stuck in startup). " + f"Killing process to retry." + ) + killed = await self._kill_process_on_port(port) + if killed: + await logger.adebug( + f"Successfully killed own process on port {port}. Waiting for port to be released..." + ) + await asyncio.sleep(2) + is_port_available = self._is_port_available(port) + if not is_port_available: + await logger.aerror(f"Port {port} is still in use after killing own process.") + port_error_msg = f"Port {port} is still in use after killing process" + raise MCPComposerPortError(port_error_msg) + else: + # Port is in use by unknown process - don't kill it (security concern) + await logger.aerror( + f"Port {port} is in use by an unknown process (not owned by Langflow). " + f"Will not kill external application for security reasons." + ) + port_error_msg = ( + f"Port {port} is already in use by another application. " + f"Please choose a different port (e.g., {port + 1}) or free up the port manually." + ) + raise MCPComposerPortError(port_error_msg, current_project_id) + + await logger.adebug(f"Port {port} is available, proceeding with MCP Composer startup") + + async def _log_startup_error_details( + self, + project_id: str, + cmd: list[str], + host: str, + port: int, + stdout: str = "", + stderr: str = "", + error_msg: str = "", + exit_code: int | None = None, + pid: int | None = None, + ) -> None: + """Log detailed startup error information. + + Args: + project_id: The project ID + cmd: The command that was executed + host: Target host + port: Target port + stdout: Standard output from the process + stderr: Standard error from the process + error_msg: User-friendly error message + exit_code: Process exit code (if terminated) + pid: Process ID (if still running) + """ + await logger.aerror(f"MCP Composer startup failed for project {project_id}:") + if exit_code is not None: + await logger.aerror(f" - Process died with exit code: {exit_code}") + if pid is not None: + await logger.aerror(f" - Process is running (PID: {pid}) but failed to bind to port {port}") + await logger.aerror(f" - Target: {host}:{port}") + + # Obfuscate secrets in command before logging + safe_cmd = self._obfuscate_command_secrets(cmd) + await logger.aerror(f" - Command: {' '.join(safe_cmd)}") + + if stderr.strip(): + await logger.aerror(f" - Error output: {stderr.strip()}") + if stdout.strip(): + await logger.aerror(f" - Standard output: {stdout.strip()}") + if error_msg: + await logger.aerror(f" - Error message: {error_msg}") + def _validate_oauth_settings(self, auth_config: dict[str, Any]) -> None: """Validate that all required OAuth settings are present and non-empty. @@ -203,6 +543,18 @@ def _validate_oauth_settings(self, auth_config: dict[str, Any]) -> None: config_error_msg = f"Invalid OAuth configuration: {'; '.join(error_parts)}" raise MCPComposerConfigError(config_error_msg) + @staticmethod + def _normalize_config_value(value: Any) -> Any: + """Normalize a config value (None or empty string becomes None). + + Args: + value: The value to normalize + + Returns: + None if value is None or empty string, otherwise the value + """ + return None if (value is None or value == "") else value + def _has_auth_config_changed(self, existing_auth: dict[str, Any] | None, new_auth: dict[str, Any] | None) -> bool: """Check if auth configuration has changed in a way that requires restart.""" if not existing_auth and not new_auth: @@ -228,12 +580,8 @@ def _has_auth_config_changed(self, existing_auth: dict[str, Any] | None, new_aut # Compare relevant fields for field in fields_to_check: - old_val = existing_auth.get(field) - new_val = new_auth.get(field) - - # Convert None and empty string to None for comparison - old_normalized = None if (old_val is None or old_val == "") else old_val - new_normalized = None if (new_val is None or new_val == "") else new_val + old_normalized = self._normalize_config_value(existing_auth.get(field)) + new_normalized = self._normalize_config_value(new_auth.get(field)) if old_normalized != new_normalized: return True @@ -250,23 +598,30 @@ def _obfuscate_command_secrets(self, cmd: list[str]) -> list[str]: List of command arguments with secrets replaced with ***REDACTED*** """ safe_cmd = [] - skip_next = False + i = 0 - for i, arg in enumerate(cmd): - if skip_next: - skip_next = False - safe_cmd.append("***REDACTED***") - continue + while i < len(cmd): + arg = cmd[i] + # Check if this is --env followed by a secret key if arg == "--env" and i + 2 < len(cmd): - # Check if next env var is a secret env_key = cmd[i + 1] + env_value = cmd[i + 2] + if any(secret in env_key.lower() for secret in ["secret", "key", "token"]): - safe_cmd.extend([arg, env_key]) # Keep env key, redact value - skip_next = True + # Redact the value + safe_cmd.extend([arg, env_key, "***REDACTED***"]) + i += 3 # Skip all three: --env, key, and value continue + # Not a secret, keep as-is + safe_cmd.extend([arg, env_key, env_value]) + i += 3 + continue + + # Regular argument safe_cmd.append(arg) + i += 1 return safe_cmd @@ -319,11 +674,75 @@ async def start_project_composer( project_id: str, sse_url: str, auth_config: dict[str, Any] | None, - max_startup_checks: int = 3, - startup_delay: float = 2.0, + max_retries: int = 3, + max_startup_checks: int = 20, + startup_delay: float = 1.5, ) -> None: """Start an MCP Composer instance for a specific project. + Args: + project_id: The project ID + sse_url: The SSE URL to connect to + auth_config: Authentication configuration + max_retries: Maximum number of retry attempts (default: 3) + max_startup_checks: Number of checks per retry attempt (default: 60) + startup_delay: Delay between checks in seconds (default: 3.0) + + Raises: + MCPComposerError: Various specific errors if startup fails + """ + # Cancel any active start operation for this project + if project_id in self._active_start_tasks: + active_task = self._active_start_tasks[project_id] + if not active_task.done(): + await logger.adebug(f"Cancelling previous MCP Composer start operation for project {project_id}") + active_task.cancel() + try: + await active_task + except asyncio.CancelledError: + await logger.adebug(f"Previous start operation for project {project_id} cancelled successfully") + finally: + # Clean up the cancelled task from tracking + del self._active_start_tasks[project_id] + + # Create and track the current task + current_task = asyncio.current_task() + if not current_task: + await logger.awarning( + f"Could not get current task for project {project_id}. " + f"Concurrent start operations may not be properly cancelled." + ) + else: + self._active_start_tasks[project_id] = current_task + + try: + await self._do_start_project_composer( + project_id, sse_url, auth_config, max_retries, max_startup_checks, startup_delay + ) + finally: + # Clean up the task reference when done + if project_id in self._active_start_tasks and self._active_start_tasks[project_id] == current_task: + del self._active_start_tasks[project_id] + + async def _do_start_project_composer( + self, + project_id: str, + sse_url: str, + auth_config: dict[str, Any] | None, + max_retries: int = 3, + max_startup_checks: int = 20, + startup_delay: float = 1.5, + ) -> None: + """Internal method to start an MCP Composer instance. + + Args: + project_id: The project ID + sse_url: The SSE URL to connect to + auth_config: Authentication configuration + max_retries: Maximum number of retry attempts (default: 3) + max_startup_checks: Number of checks per retry attempt (default: 60) + startup_delay: Delay between checks in seconds (default: 3.0) + Raises: MCPComposerError: Various specific errors if startup fails """ @@ -344,11 +763,17 @@ async def start_project_composer( async with self._start_locks[project_id]: # Check if already running (double-check after acquiring lock) - project_port = auth_config.get("oauth_port") - if not project_port: + project_port_str = auth_config.get("oauth_port") + if not project_port_str: no_port_error_msg = "No OAuth port provided" raise MCPComposerConfigError(no_port_error_msg, project_id) + try: + project_port = int(project_port_str) + except (ValueError, TypeError) as e: + port_error_msg = f"Invalid OAuth port: {project_port_str}" + raise MCPComposerConfigError(port_error_msg, project_id) from e + project_host = auth_config.get("oauth_host") if not project_host: no_host_error_msg = "No OAuth host provided" @@ -358,11 +783,13 @@ async def start_project_composer( composer_info = self.project_composers[project_id] process = composer_info.get("process") existing_auth = composer_info.get("auth_config", {}) + existing_port = composer_info.get("port") # Check if process is still running if process and process.poll() is None: # Process is running - only restart if config changed auth_changed = self._has_auth_config_changed(existing_auth, auth_config) + if auth_changed: await logger.adebug(f"Config changed for project {project_id}, restarting MCP Composer") await self._do_stop_project_composer(project_id) @@ -375,28 +802,95 @@ async def start_project_composer( # Process died or never started properly, restart it await logger.adebug(f"MCP Composer process died for project {project_id}, restarting") await self._do_stop_project_composer(project_id) + # Also kill any process that might be using the old port + if existing_port: + try: + await asyncio.wait_for(self._kill_process_on_port(existing_port), timeout=5.0) + except asyncio.TimeoutError: + await logger.aerror(f"Timeout while killing process on port {existing_port}") - is_port_available = self._is_port_available(project_port) - if not is_port_available: - await logger.awarning(f"Port {project_port} is already in use.") - port_error_msg = f"Port {project_port} is already in use" - raise MCPComposerPortError(port_error_msg) + # Retry loop: try starting the process multiple times + last_error = None + try: + # Ensure port is available (only kill untracked processes) + try: + await self._ensure_port_available(project_port, project_id) + except MCPComposerPortError as e: + # Port error before starting - store and raise immediately + self._last_errors[project_id] = e.message + raise + for retry_attempt in range(1, max_retries + 1): + try: + await logger.adebug( + f"Starting MCP Composer for project {project_id} (attempt {retry_attempt}/{max_retries})" + ) - # Start the MCP Composer process (single attempt, no outer retry loop) - process = await self._start_project_composer_process( - project_id, project_host, project_port, sse_url, auth_config, max_startup_checks, startup_delay - ) - self.project_composers[project_id] = { - "process": process, - "host": project_host, - "port": project_port, - "sse_url": sse_url, - "auth_config": auth_config, - } + # Re-check port availability before each attempt to prevent race conditions + if retry_attempt > 1: + await logger.adebug(f"Re-checking port {project_port} availability before retry...") + await self._ensure_port_available(project_port, project_id) + + process = await self._start_project_composer_process( + project_id, + project_host, + project_port, + sse_url, + auth_config, + max_startup_checks, + startup_delay, + ) - await logger.adebug( - f"MCP Composer started for project {project_id} on port {project_port} (PID: {process.pid})" - ) + except MCPComposerError as e: + last_error = e + await logger.aerror( + f"MCP Composer startup attempt {retry_attempt}/{max_retries} failed " + f"for project {project_id}: {e.message}" + ) + + # Clean up any partially started process before retrying + if project_id in self.project_composers: + await self._do_stop_project_composer(project_id) + + # If not the last attempt, wait a bit before retrying + if retry_attempt < max_retries: + await logger.adebug(f"Waiting 2 seconds before retry attempt {retry_attempt + 1}...") + await asyncio.sleep(2) + + else: + # Success! Store the composer info and register the port and PID + self.project_composers[project_id] = { + "process": process, + "host": project_host, + "port": project_port, + "sse_url": sse_url, + "auth_config": auth_config, + } + self._port_to_project[project_port] = project_id + self._pid_to_project[process.pid] = project_id + # Clear any previous error on success + self.clear_last_error(project_id) + + await logger.adebug( + f"MCP Composer started for project {project_id} on port {project_port} " + f"(PID: {process.pid}) after {retry_attempt} attempt(s)" + ) + return # Success! + + # All retries failed, raise the last error + if last_error: + await logger.aerror( + f"MCP Composer failed to start for project {project_id} after {max_retries} attempts" + ) + # Store the error message for later retrieval + self._last_errors[project_id] = last_error.message + raise last_error + + except asyncio.CancelledError: + # Operation was cancelled, clean up any started process + await logger.adebug(f"MCP Composer start operation for project {project_id} was cancelled") + if project_id in self.project_composers: + await self._do_stop_project_composer(project_id) + raise # Re-raise to propagate cancellation async def _start_project_composer_process( self, @@ -405,14 +899,34 @@ async def _start_project_composer_process( port: int, sse_url: str, auth_config: dict[str, Any] | None = None, - max_startup_checks: int = 3, - startup_delay: float = 1.0, + max_startup_checks: int = 60, + startup_delay: float = 3.0, ) -> subprocess.Popen: - """Start the MCP Composer subprocess for a specific project.""" + """Start the MCP Composer subprocess for a specific project. + + Args: + project_id: The project ID + host: Host to bind to + port: Port to bind to + sse_url: SSE URL to connect to + auth_config: Authentication configuration + max_startup_checks: Number of port binding checks (default: 60) + startup_delay: Delay between checks in seconds (default: 3.0) + + Returns: + The started subprocess + + Raises: + MCPComposerStartupError: If startup fails + """ settings = get_settings_service().settings cmd = [ "uvx", f"mcp-composer{settings.mcp_composer_version}", + "--port", + str(port), + "--host", + host, "--mode", "sse", "--sse-url", @@ -434,6 +948,8 @@ async def _start_project_composer_process( cmd.extend(["--env", "ENABLE_OAUTH", "True"]) # Map auth config to environment variables for OAuth + # Note: oauth_host and oauth_port are passed both via --host/--port CLI args + # (for server binding) and as environment variables (for OAuth flow) oauth_env_mapping = { "oauth_host": "OAUTH_HOST", "oauth_port": "OAUTH_PORT", @@ -454,68 +970,79 @@ async def _start_project_composer_process( if value is not None and str(value).strip(): cmd.extend(["--env", env_key, str(value)]) + # Log the command being executed (with secrets obfuscated) + safe_cmd = self._obfuscate_command_secrets(cmd) + await logger.adebug(f"Starting MCP Composer with command: {' '.join(safe_cmd)}") + # Start the subprocess with both stdout and stderr captured - process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # noqa: ASYNC220, S603 + # Use binary mode and decode manually to handle encoding errors gracefully + process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # noqa: ASYNC220, S603 # Monitor the process startup with multiple checks process_running = False port_bound = False - await logger.adebug(f"Monitoring MCP Composer startup for project {project_id} (PID: {process.pid})") + await logger.adebug( + f"MCP Composer process started with PID {process.pid}, monitoring startup for project {project_id}..." + ) - for check in range(max_startup_checks): - await asyncio.sleep(startup_delay) + try: + for check in range(max_startup_checks): + await asyncio.sleep(startup_delay) - # Check if process is still running - poll_result = process.poll() + # Check if process is still running + poll_result = process.poll() - startup_error_msg = None - if poll_result is not None: - # Process terminated, get the error output - await logger.aerror(f"MCP Composer process {process.pid} terminated with exit code: {poll_result}") - try: - stdout_content, stderr_content = process.communicate(timeout=2) - # Log the full error details for debugging - await logger.aerror(f"MCP Composer startup failed for project {project_id}") - await logger.aerror(f"MCP Composer stdout:\n{stdout_content}") - await logger.aerror(f"MCP Composer stderr:\n{stderr_content}") - safe_cmd = self._obfuscate_command_secrets(cmd) - await logger.aerror(f"Command that failed: {' '.join(safe_cmd)}") - - # Extract meaningful error message - startup_error_msg = self._extract_error_message(stdout_content, stderr_content, oauth_server_url) - raise MCPComposerStartupError(startup_error_msg, project_id) - except subprocess.TimeoutExpired: - process.kill() - await logger.aerror( - f"MCP Composer process {process.pid} terminated unexpectedly for project {project_id}" + startup_error_msg = None + if poll_result is not None: + # Process terminated, get the error output + ( + stdout_content, + stderr_content, + startup_error_msg, + ) = await self._read_process_output_and_extract_error(process, oauth_server_url) + await self._log_startup_error_details( + project_id, cmd, host, port, stdout_content, stderr_content, startup_error_msg, poll_result ) - startup_error_msg = self._extract_error_message("", "", oauth_server_url) - raise MCPComposerStartupError(startup_error_msg, project_id) from None + raise MCPComposerStartupError(startup_error_msg, project_id) - # Process is still running, check if port is bound - port_bound = not self._is_port_available(port) + # Process is still running, check if port is bound + port_bound = not self._is_port_available(port) - if port_bound: + if port_bound: + await logger.adebug( + f"MCP Composer for project {project_id} bound to port {port} " + f"(check {check + 1}/{max_startup_checks})" + ) + process_running = True + break await logger.adebug( - f"MCP Composer for project {project_id} bound to port {port} " + f"MCP Composer for project {project_id} not yet bound to port {port} " f"(check {check + 1}/{max_startup_checks})" ) - process_running = True - break + + # Try to read any available stderr/stdout without blocking to see what's happening + await self._read_stream_non_blocking(process.stderr, "stderr") + await self._read_stream_non_blocking(process.stdout, "stdout") + + except asyncio.CancelledError: + # Operation was cancelled, kill the process and cleanup await logger.adebug( - f"MCP Composer for project {project_id} not yet bound to port {port} " - f"(check {check + 1}/{max_startup_checks})" + f"MCP Composer process startup cancelled for project {project_id}, terminating process {process.pid}" ) - - # Try to read any available stderr without blocking (only log if there's an error) - if process.stderr and select.select([process.stderr], [], [], 0)[0]: + try: + process.terminate() + # Wait for graceful termination with timeout try: - stderr_line = process.stderr.readline() - if stderr_line and "ERROR" in stderr_line: - await logger.aerror(f"MCP Composer error: {stderr_line.strip()}") - except Exception: # noqa: S110, BLE001 - pass + await asyncio.wait_for(asyncio.to_thread(process.wait), timeout=2.0) + except asyncio.TimeoutError: + # Force kill if graceful termination times out + await logger.adebug(f"Process {process.pid} did not terminate gracefully, force killing") + await asyncio.to_thread(process.kill) + await asyncio.to_thread(process.wait) + except Exception as e: # noqa: BLE001 + await logger.adebug(f"Error terminating process during cancellation: {e}") + raise # Re-raise to propagate cancellation # After all checks if not process_running or not port_bound: @@ -524,49 +1051,26 @@ async def _start_project_composer_process( if poll_result is not None: # Process died - startup_error_msg = None - try: - stdout_content, stderr_content = process.communicate(timeout=2) - # Extract meaningful error message - startup_error_msg = self._extract_error_message(stdout_content, stderr_content, oauth_server_url) - await logger.aerror(f"MCP Composer startup failed for project {project_id}:") - await logger.aerror(f" - Process died with exit code: {poll_result}") - await logger.aerror(f" - Target: {host}:{port}") - # Obfuscate secrets in command before logging - safe_cmd = self._obfuscate_command_secrets(cmd) - await logger.aerror(f" - Command: {' '.join(safe_cmd)}") - if stderr_content.strip(): - await logger.aerror(f" - Error output: {stderr_content.strip()}") - if stdout_content.strip(): - await logger.aerror(f" - Standard output: {stdout_content.strip()}") - await logger.aerror(f" - Error message: {startup_error_msg}") - except subprocess.TimeoutExpired: - await logger.aerror(f"MCP Composer for project {project_id} died but couldn't read output") - process.kill() - + stdout_content, stderr_content, startup_error_msg = await self._read_process_output_and_extract_error( + process, oauth_server_url + ) + await self._log_startup_error_details( + project_id, cmd, host, port, stdout_content, stderr_content, startup_error_msg, poll_result + ) raise MCPComposerStartupError(startup_error_msg, project_id) # Process running but port not bound - await logger.aerror(f"MCP Composer startup failed for project {project_id}:") - await logger.aerror(f" - Process is running (PID: {process.pid}) but failed to bind to port {port}") await logger.aerror( f" - Checked {max_startup_checks} times over {max_startup_checks * startup_delay} seconds" ) - await logger.aerror(f" - Target: {host}:{port}") # Get any available output before terminating - startup_error_msg = None - try: - process.terminate() - stdout_content, stderr_content = process.communicate(timeout=2) - startup_error_msg = self._extract_error_message(stdout_content, stderr_content, oauth_server_url) - if stderr_content.strip(): - await logger.aerror(f" - Process stderr: {stderr_content.strip()}") - if stdout_content.strip(): - await logger.aerror(f" - Process stdout: {stdout_content.strip()}") - except Exception: # noqa: BLE001 - process.kill() - await logger.aerror(" - Could not retrieve process output before termination") - + process.terminate() + stdout_content, stderr_content, startup_error_msg = await self._read_process_output_and_extract_error( + process, oauth_server_url + ) + await self._log_startup_error_details( + project_id, cmd, host, port, stdout_content, stderr_content, startup_error_msg, pid=process.pid + ) raise MCPComposerStartupError(startup_error_msg, project_id) # Close the pipes if everything is successful diff --git a/src/backend/tests/unit/services/settings/test_mcp_composer.py b/src/backend/tests/unit/services/settings/test_mcp_composer.py new file mode 100644 index 000000000000..e6c54d427a6c --- /dev/null +++ b/src/backend/tests/unit/services/settings/test_mcp_composer.py @@ -0,0 +1,322 @@ +"""Unit tests for MCP Composer Service port management and process killing.""" + +import asyncio +import contextlib +import socket +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from langflow.services.mcp_composer.service import MCPComposerPortError, MCPComposerService + + +@pytest.fixture +def mcp_service(): + """Create an MCP Composer service instance for testing.""" + return MCPComposerService() + + +class TestPortAvailability: + """Test port availability checking.""" + + def test_is_port_available_when_free(self, mcp_service): + """Test that is_port_available returns True for an available port.""" + # Use a very high port number that's likely to be free + test_port = 59999 + assert mcp_service._is_port_available(test_port) is True + + def test_is_port_available_when_in_use(self, mcp_service): + """Test that is_port_available returns False when port is in use.""" + # Create a socket that binds to a port + test_port = 59998 + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.bind(("0.0.0.0", test_port)) # noqa: S104 + sock.listen(1) + # Port should now be unavailable + assert mcp_service._is_port_available(test_port) is False + finally: + sock.close() + + +class TestKillProcessOnPort: + """Test process killing functionality.""" + + @pytest.mark.asyncio + async def test_kill_process_on_port_no_process(self, mcp_service): + """Test that _kill_process_on_port returns False when no process is found.""" + with patch("asyncio.to_thread") as mock_to_thread: + # Mock lsof returning no processes + mock_result = MagicMock() + mock_result.returncode = 1 # lsof returns 1 when no matches + mock_result.stdout = "" + mock_result.stderr = "" + mock_to_thread.return_value = mock_result + + result = await mcp_service._kill_process_on_port(9999) + assert result is False + + @pytest.mark.asyncio + async def test_kill_process_on_port_success(self, mcp_service): + """Test that _kill_process_on_port successfully kills a process.""" + with patch("asyncio.to_thread") as mock_to_thread: + # Mock lsof returning a PID + mock_lsof_result = MagicMock() + mock_lsof_result.returncode = 0 + mock_lsof_result.stdout = "12345\n" + mock_lsof_result.stderr = "" + + # Mock kill command succeeding + mock_kill_result = MagicMock() + mock_kill_result.returncode = 0 + mock_kill_result.stdout = "" + mock_kill_result.stderr = "" + + # Set up side effects for two calls: lsof, then kill + mock_to_thread.side_effect = [mock_lsof_result, mock_kill_result] + + result = await mcp_service._kill_process_on_port(9000) + assert result is True + assert mock_to_thread.call_count == 2 + + @pytest.mark.asyncio + async def test_kill_process_on_port_multiple_pids(self, mcp_service): + """Test that _kill_process_on_port handles multiple PIDs.""" + with patch("asyncio.to_thread") as mock_to_thread: + # Mock lsof returning multiple PIDs + mock_lsof_result = MagicMock() + mock_lsof_result.returncode = 0 + mock_lsof_result.stdout = "12345\n67890\n" + mock_lsof_result.stderr = "" + + # Mock kill command succeeding for first PID + mock_kill_result = MagicMock() + mock_kill_result.returncode = 0 + + mock_to_thread.side_effect = [mock_lsof_result, mock_kill_result] + + result = await mcp_service._kill_process_on_port(9000) + assert result is True + + @pytest.mark.asyncio + async def test_kill_process_on_port_kill_fails(self, mcp_service): + """Test that _kill_process_on_port handles kill command failure.""" + with patch("asyncio.to_thread") as mock_to_thread: + # Mock lsof returning a PID + mock_lsof_result = MagicMock() + mock_lsof_result.returncode = 0 + mock_lsof_result.stdout = "12345\n" + mock_lsof_result.stderr = "" + + # Mock kill command failing + mock_kill_result = MagicMock() + mock_kill_result.returncode = 1 + + mock_to_thread.side_effect = [mock_lsof_result, mock_kill_result] + + result = await mcp_service._kill_process_on_port(9000) + assert result is False + + @pytest.mark.asyncio + async def test_kill_process_on_port_exception_handling(self, mcp_service): + """Test that _kill_process_on_port handles exceptions gracefully.""" + with patch("asyncio.to_thread", side_effect=Exception("Test error")): + result = await mcp_service._kill_process_on_port(9000) + assert result is False + + +class TestAuthConfigChanges: + """Test authentication configuration change detection.""" + + def test_has_auth_config_changed_port_changed(self, mcp_service): + """Test that port change is detected.""" + existing_auth = { + "auth_type": "oauth", + "oauth_host": "localhost", + "oauth_port": "9000", + "oauth_server_url": "http://localhost:9000", + } + new_auth = { + "auth_type": "oauth", + "oauth_host": "localhost", + "oauth_port": "9001", + "oauth_server_url": "http://localhost:9001", + } + + assert mcp_service._has_auth_config_changed(existing_auth, new_auth) is True + + def test_has_auth_config_changed_no_change(self, mcp_service): + """Test that identical configs are not detected as changed.""" + existing_auth = { + "auth_type": "oauth", + "oauth_host": "localhost", + "oauth_port": "9000", + "oauth_server_url": "http://localhost:9000", + } + new_auth = existing_auth.copy() + + assert mcp_service._has_auth_config_changed(existing_auth, new_auth) is False + + def test_has_auth_config_changed_auth_type_changed(self, mcp_service): + """Test that auth type change is detected.""" + existing_auth = {"auth_type": "oauth", "oauth_port": "9000"} + new_auth = {"auth_type": "apikey", "api_key": "test_key"} + + assert mcp_service._has_auth_config_changed(existing_auth, new_auth) is True + + def test_has_auth_config_changed_both_none(self, mcp_service): + """Test that two None configs are not detected as changed.""" + assert mcp_service._has_auth_config_changed(None, None) is False + + def test_has_auth_config_changed_one_none(self, mcp_service): + """Test that changing from None to config is detected.""" + existing_auth = None + new_auth = {"auth_type": "oauth", "oauth_port": "9000"} + + assert mcp_service._has_auth_config_changed(existing_auth, new_auth) is True + + +class TestPortChangeHandling: + """Test handling of port changes in composer restart.""" + + @pytest.mark.asyncio + async def test_port_change_triggers_restart(self, mcp_service): + """Test that changing ports triggers a restart via auth config change detection.""" + project_id = "test-project" + old_port = 9000 + new_port = 9001 + + # Set up existing composer + mock_process = MagicMock(poll=MagicMock(return_value=None), pid=12345) + mcp_service.project_composers[project_id] = { + "process": mock_process, + "host": "localhost", + "port": old_port, + "sse_url": "http://test", + "auth_config": { + "auth_type": "oauth", + "oauth_host": "localhost", + "oauth_port": str(old_port), + "oauth_server_url": f"http://localhost:{old_port}", + "oauth_client_id": "test", + "oauth_client_secret": "test", + "oauth_auth_url": "http://test", + "oauth_token_url": "http://test", + }, + } + mcp_service._port_to_project[old_port] = project_id + mcp_service._pid_to_project[12345] = project_id + + new_auth_config = { + "auth_type": "oauth", + "oauth_host": "localhost", + "oauth_port": str(new_port), + "oauth_server_url": f"http://localhost:{new_port}", + "oauth_client_id": "test", + "oauth_client_secret": "test", + "oauth_auth_url": "http://test", + "oauth_token_url": "http://test", + } + + with ( + patch.object(mcp_service, "_do_stop_project_composer", new=AsyncMock()) as mock_stop, + patch.object(mcp_service, "_is_port_available", return_value=True), + patch.object(mcp_service, "_start_project_composer_process", new=AsyncMock()), + ): + # Initialize locks + mcp_service._start_locks[project_id] = asyncio.Lock() + + with contextlib.suppress(Exception): + await mcp_service._do_start_project_composer( + project_id=project_id, + sse_url="http://test", + auth_config=new_auth_config, + max_retries=1, + max_startup_checks=1, + startup_delay=0.1, + ) + + # Verify composer was stopped (because config changed) + mock_stop.assert_called_once_with(project_id) + + @pytest.mark.asyncio + async def test_port_in_use_by_own_project_triggers_kill(self, mcp_service): + """Test that when port is in use by the current project, it kills the process.""" + project_id = "test-project" + test_port = 9001 + + # Register the port as owned by this project + mcp_service._port_to_project[test_port] = project_id + + auth_config = { + "auth_type": "oauth", + "oauth_host": "localhost", + "oauth_port": str(test_port), + "oauth_server_url": f"http://localhost:{test_port}", + "oauth_client_id": "test", + "oauth_client_secret": "test", + "oauth_auth_url": "http://test", + "oauth_token_url": "http://test", + } + + with ( + patch.object(mcp_service, "_is_port_available") as mock_port_check, + patch.object(mcp_service, "_kill_process_on_port", new=AsyncMock(return_value=True)) as mock_kill, + ): + # First check: port is in use, second check after kill: port is free + mock_port_check.side_effect = [False, True] + + # Initialize locks + mcp_service._start_locks[project_id] = asyncio.Lock() + + with ( + patch.object(mcp_service, "_start_project_composer_process", new=AsyncMock()), + contextlib.suppress(Exception), + ): + await mcp_service._do_start_project_composer( + project_id=project_id, + sse_url="http://test", + auth_config=auth_config, + max_retries=1, + max_startup_checks=1, + startup_delay=0.1, + ) + + # Verify kill was attempted on own project's port + mock_kill.assert_called_with(test_port) + + @pytest.mark.asyncio + async def test_port_in_use_by_unknown_process_raises_error(self, mcp_service): + """Test that error is raised when port is in use by unknown process (security).""" + project_id = "test-project" + test_port = 9001 + + # Port is NOT tracked (unknown process) + # mcp_service._port_to_project does NOT contain test_port + + auth_config = { + "auth_type": "oauth", + "oauth_host": "localhost", + "oauth_port": str(test_port), + "oauth_server_url": f"http://localhost:{test_port}", + "oauth_client_id": "test", + "oauth_client_secret": "test", + "oauth_auth_url": "http://test", + "oauth_token_url": "http://test", + } + + with patch.object(mcp_service, "_is_port_available", return_value=False): # Port in use + # Initialize locks + mcp_service._start_locks[project_id] = asyncio.Lock() + + with pytest.raises(MCPComposerPortError) as exc_info: + await mcp_service._do_start_project_composer( + project_id=project_id, + sse_url="http://test", + auth_config=auth_config, + max_retries=1, + max_startup_checks=1, + startup_delay=0.1, + ) + + # New security message: won't kill unknown processes + assert "already in use by another application" in str(exc_info.value) diff --git a/src/frontend/src/controllers/API/queries/mcp/use-patch-flows-mcp.ts b/src/frontend/src/controllers/API/queries/mcp/use-patch-flows-mcp.ts index def36e79681f..79b4e7b02ba0 100644 --- a/src/frontend/src/controllers/API/queries/mcp/use-patch-flows-mcp.ts +++ b/src/frontend/src/controllers/API/queries/mcp/use-patch-flows-mcp.ts @@ -45,7 +45,7 @@ export const usePatchFlowsMCP: useMutationFunctionType< PatchFlowMCPResponse, any, PatchFlowMCPRequest - > = mutate(["usePatchFlowsMCP"], patchFlowMCP, { + > = mutate(["usePatchFlowsMCP", params.project_id], patchFlowMCP, { onSuccess: (data, variables, context) => { const authSettings = (variables as unknown as PatchFlowMCPRequest) .auth_settings; @@ -73,9 +73,10 @@ export const usePatchFlowsMCP: useMutationFunctionType< } }, onSettled: () => { - // Use invalidateQueries instead of refetchQueries to avoid race conditions - // This marks the queries as stale but doesn't immediately refetch them - queryClient.invalidateQueries({ queryKey: ["useGetFlowsMCP"] }); + // Invalidate only this specific project's queries to avoid affecting other projects + queryClient.invalidateQueries({ + queryKey: ["useGetFlowsMCP", params.project_id], + }); }, ...options, }); diff --git a/src/frontend/src/controllers/API/queries/mcp/use-patch-install-mcp.ts b/src/frontend/src/controllers/API/queries/mcp/use-patch-install-mcp.ts index da5b217867d2..d9a01b365ebb 100644 --- a/src/frontend/src/controllers/API/queries/mcp/use-patch-install-mcp.ts +++ b/src/frontend/src/controllers/API/queries/mcp/use-patch-install-mcp.ts @@ -47,7 +47,7 @@ export const usePatchInstallMCP: useMutationFunctionType< PatchInstallMCPResponse, any, PatchInstallMCPBody - > = mutate(["usePatchInstallMCP"], patchInstallMCP, { + > = mutate(["usePatchInstallMCP", params.project_id], patchInstallMCP, { ...options, onSuccess: (data, variables, context) => { queryClient.invalidateQueries({ diff --git a/src/frontend/src/modals/authModal/__tests__/AuthModal.test.tsx b/src/frontend/src/modals/authModal/__tests__/AuthModal.test.tsx new file mode 100644 index 000000000000..59c1eaff2ac5 --- /dev/null +++ b/src/frontend/src/modals/authModal/__tests__/AuthModal.test.tsx @@ -0,0 +1,275 @@ +import { render, screen } from "@testing-library/react"; +import userEvent from "@testing-library/user-event"; +import { TooltipProvider } from "@/components/ui/tooltip"; +import AuthModal from "../index"; + +// Mock utilities +jest.mock("@/utils/mcpUtils", () => ({ + AUTH_METHODS_ARRAY: [ + { id: "none", label: "None" }, + { id: "apikey", label: "API Key" }, + { id: "oauth", label: "OAuth" }, + ], +})); + +jest.mock("@/utils/stringManipulation", () => ({ + toSpaceCase: (str: string) => str.replace(/_/g, " "), +})); + +// Mock UI components +jest.mock("@/components/common/genericIconComponent", () => ({ + __esModule: true, + default: ({ name, className, ...props }: any) => ( + + {name} + + ), +})); + +// Mock custom link +jest.mock("@/customization/components/custom-link", () => ({ + CustomLink: ({ children, to, className }: any) => ( + + {children} + + ), +})); + +// Custom render function with TooltipProvider +const renderWithTooltip = (ui: React.ReactElement) => { + return render({ui}); +}; + +describe("AuthModal OAuth Port Synchronization", () => { + const mockOnSave = jest.fn(); + const mockSetOpen = jest.fn(); + + const defaultProps = { + open: true, + setOpen: mockSetOpen, + onSave: mockOnSave, + installedClients: [], + autoInstall: false, + }; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("should auto-sync Server URL when Port is changed", async () => { + const user = userEvent.setup(); + + renderWithTooltip(); + + // Select OAuth auth type + const oauthRadio = screen.getByLabelText(/OAuth/i); + await user.click(oauthRadio); + + // Fill in the Port field + const portInput = screen.getByLabelText(/^Port$/i); + await user.clear(portInput); + await user.type(portInput, "9001"); + + // Check that Server URL was auto-updated + const serverUrlInput = screen.getByLabelText(/Server URL/i); + expect(serverUrlInput).toHaveValue("http://localhost:9001"); + }); + + it("should auto-sync Server URL when Host is changed", async () => { + const user = userEvent.setup(); + + renderWithTooltip(); + + // Select OAuth auth type + const oauthRadio = screen.getByLabelText(/OAuth/i); + await user.click(oauthRadio); + + // Fill in the Host first + const hostInput = screen.getByLabelText(/^Host$/i); + await user.clear(hostInput); + await user.type(hostInput, "example.com"); + + // Then fill in the Port + const portInput = screen.getByLabelText(/^Port$/i); + await user.clear(portInput); + await user.type(portInput, "8080"); + + // Check that Server URL was auto-updated with both host and port + const serverUrlInput = screen.getByLabelText(/Server URL/i); + expect(serverUrlInput).toHaveValue("http://example.com:8080"); + }); + + it("should auto-sync Callback Path when Port is changed", async () => { + const user = userEvent.setup(); + + renderWithTooltip(); + + // Select OAuth auth type + const oauthRadio = screen.getByLabelText(/OAuth/i); + await user.click(oauthRadio); + + // Fill in the Port field + const portInput = screen.getByLabelText(/^Port$/i); + await user.clear(portInput); + await user.type(portInput, "9001"); + + // Check that Callback Path was auto-updated + const callbackPathInput = screen.getByLabelText(/Callback Path/i); + expect(callbackPathInput).toHaveValue( + "http://localhost:9001/auth/idaas/callback", + ); + }); + + it("should auto-sync both Server URL and Callback Path when Host changes", async () => { + const user = userEvent.setup(); + + renderWithTooltip(); + + // Select OAuth auth type + const oauthRadio = screen.getByLabelText(/OAuth/i); + await user.click(oauthRadio); + + // Fill in Host and Port + const hostInput = screen.getByLabelText(/^Host$/i); + await user.clear(hostInput); + await user.type(hostInput, "192.168.1.100"); + + const portInput = screen.getByLabelText(/^Port$/i); + await user.clear(portInput); + await user.type(portInput, "9002"); + + // Verify Server URL + const serverUrlInput = screen.getByLabelText(/Server URL/i); + expect(serverUrlInput).toHaveValue("http://192.168.1.100:9002"); + + // Verify Callback Path + const callbackPathInput = screen.getByLabelText(/Callback Path/i); + expect(callbackPathInput).toHaveValue( + "http://192.168.1.100:9002/auth/idaas/callback", + ); + }); + + it("should save correctly synced OAuth settings", async () => { + const user = userEvent.setup(); + + renderWithTooltip(); + + // Select OAuth auth type + const oauthRadio = screen.getByLabelText(/OAuth/i); + await user.click(oauthRadio); + + // Fill in OAuth fields + const hostInput = screen.getByLabelText(/^Host$/i); + await user.clear(hostInput); + await user.type(hostInput, "localhost"); + + const portInput = screen.getByLabelText(/^Port$/i); + await user.clear(portInput); + await user.type(portInput, "9001"); + + const clientIdInput = screen.getByLabelText(/Client ID/i); + await user.type(clientIdInput, "test-client-id"); + + const clientSecretInput = screen.getByLabelText(/Client Secret/i); + await user.type(clientSecretInput, "test-secret"); + + const authUrlInput = screen.getByLabelText(/Authorization URL/i); + await user.type(authUrlInput, "http://localhost:9001/auth/authorize"); + + const tokenUrlInput = screen.getByLabelText(/Token URL/i); + await user.type(tokenUrlInput, "http://localhost:9001/auth/token"); + + // Click Save + const saveButton = screen.getByRole("button", { name: /Save/i }); + await user.click(saveButton); + + // Verify onSave was called with correct synced data + expect(mockOnSave).toHaveBeenCalledWith( + expect.objectContaining({ + auth_type: "oauth", + oauth_host: "localhost", + oauth_port: "9001", + oauth_server_url: "http://localhost:9001", + oauth_callback_path: "http://localhost:9001/auth/idaas/callback", + oauth_client_id: "test-client-id", + oauth_client_secret: "test-secret", + oauth_auth_url: "http://localhost:9001/auth/authorize", + oauth_token_url: "http://localhost:9001/auth/token", + }), + ); + }); + + it("should preserve existing Server URL if manually edited after port change", async () => { + const user = userEvent.setup(); + + renderWithTooltip(); + + // Select OAuth auth type + const oauthRadio = screen.getByLabelText(/OAuth/i); + await user.click(oauthRadio); + + // Set port (auto-syncs Server URL) + const portInput = screen.getByLabelText(/^Port$/i); + await user.clear(portInput); + await user.type(portInput, "9001"); + + // Manually edit Server URL to something different + const serverUrlInput = screen.getByLabelText(/Server URL/i); + await user.clear(serverUrlInput); + await user.type(serverUrlInput, "http://custom.example.com:9001"); + + // Verify the manual edit is preserved + expect(serverUrlInput).toHaveValue("http://custom.example.com:9001"); + + // Change port again + await user.clear(portInput); + await user.type(portInput, "9002"); + + // Server URL should now be auto-synced again + expect(serverUrlInput).toHaveValue("http://localhost:9002"); + }); + + it("should load existing auth settings correctly", () => { + const existingAuthSettings = { + auth_type: "oauth", + oauth_host: "existing.host.com", + oauth_port: "8080", + oauth_server_url: "http://existing.host.com:8080", + oauth_callback_path: "http://existing.host.com:8080/auth/idaas/callback", + oauth_client_id: "existing-client", + oauth_client_secret: "existing-secret", + oauth_auth_url: "http://existing.host.com:8080/auth", + oauth_token_url: "http://existing.host.com:8080/token", + oauth_mcp_scope: "user", + oauth_provider_scope: "openid", + }; + + renderWithTooltip( + , + ); + + // Verify fields are populated + expect(screen.getByLabelText(/^Host$/i)).toHaveValue("existing.host.com"); + expect(screen.getByLabelText(/^Port$/i)).toHaveValue("8080"); + expect(screen.getByLabelText(/Server URL/i)).toHaveValue( + "http://existing.host.com:8080", + ); + expect(screen.getByLabelText(/Callback Path/i)).toHaveValue( + "http://existing.host.com:8080/auth/idaas/callback", + ); + }); + + it("should not auto-sync if auth type is not OAuth", async () => { + const user = userEvent.setup(); + + renderWithTooltip(); + + // Select API Key auth type + const apikeyRadio = screen.getByLabelText(/API Key/i); + await user.click(apikeyRadio); + + // OAuth fields should not be visible + expect(screen.queryByLabelText(/^Port$/i)).not.toBeInTheDocument(); + expect(screen.queryByLabelText(/Server URL/i)).not.toBeInTheDocument(); + }); +}); diff --git a/src/frontend/src/modals/authModal/index.tsx b/src/frontend/src/modals/authModal/index.tsx index 5446a2c55547..58b69fe1af7f 100644 --- a/src/frontend/src/modals/authModal/index.tsx +++ b/src/frontend/src/modals/authModal/index.tsx @@ -79,10 +79,26 @@ const AuthModal = ({ }; const handleAuthFieldChange = (field: string, value: string) => { - setAuthFields((prev) => ({ - ...prev, - [field]: value, - })); + setAuthFields((prev) => { + const newFields = { + ...prev, + [field]: value, + }; + + // Auto-sync Server URL and Callback Path when Port or Host changes + if (field === "oauthPort" || field === "oauthHost") { + const host = + field === "oauthHost" ? value : prev.oauthHost || "localhost"; + const port = field === "oauthPort" ? value : prev.oauthPort || ""; + + if (port) { + newFields.oauthServerUrl = `http://${host}:${port}`; + newFields.oauthCallbackPath = `http://${host}:${port}/auth/idaas/callback`; + } + } + + return newFields; + }); }; const handleSave = () => { diff --git a/src/frontend/src/pages/MainPage/pages/homePage/components/McpServerTab.tsx b/src/frontend/src/pages/MainPage/pages/homePage/components/McpServerTab.tsx index 242f8a5a2f57..43f017440b11 100644 --- a/src/frontend/src/pages/MainPage/pages/homePage/components/McpServerTab.tsx +++ b/src/frontend/src/pages/MainPage/pages/homePage/components/McpServerTab.tsx @@ -1,5 +1,5 @@ import { useQueryClient } from "@tanstack/react-query"; -import { memo, type ReactNode, useCallback, useState } from "react"; +import { memo, type ReactNode, useCallback, useEffect, useState } from "react"; import { useParams } from "react-router-dom"; import { Light as SyntaxHighlighter } from "react-syntax-highlighter"; import { ForwardedIconComponent } from "@/components/common/genericIconComponent"; @@ -156,30 +156,37 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { const [apiKey, setApiKey] = useState(""); const [isGeneratingApiKey, setIsGeneratingApiKey] = useState(false); const [authModalOpen, setAuthModalOpen] = useState(false); + const [isWaitingForComposer, setIsWaitingForComposer] = useState(false); + const [showSlowWarning, setShowSlowWarning] = useState(false); const setSuccessData = useAlertStore((state) => state.setSuccessData); const setErrorData = useAlertStore((state) => state.setErrorData); + const queryClient = useQueryClient(); const { data: mcpProjectData, isLoading: isLoadingMCPProjectData } = useGetFlowsMCP({ projectId }); const { mutate: patchFlowsMCP, isPending: isPatchingFlowsMCP } = usePatchFlowsMCP({ project_id: projectId }); - // Extract tools and auth_settings from the response const flowsMCP = mcpProjectData?.tools || []; const currentAuthSettings = mcpProjectData?.auth_settings; - // Only get composer URL for OAuth projects - // Disable the query during mutations to prevent stale auth state issues const isOAuthProject = currentAuthSettings?.auth_type === "oauth" && ENABLE_MCP_COMPOSER; const shouldQueryComposerUrl = isOAuthProject && !isPatchingFlowsMCP; - const { data: composerUrlData } = useGetProjectComposerUrl( - { - projectId, - }, - { enabled: !!projectId && shouldQueryComposerUrl }, - ); + const { data: composerUrlData, isLoading: isLoadingComposerUrl } = + useGetProjectComposerUrl( + { + projectId, + }, + { enabled: !!projectId && shouldQueryComposerUrl }, + ); + + useEffect(() => { + if (isWaitingForComposer && !isLoadingComposerUrl && composerUrlData) { + setIsWaitingForComposer(false); + } + }, [isWaitingForComposer, isLoadingComposerUrl, composerUrlData]); const { mutate: patchInstallMCP } = usePatchInstallMCP({ project_id: projectId, @@ -203,7 +210,6 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { ? currentAuthSettings?.auth_type === "apikey" : !isAutoLogin; - // Check if the current connection is local const isLocalConnection = useCustomIsLocalConnection(); const [selectedMode, setSelectedMode] = useState( @@ -218,8 +224,6 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { mcp_enabled: flow.status, })); - // Prepare the request with both settings and auth_settings - // If ENABLE_MCP_COMPOSER is false, always use "none" for auth_type const finalAuthSettings = ENABLE_MCP_COMPOSER ? currentAuthSettings : { auth_type: "none" }; @@ -247,7 +251,37 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { auth_settings: authSettings, }; - patchFlowsMCP(requestData); + // Clear cached composer URL data BEFORE making the request to ensure fresh errors are fetched + queryClient.removeQueries({ + queryKey: ["project-composer-url", projectId], + }); + + // Set waiting state for OAuth before making the request + if (authSettings.auth_type === "oauth") { + setIsWaitingForComposer(true); + } + + patchFlowsMCP(requestData, { + onSuccess: () => { + // Invalidate the MCP project data to refresh auth settings + queryClient.invalidateQueries({ + queryKey: ["flows-mcp", { projectId }], + }); + + if (authSettings.auth_type === "oauth") { + // Also invalidate composer URL to fetch new OAuth server info + queryClient.invalidateQueries({ + queryKey: ["project-composer-url", projectId], + }); + } else { + // Clear waiting state if not OAuth + setIsWaitingForComposer(false); + } + }, + onError: () => { + setIsWaitingForComposer(false); + }, + }); }; const flowsMCPData = flowsMCP?.map((flow) => ({ @@ -385,11 +419,37 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { const [loadingMCP, setLoadingMCP] = useState([]); - // Check if authentication is configured (not "none") const hasAuthentication = - currentAuthSettings?.auth_type && currentAuthSettings.auth_type !== "none"; + (currentAuthSettings?.auth_type && + currentAuthSettings.auth_type !== "none") || + isWaitingForComposer; + + const isLoadingMCPProjectAuth = + isLoadingMCPProjectData || + isPatchingFlowsMCP || + isWaitingForComposer || + (isOAuthProject && isLoadingComposerUrl); + + // Monitor loading time and show warning after 30s + useEffect(() => { + let timer: NodeJS.Timeout | null = null; + + if (isLoadingMCPProjectAuth) { + // Start timer when loading begins + timer = setTimeout(() => { + setShowSlowWarning(true); + }, 30000); // 30 seconds + } else { + // Reset warning when loading completes + setShowSlowWarning(false); + } - const isLoadingMCPProjectAuth = isLoadingMCPProjectData || isPatchingFlowsMCP; + return () => { + if (timer) { + clearTimeout(timer); + } + }; + }, [isLoadingMCPProjectAuth]); return (
@@ -460,21 +520,31 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { ) : ( + {isLoadingMCPProjectAuth && showSlowWarning && ( + + )} { {isLoadingMCPProjectAuth ? "Loading..." : AUTH_METHODS[ - currentAuthSettings.auth_type as keyof typeof AUTH_METHODS - ]?.label || currentAuthSettings.auth_type} + currentAuthSettings?.auth_type as keyof typeof AUTH_METHODS + ]?.label || currentAuthSettings?.auth_type} )} diff --git a/src/frontend/src/pages/MainPage/pages/homePage/components/__tests__/McpServerTab.test.tsx b/src/frontend/src/pages/MainPage/pages/homePage/components/__tests__/McpServerTab.test.tsx new file mode 100644 index 000000000000..6b1f55ef22ff --- /dev/null +++ b/src/frontend/src/pages/MainPage/pages/homePage/components/__tests__/McpServerTab.test.tsx @@ -0,0 +1,794 @@ +import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; +import { fireEvent, render, screen, waitFor } from "@testing-library/react"; +import userEvent from "@testing-library/user-event"; +import type React from "react"; +import { MemoryRouter } from "react-router-dom"; +import McpServerTab from "../McpServerTab"; + +// Mock react-router-dom +jest.mock("react-router-dom", () => ({ + ...jest.requireActual("react-router-dom"), + useParams: jest.fn(), +})); + +// Mock constants +jest.mock("@/constants/constants", () => ({ + MAX_MCP_SERVER_NAME_LENGTH: 50, +})); + +// Mock API controller +jest.mock("@/controllers/API", () => ({ + createApiKey: jest.fn(), +})); + +// Mock API hooks +jest.mock("@/controllers/API/queries/mcp", () => ({ + useGetFlowsMCP: jest.fn(), + usePatchFlowsMCP: jest.fn(), +})); + +jest.mock("@/controllers/API/queries/mcp/use-get-composer-url", () => ({ + useGetProjectComposerUrl: jest.fn(), +})); + +jest.mock("@/controllers/API/queries/mcp/use-get-installed-mcp", () => ({ + useGetInstalledMCP: jest.fn(), +})); + +jest.mock("@/controllers/API/queries/mcp/use-patch-install-mcp", () => ({ + usePatchInstallMCP: jest.fn(), +})); + +// Mock feature flags +jest.mock("@/customization/feature-flags", () => ({ + ENABLE_MCP_COMPOSER: true, +})); + +// Mock custom hooks +jest.mock("@/customization/hooks/use-custom-is-local-connection", () => ({ + useCustomIsLocalConnection: jest.fn(), +})); + +jest.mock("@/customization/hooks/use-custom-theme", () => ({ + __esModule: true, + default: jest.fn(), +})); + +jest.mock("@/customization/utils/custom-mcp-url", () => ({ + customGetMCPUrl: jest.fn(), +})); + +// Mock stores +jest.mock("@/stores/alertStore", () => ({ + __esModule: true, + default: jest.fn((selector) => + selector({ + setSuccessData: jest.fn(), + setErrorData: jest.fn(), + }), + ), +})); + +jest.mock("@/stores/authStore", () => ({ + __esModule: true, + default: jest.fn((selector) => + selector({ + autoLogin: false, + }), + ), +})); + +jest.mock("@/stores/foldersStore", () => ({ + useFolderStore: jest.fn((selector) => + selector({ + myCollectionId: "test-collection-id", + }), + ), +})); + +// Mock utils +jest.mock("@/utils/mcpUtils", () => ({ + AUTH_METHODS: { + apikey: { label: "API Key" }, + oauth: { label: "OAuth" }, + none: { label: "None" }, + }, +})); + +jest.mock("@/utils/stringManipulation", () => ({ + parseString: jest.fn((str: string) => str.toLowerCase().replace(/\s+/g, "_")), + toSpaceCase: jest.fn((str: string) => str.replace(/_/g, " ")), +})); + +jest.mock("@/utils/utils", () => ({ + cn: (...args: any[]) => args.filter(Boolean).join(" "), + getOS: jest.fn(() => "macoslinux"), +})); + +// Mock UI components +jest.mock("@/components/common/genericIconComponent", () => ({ + ForwardedIconComponent: ({ name, className, ...props }: any) => ( + + {name} + + ), +})); + +jest.mock("@/components/common/shadTooltipComponent", () => ({ + __esModule: true, + default: ({ children, content, side }: any) => ( +
+ {children} +
+ ), +})); + +jest.mock( + "@/components/core/parameterRenderComponent/components/ToolsComponent", + () => ({ + __esModule: true, + default: ({ value, handleOnNewValue, button_description }: any) => ( +
+ +
+ ), + }), +); + +jest.mock("@/components/ui/button", () => ({ + Button: ({ + children, + onClick, + disabled, + loading, + unstyled, + ...props + }: any) => ( + + ), +})); + +jest.mock("@/components/ui/tabs-button", () => ({ + Tabs: ({ children, value, onValueChange }: any) => ( +
+ {children} +
+ ), + TabsList: ({ children }: any) => ( +
{children}
+ ), + TabsTrigger: ({ children, value, ...props }: any) => ( + + ), +})); + +// Mock react-syntax-highlighter +jest.mock("react-syntax-highlighter", () => ({ + Light: ({ children, CodeTag }: any) => ( +
+ {CodeTag ? {children} : children} +
+ ), +})); + +// Mock AuthModal +jest.mock("@/modals/authModal", () => ({ + __esModule: true, + default: ({ open, setOpen, onSave, authSettings }: any) => ( +
+ + +
+ ), +})); + +const createTestWrapper = () => { + const queryClient = new QueryClient({ + defaultOptions: { + queries: { retry: false }, + mutations: { retry: false }, + }, + }); + + return ({ children }: { children: React.ReactNode }) => ( + + {children} + + ); +}; + +describe("McpServerTab", () => { + const mockUseParams = require("react-router-dom").useParams; + const mockCreateApiKey = require("@/controllers/API").createApiKey; + const mockUseGetFlowsMCP = + require("@/controllers/API/queries/mcp").useGetFlowsMCP; + const mockUsePatchFlowsMCP = + require("@/controllers/API/queries/mcp").usePatchFlowsMCP; + const mockUseGetProjectComposerUrl = + require("@/controllers/API/queries/mcp/use-get-composer-url").useGetProjectComposerUrl; + const mockUseGetInstalledMCP = + require("@/controllers/API/queries/mcp/use-get-installed-mcp").useGetInstalledMCP; + const mockUsePatchInstallMCP = + require("@/controllers/API/queries/mcp/use-patch-install-mcp").usePatchInstallMCP; + const mockUseCustomIsLocalConnection = + require("@/customization/hooks/use-custom-is-local-connection").useCustomIsLocalConnection; + const mockUseTheme = + require("@/customization/hooks/use-custom-theme").default; + const mockCustomGetMCPUrl = + require("@/customization/utils/custom-mcp-url").customGetMCPUrl; + + const defaultFlowsMCPData = { + tools: [ + { + id: "flow-1", + action_name: "Action 1", + action_description: "Description 1", + name: "Flow 1", + description: "Flow Description 1", + mcp_enabled: true, + }, + ], + auth_settings: { + auth_type: "none", + }, + }; + + beforeEach(() => { + jest.clearAllMocks(); + mockUseParams.mockReturnValue({ folderId: "test-folder-id" }); + mockUseTheme.mockReturnValue({ dark: false }); + mockUseCustomIsLocalConnection.mockReturnValue(true); + mockCustomGetMCPUrl.mockReturnValue("http://localhost:7860/api/v1/mcp"); + + mockUseGetFlowsMCP.mockReturnValue({ + data: defaultFlowsMCPData, + isLoading: false, + }); + + mockUsePatchFlowsMCP.mockReturnValue({ + mutate: jest.fn(), + isPending: false, + }); + + mockUseGetProjectComposerUrl.mockReturnValue({ + data: null, + isLoading: false, + }); + + mockUseGetInstalledMCP.mockReturnValue({ + data: [ + { name: "cursor", installed: false, available: true }, + { name: "claude", installed: false, available: true }, + { name: "windsurf", installed: false, available: true }, + ], + }); + + mockUsePatchInstallMCP.mockReturnValue({ + mutate: jest.fn(), + }); + }); + + describe("Rendering", () => { + it("renders MCP Server title", () => { + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByTestId("mcp-server-title")).toHaveTextContent( + "MCP Server", + ); + }); + + it("renders description with documentation link", () => { + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + const link = screen.getByRole("link", { + name: /Projects as MCP Servers guide/i, + }); + expect(link).toHaveAttribute( + "href", + "https://docs.langflow.org/mcp-server", + ); + expect(link).toHaveAttribute("target", "_blank"); + }); + + it("renders ToolsComponent with correct props", () => { + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByTestId("tools-component")).toBeInTheDocument(); + expect(screen.getByTestId("edit-tools-button")).toHaveTextContent( + "Edit Tools", + ); + }); + + it("renders JSON and Auto install mode tabs", () => { + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByText("Auto install")).toBeInTheDocument(); + expect(screen.getByText("JSON")).toBeInTheDocument(); + }); + }); + + describe("Authentication", () => { + it("displays 'None (public)' when no authentication is configured", () => { + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByText("None (public)")).toBeInTheDocument(); + }); + + it("displays API Key auth when configured", () => { + mockUseGetFlowsMCP.mockReturnValue({ + data: { + ...defaultFlowsMCPData, + auth_settings: { auth_type: "apikey" }, + }, + isLoading: false, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByText("API Key")).toBeInTheDocument(); + }); + + it("displays OAuth auth when configured", () => { + mockUseGetFlowsMCP.mockReturnValue({ + data: { + ...defaultFlowsMCPData, + auth_settings: { auth_type: "oauth" }, + }, + isLoading: false, + }); + + mockUseGetProjectComposerUrl.mockReturnValue({ + data: { sse_url: "http://composer-url", uses_composer: true }, + isLoading: false, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByText("OAuth")).toBeInTheDocument(); + }); + + it("opens auth modal when Add Auth button is clicked", async () => { + const user = userEvent.setup(); + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + const addAuthButton = screen.getByRole("button", { name: /Add Auth/i }); + await user.click(addAuthButton); + + expect(screen.getByTestId("auth-modal")).toHaveAttribute( + "data-open", + "true", + ); + }); + + it("opens auth modal when Edit Auth button is clicked", async () => { + const user = userEvent.setup(); + mockUseGetFlowsMCP.mockReturnValue({ + data: { + ...defaultFlowsMCPData, + auth_settings: { auth_type: "apikey" }, + }, + isLoading: false, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + const editAuthButton = screen.getByRole("button", { name: /Edit Auth/i }); + await user.click(editAuthButton); + + expect(screen.getByTestId("auth-modal")).toHaveAttribute( + "data-open", + "true", + ); + }); + + it("calls patchFlowsMCP when auth is saved", async () => { + const user = userEvent.setup(); + const mockPatchFlowsMCP = jest.fn(); + mockUsePatchFlowsMCP.mockReturnValue({ + mutate: mockPatchFlowsMCP, + isPending: false, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + const addAuthButton = screen.getByRole("button", { name: /Add Auth/i }); + await user.click(addAuthButton); + + const saveButton = screen.getByTestId("save-auth-modal"); + await user.click(saveButton); + + expect(mockPatchFlowsMCP).toHaveBeenCalledWith( + expect.objectContaining({ + auth_settings: { auth_type: "apikey" }, + }), + expect.any(Object), + ); + }); + }); + + describe("API Key Generation", () => { + it("generates API key when button is clicked", async () => { + const user = userEvent.setup(); + mockCreateApiKey.mockResolvedValue({ api_key: "test-api-key-123" }); + mockUseGetFlowsMCP.mockReturnValue({ + data: { + ...defaultFlowsMCPData, + auth_settings: { auth_type: "apikey" }, + }, + isLoading: false, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + // Switch to JSON mode + const jsonTab = screen.getByText("JSON"); + await user.click(jsonTab); + + // Find and click the Generate API key button + const generateButton = screen.getByText("Generate API key"); + await user.click(generateButton); + + await waitFor(() => { + expect(mockCreateApiKey).toHaveBeenCalledWith( + "MCP Server Test Project", + ); + }); + }); + }); + + describe("Mode Switching", () => { + it("switches to JSON mode when clicked", async () => { + const user = userEvent.setup(); + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + const jsonTab = screen.getByText("JSON"); + await user.click(jsonTab); + + expect(screen.getByTestId("syntax-highlighter")).toBeInTheDocument(); + }); + + it("displays auto install mode by default for local connections", () => { + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect( + screen.getByRole("button", { name: /Cursor/i }), + ).toBeInTheDocument(); + expect( + screen.getByRole("button", { name: /Claude/i }), + ).toBeInTheDocument(); + expect( + screen.getByRole("button", { name: /Windsurf/i }), + ).toBeInTheDocument(); + }); + + it("displays JSON mode by default for non-local connections", () => { + mockUseCustomIsLocalConnection.mockReturnValue(false); + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByTestId("syntax-highlighter")).toBeInTheDocument(); + }); + }); + + describe("Auto Install", () => { + it("installs MCP server when installer button is clicked", async () => { + const user = userEvent.setup(); + const mockPatchInstallMCP = jest.fn((_, callbacks) => { + callbacks?.onSuccess?.(); + }); + mockUsePatchInstallMCP.mockReturnValue({ + mutate: mockPatchInstallMCP, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + const cursorButton = screen.getByRole("button", { name: /Cursor/i }); + await user.click(cursorButton); + + expect(mockPatchInstallMCP).toHaveBeenCalledWith( + { client: "cursor" }, + expect.objectContaining({ + onSuccess: expect.any(Function), + onError: expect.any(Function), + }), + ); + }); + + it("disables auto install buttons when not local connection", () => { + mockUseCustomIsLocalConnection.mockReturnValue(false); + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + // Switch to Auto install mode + const autoInstallTab = screen.getByText("Auto install"); + fireEvent.click(autoInstallTab); + + const cursorButton = screen.getByRole("button", { name: /Cursor/i }); + expect(cursorButton).toBeDisabled(); + }); + + it("shows warning when auto install is disabled for non-local connections", async () => { + const user = userEvent.setup(); + mockUseCustomIsLocalConnection.mockReturnValue(false); + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + // Switch to Auto install mode + const autoInstallTab = screen.getByText("Auto install"); + await user.click(autoInstallTab); + + expect( + screen.getByText( + /One-click install is disabled because the Langflow server is not running on your local machine/i, + ), + ).toBeInTheDocument(); + }); + + it("displays check icon for installed clients", () => { + mockUseGetInstalledMCP.mockReturnValue({ + data: [ + { name: "cursor", installed: true, available: true }, + { name: "claude", installed: false, available: true }, + { name: "windsurf", installed: false, available: true }, + ], + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByTestId("icon-Check")).toBeInTheDocument(); + }); + }); + + describe("Platform Selection", () => { + it("renders platform tabs", async () => { + const user = userEvent.setup(); + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + // Switch to JSON mode + const jsonTab = screen.getByText("JSON"); + await user.click(jsonTab); + + expect(screen.getByTestId("tab-trigger-macoslinux")).toBeInTheDocument(); + expect(screen.getByTestId("tab-trigger-windows")).toBeInTheDocument(); + expect(screen.getByTestId("tab-trigger-wsl")).toBeInTheDocument(); + }); + }); + + describe("Tools Management", () => { + it("updates flows when tools are changed", async () => { + const user = userEvent.setup(); + const mockPatchFlowsMCP = jest.fn(); + mockUsePatchFlowsMCP.mockReturnValue({ + mutate: mockPatchFlowsMCP, + isPending: false, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + const editButton = screen.getByTestId("edit-tools-button"); + await user.click(editButton); + + expect(mockPatchFlowsMCP).toHaveBeenCalled(); + }); + }); + + describe("OAuth Error Handling", () => { + it("displays error message when OAuth configuration has errors", async () => { + const user = userEvent.setup(); + mockUseGetFlowsMCP.mockReturnValue({ + data: { + ...defaultFlowsMCPData, + auth_settings: { auth_type: "oauth" }, + }, + isLoading: false, + }); + + mockUseGetProjectComposerUrl.mockReturnValue({ + data: { + error_message: "OAuth configuration error", + uses_composer: true, + }, + isLoading: false, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + // Switch to JSON mode + const jsonTab = screen.getByText("JSON"); + await user.click(jsonTab); + + expect( + screen.getByText("MCP Server Configuration Error"), + ).toBeInTheDocument(); + expect(screen.getByText("OAuth configuration error")).toBeInTheDocument(); + }); + }); + + describe("Loading States", () => { + it("shows loading state for auth when patching", () => { + mockUseGetFlowsMCP.mockReturnValue({ + data: { + ...defaultFlowsMCPData, + auth_settings: { auth_type: "apikey" }, + }, + isLoading: false, + }); + + mockUsePatchFlowsMCP.mockReturnValue({ + mutate: jest.fn(), + isPending: true, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByText("Loading...")).toBeInTheDocument(); + expect(screen.getByTestId("icon-Loader2")).toBeInTheDocument(); + }); + + it("shows loading state when waiting for composer", () => { + mockUseGetFlowsMCP.mockReturnValue({ + data: { + ...defaultFlowsMCPData, + auth_settings: { auth_type: "oauth" }, + }, + isLoading: false, + }); + + mockUseGetProjectComposerUrl.mockReturnValue({ + data: null, + isLoading: true, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByText("Loading...")).toBeInTheDocument(); + }); + }); + + describe("Dark Mode", () => { + it("applies dark mode styles when dark theme is active", () => { + mockUseTheme.mockReturnValue({ dark: true }); + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + expect(screen.getByTestId("mcp-server-title")).toBeInTheDocument(); + }); + }); + + describe("Copy to Clipboard", () => { + it("copies JSON configuration to clipboard", async () => { + const user = userEvent.setup(); + const mockWriteText = jest.fn().mockResolvedValue(undefined); + + Object.defineProperty(navigator, "clipboard", { + value: { + writeText: mockWriteText, + }, + writable: true, + configurable: true, + }); + + const TestWrapper = createTestWrapper(); + render(, { + wrapper: TestWrapper, + }); + + // Switch to JSON mode + const jsonTab = screen.getByText("JSON"); + await user.click(jsonTab); + + const copyButton = screen.getByTestId("icon-copy"); + await user.click(copyButton); + + await waitFor(() => { + expect(mockWriteText).toHaveBeenCalled(); + }); + }); + }); +});