| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- """Combined authentication and in-memory session state.
- This module merges `session_state.py` and `login_post.py` into a single
- convenient file. It exposes the session-state helpers:
- - set_sess_key(key: str) -> None
- - get_sess_key() -> Optional[str]
- - clear_sess_key() -> None
- and the `login` function which performs the POST request and stores the
- `sess_key` (if found) into the in-memory session state.
- It also provides a `main()` function intended for CLI use.
- """
- from __future__ import annotations
- import json
- import re
- import sys
- from pathlib import Path
- import os
- from threading import Lock
- from typing import Optional, Tuple
- import requests
- from log_util import get_logger, log_request
- def get_app_dir() -> Path:
- """Return the directory where runtime files should be read/written.
- Logic:
- - If running as a PyInstaller one-file bundle, prefer the directory of the
- executable (Path(sys.executable).parent).
- - Otherwise prefer the current working directory (Path.cwd()).
- - This makes writes (logs, config, protector_list) appear next to the exe
- when distributed.
- """
- try:
- if getattr(sys, "frozen", False):
- return Path(sys.executable).resolve().parent
- except Exception:
- pass
- try:
- return Path.cwd()
- except Exception:
- return Path(__file__).resolve().parent
- ROOT = Path(__file__).resolve().parent
- CONFIG_PATH = get_app_dir() / "config.json"
- DEFAULT_PAYLOAD = {
- "username": "xiaobai",
- "passwd": "dc81b4427df07fd6b3ebcb05a7b34daf",
- "pass": "c2FsdF8xMXhpYW9iYWku",
- "remember_password": "",
- }
- # --- simple thread-safe in-memory session state ---
- _lock = Lock()
- _sess_key: Optional[str] = None
- def set_sess_key(key: str) -> None:
- """Set global sess_key (thread-safe)."""
- global _sess_key
- with _lock:
- _sess_key = key
- def get_sess_key() -> Optional[str]:
- """Get current sess_key, or None if not set (thread-safe)."""
- with _lock:
- return _sess_key
- def clear_sess_key() -> None:
- """Clear current sess_key (thread-safe)."""
- global _sess_key
- with _lock:
- _sess_key = None
- # --- login/request helpers ---
- def load_config() -> dict:
- """Load configuration.
- If the runtime config (CONFIG_PATH) does not exist, attempt to create it by
- copying the project's default `config.json` (ROOT / 'config.json') or using
- reasonable defaults. The created file will be written next to the exe or in
- the current working directory so that packaged exe creates files in its
- directory.
- """
- # If config exists in runtime location, load it.
- if CONFIG_PATH.exists():
- with open(CONFIG_PATH, "r", encoding="utf-8") as f:
- return json.load(f)
- # Otherwise try to obtain a project default from source tree (useful during
- # development and when shipping defaults inside the package).
- project_default = ROOT / "config.json"
- default_cfg = {
- "base_url": "http://ip:port/",
- "conn_threshold": 20,
- "scan_interval": 60,
- "test_prefix": "Test_",
- "rule_ip_limit": 1000,
- }
- if project_default.exists():
- try:
- with open(project_default, "r", encoding="utf-8") as f:
- default_cfg = json.load(f)
- except Exception:
- # ignore and fall back to embedded defaults
- pass
- # Ensure runtime directory exists and write config atomically
- try:
- CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
- tmp = CONFIG_PATH.with_suffix(".tmp")
- with open(tmp, "w", encoding="utf-8") as f:
- json.dump(default_cfg, f, ensure_ascii=False, indent=2)
- try:
- tmp.replace(CONFIG_PATH)
- except Exception:
- # fallback rename
- tmp.rename(CONFIG_PATH)
- except Exception as e:
- # If we cannot write, surface a FileNotFoundError to preserve original
- # callers' behavior.
- raise FileNotFoundError(f"无法创建配置文件 {CONFIG_PATH}: {e}")
- return default_cfg
- def get_base_url() -> str:
- """Return base_url from config (ensures no trailing slash)."""
- cfg = load_config()
- return cfg.get("base_url", "").rstrip("/")
- def get_host() -> str:
- """Return host:port (netloc) parsed from base_url.
- """
- from urllib.parse import urlparse
- base = get_base_url()
- if not base:
- return ""
- parsed = urlparse(base)
- return parsed.netloc
- def _extract_sess_cookie_from_response(resp: requests.Response) -> Optional[str]:
- """Try to extract sess_key cookie string from response.
- Returns a cookie string like 'sess_key=...;' if found, else None.
- """
- try:
- sess_val = resp.cookies.get("sess_key")
- if sess_val:
- return f"sess_key={sess_val};"
- set_cookie = resp.headers.get("Set-Cookie", "")
- m = re.search(r"(sess_key=[^;]+;?)", set_cookie)
- if m:
- return m.group(1)
- except Exception:
- return None
- return None
- def login(payload: Optional[dict] = None, timeout: int = 10) -> Tuple[requests.Response, Optional[str]]:
- """Send login POST. Returns (response, sess_cookie_or_none).
- The function will also set the in-memory sess_key if it can be extracted.
- """
- cfg = load_config()
- base = cfg.get("base_url", "").rstrip("/")
- # Validate base URL to avoid passing placeholders like "http://ip:port/" to requests
- from urllib.parse import urlparse
- parsed = urlparse(base)
- # If base_url is a placeholder (intentionally invalid), do not raise an
- # exception here. Return a lightweight dummy response so the app can
- # continue (GUI can prompt the user to fix config.json on first run).
- if not base or not parsed.scheme or not parsed.netloc or "ip:port" in base:
- logger = get_logger("login_post")
- logger.warning(
- "base_url appears to be a placeholder; skipping network login. GUI can be used to set a real base_url."
- )
- class _DummyResp:
- def __init__(self):
- self.status_code = 0
- self.text = "base_url placeholder - no network request performed"
- def json(self):
- raise ValueError("No JSON available")
- return _DummyResp(), None
- url = f"{base}/Action/login"
- data = payload or DEFAULT_PAYLOAD
- logger = get_logger("login_post")
- logger.debug(f"准备发送请求,URL: {url}")
- resp = requests.post(url, json=data, timeout=timeout)
- sess_cookie = _extract_sess_cookie_from_response(resp)
- if sess_cookie:
- # Write sess_key into in-memory session state (strip trailing semicolon)
- cookie_val = sess_cookie.split("=", 1)[1].rstrip(";")
- try:
- set_sess_key(cookie_val)
- logger.info("sess_key 已保存到内存状态")
- except Exception:
- logger.exception("保存 sess_key 失败")
- # Log request/response but do not fail on logging errors
- try:
- log_request(logger, "login", url, data, resp)
- except Exception:
- logger.exception("记录请求/响应失败")
- return resp, sess_cookie
- def main() -> None:
- logger = get_logger("main")
- try:
- resp, sess_cookie = login()
- except FileNotFoundError as e:
- logger.error(f"配置错误: {e}")
- sys.exit(2)
- except requests.RequestException as e:
- logger.error(f"请求失败: {e}")
- sys.exit(1)
- logger.info(f"状态: {resp.status_code}")
- content_type = resp.headers.get("Content-Type", "")
- if "application/json" in content_type:
- try:
- pretty = json.dumps(resp.json(), ensure_ascii=False, indent=2)
- logger.info(f"响应 JSON:\n{pretty}")
- print(pretty)
- except ValueError:
- logger.info(f"响应文本: {resp.text}")
- print(resp.text)
- else:
- logger.info(f"响应文本: {resp.text}")
- print(resp.text)
- if sess_cookie:
- logger.info(f"提取到 sess_key: {sess_cookie}")
- print(f"sess_key: {sess_cookie}")
- if __name__ == "__main__":
- main()
|