auth_session.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. """Combined authentication and in-memory session state.
  2. This module merges `session_state.py` and `login_post.py` into a single
  3. convenient file. It exposes the session-state helpers:
  4. - set_sess_key(key: str) -> None
  5. - get_sess_key() -> Optional[str]
  6. - clear_sess_key() -> None
  7. and the `login` function which performs the POST request and stores the
  8. `sess_key` (if found) into the in-memory session state.
  9. It also provides a `main()` function intended for CLI use.
  10. """
  11. from __future__ import annotations
  12. import json
  13. import re
  14. import sys
  15. from pathlib import Path
  16. import os
  17. from threading import Lock
  18. from typing import Optional, Tuple
  19. import requests
  20. from log_util import get_logger, log_request
  21. def get_app_dir() -> Path:
  22. """Return the directory where runtime files should be read/written.
  23. Logic:
  24. - If running as a PyInstaller one-file bundle, prefer the directory of the
  25. executable (Path(sys.executable).parent).
  26. - Otherwise prefer the current working directory (Path.cwd()).
  27. - This makes writes (logs, config, protector_list) appear next to the exe
  28. when distributed.
  29. """
  30. try:
  31. if getattr(sys, "frozen", False):
  32. # When running as a PyInstaller bundle we want a stable location
  33. # for runtime-writable files. For onefile mode PyInstaller
  34. # extracts into a temporary folder and sets `sys.executable` to
  35. # the extracted binary path. However `sys.argv[0]` normally
  36. # contains the original path to the launched EXE (the file in
  37. # the dist folder). Prefer using sys.argv[0] when available so
  38. # writes go next to the original executable (e.g. dist\main.exe)
  39. # rather than into the ephemeral temp extraction directory.
  40. try:
  41. return Path(sys.argv[0]).resolve().parent
  42. except Exception:
  43. return Path(sys.executable).resolve().parent
  44. except Exception:
  45. pass
  46. try:
  47. return Path.cwd()
  48. except Exception:
  49. return Path(__file__).resolve().parent
  50. ROOT = Path(__file__).resolve().parent
  51. CONFIG_PATH = get_app_dir() / "config.json"
  52. DEFAULT_PAYLOAD = {
  53. "username": "xiaobai",
  54. "passwd": "dc81b4427df07fd6b3ebcb05a7b34daf",
  55. "pass": "c2FsdF8xMXhpYW9iYWku",
  56. "remember_password": "",
  57. }
  58. # --- simple thread-safe in-memory session state ---
  59. _lock = Lock()
  60. _sess_key: Optional[str] = None
  61. def set_sess_key(key: str) -> None:
  62. """Set global sess_key (thread-safe)."""
  63. global _sess_key
  64. with _lock:
  65. _sess_key = key
  66. def get_sess_key() -> Optional[str]:
  67. """Get current sess_key, or None if not set (thread-safe)."""
  68. with _lock:
  69. return _sess_key
  70. def clear_sess_key() -> None:
  71. """Clear current sess_key (thread-safe)."""
  72. global _sess_key
  73. with _lock:
  74. _sess_key = None
  75. # --- login/request helpers ---
  76. def load_config() -> dict:
  77. """Load configuration.
  78. If the runtime config (CONFIG_PATH) does not exist, attempt to create it by
  79. copying the project's default `config.json` (ROOT / 'config.json') or using
  80. reasonable defaults. The created file will be written next to the exe or in
  81. the current working directory so that packaged exe creates files in its
  82. directory.
  83. """
  84. # If config exists in runtime location, load it.
  85. if CONFIG_PATH.exists():
  86. with open(CONFIG_PATH, "r", encoding="utf-8") as f:
  87. return json.load(f)
  88. # Otherwise try to obtain a project default from source tree (useful during
  89. # development and when shipping defaults inside the package).
  90. project_default = ROOT / "config.json"
  91. default_cfg = {
  92. "base_url": "http://ip:port/",
  93. "conn_threshold": 20,
  94. "scan_interval": 60,
  95. "test_prefix": "Test_",
  96. "rule_ip_limit": 1000,
  97. }
  98. if project_default.exists():
  99. try:
  100. with open(project_default, "r", encoding="utf-8") as f:
  101. default_cfg = json.load(f)
  102. except Exception:
  103. # ignore and fall back to embedded defaults
  104. pass
  105. # Ensure runtime directory exists and write config atomically
  106. try:
  107. CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
  108. tmp = CONFIG_PATH.with_suffix(".tmp")
  109. with open(tmp, "w", encoding="utf-8") as f:
  110. json.dump(default_cfg, f, ensure_ascii=False, indent=2)
  111. try:
  112. tmp.replace(CONFIG_PATH)
  113. except Exception:
  114. # fallback rename
  115. tmp.rename(CONFIG_PATH)
  116. except Exception as e:
  117. # If we cannot write, surface a FileNotFoundError to preserve original
  118. # callers' behavior.
  119. raise FileNotFoundError(f"无法创建配置文件 {CONFIG_PATH}: {e}")
  120. return default_cfg
  121. def get_base_url() -> str:
  122. """Return base_url from config (ensures no trailing slash)."""
  123. cfg = load_config()
  124. return cfg.get("base_url", "").rstrip("/")
  125. def get_host() -> str:
  126. """Return host:port (netloc) parsed from base_url.
  127. """
  128. from urllib.parse import urlparse
  129. base = get_base_url()
  130. if not base:
  131. return ""
  132. parsed = urlparse(base)
  133. return parsed.netloc
  134. def _extract_sess_cookie_from_response(resp: requests.Response) -> Optional[str]:
  135. """Try to extract sess_key cookie string from response.
  136. Returns a cookie string like 'sess_key=...;' if found, else None.
  137. """
  138. try:
  139. sess_val = resp.cookies.get("sess_key")
  140. if sess_val:
  141. return f"sess_key={sess_val};"
  142. set_cookie = resp.headers.get("Set-Cookie", "")
  143. m = re.search(r"(sess_key=[^;]+;?)", set_cookie)
  144. if m:
  145. return m.group(1)
  146. except Exception:
  147. return None
  148. return None
  149. def login(payload: Optional[dict] = None, timeout: int = 10) -> Tuple[requests.Response, Optional[str]]:
  150. """Send login POST. Returns (response, sess_cookie_or_none).
  151. The function will also set the in-memory sess_key if it can be extracted.
  152. """
  153. cfg = load_config()
  154. base = cfg.get("base_url", "").rstrip("/")
  155. # Validate base URL to avoid passing placeholders like "http://ip:port/" to requests
  156. from urllib.parse import urlparse
  157. parsed = urlparse(base)
  158. # If base_url is a placeholder (intentionally invalid), do not raise an
  159. # exception here. Return a lightweight dummy response so the app can
  160. # continue (GUI can prompt the user to fix config.json on first run).
  161. if not base or not parsed.scheme or not parsed.netloc or "ip:port" in base:
  162. logger = get_logger("login_post")
  163. logger.warning(
  164. "base_url appears to be a placeholder; skipping network login. GUI can be used to set a real base_url."
  165. )
  166. class _DummyResp:
  167. def __init__(self):
  168. self.status_code = 0
  169. self.text = "base_url placeholder - no network request performed"
  170. def json(self):
  171. raise ValueError("No JSON available")
  172. return _DummyResp(), None
  173. url = f"{base}/Action/login"
  174. data = payload or DEFAULT_PAYLOAD
  175. logger = get_logger("login_post")
  176. logger.debug(f"准备发送请求,URL: {url}")
  177. resp = requests.post(url, json=data, timeout=timeout)
  178. sess_cookie = _extract_sess_cookie_from_response(resp)
  179. if sess_cookie:
  180. # Write sess_key into in-memory session state (strip trailing semicolon)
  181. cookie_val = sess_cookie.split("=", 1)[1].rstrip(";")
  182. try:
  183. set_sess_key(cookie_val)
  184. logger.info("sess_key 已保存到内存状态")
  185. except Exception:
  186. logger.exception("保存 sess_key 失败")
  187. # Log request/response but do not fail on logging errors
  188. try:
  189. log_request(logger, "login", url, data, resp)
  190. except Exception:
  191. logger.exception("记录请求/响应失败")
  192. return resp, sess_cookie
  193. def main() -> None:
  194. logger = get_logger("main")
  195. try:
  196. resp, sess_cookie = login()
  197. except FileNotFoundError as e:
  198. logger.error(f"配置错误: {e}")
  199. sys.exit(2)
  200. except requests.RequestException as e:
  201. logger.error(f"请求失败: {e}")
  202. sys.exit(1)
  203. logger.info(f"状态: {resp.status_code}")
  204. content_type = resp.headers.get("Content-Type", "")
  205. if "application/json" in content_type:
  206. try:
  207. pretty = json.dumps(resp.json(), ensure_ascii=False, indent=2)
  208. logger.info(f"响应 JSON:\n{pretty}")
  209. print(pretty)
  210. except ValueError:
  211. logger.info(f"响应文本: {resp.text}")
  212. print(resp.text)
  213. else:
  214. logger.info(f"响应文本: {resp.text}")
  215. print(resp.text)
  216. if sess_cookie:
  217. logger.info(f"提取到 sess_key: {sess_cookie}")
  218. print(f"sess_key: {sess_cookie}")
  219. if __name__ == "__main__":
  220. main()