acl_actions.py 7.7 KB


  1. import json
  2. import sys
  3. from pathlib import Path
  4. from typing import Optional, Tuple
  5. import requests
  6. from log_util import get_logger, log_request
  7. from auth_session import get_sess_key, load_config
  8. ROOT = Path(__file__).resolve().parent
  9. CONFIG_PATH = ROOT / "config.json"
  10. DEFAULT_SHOW_PAYLOAD = {
  11. "func_name": "acl",
  12. "action": "show",
  13. "param": {
  14. "TYPE": "total,data",
  15. "limit": "0,20",
  16. "ORDER_BY": "",
  17. "ORDER": ""
  18. }
  19. }
  20. def _make_cookies_from_sess_key(sess_key: str) -> dict:
  21. # sess_key may be stored as value (no "sess_key=...;") or as "sess_key=...;".
  22. val = sess_key.split("=", 1)[-1].rstrip(";")
  23. return {"sess_key": val}
  24. def get_acl_rules(payload: dict | None = None, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
  25. cfg = load_config()
  26. base = cfg.get("base_url", "").rstrip("/")
  27. url = f"{base}/Action/call"
  28. data = payload or DEFAULT_SHOW_PAYLOAD
  29. logger = get_logger("acl_post")
  30. sess_key = get_sess_key()
  31. if not sess_key:
  32. raise ValueError("未找到 sess_key,请先登录")
  33. cookies = _make_cookies_from_sess_key(sess_key)
  34. logger.debug(f"准备发送请求,URL: {url}")
  35. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  36. try:
  37. log_request(logger, "get_acl_rules", url, data, resp)
  38. except Exception:
  39. logger.exception("记录请求/响应失败")
  40. try:
  41. json_data = resp.json()
  42. except ValueError:
  43. json_data = None
  44. return resp, json_data
  45. def add_acl_rule(
  46. dst_addr: str,
  47. comment: str,
  48. protocol: str = "any",
  49. action: str = "drop",
  50. dir: str = "forward",
  51. ctdir: str = "0",
  52. iinterface: str = "any",
  53. ointerface: str = "any",
  54. src_addr: str = "",
  55. src_port: str = "",
  56. dst_port: str = "",
  57. enabled: str = "yes",
  58. week: str = "1234567",
  59. time: str = "00:00-23:59",
  60. ip_type: str = "4",
  61. src6_addr: str = "",
  62. dst6_addr: str = "",
  63. src6_mode: str = "",
  64. dst6_mode: str = "",
  65. src6_suffix: str = "",
  66. dst6_suffix: str = "",
  67. timeout: int = 10,
  68. ) -> Tuple[requests.Response, Optional[dict]]:
  69. cfg = load_config()
  70. base = cfg.get("base_url", "").rstrip("/")
  71. url = f"{base}/Action/call"
  72. data = {
  73. "func_name": "acl",
  74. "action": "add",
  75. "param": {
  76. "protocol": protocol,
  77. "action": action,
  78. "dir": dir,
  79. "ctdir": ctdir,
  80. "iinterface": iinterface,
  81. "ointerface": ointerface,
  82. "src_addr": src_addr,
  83. "dst_addr": dst_addr,
  84. "src_port": src_port,
  85. "dst_port": dst_port,
  86. "comment": comment,
  87. "enabled": enabled,
  88. "week": week,
  89. "time": time,
  90. "ip_type": ip_type,
  91. "src6_addr": src6_addr,
  92. "dst6_addr": dst6_addr,
  93. "src6_mode": src6_mode,
  94. "dst6_mode": dst6_mode,
  95. "src6_suffix": src6_suffix,
  96. "dst6_suffix": dst6_suffix,
  97. },
  98. }
  99. logger = get_logger("add_acl_post")
  100. sess_key = get_sess_key()
  101. if not sess_key:
  102. raise ValueError("未找到 sess_key,请先登录")
  103. cookies = _make_cookies_from_sess_key(sess_key)
  104. logger.debug(f"准备发送请求,URL: {url}")
  105. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  106. try:
  107. log_request(logger, "add_acl_rule", url, data, resp)
  108. except Exception:
  109. logger.exception("记录请求/响应失败")
  110. try:
  111. json_data = resp.json()
  112. except ValueError:
  113. json_data = None
  114. return resp, json_data
  115. def del_acl_rule(rule_id: int, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
  116. cfg = load_config()
  117. base = cfg.get("base_url", "").rstrip("/")
  118. url = f"{base}/Action/call"
  119. data = {"func_name": "acl", "action": "del", "param": {"id": rule_id}}
  120. logger = get_logger("del_acl_post")
  121. sess_key = get_sess_key()
  122. if not sess_key:
  123. raise ValueError("未找到 sess_key,请先登录")
  124. cookies = _make_cookies_from_sess_key(sess_key)
  125. logger.debug(f"准备发送请求,URL: {url}")
  126. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  127. try:
  128. log_request(logger, "del_acl_rule", url, data, resp)
  129. except Exception:
  130. logger.exception("记录请求/响应失败")
  131. try:
  132. json_data = resp.json()
  133. except ValueError:
  134. json_data = None
  135. return resp, json_data
  136. def edit_acl_rule(
  137. rule_id: int,
  138. dst_addr: str = "",
  139. comment: str = "",
  140. protocol: str = "any",
  141. action: str = "drop",
  142. dir: str = "forward",
  143. ctdir: int = 0,
  144. iinterface: str = "any",
  145. ointerface: str = "any",
  146. src_addr: str = "",
  147. src_port: str = "",
  148. dst_port: str = "",
  149. enabled: str = "yes",
  150. week: str = "1234567",
  151. time: str = "00:00-23:59",
  152. ip_type: str = "4",
  153. src6_addr: str = "",
  154. dst6_addr: str = "",
  155. src6_mode: int = 0,
  156. dst6_mode: int = 0,
  157. src6_suffix: str = "",
  158. dst6_suffix: str = "",
  159. src6_mac: str = "",
  160. dst6_mac: str = "",
  161. timeout: int = 10,
  162. ) -> Tuple[requests.Response, Optional[dict]]:
  163. cfg = load_config()
  164. base = cfg.get("base_url", "").rstrip("/")
  165. url = f"{base}/Action/call"
  166. data = {
  167. "func_name": "acl",
  168. "action": "edit",
  169. "param": {
  170. "id": rule_id,
  171. "protocol": protocol,
  172. "action": action,
  173. "dir": dir,
  174. "ctdir": ctdir,
  175. "iinterface": iinterface,
  176. "ointerface": ointerface,
  177. "src_addr": src_addr,
  178. "dst_addr": dst_addr,
  179. "src_port": src_port,
  180. "dst_port": dst_port,
  181. "comment": comment,
  182. "enabled": enabled,
  183. "week": week,
  184. "time": time,
  185. "ip_type": ip_type,
  186. "src6_addr": src6_addr,
  187. "dst6_addr": dst6_addr,
  188. "src6_mode": src6_mode,
  189. "dst6_mode": dst6_mode,
  190. "src6_suffix": src6_suffix,
  191. "dst6_suffix": dst6_suffix,
  192. "src6_mac": src6_mac,
  193. "dst6_mac": dst6_mac,
  194. },
  195. }
  196. logger = get_logger("edit_acl_post")
  197. sess_key = get_sess_key()
  198. if not sess_key:
  199. raise ValueError("未找到 sess_key,请先登录")
  200. cookies = _make_cookies_from_sess_key(sess_key)
  201. logger.debug(f"准备发送请求,URL: {url}")
  202. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  203. try:
  204. log_request(logger, "edit_acl_rule", url, data, resp)
  205. except Exception:
  206. logger.exception("记录请求/响应失败")
  207. try:
  208. json_data = resp.json()
  209. except ValueError:
  210. json_data = None
  211. return resp, json_data
  212. def _print_response(resp: requests.Response, data: Optional[dict], logger_name: str = "main") -> None:
  213. logger = get_logger(logger_name)
  214. logger.info(f"状态: {resp.status_code}")
  215. if data:
  216. try:
  217. pretty = json.dumps(data, ensure_ascii=False, indent=2)
  218. logger.info(f"响应 JSON:\n{pretty}")
  219. print(pretty)
  220. except Exception:
  221. logger.info(f"响应文本: {resp.text}")
  222. print(resp.text)
  223. else:
  224. logger.info(f"响应文本: {resp.text}")
  225. print(resp.text)
  226. if __name__ == "__main__":
  227. # 示例:调用 get_acl_rules
  228. try:
  229. resp, data = get_acl_rules()
  230. _print_response(resp, data)
  231. except FileNotFoundError as e:
  232. get_logger("main").error(f"配置错误: {e}")
  233. sys.exit(2)
  234. except ValueError as e:
  235. get_logger("main").error(f"会话错误: {e}")
  236. sys.exit(3)
  237. except requests.RequestException as e:
  238. get_logger("main").error(f"请求失败: {e}")
  239. sys.exit(1)