acl_actions.py 7.5 KB


  1. import json
  2. import sys
  3. from pathlib import Path
  4. from typing import Optional, Tuple
  5. import requests
  6. from log_util import get_logger, log_request
  7. from auth_session import get_sess_key, get_base_url
  8. ROOT = Path(__file__).resolve().parent
  9. CONFIG_PATH = ROOT / "config.json"
  10. DEFAULT_SHOW_PAYLOAD = {
  11. "func_name": "acl",
  12. "action": "show",
  13. "param": {
  14. "TYPE": "total,data",
  15. "limit": "0,20",
  16. "ORDER_BY": "",
  17. "ORDER": ""
  18. }
  19. }
  20. def _make_cookies_from_sess_key(sess_key: str) -> dict:
  21. # sess_key may be stored as value (no "sess_key=...;") or as "sess_key=...;".
  22. val = sess_key.split("=", 1)[-1].rstrip(";")
  23. return {"sess_key": val}
  24. def get_acl_rules(payload: dict | None = None, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
  25. base = get_base_url()
  26. url = f"{base}/Action/call"
  27. data = payload or DEFAULT_SHOW_PAYLOAD
  28. logger = get_logger("acl_post")
  29. sess_key = get_sess_key()
  30. if not sess_key:
  31. raise ValueError("未找到 sess_key,请先登录")
  32. cookies = _make_cookies_from_sess_key(sess_key)
  33. logger.debug(f"准备发送请求,URL: {url}")
  34. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  35. try:
  36. log_request(logger, "get_acl_rules", url, data, resp)
  37. except Exception:
  38. logger.exception("记录请求/响应失败")
  39. try:
  40. json_data = resp.json()
  41. except ValueError:
  42. json_data = None
  43. return resp, json_data
  44. def add_acl_rule(
  45. dst_addr: str,
  46. comment: str,
  47. protocol: str = "any",
  48. action: str = "drop",
  49. dir: str = "forward",
  50. ctdir: str = "0",
  51. iinterface: str = "any",
  52. ointerface: str = "any",
  53. src_addr: str = "",
  54. src_port: str = "",
  55. dst_port: str = "",
  56. enabled: str = "yes",
  57. week: str = "1234567",
  58. time: str = "00:00-23:59",
  59. ip_type: str = "4",
  60. src6_addr: str = "",
  61. dst6_addr: str = "",
  62. src6_mode: str = "",
  63. dst6_mode: str = "",
  64. src6_suffix: str = "",
  65. dst6_suffix: str = "",
  66. timeout: int = 10,
  67. ) -> Tuple[requests.Response, Optional[dict]]:
  68. base = get_base_url()
  69. url = f"{base}/Action/call"
  70. data = {
  71. "func_name": "acl",
  72. "action": "add",
  73. "param": {
  74. "protocol": protocol,
  75. "action": action,
  76. "dir": dir,
  77. "ctdir": ctdir,
  78. "iinterface": iinterface,
  79. "ointerface": ointerface,
  80. "src_addr": src_addr,
  81. "dst_addr": dst_addr,
  82. "src_port": src_port,
  83. "dst_port": dst_port,
  84. "comment": comment,
  85. "enabled": enabled,
  86. "week": week,
  87. "time": time,
  88. "ip_type": ip_type,
  89. "src6_addr": src6_addr,
  90. "dst6_addr": dst6_addr,
  91. "src6_mode": src6_mode,
  92. "dst6_mode": dst6_mode,
  93. "src6_suffix": src6_suffix,
  94. "dst6_suffix": dst6_suffix,
  95. },
  96. }
  97. logger = get_logger("add_acl_post")
  98. sess_key = get_sess_key()
  99. if not sess_key:
  100. raise ValueError("未找到 sess_key,请先登录")
  101. cookies = _make_cookies_from_sess_key(sess_key)
  102. logger.debug(f"准备发送请求,URL: {url}")
  103. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  104. try:
  105. log_request(logger, "add_acl_rule", url, data, resp)
  106. except Exception:
  107. logger.exception("记录请求/响应失败")
  108. try:
  109. json_data = resp.json()
  110. except ValueError:
  111. json_data = None
  112. return resp, json_data
  113. def del_acl_rule(rule_id: int, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
  114. base = get_base_url()
  115. url = f"{base}/Action/call"
  116. data = {"func_name": "acl", "action": "del", "param": {"id": rule_id}}
  117. logger = get_logger("del_acl_post")
  118. sess_key = get_sess_key()
  119. if not sess_key:
  120. raise ValueError("未找到 sess_key,请先登录")
  121. cookies = _make_cookies_from_sess_key(sess_key)
  122. logger.debug(f"准备发送请求,URL: {url}")
  123. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  124. try:
  125. log_request(logger, "del_acl_rule", url, data, resp)
  126. except Exception:
  127. logger.exception("记录请求/响应失败")
  128. try:
  129. json_data = resp.json()
  130. except ValueError:
  131. json_data = None
  132. return resp, json_data
  133. def edit_acl_rule(
  134. rule_id: int,
  135. dst_addr: str = "",
  136. comment: str = "",
  137. protocol: str = "any",
  138. action: str = "drop",
  139. dir: str = "forward",
  140. ctdir: int = 0,
  141. iinterface: str = "any",
  142. ointerface: str = "any",
  143. src_addr: str = "",
  144. src_port: str = "",
  145. dst_port: str = "",
  146. enabled: str = "yes",
  147. week: str = "1234567",
  148. time: str = "00:00-23:59",
  149. ip_type: str = "4",
  150. src6_addr: str = "",
  151. dst6_addr: str = "",
  152. src6_mode: int = 0,
  153. dst6_mode: int = 0,
  154. src6_suffix: str = "",
  155. dst6_suffix: str = "",
  156. src6_mac: str = "",
  157. dst6_mac: str = "",
  158. timeout: int = 10,
  159. ) -> Tuple[requests.Response, Optional[dict]]:
  160. base = get_base_url()
  161. url = f"{base}/Action/call"
  162. data = {
  163. "func_name": "acl",
  164. "action": "edit",
  165. "param": {
  166. "id": rule_id,
  167. "protocol": protocol,
  168. "action": action,
  169. "dir": dir,
  170. "ctdir": ctdir,
  171. "iinterface": iinterface,
  172. "ointerface": ointerface,
  173. "src_addr": src_addr,
  174. "dst_addr": dst_addr,
  175. "src_port": src_port,
  176. "dst_port": dst_port,
  177. "comment": comment,
  178. "enabled": enabled,
  179. "week": week,
  180. "time": time,
  181. "ip_type": ip_type,
  182. "src6_addr": src6_addr,
  183. "dst6_addr": dst6_addr,
  184. "src6_mode": src6_mode,
  185. "dst6_mode": dst6_mode,
  186. "src6_suffix": src6_suffix,
  187. "dst6_suffix": dst6_suffix,
  188. "src6_mac": src6_mac,
  189. "dst6_mac": dst6_mac,
  190. },
  191. }
  192. logger = get_logger("edit_acl_post")
  193. sess_key = get_sess_key()
  194. if not sess_key:
  195. raise ValueError("未找到 sess_key,请先登录")
  196. cookies = _make_cookies_from_sess_key(sess_key)
  197. logger.debug(f"准备发送请求,URL: {url}")
  198. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  199. try:
  200. log_request(logger, "edit_acl_rule", url, data, resp)
  201. except Exception:
  202. logger.exception("记录请求/响应失败")
  203. try:
  204. json_data = resp.json()
  205. except ValueError:
  206. json_data = None
  207. return resp, json_data
  208. def _print_response(resp: requests.Response, data: Optional[dict], logger_name: str = "main") -> None:
  209. logger = get_logger(logger_name)
  210. logger.info(f"状态: {resp.status_code}")
  211. if data:
  212. try:
  213. pretty = json.dumps(data, ensure_ascii=False, indent=2)
  214. logger.info(f"响应 JSON:\n{pretty}")
  215. print(pretty)
  216. except Exception:
  217. logger.info(f"响应文本: {resp.text}")
  218. print(resp.text)
  219. else:
  220. logger.info(f"响应文本: {resp.text}")
  221. print(resp.text)
  222. if __name__ == "__main__":
  223. # 示例:调用 get_acl_rules
  224. try:
  225. resp, data = get_acl_rules()
  226. _print_response(resp, data)
  227. except FileNotFoundError as e:
  228. get_logger("main").error(f"配置错误: {e}")
  229. sys.exit(2)
  230. except ValueError as e:
  231. get_logger("main").error(f"会话错误: {e}")
  232. sys.exit(3)
  233. except requests.RequestException as e:
  234. get_logger("main").error(f"请求失败: {e}")
  235. sys.exit(1)