fix(ci): Sync canonical security artifacts
This commit is contained in:
@@ -34,6 +34,17 @@ ENDPOINTS = [
|
||||
]
|
||||
|
||||
|
||||
def create_tls_context():
|
||||
"""Cria contexto TLS restringindo conexoes a TLS 1.2+."""
|
||||
context = ssl.create_default_context()
|
||||
if hasattr(ssl, "TLSVersion"):
|
||||
context.minimum_version = ssl.TLSVersion.TLSv1_2
|
||||
else:
|
||||
context.options |= getattr(ssl, "OP_NO_TLSv1", 0)
|
||||
context.options |= getattr(ssl, "OP_NO_TLSv1_1", 0)
|
||||
return context
|
||||
|
||||
|
||||
def test_tcp_latency(host, port, timeout=5):
|
||||
"""Testa latência TCP para um host:port."""
|
||||
try:
|
||||
@@ -49,7 +60,7 @@ def test_tcp_latency(host, port, timeout=5):
|
||||
def test_tls_handshake(host, port=443, timeout=5):
|
||||
"""Testa tempo do handshake TLS."""
|
||||
try:
|
||||
context = ssl.create_default_context()
|
||||
context = create_tls_context()
|
||||
start = time.time()
|
||||
with socket.create_connection((host, port), timeout=timeout) as sock:
|
||||
with context.wrap_socket(sock, server_hostname=host) as ssock:
|
||||
|
||||
@@ -28,11 +28,21 @@ except ImportError:
|
||||
GRAPH_API = "https://graph.facebook.com/v21.0"
|
||||
|
||||
|
||||
def _mask_secret(value: str) -> str:
|
||||
"""Return a masked version of a secret for safe logging."""
|
||||
if not value or len(value) < 8:
|
||||
return "***masked***"
|
||||
return f"{value[:6]}...masked"
|
||||
def _redact_json(value):
|
||||
"""Recursively redact common secret-bearing keys before logging JSON."""
|
||||
sensitive_keys = {"authorization", "token", "access_token", "app_secret", "secret"}
|
||||
|
||||
if isinstance(value, dict):
|
||||
redacted = {}
|
||||
for key, item in value.items():
|
||||
if key.lower() in sensitive_keys:
|
||||
redacted[key] = "***redacted***"
|
||||
else:
|
||||
redacted[key] = _redact_json(item)
|
||||
return redacted
|
||||
if isinstance(value, list):
|
||||
return [_redact_json(item) for item in value]
|
||||
return value
|
||||
|
||||
|
||||
def send_test(to: str, message: str) -> None:
|
||||
@@ -84,11 +94,7 @@ def send_test(to: str, message: str) -> None:
|
||||
|
||||
print()
|
||||
print("Full response:")
|
||||
# Mask token in response output to prevent credential leakage
|
||||
response_str = json.dumps(data, indent=2)
|
||||
if token and token in response_str:
|
||||
response_str = response_str.replace(token, _mask_secret(token))
|
||||
print(response_str)
|
||||
print(json.dumps(_redact_json(data), indent=2))
|
||||
|
||||
except httpx.ConnectError:
|
||||
print("Error: Connection failed. Check your internet connection.")
|
||||
@@ -96,10 +102,8 @@ def send_test(to: str, message: str) -> None:
|
||||
except httpx.TimeoutException:
|
||||
print("Error: Request timed out.")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
# Mask token in error output to prevent credential leakage
|
||||
safe_err = str(e).replace(token, _mask_secret(token)) if token else str(e)
|
||||
print(f"Error: {safe_err}")
|
||||
except Exception as exc:
|
||||
print(f"Error: unexpected {exc.__class__.__name__} while sending the test message.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
|
||||
@@ -47,11 +47,14 @@ def check_env_vars() -> tuple[bool, list[str]]:
|
||||
return len(missing) == 0, missing
|
||||
|
||||
|
||||
def _mask_secret(value: str) -> str:
|
||||
"""Return a masked version of a secret for safe logging."""
|
||||
if not value or len(value) < 8:
|
||||
return "***masked***"
|
||||
return f"{value[:6]}...masked"
|
||||
def _format_api_failure(response: httpx.Response) -> str:
|
||||
"""Return a sanitized API failure message without echoing sensitive payloads."""
|
||||
try:
|
||||
error = response.json().get("error", {})
|
||||
except ValueError:
|
||||
error = {}
|
||||
error_code = error.get("code", "?")
|
||||
return f"API request failed (status {response.status_code}, code {error_code})."
|
||||
|
||||
|
||||
def test_api_connection() -> tuple[bool, str]:
|
||||
@@ -76,17 +79,14 @@ def test_api_connection() -> tuple[bool, str]:
|
||||
f" Quality: {data.get('quality_rating', 'N/A')}"
|
||||
)
|
||||
else:
|
||||
error = response.json().get("error", {})
|
||||
return False, f"API Error {error.get('code', '?')}: {error.get('message', 'Unknown')}"
|
||||
return False, _format_api_failure(response)
|
||||
|
||||
except httpx.ConnectError:
|
||||
return False, "Connection failed. Check your internet connection."
|
||||
except httpx.TimeoutException:
|
||||
return False, "Request timed out after 10 seconds."
|
||||
except Exception as e:
|
||||
# Mask token in error output to prevent credential leakage
|
||||
safe_err = str(e).replace(token, _mask_secret(token)) if token else str(e)
|
||||
return False, f"Unexpected error: {safe_err}"
|
||||
except Exception as exc:
|
||||
return False, f"Unexpected {exc.__class__.__name__} while contacting the Graph API."
|
||||
|
||||
|
||||
def test_waba_access() -> tuple[bool, str]:
|
||||
@@ -106,13 +106,10 @@ def test_waba_access() -> tuple[bool, str]:
|
||||
count = len(data.get("data", []))
|
||||
return True, f"WABA accessible. {count} phone number(s) found."
|
||||
else:
|
||||
error = response.json().get("error", {})
|
||||
return False, f"API Error {error.get('code', '?')}: {error.get('message', 'Unknown')}"
|
||||
return False, _format_api_failure(response)
|
||||
|
||||
except Exception as e:
|
||||
# Mask token in error output to prevent credential leakage
|
||||
safe_err = str(e).replace(token, _mask_secret(token)) if token else str(e)
|
||||
return False, f"Error: {safe_err}"
|
||||
except Exception as exc:
|
||||
return False, f"Unexpected {exc.__class__.__name__} while checking WABA access."
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
Reference in New Issue
Block a user