auth_session.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. """Combined authentication and in-memory session state.
  2. This module merges `session_state.py` and `login_post.py` into a single
  3. convenient file. It exposes the session-state helpers:
  4. - set_sess_key(key: str) -> None
  5. - get_sess_key() -> Optional[str]
  6. - clear_sess_key() -> None
  7. and the `login` function which performs the POST request and stores the
  8. `sess_key` (if found) into the in-memory session state.
  9. It also provides a `main()` function intended for CLI use.
  10. """
  11. from __future__ import annotations
  12. import json
  13. import re
  14. import sys
  15. from pathlib import Path
  16. import os
  17. from threading import Lock
  18. from typing import Optional, Tuple
  19. import requests
  20. from log_util import get_logger, log_request
  21. def get_app_dir() -> Path:
  22. """Return the directory where runtime files should be read/written.
  23. Logic:
  24. - If running as a PyInstaller one-file bundle, prefer the directory of the
  25. executable (Path(sys.executable).parent).
  26. - Otherwise prefer the current working directory (Path.cwd()).
  27. - This makes writes (logs, config, protector_list) appear next to the exe
  28. when distributed.
  29. """
  30. try:
  31. if getattr(sys, "frozen", False):
  32. return Path(sys.executable).resolve().parent
  33. except Exception:
  34. pass
  35. try:
  36. return Path.cwd()
  37. except Exception:
  38. return Path(__file__).resolve().parent
  39. ROOT = Path(__file__).resolve().parent
  40. CONFIG_PATH = get_app_dir() / "config.json"
  41. DEFAULT_PAYLOAD = {
  42. "username": "xiaobai",
  43. "passwd": "dc81b4427df07fd6b3ebcb05a7b34daf",
  44. "pass": "c2FsdF8xMXhpYW9iYWku",
  45. "remember_password": "",
  46. }
  47. # --- simple thread-safe in-memory session state ---
  48. _lock = Lock()
  49. _sess_key: Optional[str] = None
  50. def set_sess_key(key: str) -> None:
  51. """Set global sess_key (thread-safe)."""
  52. global _sess_key
  53. with _lock:
  54. _sess_key = key
  55. def get_sess_key() -> Optional[str]:
  56. """Get current sess_key, or None if not set (thread-safe)."""
  57. with _lock:
  58. return _sess_key
  59. def clear_sess_key() -> None:
  60. """Clear current sess_key (thread-safe)."""
  61. global _sess_key
  62. with _lock:
  63. _sess_key = None
  64. # --- login/request helpers ---
  65. def load_config() -> dict:
  66. """Load configuration.
  67. If the runtime config (CONFIG_PATH) does not exist, attempt to create it by
  68. copying the project's default `config.json` (ROOT / 'config.json') or using
  69. reasonable defaults. The created file will be written next to the exe or in
  70. the current working directory so that packaged exe creates files in its
  71. directory.
  72. """
  73. # If config exists in runtime location, load it.
  74. if CONFIG_PATH.exists():
  75. with open(CONFIG_PATH, "r", encoding="utf-8") as f:
  76. return json.load(f)
  77. # Otherwise try to obtain a project default from source tree (useful during
  78. # development and when shipping defaults inside the package).
  79. project_default = ROOT / "config.json"
  80. default_cfg = {
  81. "base_url": "http://ip:port/",
  82. "conn_threshold": 20,
  83. "scan_interval": 60,
  84. "test_prefix": "Test_",
  85. "rule_ip_limit": 1000,
  86. }
  87. if project_default.exists():
  88. try:
  89. with open(project_default, "r", encoding="utf-8") as f:
  90. default_cfg = json.load(f)
  91. except Exception:
  92. # ignore and fall back to embedded defaults
  93. pass
  94. # Ensure runtime directory exists and write config atomically
  95. try:
  96. CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
  97. tmp = CONFIG_PATH.with_suffix(".tmp")
  98. with open(tmp, "w", encoding="utf-8") as f:
  99. json.dump(default_cfg, f, ensure_ascii=False, indent=2)
  100. try:
  101. tmp.replace(CONFIG_PATH)
  102. except Exception:
  103. # fallback rename
  104. tmp.rename(CONFIG_PATH)
  105. except Exception as e:
  106. # If we cannot write, surface a FileNotFoundError to preserve original
  107. # callers' behavior.
  108. raise FileNotFoundError(f"无法创建配置文件 {CONFIG_PATH}: {e}")
  109. return default_cfg
  110. def get_base_url() -> str:
  111. """Return base_url from config (ensures no trailing slash)."""
  112. cfg = load_config()
  113. return cfg.get("base_url", "").rstrip("/")
  114. def get_host() -> str:
  115. """Return host:port (netloc) parsed from base_url.
  116. """
  117. from urllib.parse import urlparse
  118. base = get_base_url()
  119. if not base:
  120. return ""
  121. parsed = urlparse(base)
  122. return parsed.netloc
  123. def _extract_sess_cookie_from_response(resp: requests.Response) -> Optional[str]:
  124. """Try to extract sess_key cookie string from response.
  125. Returns a cookie string like 'sess_key=...;' if found, else None.
  126. """
  127. try:
  128. sess_val = resp.cookies.get("sess_key")
  129. if sess_val:
  130. return f"sess_key={sess_val};"
  131. set_cookie = resp.headers.get("Set-Cookie", "")
  132. m = re.search(r"(sess_key=[^;]+;?)", set_cookie)
  133. if m:
  134. return m.group(1)
  135. except Exception:
  136. return None
  137. return None
  138. def login(payload: Optional[dict] = None, timeout: int = 10) -> Tuple[requests.Response, Optional[str]]:
  139. """Send login POST. Returns (response, sess_cookie_or_none).
  140. The function will also set the in-memory sess_key if it can be extracted.
  141. """
  142. cfg = load_config()
  143. base = cfg.get("base_url", "").rstrip("/")
  144. # Validate base URL to avoid passing placeholders like "http://ip:port/" to requests
  145. from urllib.parse import urlparse
  146. parsed = urlparse(base)
  147. # If base_url is a placeholder (intentionally invalid), do not raise an
  148. # exception here. Return a lightweight dummy response so the app can
  149. # continue (GUI can prompt the user to fix config.json on first run).
  150. if not base or not parsed.scheme or not parsed.netloc or "ip:port" in base:
  151. logger = get_logger("login_post")
  152. logger.warning(
  153. "base_url appears to be a placeholder; skipping network login. GUI can be used to set a real base_url."
  154. )
  155. class _DummyResp:
  156. def __init__(self):
  157. self.status_code = 0
  158. self.text = "base_url placeholder - no network request performed"
  159. def json(self):
  160. raise ValueError("No JSON available")
  161. return _DummyResp(), None
  162. url = f"{base}/Action/login"
  163. data = payload or DEFAULT_PAYLOAD
  164. logger = get_logger("login_post")
  165. logger.debug(f"准备发送请求,URL: {url}")
  166. resp = requests.post(url, json=data, timeout=timeout)
  167. sess_cookie = _extract_sess_cookie_from_response(resp)
  168. if sess_cookie:
  169. # Write sess_key into in-memory session state (strip trailing semicolon)
  170. cookie_val = sess_cookie.split("=", 1)[1].rstrip(";")
  171. try:
  172. set_sess_key(cookie_val)
  173. logger.info("sess_key 已保存到内存状态")
  174. except Exception:
  175. logger.exception("保存 sess_key 失败")
  176. # Log request/response but do not fail on logging errors
  177. try:
  178. log_request(logger, "login", url, data, resp)
  179. except Exception:
  180. logger.exception("记录请求/响应失败")
  181. return resp, sess_cookie
  182. def main() -> None:
  183. logger = get_logger("main")
  184. try:
  185. resp, sess_cookie = login()
  186. except FileNotFoundError as e:
  187. logger.error(f"配置错误: {e}")
  188. sys.exit(2)
  189. except requests.RequestException as e:
  190. logger.error(f"请求失败: {e}")
  191. sys.exit(1)
  192. logger.info(f"状态: {resp.status_code}")
  193. content_type = resp.headers.get("Content-Type", "")
  194. if "application/json" in content_type:
  195. try:
  196. pretty = json.dumps(resp.json(), ensure_ascii=False, indent=2)
  197. logger.info(f"响应 JSON:\n{pretty}")
  198. print(pretty)
  199. except ValueError:
  200. logger.info(f"响应文本: {resp.text}")
  201. print(resp.text)
  202. else:
  203. logger.info(f"响应文本: {resp.text}")
  204. print(resp.text)
  205. if sess_cookie:
  206. logger.info(f"提取到 sess_key: {sess_cookie}")
  207. print(f"sess_key: {sess_cookie}")
  208. if __name__ == "__main__":
  209. main()