| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- import json
- import sys
- from pathlib import Path
- from typing import Optional, Tuple
- import requests
- from log_util import get_logger, log_request
- from auth_session import get_sess_key, get_base_url
- ROOT = Path(__file__).resolve().parent
- CONFIG_PATH = ROOT / "config.json"
- DEFAULT_SHOW_PAYLOAD = {
- "func_name": "acl",
- "action": "show",
- "param": {
- "TYPE": "total,data",
- "limit": "0,20",
- "ORDER_BY": "",
- "ORDER": ""
- }
- }
- def _make_cookies_from_sess_key(sess_key: str) -> dict:
- # sess_key may be stored as value (no "sess_key=...;") or as "sess_key=...;".
- val = sess_key.split("=", 1)[-1].rstrip(";")
- return {"sess_key": val}
- def get_acl_rules(payload: dict | None = None, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
- base = get_base_url()
- url = f"{base}/Action/call"
- data = payload or DEFAULT_SHOW_PAYLOAD
- logger = get_logger("acl_post")
- sess_key = get_sess_key()
- if not sess_key:
- raise ValueError("未找到 sess_key,请先登录")
- cookies = _make_cookies_from_sess_key(sess_key)
- logger.debug(f"准备发送请求,URL: {url}")
- resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
- try:
- log_request(logger, "get_acl_rules", url, data, resp)
- except Exception:
- logger.exception("记录请求/响应失败")
- try:
- json_data = resp.json()
- except ValueError:
- json_data = None
- return resp, json_data
- def add_acl_rule(
- dst_addr: str,
- comment: str,
- protocol: str = "any",
- action: str = "drop",
- dir: str = "forward",
- ctdir: str = "0",
- iinterface: str = "any",
- ointerface: str = "any",
- src_addr: str = "",
- src_port: str = "",
- dst_port: str = "",
- enabled: str = "yes",
- week: str = "1234567",
- time: str = "00:00-23:59",
- ip_type: str = "4",
- src6_addr: str = "",
- dst6_addr: str = "",
- src6_mode: str = "",
- dst6_mode: str = "",
- src6_suffix: str = "",
- dst6_suffix: str = "",
- timeout: int = 10,
- ) -> Tuple[requests.Response, Optional[dict]]:
- base = get_base_url()
- url = f"{base}/Action/call"
- data = {
- "func_name": "acl",
- "action": "add",
- "param": {
- "protocol": protocol,
- "action": action,
- "dir": dir,
- "ctdir": ctdir,
- "iinterface": iinterface,
- "ointerface": ointerface,
- "src_addr": src_addr,
- "dst_addr": dst_addr,
- "src_port": src_port,
- "dst_port": dst_port,
- "comment": comment,
- "enabled": enabled,
- "week": week,
- "time": time,
- "ip_type": ip_type,
- "src6_addr": src6_addr,
- "dst6_addr": dst6_addr,
- "src6_mode": src6_mode,
- "dst6_mode": dst6_mode,
- "src6_suffix": src6_suffix,
- "dst6_suffix": dst6_suffix,
- },
- }
- logger = get_logger("add_acl_post")
- sess_key = get_sess_key()
- if not sess_key:
- raise ValueError("未找到 sess_key,请先登录")
- cookies = _make_cookies_from_sess_key(sess_key)
- logger.debug(f"准备发送请求,URL: {url}")
- resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
- try:
- log_request(logger, "add_acl_rule", url, data, resp)
- except Exception:
- logger.exception("记录请求/响应失败")
- try:
- json_data = resp.json()
- except ValueError:
- json_data = None
- return resp, json_data
- def del_acl_rule(rule_id: int, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
- base = get_base_url()
- url = f"{base}/Action/call"
- data = {"func_name": "acl", "action": "del", "param": {"id": rule_id}}
- logger = get_logger("del_acl_post")
- sess_key = get_sess_key()
- if not sess_key:
- raise ValueError("未找到 sess_key,请先登录")
- cookies = _make_cookies_from_sess_key(sess_key)
- logger.debug(f"准备发送请求,URL: {url}")
- resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
- try:
- log_request(logger, "del_acl_rule", url, data, resp)
- except Exception:
- logger.exception("记录请求/响应失败")
- try:
- json_data = resp.json()
- except ValueError:
- json_data = None
- return resp, json_data
- def edit_acl_rule(
- rule_id: int,
- dst_addr: str = "",
- comment: str = "",
- protocol: str = "any",
- action: str = "drop",
- dir: str = "forward",
- ctdir: int = 0,
- iinterface: str = "any",
- ointerface: str = "any",
- src_addr: str = "",
- src_port: str = "",
- dst_port: str = "",
- enabled: str = "yes",
- week: str = "1234567",
- time: str = "00:00-23:59",
- ip_type: str = "4",
- src6_addr: str = "",
- dst6_addr: str = "",
- src6_mode: int = 0,
- dst6_mode: int = 0,
- src6_suffix: str = "",
- dst6_suffix: str = "",
- src6_mac: str = "",
- dst6_mac: str = "",
- timeout: int = 10,
- ) -> Tuple[requests.Response, Optional[dict]]:
- base = get_base_url()
- url = f"{base}/Action/call"
- data = {
- "func_name": "acl",
- "action": "edit",
- "param": {
- "id": rule_id,
- "protocol": protocol,
- "action": action,
- "dir": dir,
- "ctdir": ctdir,
- "iinterface": iinterface,
- "ointerface": ointerface,
- "src_addr": src_addr,
- "dst_addr": dst_addr,
- "src_port": src_port,
- "dst_port": dst_port,
- "comment": comment,
- "enabled": enabled,
- "week": week,
- "time": time,
- "ip_type": ip_type,
- "src6_addr": src6_addr,
- "dst6_addr": dst6_addr,
- "src6_mode": src6_mode,
- "dst6_mode": dst6_mode,
- "src6_suffix": src6_suffix,
- "dst6_suffix": dst6_suffix,
- "src6_mac": src6_mac,
- "dst6_mac": dst6_mac,
- },
- }
- logger = get_logger("edit_acl_post")
- sess_key = get_sess_key()
- if not sess_key:
- raise ValueError("未找到 sess_key,请先登录")
- cookies = _make_cookies_from_sess_key(sess_key)
- logger.debug(f"准备发送请求,URL: {url}")
- resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
- try:
- log_request(logger, "edit_acl_rule", url, data, resp)
- except Exception:
- logger.exception("记录请求/响应失败")
- try:
- json_data = resp.json()
- except ValueError:
- json_data = None
- return resp, json_data
- def _print_response(resp: requests.Response, data: Optional[dict], logger_name: str = "main") -> None:
- logger = get_logger(logger_name)
- logger.info(f"状态: {resp.status_code}")
- if data:
- try:
- pretty = json.dumps(data, ensure_ascii=False, indent=2)
- logger.info(f"响应 JSON:\n{pretty}")
- print(pretty)
- except Exception:
- logger.info(f"响应文本: {resp.text}")
- print(resp.text)
- else:
- logger.info(f"响应文本: {resp.text}")
- print(resp.text)
- if __name__ == "__main__":
- # 示例:调用 get_acl_rules
- try:
- resp, data = get_acl_rules()
- _print_response(resp, data)
- except FileNotFoundError as e:
- get_logger("main").error(f"配置错误: {e}")
- sys.exit(2)
- except ValueError as e:
- get_logger("main").error(f"会话错误: {e}")
- sys.exit(3)
- except requests.RequestException as e:
- get_logger("main").error(f"请求失败: {e}")
- sys.exit(1)
|