advanced_acl.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. """高级 ACL 操作:按备注前缀管理分组的 dst_addr 集合。
  2. 实现 get_all_test_ips, add_ip, del_ip 三个高级接口,依赖低级接口
  3. 在 `acl_actions` 中提供的函数,以及 `auth_session.load_config`。
  4. 实现注意点:
  5. - 使用线程锁串行化写操作以降低竞态;写回失败则返回错误信息。
  6. - 不对 IP 段展开,直接以字符串方式处理 dst_addr 的逗号分隔项。
  7. """
  8. from __future__ import annotations
  9. import threading
  10. from typing import List, Dict, Tuple, Optional, Set
  11. from log_util import get_logger
  12. from auth_session import load_config
  13. import acl_actions
  14. logger = get_logger("advanced_acl")
  15. # 本地锁:序列化对 ACL 的修改请求以降低竞态
  16. _lock = threading.Lock()
  17. def _parse_dst_addr(dst: str) -> List[str]:
  18. if not dst:
  19. return []
  20. parts = [p.strip() for p in dst.split(",") if p.strip()]
  21. return parts
  22. def _join_dst_addr(parts: List[str]) -> str:
  23. return ",".join(parts)
  24. def _load_settings() -> Tuple[str, int]:
  25. cfg = load_config()
  26. prefix = cfg.get("test_prefix", "Test_")
  27. limit = int(cfg.get("rule_ip_limit", 1000))
  28. return prefix, limit
  29. def get_all_test_ips() -> List[str]:
  30. """返回所有以 test_prefix 前缀命名规则的合并去重 IP 列表(字符串形式)。"""
  31. prefix, _ = _load_settings()
  32. resp, data = acl_actions.get_acl_rules()
  33. ips: Set[str] = set()
  34. if not data:
  35. return []
  36. # Data.data is expected to be a list of rules; support a couple of shapes
  37. rules = data.get("Data", {}).get("data") if isinstance(data.get("Data"), dict) else data.get("Data")
  38. if rules is None:
  39. # try common structure
  40. rules = data.get("data") or []
  41. for rule in rules:
  42. comment = rule.get("comment", "")
  43. if not comment or not comment.startswith(prefix):
  44. continue
  45. dst = rule.get("dst_addr", "")
  46. for ip in _parse_dst_addr(dst):
  47. ips.add(ip)
  48. return sorted(ips)
  49. def _collect_prefixed_rules() -> List[Dict]:
  50. """Return list of rule dicts that have comment starting with prefix, sorted by numeric suffix."""
  51. prefix, _ = _load_settings()
  52. resp, data = acl_actions.get_acl_rules()
  53. if not data:
  54. return []
  55. rules = data.get("Data", {}).get("data") if isinstance(data.get("Data"), dict) else data.get("Data")
  56. if rules is None:
  57. rules = data.get("data") or []
  58. matched = []
  59. for rule in rules:
  60. comment = rule.get("comment", "")
  61. if comment and comment.startswith(prefix):
  62. matched.append(rule)
  63. # sort by numeric suffix if present, else lexicographically
  64. def _key(r: Dict):
  65. c = r.get("comment", "")
  66. s = c[len(prefix):]
  67. try:
  68. return int(s)
  69. except Exception:
  70. return float("inf")
  71. matched.sort(key=_key)
  72. return matched
  73. def add_ip(ip: str) -> Dict:
  74. """Add a single IP to the Test_* grouped rules.
  75. 返回 dict: {"added": bool, "rule": str|None, "row_id": int|None, "message": str}
  76. """
  77. prefix, limit = _load_settings()
  78. # 快速存在性检查
  79. current = set(get_all_test_ips())
  80. if ip in current:
  81. return {"added": False, "message": "already exists", "rule": None, "row_id": None}
  82. # 串行化修改操作
  83. with _lock:
  84. # 重新获取并解析规则,避免竞态
  85. rules = _collect_prefixed_rules()
  86. # 尝试放入已有规则
  87. for rule in rules:
  88. rid = rule.get("id") or rule.get("ID") or rule.get("RowId")
  89. comment = rule.get("comment", "")
  90. dst = rule.get("dst_addr", "")
  91. parts = _parse_dst_addr(dst)
  92. if len(parts) + 1 <= limit:
  93. # 将 IP 附加并编辑
  94. if ip in parts:
  95. return {"added": False, "message": "already exists after recheck", "rule": comment, "row_id": rid}
  96. parts.append(ip)
  97. new_dst = _join_dst_addr(parts)
  98. try:
  99. resp, data = acl_actions.edit_acl_rule(rule_id=rid, dst_addr=new_dst, comment=comment)
  100. except Exception as e:
  101. logger.exception("edit_acl_rule 调用失败")
  102. return {"added": False, "message": f"edit failed: {e}", "rule": comment, "row_id": rid}
  103. # 检查响应是否成功(保守判断:HTTP 200 且返回非空 JSON)
  104. if resp.status_code == 200:
  105. return {"added": True, "rule": comment, "row_id": (data.get("RowId") if isinstance(data, dict) else None), "message": "ok"}
  106. else:
  107. return {"added": False, "message": f"edit returned {resp.status_code}", "rule": comment, "row_id": rid}
  108. # 如果没有可放的规则,则创建新的规则
  109. # 确定新规则编号
  110. if not rules:
  111. new_idx = 1
  112. else:
  113. # 尝试解析最大编号
  114. last = rules[-1].get("comment", "")
  115. try:
  116. last_idx = int(last[len(prefix):])
  117. new_idx = last_idx + 1
  118. except Exception:
  119. new_idx = len(rules) + 1
  120. new_comment = f"{prefix}{new_idx}"
  121. try:
  122. resp, data = acl_actions.add_acl_rule(dst_addr=ip, comment=new_comment)
  123. except Exception as e:
  124. logger.exception("add_acl_rule 调用失败")
  125. return {"added": False, "message": f"add failed: {e}", "rule": new_comment, "row_id": None}
  126. if resp.status_code == 200:
  127. row_id = data.get("RowId") if isinstance(data, dict) else None
  128. return {"added": True, "rule": new_comment, "row_id": row_id, "message": "ok"}
  129. else:
  130. return {"added": False, "message": f"add returned {resp.status_code}", "rule": new_comment, "row_id": None}
  131. def del_ip(ip: str) -> Dict:
  132. """Delete IP from all Test_* rules.
  133. 返回 dict: {"deleted": bool, "affected": [ {"rule_id": id, "comment": str, "action": "edited"|"deleted"} ], "message": str}
  134. """
  135. prefix, _ = _load_settings()
  136. affected = []
  137. with _lock:
  138. rules = _collect_prefixed_rules()
  139. found = False
  140. for rule in rules:
  141. rid = rule.get("id") or rule.get("ID") or rule.get("RowId")
  142. comment = rule.get("comment", "")
  143. dst = rule.get("dst_addr", "")
  144. parts = _parse_dst_addr(dst)
  145. if ip in parts:
  146. found = True
  147. parts = [p for p in parts if p != ip]
  148. if parts:
  149. new_dst = _join_dst_addr(parts)
  150. try:
  151. resp, data = acl_actions.edit_acl_rule(rule_id=rid, dst_addr=new_dst, comment=comment)
  152. except Exception as e:
  153. logger.exception("edit_acl_rule 调用失败")
  154. affected.append({"rule_id": rid, "comment": comment, "action": "error", "message": str(e)})
  155. continue
  156. if resp.status_code == 200:
  157. affected.append({"rule_id": rid, "comment": comment, "action": "edited"})
  158. else:
  159. affected.append({"rule_id": rid, "comment": comment, "action": "error", "message": f"edit returned {resp.status_code}"})
  160. else:
  161. # 删除该规则
  162. try:
  163. resp, data = acl_actions.del_acl_rule(rule_id=rid)
  164. except Exception as e:
  165. logger.exception("del_acl_rule 调用失败")
  166. affected.append({"rule_id": rid, "comment": comment, "action": "error", "message": str(e)})
  167. continue
  168. if resp.status_code == 200:
  169. affected.append({"rule_id": rid, "comment": comment, "action": "deleted"})
  170. else:
  171. affected.append({"rule_id": rid, "comment": comment, "action": "error", "message": f"del returned {resp.status_code}"})
  172. if not found:
  173. return {"deleted": False, "affected": [], "message": "not found"}
  174. return {"deleted": True, "affected": affected, "message": "ok"}
  175. __all__ = ["get_all_test_ips", "add_ip", "del_ip"]