auth_session.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. """Combined authentication and in-memory session state.
  2. This module merges `session_state.py` and `login_post.py` into a single
  3. convenient file. It exposes the session-state helpers:
  4. - set_sess_key(key: str) -> None
  5. - get_sess_key() -> Optional[str]
  6. - clear_sess_key() -> None
  7. and the `login` function which performs the POST request and stores the
  8. `sess_key` (if found) into the in-memory session state.
  9. It also provides a `main()` function intended for CLI use.
  10. """
  11. from __future__ import annotations
  12. import json
  13. import re
  14. import sys
  15. from pathlib import Path
  16. from threading import Lock
  17. from typing import Optional, Tuple
  18. import requests
  19. from log_util import get_logger, log_request
  20. ROOT = Path(__file__).resolve().parent
  21. CONFIG_PATH = ROOT / "config.json"
  22. # --- simple thread-safe in-memory session state ---
  23. _lock = Lock()
  24. _sess_key: Optional[str] = None
  25. def set_sess_key(key: str) -> None:
  26. """Set global sess_key (thread-safe)."""
  27. global _sess_key
  28. with _lock:
  29. _sess_key = key
  30. def get_sess_key() -> Optional[str]:
  31. """Get current sess_key, or None if not set (thread-safe)."""
  32. with _lock:
  33. return _sess_key
  34. def clear_sess_key() -> None:
  35. """Clear current sess_key (thread-safe)."""
  36. global _sess_key
  37. with _lock:
  38. _sess_key = None
  39. # --- login/request helpers ---
  40. def load_config() -> dict:
  41. if not CONFIG_PATH.exists():
  42. raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
  43. with open(CONFIG_PATH, "r", encoding="utf-8") as f:
  44. return json.load(f)
  45. def get_base_url() -> str:
  46. """Return base_url from config (ensures no trailing slash)."""
  47. cfg = load_config()
  48. return cfg.get("base_url", "").rstrip("/")
  49. def get_host() -> str:
  50. """Return host:port (netloc) parsed from base_url.
  51. Example: 'http://8.210.76.176:65531/' -> '8.210.76.176:65531'
  52. """
  53. from urllib.parse import urlparse
  54. base = get_base_url()
  55. if not base:
  56. return ""
  57. parsed = urlparse(base)
  58. return parsed.netloc
  59. DEFAULT_PAYLOAD = {
  60. "username": "xiaobai",
  61. "passwd": "dc81b4427df07fd6b3ebcb05a7b34daf",
  62. "pass": "c2FsdF8xMXhpYW9iYWku",
  63. "remember_password": "",
  64. }
  65. def _extract_sess_cookie_from_response(resp: requests.Response) -> Optional[str]:
  66. """Try to extract sess_key cookie string from response.
  67. Returns a cookie string like 'sess_key=...;' if found, else None.
  68. """
  69. try:
  70. sess_val = resp.cookies.get("sess_key")
  71. if sess_val:
  72. return f"sess_key={sess_val};"
  73. set_cookie = resp.headers.get("Set-Cookie", "")
  74. m = re.search(r"(sess_key=[^;]+;?)", set_cookie)
  75. if m:
  76. return m.group(1)
  77. except Exception:
  78. return None
  79. return None
  80. def login(payload: dict | None = None, timeout: int = 10) -> Tuple[requests.Response, Optional[str]]:
  81. """Send login POST. Returns (response, sess_cookie_or_none).
  82. The function will also set the in-memory sess_key if it can be extracted.
  83. """
  84. cfg = load_config()
  85. base = cfg.get("base_url", "").rstrip("/")
  86. url = f"{base}/Action/login"
  87. data = payload or DEFAULT_PAYLOAD
  88. logger = get_logger("login_post")
  89. logger.debug(f"准备发送请求,URL: {url}")
  90. resp = requests.post(url, json=data, timeout=timeout)
  91. sess_cookie = _extract_sess_cookie_from_response(resp)
  92. if sess_cookie:
  93. # Write sess_key into in-memory session state (strip trailing semicolon)
  94. cookie_val = sess_cookie.split("=", 1)[1].rstrip(";")
  95. try:
  96. set_sess_key(cookie_val)
  97. logger.info("sess_key 已保存到内存状态")
  98. except Exception:
  99. logger.exception("保存 sess_key 失败")
  100. # Log request/response but do not fail on logging errors
  101. try:
  102. log_request(logger, "login", url, data, resp)
  103. except Exception:
  104. logger.exception("记录请求/响应失败")
  105. return resp, sess_cookie
  106. def main() -> None:
  107. logger = get_logger("main")
  108. try:
  109. resp, sess_cookie = login()
  110. except FileNotFoundError as e:
  111. logger.error(f"配置错误: {e}")
  112. sys.exit(2)
  113. except requests.RequestException as e:
  114. logger.error(f"请求失败: {e}")
  115. sys.exit(1)
  116. logger.info(f"状态: {resp.status_code}")
  117. content_type = resp.headers.get("Content-Type", "")
  118. if "application/json" in content_type:
  119. try:
  120. pretty = json.dumps(resp.json(), ensure_ascii=False, indent=2)
  121. logger.info(f"响应 JSON:\n{pretty}")
  122. print(pretty)
  123. except ValueError:
  124. logger.info(f"响应文本: {resp.text}")
  125. print(resp.text)
  126. else:
  127. logger.info(f"响应文本: {resp.text}")
  128. print(resp.text)
  129. if sess_cookie:
  130. logger.info(f"提取到 sess_key: {sess_cookie}")
  131. print(f"sess_key: {sess_cookie}")
  132. if __name__ == "__main__":
  133. main()