edit_acl_post.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import json
  2. import sys
  3. from pathlib import Path
  4. from typing import Optional
  5. import requests
  6. from log_util import get_logger, log_request
  7. from session_state import get_sess_key
  8. ROOT = Path(__file__).resolve().parent
  9. CONFIG_PATH = ROOT / "config.json"
  10. def load_config():
  11. if not CONFIG_PATH.exists():
  12. raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
  13. with open(CONFIG_PATH, "r", encoding="utf-8") as f:
  14. return json.load(f)
  15. def edit_acl_rule(
  16. rule_id: int,
  17. dst_addr: str = "",
  18. comment: str = "",
  19. protocol: str = "any",
  20. action: str = "drop",
  21. dir: str = "forward",
  22. ctdir: int = 0,
  23. iinterface: str = "any",
  24. ointerface: str = "any",
  25. src_addr: str = "",
  26. src_port: str = "",
  27. dst_port: str = "",
  28. enabled: str = "yes",
  29. week: str = "1234567",
  30. time: str = "00:00-23:59",
  31. ip_type: str = "4",
  32. src6_addr: str = "",
  33. dst6_addr: str = "",
  34. src6_mode: int = 0,
  35. dst6_mode: int = 0,
  36. src6_suffix: str = "",
  37. dst6_suffix: str = "",
  38. src6_mac: str = "",
  39. dst6_mac: str = "",
  40. timeout: int = 10
  41. ):
  42. """Send ACL edit POST. Returns requests.Response and parsed JSON data.
  43. rule_id: The ID of the ACL rule to edit (required).
  44. Other parameters are optional and will update the rule if provided.
  45. """
  46. cfg = load_config()
  47. base = cfg.get("base_url", "").rstrip("/")
  48. url = f"{base}/Action/call"
  49. data = {
  50. "func_name": "acl",
  51. "action": "edit",
  52. "param": {
  53. "id": rule_id,
  54. "protocol": protocol,
  55. "action": action,
  56. "dir": dir,
  57. "ctdir": ctdir,
  58. "iinterface": iinterface,
  59. "ointerface": ointerface,
  60. "src_addr": src_addr,
  61. "dst_addr": dst_addr,
  62. "src_port": src_port,
  63. "dst_port": dst_port,
  64. "comment": comment,
  65. "enabled": enabled,
  66. "week": week,
  67. "time": time,
  68. "ip_type": ip_type,
  69. "src6_addr": src6_addr,
  70. "dst6_addr": dst6_addr,
  71. "src6_mode": src6_mode,
  72. "dst6_mode": dst6_mode,
  73. "src6_suffix": src6_suffix,
  74. "dst6_suffix": dst6_suffix,
  75. "src6_mac": src6_mac,
  76. "dst6_mac": dst6_mac
  77. }
  78. }
  79. logger = get_logger("edit_acl_post")
  80. # Get sess_key from global state
  81. sess_key = get_sess_key()
  82. if not sess_key:
  83. raise ValueError("未找到 sess_key,请先登录")
  84. cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")} # Extract value from "sess_key=value;"
  85. logger.debug(f"准备发送请求,URL: {url}")
  86. resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
  87. # 记录请求/响应
  88. try:
  89. log_request(logger, "edit_acl_rule", url, data, resp)
  90. except Exception:
  91. logger.exception("记录请求/响应失败")
  92. # Parse JSON response
  93. try:
  94. json_data = resp.json()
  95. except ValueError:
  96. json_data = None
  97. return resp, json_data
  98. def main():
  99. logger = get_logger("main")
  100. try:
  101. # Example: Edit ACL rule with ID 14, change dst_addr to "1.2.3.4" and comment to "remark_kkkkkk"
  102. resp, data = edit_acl_rule(rule_id=14, dst_addr="1.2.3.4", comment="remark_kkkkkk")
  103. except FileNotFoundError as e:
  104. logger.error(f"配置错误: {e}")
  105. sys.exit(2)
  106. except ValueError as e:
  107. logger.error(f"会话错误: {e}")
  108. sys.exit(3)
  109. except requests.RequestException as e:
  110. logger.error(f"请求失败: {e}")
  111. sys.exit(1)
  112. # 控制台友好输出
  113. logger.info(f"状态: {resp.status_code}")
  114. if data:
  115. try:
  116. pretty = json.dumps(data, ensure_ascii=False, indent=2)
  117. logger.info(f"响应 JSON:\n{pretty}")
  118. print(pretty)
  119. except Exception:
  120. logger.info(f"响应文本: {resp.text}")
  121. print(resp.text)
  122. else:
  123. logger.info(f"响应文本: {resp.text}")
  124. print(resp.text)
  125. if __name__ == "__main__":
  126. main()