add_acl_post.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. import json
  2. import sys
  3. from pathlib import Path
  4. from typing import Optional
  5. import requests
  6. from log_util import get_logger, log_request
  7. from session_state import get_sess_key
  8. ROOT = Path(__file__).resolve().parent
  9. CONFIG_PATH = ROOT / "config.json"
  10. def load_config():
  11. if not CONFIG_PATH.exists():
  12. raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
  13. with open(CONFIG_PATH, "r", encoding="utf-8") as f:
  14. return json.load(f)
  15. def add_acl_rule(
  16. dst_addr: str,
  17. comment: str,
  18. protocol: str = "any",
  19. action: str = "drop",
  20. dir: str = "forward",
  21. ctdir: str = "0",
  22. iinterface: str = "any",
  23. ointerface: str = "any",
  24. src_addr: str = "",
  25. src_port: str = "",
  26. dst_port: str = "",
  27. enabled: str = "yes",
  28. week: str = "1234567",
  29. time: str = "00:00-23:59",
  30. ip_type: str = "4",
  31. src6_addr: str = "",
  32. dst6_addr: str = "",
  33. src6_mode: str = "",
  34. dst6_mode: str = "",
  35. src6_suffix: str = "",
  36. dst6_suffix: str = "",
  37. timeout: int = 10
  38. ):
  39. """Send ACL add POST. Returns requests.Response and parsed JSON data.
  40. dst_addr: Comma-separated destination addresses, e.g., "1.2.3.4,2.4.5.6"
  41. comment: Comment for the rule, e.g., "remark_kkkkkk"
  42. Other parameters have defaults based on the example.
  43. """
  44. cfg = load_config()
  45. base = cfg.get("base_url", "").rstrip("/")
  46. url = f"{base}/Action/call"
  47. data = {
  48. "func_name": "acl",
  49. "action": "add",
  50. "param": {
  51. "protocol": protocol,
  52. "action": action,
  53. "dir": dir,
  54. "ctdir": ctdir,
  55. "iinterface": iinterface,
  56. "ointerface": ointerface,
  57. "src_addr": src_addr,
  58. "dst_addr": dst_addr,
  59. "src_port": src_port,
  60. "dst_port": dst_port,
  61. "comment": comment,
  62. "enabled": enabled,
  63. "week": week,
  64. "time": time,
  65. "ip_type": ip_type,
  66. "src6_addr": src6_addr,
  67. "dst6_addr": dst6_addr,
  68. "src6_mode": src6_mode,
  69. "dst6_mode": dst6_mode,
  70. "src6_suffix": src6_suffix,
  71. "dst6_suffix": dst6_suffix
  72. }
  73. }
  74. logger = get_logger("add_acl_post")
  75. # Get sess_key from global state
  76. sess_key = get_sess_key()
  77. if not sess_key:
  78. raise ValueError("未找到 sess_key,请先登录")
  79. cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")} # Extract value from "sess_key=value;"
  80. logger.debug(f"准备发送请求,URL: {url}")
  81. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  82. # 记录请求/响应
  83. try:
  84. log_request(logger, "add_acl_rule", url, data, resp)
  85. except Exception:
  86. logger.exception("记录请求/响应失败")
  87. # Parse JSON response
  88. try:
  89. json_data = resp.json()
  90. except ValueError:
  91. json_data = None
  92. return resp, json_data
  93. def main():
  94. logger = get_logger("main")
  95. try:
  96. # Example: Add ACL rule to drop traffic to 1.2.3.4 and 2.4.5.6 with comment "remark_kkkkkk"
  97. resp, data = add_acl_rule(dst_addr="1.2.3.4,2.4.5.6", comment="remark_kkkkkk")
  98. except FileNotFoundError as e:
  99. logger.error(f"配置错误: {e}")
  100. sys.exit(2)
  101. except ValueError as e:
  102. logger.error(f"会话错误: {e}")
  103. sys.exit(3)
  104. except requests.RequestException as e:
  105. logger.error(f"请求失败: {e}")
  106. sys.exit(1)
  107. # 控制台友好输出
  108. logger.info(f"状态: {resp.status_code}")
  109. if data:
  110. try:
  111. pretty = json.dumps(data, ensure_ascii=False, indent=2)
  112. logger.info(f"响应 JSON:\n{pretty}")
  113. print(pretty)
  114. except Exception:
  115. logger.info(f"响应文本: {resp.text}")
  116. print(resp.text)
  117. else:
  118. logger.info(f"响应文本: {resp.text}")
  119. print(resp.text)
  120. if __name__ == "__main__":
  121. main()