advanced_acl.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. """高级 ACL 操作:按备注前缀管理分组的 dst_addr 集合。
  2. 实现 get_all_test_ips, add_ip, del_ip 三个高级接口,依赖低级接口
  3. 在 `acl_actions` 中提供的函数,以及 `auth_session.load_config`。
  4. 实现注意点:
  5. - 使用线程锁串行化写操作以降低竞态;写回失败则返回错误信息。
  6. - 不对 IP 段展开,直接以字符串方式处理 dst_addr 的逗号分隔项。
  7. """
  8. from __future__ import annotations
  9. import threading
  10. from typing import List, Dict, Tuple, Optional, Set
  11. from log_util import get_logger
  12. from auth_session import load_config
  13. import acl_actions
  14. logger = get_logger("advanced_acl")
  15. # 本地锁:序列化对 ACL 的修改请求以降低竞态
  16. _lock = threading.Lock()
  17. def _parse_dst_addr(dst: str) -> List[str]:
  18. if not dst:
  19. return []
  20. parts = [p.strip() for p in dst.split(",") if p.strip()]
  21. return parts
  22. def _join_dst_addr(parts: List[str]) -> str:
  23. return ",".join(parts)
  24. def _load_settings() -> Tuple[str, int]:
  25. cfg = load_config()
  26. prefix = cfg.get("test_prefix", "Test_")
  27. limit = int(cfg.get("rule_ip_limit", 1000))
  28. return prefix, limit
  29. def get_all_test_ips() -> List[str]:
  30. """返回所有以 test_prefix 前缀命名规则的合并去重 IP 列表(字符串形式)。"""
  31. prefix, _ = _load_settings()
  32. resp, data = acl_actions.get_acl_rules()
  33. ips: Set[str] = set()
  34. if not data:
  35. return []
  36. # Data.data is expected to be a list of rules; support a couple of shapes
  37. rules = data.get("Data", {}).get("data") if isinstance(data.get("Data"), dict) else data.get("Data")
  38. if rules is None:
  39. # try common structure
  40. rules = data.get("data") or []
  41. for rule in rules:
  42. comment = rule.get("comment", "")
  43. if not comment or not comment.startswith(prefix):
  44. continue
  45. dst = rule.get("dst_addr", "")
  46. for ip in _parse_dst_addr(dst):
  47. ips.add(ip)
  48. return sorted(ips)
  49. def _collect_prefixed_rules() -> List[Dict]:
  50. """Return list of rule dicts that have comment starting with prefix, sorted by numeric suffix."""
  51. prefix, _ = _load_settings()
  52. resp, data = acl_actions.get_acl_rules()
  53. if not data:
  54. return []
  55. rules = data.get("Data", {}).get("data") if isinstance(data.get("Data"), dict) else data.get("Data")
  56. if rules is None:
  57. rules = data.get("data") or []
  58. matched = []
  59. for rule in rules:
  60. comment = rule.get("comment", "")
  61. if comment and comment.startswith(prefix):
  62. matched.append(rule)
  63. # sort by numeric suffix if present, else lexicographically
  64. def _key(r: Dict):
  65. c = r.get("comment", "")
  66. s = c[len(prefix):]
  67. try:
  68. return int(s)
  69. except Exception:
  70. return float("inf")
  71. matched.sort(key=_key)
  72. return matched
  73. def add_ip(ip: str, comment: Optional[str] = None) -> Dict:
  74. """Add a single IP to the Test_* grouped rules.
  75. If `comment` is provided it will be used as the comment when creating a new rule.
  76. When adding to an existing Test_* rule, the existing rule's comment is preserved.
  77. 返回 dict: {"added": bool, "rule": str|None, "row_id": int|None, "message": str}
  78. """
  79. prefix, limit = _load_settings()
  80. # 快速存在性检查
  81. current = set(get_all_test_ips())
  82. if ip in current:
  83. return {"added": False, "message": "already exists", "rule": None, "row_id": None}
  84. # 串行化修改操作
  85. with _lock:
  86. # 重新获取并解析规则,避免竞态
  87. rules = _collect_prefixed_rules()
  88. # 尝试放入已有规则
  89. for rule in rules:
  90. rid = rule.get("id") or rule.get("ID") or rule.get("RowId")
  91. comment = rule.get("comment", "")
  92. dst = rule.get("dst_addr", "")
  93. parts = _parse_dst_addr(dst)
  94. if len(parts) + 1 <= limit:
  95. # 将 IP 附加并编辑
  96. if ip in parts:
  97. return {"added": False, "message": "already exists after recheck", "rule": comment, "row_id": rid}
  98. parts.append(ip)
  99. new_dst = _join_dst_addr(parts)
  100. try:
  101. resp, data = acl_actions.edit_acl_rule(rule_id=rid, dst_addr=new_dst, comment=comment)
  102. except Exception as e:
  103. logger.exception("edit_acl_rule 调用失败")
  104. return {"added": False, "message": f"edit failed: {e}", "rule": comment, "row_id": rid}
  105. # 检查响应是否成功(保守判断:HTTP 200 且返回非空 JSON)
  106. if resp.status_code == 200:
  107. return {"added": True, "rule": comment, "row_id": (data.get("RowId") if isinstance(data, dict) else None), "message": "ok"}
  108. else:
  109. return {"added": False, "message": f"edit returned {resp.status_code}", "rule": comment, "row_id": rid}
  110. # 如果没有可放的规则,则创建新的规则
  111. # 确定新规则编号
  112. if not rules:
  113. new_idx = 1
  114. else:
  115. # 尝试解析最大编号
  116. last = rules[-1].get("comment", "")
  117. try:
  118. last_idx = int(last[len(prefix):])
  119. new_idx = last_idx + 1
  120. except Exception:
  121. new_idx = len(rules) + 1
  122. # new_comment: by default use prefix-based Test_n, but allow caller to override by passing comment
  123. new_comment = comment if comment else f"{prefix}{new_idx}"
  124. try:
  125. resp, data = acl_actions.add_acl_rule(dst_addr=ip, comment=new_comment)
  126. except Exception as e:
  127. logger.exception("add_acl_rule 调用失败")
  128. return {"added": False, "message": f"add failed: {e}", "rule": new_comment, "row_id": None}
  129. if resp.status_code == 200:
  130. row_id = data.get("RowId") if isinstance(data, dict) else None
  131. return {"added": True, "rule": new_comment, "row_id": row_id, "message": "ok"}
  132. else:
  133. return {"added": False, "message": f"add returned {resp.status_code}", "rule": new_comment, "row_id": None}
  134. def del_ip(ip: str) -> Dict:
  135. """Delete IP from all Test_* rules.
  136. 返回 dict: {"deleted": bool, "affected": [ {"rule_id": id, "comment": str, "action": "edited"|"deleted"} ], "message": str}
  137. """
  138. prefix, _ = _load_settings()
  139. affected = []
  140. with _lock:
  141. rules = _collect_prefixed_rules()
  142. found = False
  143. for rule in rules:
  144. rid = rule.get("id") or rule.get("ID") or rule.get("RowId")
  145. comment = rule.get("comment", "")
  146. dst = rule.get("dst_addr", "")
  147. parts = _parse_dst_addr(dst)
  148. if ip in parts:
  149. found = True
  150. parts = [p for p in parts if p != ip]
  151. if parts:
  152. new_dst = _join_dst_addr(parts)
  153. try:
  154. resp, data = acl_actions.edit_acl_rule(rule_id=rid, dst_addr=new_dst, comment=comment)
  155. except Exception as e:
  156. logger.exception("edit_acl_rule 调用失败")
  157. affected.append({"rule_id": rid, "comment": comment, "action": "error", "message": str(e)})
  158. continue
  159. if resp.status_code == 200:
  160. affected.append({"rule_id": rid, "comment": comment, "action": "edited"})
  161. else:
  162. affected.append({"rule_id": rid, "comment": comment, "action": "error", "message": f"edit returned {resp.status_code}"})
  163. else:
  164. # 删除该规则
  165. try:
  166. resp, data = acl_actions.del_acl_rule(rule_id=rid)
  167. except Exception as e:
  168. logger.exception("del_acl_rule 调用失败")
  169. affected.append({"rule_id": rid, "comment": comment, "action": "error", "message": str(e)})
  170. continue
  171. if resp.status_code == 200:
  172. affected.append({"rule_id": rid, "comment": comment, "action": "deleted"})
  173. else:
  174. affected.append({"rule_id": rid, "comment": comment, "action": "error", "message": f"del returned {resp.status_code}"})
  175. if not found:
  176. return {"deleted": False, "affected": [], "message": "not found"}
  177. return {"deleted": True, "affected": affected, "message": "ok"}
  178. __all__ = ["get_all_test_ips", "add_ip", "del_ip"]