"""Combined authentication and in-memory session state. This module merges `session_state.py` and `login_post.py` into a single convenient file. It exposes the session-state helpers: - set_sess_key(key: str) -> None - get_sess_key() -> Optional[str] - clear_sess_key() -> None and the `login` function which performs the POST request and stores the `sess_key` (if found) into the in-memory session state. It also provides a `main()` function intended for CLI use. """ from __future__ import annotations import json import re import sys from pathlib import Path import os from threading import Lock from typing import Optional, Tuple import requests from log_util import get_logger, log_request def get_app_dir() -> Path: """Return the directory where runtime files should be read/written. Logic: - If running as a PyInstaller one-file bundle, prefer the directory of the executable (Path(sys.executable).parent). - Otherwise prefer the current working directory (Path.cwd()). - This makes writes (logs, config, protector_list) appear next to the exe when distributed. """ try: if getattr(sys, "frozen", False): # When running as a PyInstaller bundle we want a stable location # for runtime-writable files. For onefile mode PyInstaller # extracts into a temporary folder and sets `sys.executable` to # the extracted binary path. However `sys.argv[0]` normally # contains the original path to the launched EXE (the file in # the dist folder). Prefer using sys.argv[0] when available so # writes go next to the original executable (e.g. dist\main.exe) # rather than into the ephemeral temp extraction directory. try: return Path(sys.argv[0]).resolve().parent except Exception: return Path(sys.executable).resolve().parent except Exception: pass try: return Path.cwd() except Exception: return Path(__file__).resolve().parent ROOT = Path(__file__).resolve().parent CONFIG_PATH = get_app_dir() / "config.json" DEFAULT_PAYLOAD = { "username": "xiaobai", "passwd": "dc81b4427df07fd6b3ebcb05a7b34daf", "pass": "c2FsdF8xMXhpYW9iYWku", "remember_password": "", } # --- simple thread-safe in-memory session state --- _lock = Lock() _sess_key: Optional[str] = None def set_sess_key(key: str) -> None: """Set global sess_key (thread-safe).""" global _sess_key with _lock: _sess_key = key def get_sess_key() -> Optional[str]: """Get current sess_key, or None if not set (thread-safe).""" with _lock: return _sess_key def clear_sess_key() -> None: """Clear current sess_key (thread-safe).""" global _sess_key with _lock: _sess_key = None # --- login/request helpers --- def load_config() -> dict: """Load configuration. If the runtime config (CONFIG_PATH) does not exist, attempt to create it by copying the project's default `config.json` (ROOT / 'config.json') or using reasonable defaults. The created file will be written next to the exe or in the current working directory so that packaged exe creates files in its directory. """ # If config exists in runtime location, load it. if CONFIG_PATH.exists(): with open(CONFIG_PATH, "r", encoding="utf-8") as f: return json.load(f) # Otherwise try to obtain a project default from source tree (useful during # development and when shipping defaults inside the package). project_default = ROOT / "config.json" default_cfg = { "base_url": "http://ip:port/", "conn_threshold": 20, "scan_interval": 60, "test_prefix": "Test_", "rule_ip_limit": 1000, } if project_default.exists(): try: with open(project_default, "r", encoding="utf-8") as f: default_cfg = json.load(f) except Exception: # ignore and fall back to embedded defaults pass # Ensure runtime directory exists and write config atomically try: CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) tmp = CONFIG_PATH.with_suffix(".tmp") with open(tmp, "w", encoding="utf-8") as f: json.dump(default_cfg, f, ensure_ascii=False, indent=2) try: tmp.replace(CONFIG_PATH) except Exception: # fallback rename tmp.rename(CONFIG_PATH) except Exception as e: # If we cannot write, surface a FileNotFoundError to preserve original # callers' behavior. raise FileNotFoundError(f"无法创建配置文件 {CONFIG_PATH}: {e}") return default_cfg def get_base_url() -> str: """Return base_url from config (ensures no trailing slash).""" cfg = load_config() return cfg.get("base_url", "").rstrip("/") def get_host() -> str: """Return host:port (netloc) parsed from base_url. """ from urllib.parse import urlparse base = get_base_url() if not base: return "" parsed = urlparse(base) return parsed.netloc def _extract_sess_cookie_from_response(resp: requests.Response) -> Optional[str]: """Try to extract sess_key cookie string from response. Returns a cookie string like 'sess_key=...;' if found, else None. """ try: sess_val = resp.cookies.get("sess_key") if sess_val: return f"sess_key={sess_val};" set_cookie = resp.headers.get("Set-Cookie", "") m = re.search(r"(sess_key=[^;]+;?)", set_cookie) if m: return m.group(1) except Exception: return None return None def login(payload: Optional[dict] = None, timeout: int = 10) -> Tuple[requests.Response, Optional[str]]: """Send login POST. Returns (response, sess_cookie_or_none). The function will also set the in-memory sess_key if it can be extracted. """ cfg = load_config() base = cfg.get("base_url", "").rstrip("/") # Validate base URL to avoid passing placeholders like "http://ip:port/" to requests from urllib.parse import urlparse parsed = urlparse(base) # If base_url is a placeholder (intentionally invalid), do not raise an # exception here. Return a lightweight dummy response so the app can # continue (GUI can prompt the user to fix config.json on first run). if not base or not parsed.scheme or not parsed.netloc or "ip:port" in base: logger = get_logger("login_post") logger.warning( "base_url appears to be a placeholder; skipping network login. GUI can be used to set a real base_url." ) class _DummyResp: def __init__(self): self.status_code = 0 self.text = "base_url placeholder - no network request performed" def json(self): raise ValueError("No JSON available") return _DummyResp(), None url = f"{base}/Action/login" data = payload or DEFAULT_PAYLOAD logger = get_logger("login_post") logger.debug(f"准备发送请求,URL: {url}") resp = requests.post(url, json=data, timeout=timeout) sess_cookie = _extract_sess_cookie_from_response(resp) if sess_cookie: # Write sess_key into in-memory session state (strip trailing semicolon) cookie_val = sess_cookie.split("=", 1)[1].rstrip(";") try: set_sess_key(cookie_val) logger.info("sess_key 已保存到内存状态") except Exception: logger.exception("保存 sess_key 失败") # Log request/response but do not fail on logging errors try: log_request(logger, "login", url, data, resp) except Exception: logger.exception("记录请求/响应失败") return resp, sess_cookie def main() -> None: logger = get_logger("main") try: resp, sess_cookie = login() except FileNotFoundError as e: logger.error(f"配置错误: {e}") sys.exit(2) except requests.RequestException as e: logger.error(f"请求失败: {e}") sys.exit(1) logger.info(f"状态: {resp.status_code}") content_type = resp.headers.get("Content-Type", "") if "application/json" in content_type: try: pretty = json.dumps(resp.json(), ensure_ascii=False, indent=2) logger.info(f"响应 JSON:\n{pretty}") print(pretty) except ValueError: logger.info(f"响应文本: {resp.text}") print(resp.text) else: logger.info(f"响应文本: {resp.text}") print(resp.text) if sess_cookie: logger.info(f"提取到 sess_key: {sess_cookie}") print(f"sess_key: {sess_cookie}") if __name__ == "__main__": main()