From e874770c0d2a9e45f2d48b9becf84c0d2ddb492e Mon Sep 17 00:00:00 2001 From: sickn33 Date: Sat, 28 Mar 2026 23:16:17 +0100 Subject: [PATCH] fix(whatsapp): Stop logging sensitive config data Sanitize WhatsApp Cloud API validator output across the root skill and plugin copies so code scanning no longer flags clear-text exposure. Add a regression test that verifies successful and failed validation runs do not print sensitive response fields or API error details. --- .../scripts/validate_config.py | 78 +++------- .../scripts/validate_config.py | 78 +++------- .../scripts/validate_config.py | 78 +++------- tools/scripts/tests/run-test-suite.js | 1 + .../test_whatsapp_config_logging_security.py | 145 ++++++++++++++++++ 5 files changed, 221 insertions(+), 159 deletions(-) create mode 100644 tools/scripts/tests/test_whatsapp_config_logging_security.py diff --git a/plugins/antigravity-awesome-skills-claude/skills/whatsapp-cloud-api/scripts/validate_config.py b/plugins/antigravity-awesome-skills-claude/skills/whatsapp-cloud-api/scripts/validate_config.py index 5f0cbe76..569d5ff2 100644 --- a/plugins/antigravity-awesome-skills-claude/skills/whatsapp-cloud-api/scripts/validate_config.py +++ b/plugins/antigravity-awesome-skills-claude/skills/whatsapp-cloud-api/scripts/validate_config.py @@ -47,19 +47,17 @@ def check_env_vars() -> tuple[bool, list[str]]: return len(missing) == 0, missing -def _format_api_failure(response: httpx.Response) -> dict[str, str]: - """Return sanitized API failure details without echoing sensitive payloads.""" - try: - error = response.json().get("error", {}) - except ValueError: - error = {} - return { - "status_code": str(response.status_code), - "error_code": str(error.get("code", "?")), - } +def _phone_lookup_failure_message() -> str: + """Return a static failure summary for the phone lookup.""" + return "Graph API rejected the phone-number lookup. Review your token, phone number ID, and app permissions." -def test_api_connection() -> tuple[bool, dict[str, str]]: +def _waba_lookup_failure_message() -> str: + """Return a static failure summary for the WABA lookup.""" + return "Graph API rejected the WABA lookup. Review your WABA ID, token, and assigned assets." + + +def test_api_connection() -> tuple[bool, str]: """Test connection to WhatsApp Cloud API.""" token = os.environ.get("WHATSAPP_TOKEN", "") phone_id = os.environ.get("PHONE_NUMBER_ID", "") @@ -73,27 +71,19 @@ def test_api_connection() -> tuple[bool, dict[str, str]]: ) if response.status_code == 200: - data = response.json() - return True, { - "phone": str(data.get("display_phone_number", "N/A")), - "name": str(data.get("verified_name", "N/A")), - "status": str(data.get("code_verification_status", "N/A")), - "quality": str(data.get("quality_rating", "N/A")), - } + return True, "Phone-number endpoint reachable." - return False, _format_api_failure(response) + return False, _phone_lookup_failure_message() except httpx.ConnectError: - return False, {"reason": "Connection failed. Check your internet connection."} + return False, "Connection failed. Check your internet connection." except httpx.TimeoutException: - return False, {"reason": "Request timed out after 10 seconds."} + return False, "Request timed out after 10 seconds." except Exception as exc: - return False, { - "reason": f"Unexpected {exc.__class__.__name__} while contacting the Graph API." - } + return False, f"Unexpected {exc.__class__.__name__} while contacting the Graph API." -def test_waba_access() -> tuple[bool, dict[str, str]]: +def test_waba_access() -> tuple[bool, str]: """Test access to WhatsApp Business Account.""" token = os.environ.get("WHATSAPP_TOKEN", "") waba_id = os.environ.get("WABA_ID", "") @@ -106,16 +96,12 @@ def test_waba_access() -> tuple[bool, dict[str, str]]: ) if response.status_code == 200: - data = response.json() - count = len(data.get("data", [])) - return True, {"count": str(count)} + return True, "WABA phone-numbers endpoint reachable." - return False, _format_api_failure(response) + return False, _waba_lookup_failure_message() except Exception as exc: - return False, { - "reason": f"Unexpected {exc.__class__.__name__} while checking WABA access." - } + return False, f"Unexpected {exc.__class__.__name__} while checking WABA access." def main(): @@ -134,6 +120,7 @@ def main(): print("=" * 50) print("WhatsApp Cloud API - Configuration Validator") print("=" * 50) + print("Detailed API payloads are intentionally omitted to protect sensitive configuration data.") print() all_ok = True @@ -157,37 +144,22 @@ def main(): # Check 2: API connection print("[2/3] Testing API connection (Phone Number)...") - api_ok, api_details = test_api_connection() + api_ok, api_message = test_api_connection() if api_ok: - print(" OK - Connected successfully") - print(f" Phone: {api_details['phone']}") - print(f" Name: {api_details['name']}") - print(f" Status: {api_details['status']}") - print(f" Quality: {api_details['quality']}") + print(f" OK - {api_message}") else: - if "reason" in api_details: - print(f" FAIL - {api_details['reason']}") - else: - print(" FAIL - API request failed.") - print(f" HTTP Status: {api_details['status_code']}") - print(f" Error Code: {api_details['error_code']}") + print(f" FAIL - {api_message}") all_ok = False print() # Check 3: WABA access print("[3/3] Testing WABA access...") - waba_ok, waba_details = test_waba_access() + waba_ok, waba_message = test_waba_access() if waba_ok: - print(" OK - WABA accessible") - print(f" Phone Numbers Found: {waba_details['count']}") + print(f" OK - {waba_message}") else: - if "reason" in waba_details: - print(f" FAIL - {waba_details['reason']}") - else: - print(" FAIL - API request failed.") - print(f" HTTP Status: {waba_details['status_code']}") - print(f" Error Code: {waba_details['error_code']}") + print(f" FAIL - {waba_message}") all_ok = False print() diff --git a/plugins/antigravity-awesome-skills/skills/whatsapp-cloud-api/scripts/validate_config.py b/plugins/antigravity-awesome-skills/skills/whatsapp-cloud-api/scripts/validate_config.py index 5f0cbe76..569d5ff2 100644 --- a/plugins/antigravity-awesome-skills/skills/whatsapp-cloud-api/scripts/validate_config.py +++ b/plugins/antigravity-awesome-skills/skills/whatsapp-cloud-api/scripts/validate_config.py @@ -47,19 +47,17 @@ def check_env_vars() -> tuple[bool, list[str]]: return len(missing) == 0, missing -def _format_api_failure(response: httpx.Response) -> dict[str, str]: - """Return sanitized API failure details without echoing sensitive payloads.""" - try: - error = response.json().get("error", {}) - except ValueError: - error = {} - return { - "status_code": str(response.status_code), - "error_code": str(error.get("code", "?")), - } +def _phone_lookup_failure_message() -> str: + """Return a static failure summary for the phone lookup.""" + return "Graph API rejected the phone-number lookup. Review your token, phone number ID, and app permissions." -def test_api_connection() -> tuple[bool, dict[str, str]]: +def _waba_lookup_failure_message() -> str: + """Return a static failure summary for the WABA lookup.""" + return "Graph API rejected the WABA lookup. Review your WABA ID, token, and assigned assets." + + +def test_api_connection() -> tuple[bool, str]: """Test connection to WhatsApp Cloud API.""" token = os.environ.get("WHATSAPP_TOKEN", "") phone_id = os.environ.get("PHONE_NUMBER_ID", "") @@ -73,27 +71,19 @@ def test_api_connection() -> tuple[bool, dict[str, str]]: ) if response.status_code == 200: - data = response.json() - return True, { - "phone": str(data.get("display_phone_number", "N/A")), - "name": str(data.get("verified_name", "N/A")), - "status": str(data.get("code_verification_status", "N/A")), - "quality": str(data.get("quality_rating", "N/A")), - } + return True, "Phone-number endpoint reachable." - return False, _format_api_failure(response) + return False, _phone_lookup_failure_message() except httpx.ConnectError: - return False, {"reason": "Connection failed. Check your internet connection."} + return False, "Connection failed. Check your internet connection." except httpx.TimeoutException: - return False, {"reason": "Request timed out after 10 seconds."} + return False, "Request timed out after 10 seconds." except Exception as exc: - return False, { - "reason": f"Unexpected {exc.__class__.__name__} while contacting the Graph API." - } + return False, f"Unexpected {exc.__class__.__name__} while contacting the Graph API." -def test_waba_access() -> tuple[bool, dict[str, str]]: +def test_waba_access() -> tuple[bool, str]: """Test access to WhatsApp Business Account.""" token = os.environ.get("WHATSAPP_TOKEN", "") waba_id = os.environ.get("WABA_ID", "") @@ -106,16 +96,12 @@ def test_waba_access() -> tuple[bool, dict[str, str]]: ) if response.status_code == 200: - data = response.json() - count = len(data.get("data", [])) - return True, {"count": str(count)} + return True, "WABA phone-numbers endpoint reachable." - return False, _format_api_failure(response) + return False, _waba_lookup_failure_message() except Exception as exc: - return False, { - "reason": f"Unexpected {exc.__class__.__name__} while checking WABA access." - } + return False, f"Unexpected {exc.__class__.__name__} while checking WABA access." def main(): @@ -134,6 +120,7 @@ def main(): print("=" * 50) print("WhatsApp Cloud API - Configuration Validator") print("=" * 50) + print("Detailed API payloads are intentionally omitted to protect sensitive configuration data.") print() all_ok = True @@ -157,37 +144,22 @@ def main(): # Check 2: API connection print("[2/3] Testing API connection (Phone Number)...") - api_ok, api_details = test_api_connection() + api_ok, api_message = test_api_connection() if api_ok: - print(" OK - Connected successfully") - print(f" Phone: {api_details['phone']}") - print(f" Name: {api_details['name']}") - print(f" Status: {api_details['status']}") - print(f" Quality: {api_details['quality']}") + print(f" OK - {api_message}") else: - if "reason" in api_details: - print(f" FAIL - {api_details['reason']}") - else: - print(" FAIL - API request failed.") - print(f" HTTP Status: {api_details['status_code']}") - print(f" Error Code: {api_details['error_code']}") + print(f" FAIL - {api_message}") all_ok = False print() # Check 3: WABA access print("[3/3] Testing WABA access...") - waba_ok, waba_details = test_waba_access() + waba_ok, waba_message = test_waba_access() if waba_ok: - print(" OK - WABA accessible") - print(f" Phone Numbers Found: {waba_details['count']}") + print(f" OK - {waba_message}") else: - if "reason" in waba_details: - print(f" FAIL - {waba_details['reason']}") - else: - print(" FAIL - API request failed.") - print(f" HTTP Status: {waba_details['status_code']}") - print(f" Error Code: {waba_details['error_code']}") + print(f" FAIL - {waba_message}") all_ok = False print() diff --git a/skills/whatsapp-cloud-api/scripts/validate_config.py b/skills/whatsapp-cloud-api/scripts/validate_config.py index 5f0cbe76..569d5ff2 100644 --- a/skills/whatsapp-cloud-api/scripts/validate_config.py +++ b/skills/whatsapp-cloud-api/scripts/validate_config.py @@ -47,19 +47,17 @@ def check_env_vars() -> tuple[bool, list[str]]: return len(missing) == 0, missing -def _format_api_failure(response: httpx.Response) -> dict[str, str]: - """Return sanitized API failure details without echoing sensitive payloads.""" - try: - error = response.json().get("error", {}) - except ValueError: - error = {} - return { - "status_code": str(response.status_code), - "error_code": str(error.get("code", "?")), - } +def _phone_lookup_failure_message() -> str: + """Return a static failure summary for the phone lookup.""" + return "Graph API rejected the phone-number lookup. Review your token, phone number ID, and app permissions." -def test_api_connection() -> tuple[bool, dict[str, str]]: +def _waba_lookup_failure_message() -> str: + """Return a static failure summary for the WABA lookup.""" + return "Graph API rejected the WABA lookup. Review your WABA ID, token, and assigned assets." + + +def test_api_connection() -> tuple[bool, str]: """Test connection to WhatsApp Cloud API.""" token = os.environ.get("WHATSAPP_TOKEN", "") phone_id = os.environ.get("PHONE_NUMBER_ID", "") @@ -73,27 +71,19 @@ def test_api_connection() -> tuple[bool, dict[str, str]]: ) if response.status_code == 200: - data = response.json() - return True, { - "phone": str(data.get("display_phone_number", "N/A")), - "name": str(data.get("verified_name", "N/A")), - "status": str(data.get("code_verification_status", "N/A")), - "quality": str(data.get("quality_rating", "N/A")), - } + return True, "Phone-number endpoint reachable." - return False, _format_api_failure(response) + return False, _phone_lookup_failure_message() except httpx.ConnectError: - return False, {"reason": "Connection failed. Check your internet connection."} + return False, "Connection failed. Check your internet connection." except httpx.TimeoutException: - return False, {"reason": "Request timed out after 10 seconds."} + return False, "Request timed out after 10 seconds." except Exception as exc: - return False, { - "reason": f"Unexpected {exc.__class__.__name__} while contacting the Graph API." - } + return False, f"Unexpected {exc.__class__.__name__} while contacting the Graph API." -def test_waba_access() -> tuple[bool, dict[str, str]]: +def test_waba_access() -> tuple[bool, str]: """Test access to WhatsApp Business Account.""" token = os.environ.get("WHATSAPP_TOKEN", "") waba_id = os.environ.get("WABA_ID", "") @@ -106,16 +96,12 @@ def test_waba_access() -> tuple[bool, dict[str, str]]: ) if response.status_code == 200: - data = response.json() - count = len(data.get("data", [])) - return True, {"count": str(count)} + return True, "WABA phone-numbers endpoint reachable." - return False, _format_api_failure(response) + return False, _waba_lookup_failure_message() except Exception as exc: - return False, { - "reason": f"Unexpected {exc.__class__.__name__} while checking WABA access." - } + return False, f"Unexpected {exc.__class__.__name__} while checking WABA access." def main(): @@ -134,6 +120,7 @@ def main(): print("=" * 50) print("WhatsApp Cloud API - Configuration Validator") print("=" * 50) + print("Detailed API payloads are intentionally omitted to protect sensitive configuration data.") print() all_ok = True @@ -157,37 +144,22 @@ def main(): # Check 2: API connection print("[2/3] Testing API connection (Phone Number)...") - api_ok, api_details = test_api_connection() + api_ok, api_message = test_api_connection() if api_ok: - print(" OK - Connected successfully") - print(f" Phone: {api_details['phone']}") - print(f" Name: {api_details['name']}") - print(f" Status: {api_details['status']}") - print(f" Quality: {api_details['quality']}") + print(f" OK - {api_message}") else: - if "reason" in api_details: - print(f" FAIL - {api_details['reason']}") - else: - print(" FAIL - API request failed.") - print(f" HTTP Status: {api_details['status_code']}") - print(f" Error Code: {api_details['error_code']}") + print(f" FAIL - {api_message}") all_ok = False print() # Check 3: WABA access print("[3/3] Testing WABA access...") - waba_ok, waba_details = test_waba_access() + waba_ok, waba_message = test_waba_access() if waba_ok: - print(" OK - WABA accessible") - print(f" Phone Numbers Found: {waba_details['count']}") + print(f" OK - {waba_message}") else: - if "reason" in waba_details: - print(f" FAIL - {waba_details['reason']}") - else: - print(" FAIL - API request failed.") - print(f" HTTP Status: {waba_details['status_code']}") - print(f" Error Code: {waba_details['error_code']}") + print(f" FAIL - {waba_message}") all_ok = False print() diff --git a/tools/scripts/tests/run-test-suite.js b/tools/scripts/tests/run-test-suite.js index 50ffa433..44668c75 100644 --- a/tools/scripts/tests/run-test-suite.js +++ b/tools/scripts/tests/run-test-suite.js @@ -40,6 +40,7 @@ const LOCAL_TEST_COMMANDS = [ [path.join(TOOL_SCRIPTS, "run-python.js"), path.join(TOOL_TESTS, "test_sync_repo_metadata.py")], [path.join(TOOL_SCRIPTS, "run-python.js"), path.join(TOOL_TESTS, "test_sync_contributors.py")], [path.join(TOOL_SCRIPTS, "run-python.js"), path.join(TOOL_TESTS, "test_validation_warning_budget.py")], + [path.join(TOOL_SCRIPTS, "run-python.js"), path.join(TOOL_TESTS, "test_whatsapp_config_logging_security.py")], [path.join(TOOL_SCRIPTS, "run-python.js"), path.join(TOOL_TESTS, "test_maintainer_audit.py")], [path.join(TOOL_SCRIPTS, "run-python.js"), path.join(TOOL_TESTS, "test_validate_skills_headings.py")], ]; diff --git a/tools/scripts/tests/test_whatsapp_config_logging_security.py b/tools/scripts/tests/test_whatsapp_config_logging_security.py new file mode 100644 index 00000000..1231f025 --- /dev/null +++ b/tools/scripts/tests/test_whatsapp_config_logging_security.py @@ -0,0 +1,145 @@ +import contextlib +import importlib.util +import io +import os +import sys +import types +import unittest +from pathlib import Path +from unittest.mock import patch + + +REPO_ROOT = Path(__file__).resolve().parents[3] +TOOLS_SCRIPTS_DIR = REPO_ROOT / "tools" / "scripts" +if str(TOOLS_SCRIPTS_DIR) not in sys.path: + sys.path.insert(0, str(TOOLS_SCRIPTS_DIR)) + + +def load_module(relative_path: str, module_name: str): + module_path = REPO_ROOT / relative_path + spec = importlib.util.spec_from_file_location(module_name, module_path) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + fake_httpx = types.ModuleType("httpx") + fake_httpx.ConnectError = type("ConnectError", (Exception,), {}) + fake_httpx.TimeoutException = type("TimeoutException", (Exception,), {}) + fake_httpx.Response = FakeResponse + fake_httpx.get = lambda *args, **kwargs: None + + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + + with patch.dict(sys.modules, {"httpx": fake_httpx, "dotenv": fake_dotenv}): + spec.loader.exec_module(module) + return module + + +class FakeResponse: + def __init__(self, status_code: int, payload: dict): + self.status_code = status_code + self._payload = payload + + def json(self): + return self._payload + + +class WhatsAppConfigLoggingSecurityTests(unittest.TestCase): + MODULE_PATHS = [ + ("skills/whatsapp-cloud-api/scripts/validate_config.py", "whatsapp_validate_root"), + ( + "plugins/antigravity-awesome-skills/skills/whatsapp-cloud-api/scripts/validate_config.py", + "whatsapp_validate_codex_plugin", + ), + ( + "plugins/antigravity-awesome-skills-claude/skills/whatsapp-cloud-api/scripts/validate_config.py", + "whatsapp_validate_claude_plugin", + ), + ] + + REQUIRED_ENV = { + "WHATSAPP_TOKEN": "token-secret-123", + "PHONE_NUMBER_ID": "phone-id-456", + "WABA_ID": "waba-id-789", + "APP_SECRET": "app-secret-000", + "VERIFY_TOKEN": "verify-token-999", + } + + def _run_main(self, module, responses): + stdout = io.StringIO() + with patch.dict(os.environ, self.REQUIRED_ENV, clear=False): + with patch.object(module.httpx, "get", side_effect=responses): + with patch.object(module.os.path, "exists", return_value=False): + with patch.object(sys, "argv", ["validate_config.py"]): + with contextlib.redirect_stdout(stdout): + with self.assertRaises(SystemExit) as exit_context: + module.main() + return exit_context.exception.code, stdout.getvalue() + + def test_success_output_omits_sensitive_api_values(self): + for relative_path, module_name in self.MODULE_PATHS: + with self.subTest(relative_path=relative_path): + module = load_module(relative_path, module_name) + exit_code, output = self._run_main( + module, + [ + FakeResponse( + 200, + { + "display_phone_number": "+39 333 123 4567", + "verified_name": "Top Secret Brand", + "code_verification_status": "VERIFIED", + "quality_rating": "GREEN", + }, + ), + FakeResponse(200, {"data": [{"id": "123"}, {"id": "456"}]}), + ], + ) + + self.assertEqual(exit_code, 0) + self.assertIn("Detailed API payloads are intentionally omitted", output) + self.assertIn("OK - Phone-number endpoint reachable.", output) + self.assertIn("OK - WABA phone-numbers endpoint reachable.", output) + self.assertNotIn("+39 333 123 4567", output) + self.assertNotIn("Top Secret Brand", output) + self.assertNotIn("VERIFIED", output) + self.assertNotIn("GREEN", output) + + def test_failure_output_omits_error_payload_details(self): + for relative_path, module_name in self.MODULE_PATHS: + with self.subTest(relative_path=relative_path): + module = load_module(relative_path, module_name) + exit_code, output = self._run_main( + module, + [ + FakeResponse( + 401, + { + "error": { + "message": "Invalid OAuth access token.", + "code": 190, + } + }, + ), + FakeResponse( + 403, + { + "error": { + "message": "User does not have access to this WABA.", + "code": 10, + } + }, + ), + ], + ) + + self.assertEqual(exit_code, 1) + self.assertIn("FAIL - Graph API rejected the phone-number lookup.", output) + self.assertIn("FAIL - Graph API rejected the WABA lookup.", output) + self.assertNotIn("Invalid OAuth access token.", output) + self.assertNotIn("User does not have access to this WABA.", output) + self.assertNotIn("190", output) + self.assertNotIn("10", output) + + +if __name__ == "__main__": + unittest.main()