Prechádzať zdrojové kódy

feat(login): 实现登录功能并管理会话状态

- 新增 login_post.py,实现登录请求发送与响应处理
- 添加从响应中提取 sess_key 的逻辑
- 新增 session_state.py,提供线程安全的会话状态管理
- 在 main.py 中集成登录调用与 sess_key 存储
- 添加配置文件加载与日志记录功能
- 实现控制台友好的响应输出与错误处理
mcbaiyun 2 mesiacov pred
rodič
commit
2b3eced3a8
3 zmenil súbory, kde vykonal 165 pridanie a 0 odobranie
  1. 103 0
      login_post.py
  2. 33 0
      main.py
  3. 29 0
      session_state.py

+ 103 - 0
login_post.py

@@ -0,0 +1,103 @@
+import json
+import sys
+from pathlib import Path
+
+import requests
+
+from log_util import get_logger, log_request
+
+
+ROOT = Path(__file__).resolve().parent
+CONFIG_PATH = ROOT / "config.json"
+
+
+def load_config():
+    if not CONFIG_PATH.exists():
+        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
+    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+DEFAULT_PAYLOAD = {
+    "username": "xiaobai",
+    "passwd": "dc81b4427df07fd6b3ebcb05a7b34daf",
+    "pass": "c2FsdF8xMXhpYW9iYWku",
+    "remember_password": ""
+}
+
+
+def login(payload: dict | None = None, timeout: int = 10):
+    """Send login POST. Returns requests.Response.
+
+    payload: JSON payload to send. If None, uses DEFAULT_PAYLOAD.
+    timeout: request timeout in seconds.
+    """
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/login"
+    data = payload or DEFAULT_PAYLOAD
+    logger = get_logger("login_post")
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, timeout=timeout)
+
+    # Try to extract sess_key from cookies or Set-Cookie header.
+    sess_cookie = None
+    try:
+        # requests exposes cookies in resp.cookies
+        sess_val = resp.cookies.get("sess_key")
+        if sess_val:
+            sess_cookie = f"sess_key={sess_val};"
+        else:
+            set_cookie = resp.headers.get("Set-Cookie", "")
+            import re
+
+            m = re.search(r"(sess_key=[^;]+;?)", set_cookie)
+            if m:
+                sess_cookie = m.group(1)
+    except Exception:
+        # Non-fatal: if extraction fails, return None for sess_cookie
+        sess_cookie = None
+
+    # 记录请求/响应
+    try:
+        log_request(logger, "login", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    return resp, sess_cookie
+
+
+def main():
+    logger = get_logger("main")
+    try:
+        resp, sess_cookie = login()
+    except FileNotFoundError as e:
+        logger.error(f"配置错误: {e}")
+        sys.exit(2)
+    except requests.RequestException as e:
+        logger.error(f"请求失败: {e}")
+        sys.exit(1)
+    # 控制台友好输出
+    logger.info(f"状态: {resp.status_code}")
+    # Try to pretty-print JSON if possible
+    content_type = resp.headers.get("Content-Type", "")
+    if "application/json" in content_type:
+        try:
+            pretty = json.dumps(resp.json(), ensure_ascii=False, indent=2)
+            logger.info(f"响应 JSON:\n{pretty}")
+            print(pretty)
+        except ValueError:
+            logger.info(f"响应文本: {resp.text}")
+            print(resp.text)
+    else:
+        logger.info(f"响应文本: {resp.text}")
+        print(resp.text)
+
+    if sess_cookie:
+        # Print the raw sess_key cookie string
+        logger.info(f"提取到 sess_key: {sess_cookie}")
+        print(f"sess_key: {sess_cookie}")
+
+
+if __name__ == "__main__":
+    main()

+ 33 - 0
main.py

@@ -0,0 +1,33 @@
+"""Entry point to call various API actions."""
+
+from login_post import login
+from log_util import get_logger
+from session_state import set_sess_key
+
+
+def main():
+	logger = get_logger("main")
+	logger.info("开始 main()")
+
+	try:
+		resp, sess_cookie = login()
+	except FileNotFoundError as e:
+		logger.error(f"配置错误: {e}")
+		return
+	except Exception as e:
+		logger.exception(f"请求或其他错误: {e}")
+		return
+
+	logger.info(f"已调用: login() | 参数: 默认 payload | 状态: {resp.status_code}")
+	if sess_cookie:
+		logger.info(f"返回的 sess_key: {sess_cookie}")
+		# 保存为全局会话状态,供其他接口调用时使用
+		set_sess_key(sess_cookie)
+		print(f"sess_key: {sess_cookie}")
+	else:
+		logger.warning("未在响应中找到 sess_key")
+		print("未在响应中找到 sess_key")
+
+
+if __name__ == "__main__":
+	main()

+ 29 - 0
session_state.py

@@ -0,0 +1,29 @@
+"""Simple thread-safe in-memory session state for storing sess_key.
+
+提供 set_sess_key/get_sess_key/clear_sess_key 接口,供其他模块在调用 API 时获取会话 key。
+"""
+from threading import Lock
+from typing import Optional
+
+_lock = Lock()
+_sess_key: Optional[str] = None
+
+
+def set_sess_key(key: str) -> None:
+    """设置全局 sess_key(线程安全)。"""
+    global _sess_key
+    with _lock:
+        _sess_key = key
+
+
+def get_sess_key() -> Optional[str]:
+    """获取当前的 sess_key,如果没有则返回 None(线程安全)。"""
+    with _lock:
+        return _sess_key
+
+
+def clear_sess_key() -> None:
+    """清除当前 sess_key(线程安全)。"""
+    global _sess_key
+    with _lock:
+        _sess_key = None