analysis_connections.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import json
  2. import sys
  3. from pathlib import Path
  4. from typing import Optional, Tuple
  5. import requests
  6. from log_util import get_logger, log_request
  7. from auth_session import load_config, get_sess_key, get_base_url, get_host
  8. ROOT = Path(__file__).resolve().parent
  9. CONFIG_PATH = ROOT / "config.json"
  10. def monitor_lanip(ip: str, interface: str = "all", proto: str = "all", maxnum: int = 100000, limit: str = "0,100000", timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
  11. """Call the monitor_lanip action and return (resp, json_data).
  12. Example payload:
  13. {"func_name":"monitor_lanip","action":"show","param":{"TYPE":"conn,conn_num","ip":"10.8.7.2","interface":"all","proto":"all","maxnum":500,"limit":"0,20","ORDER_BY":"","ORDER":""}}
  14. """
  15. cfg = load_config()
  16. base = cfg.get("base_url", "").rstrip("/")
  17. url = f"{base}/Action/call"
  18. data = {
  19. "func_name": "monitor_lanip",
  20. "action": "show",
  21. "param": {
  22. "TYPE": "conn,conn_num",
  23. "ip": ip,
  24. "interface": interface,
  25. "proto": proto,
  26. "maxnum": maxnum,
  27. "limit": limit,
  28. "ORDER_BY": "",
  29. "ORDER": "",
  30. },
  31. }
  32. logger = get_logger("analysis_connections")
  33. sess_key = get_sess_key()
  34. if not sess_key:
  35. raise ValueError("未找到 sess_key,请先登录")
  36. cookies = {"sess_key": sess_key.split("=", 1)[-1].rstrip(";")}
  37. # Prepare headers including Origin/Referer/Host derived from config
  38. base = get_base_url()
  39. host = get_host()
  40. headers = {
  41. "Origin": base + "/",
  42. "Referer": base + "/",
  43. "Host": host,
  44. "User-Agent": "python-requests/advanced-client",
  45. "Accept": "application/json, text/plain, */*",
  46. "Content-Type": "application/json;charset=UTF-8",
  47. }
  48. logger.debug(f"准备发送请求,URL: {url} payload: {data} headers: {headers}")
  49. resp = requests.post(url, json=data, cookies=cookies, headers=headers, timeout=timeout)
  50. try:
  51. log_request(logger, "monitor_lanip", url, data, resp)
  52. except Exception:
  53. logger.exception("记录请求/响应失败")
  54. try:
  55. json_data = resp.json()
  56. except ValueError:
  57. json_data = None
  58. return resp, json_data
  59. def main():
  60. logger = get_logger("main")
  61. if len(sys.argv) < 2:
  62. print("用法: python analysis_connections.py <ip>")
  63. sys.exit(2)
  64. ip = sys.argv[1]
  65. try:
  66. resp, data = monitor_lanip(ip)
  67. except FileNotFoundError as e:
  68. logger.error(f"配置错误: {e}")
  69. sys.exit(2)
  70. except ValueError as e:
  71. logger.error(f"会话错误: {e}")
  72. sys.exit(3)
  73. except requests.RequestException as e:
  74. logger.error(f"请求失败: {e}")
  75. sys.exit(1)
  76. logger.info(f"状态: {resp.status_code}")
  77. if data:
  78. try:
  79. pretty = json.dumps(data, ensure_ascii=False, indent=2)
  80. logger.info(f"响应 JSON:\n{pretty}")
  81. print(pretty)
  82. except Exception:
  83. logger.info(f"响应文本: {resp.text}")
  84. print(resp.text)
  85. else:
  86. logger.info(f"响应文本: {resp.text}")
  87. print(resp.text)
  88. if __name__ == "__main__":
  89. main()
  90. def find_high_connection_pairs(ip: str, threshold: int | None = None, limit: str = "0,100000") -> dict:
  91. """Fetch connections for `ip` and return pairs (src_port, dst_addr) whose count > threshold.
  92. Returns a dict with keys: threshold, total_reported, fetched, matches (list of dicts).
  93. Each match: {"src_port": ..., "dst_addr": ..., "count": ..., "samples": [..sample entries..]}
  94. If threshold is None, will read `conn_threshold` from config.json (default 20).
  95. """
  96. from collections import Counter
  97. cfg = load_config()
  98. cfg_threshold = cfg.get("conn_threshold", 20)
  99. if threshold is None:
  100. threshold = int(cfg_threshold)
  101. # fetch with large limit (caller may specify)
  102. resp, data = monitor_lanip(ip, limit=limit)
  103. result = {"threshold": threshold, "total_reported": None, "fetched": 0, "matches": []}
  104. if not data:
  105. return result
  106. def find_high_connections_for_src_ports(ip: str, src_ports, threshold: int | None = None, limit: str = "0,100000") -> dict:
  107. """Analyze only the specified src_ports (single int or iterable) for given ip.
  108. Returns dict: {"threshold": n, "total_reported": x, "fetched": y, "by_src_port": {port: [{dst_addr, count, samples}, ...]}}
  109. """
  110. from collections import Counter
  111. # normalize src_ports to a set of ints/strings
  112. if isinstance(src_ports, (int, str)):
  113. ports_set = {int(src_ports)}
  114. else:
  115. ports_set = {int(p) for p in src_ports}
  116. cfg = load_config()
  117. cfg_threshold = cfg.get("conn_threshold", 20)
  118. if threshold is None:
  119. threshold = int(cfg_threshold)
  120. resp, data = monitor_lanip(ip, limit=limit)
  121. result = {"threshold": threshold, "total_reported": None, "fetched": 0, "by_src_port": {}}
  122. if not data:
  123. return result
  124. total_reported = data.get("Data", {}).get("conn_num")
  125. conns = data.get("Data", {}).get("conn", []) or []
  126. result["total_reported"] = total_reported
  127. result["fetched"] = len(conns)
  128. # group by src_port then count dst_addr
  129. grouped: dict[int, Counter] = {}
  130. samples: dict[tuple, list] = {}
  131. for entry in conns:
  132. try:
  133. src_port = int(entry.get("src_port"))
  134. except Exception:
  135. continue
  136. if src_port not in ports_set:
  137. continue
  138. dst = entry.get("dst_addr")
  139. key = (src_port, dst)
  140. grouped.setdefault(src_port, Counter())[dst] += 1
  141. samples.setdefault(key, []).append(entry)
  142. for port in ports_set:
  143. counter = grouped.get(port, Counter())
  144. matches = []
  145. for dst, cnt in counter.items():
  146. if cnt > threshold:
  147. sample_list = samples.get((port, dst), [])[:5]
  148. matches.append({"dst_addr": dst, "count": cnt, "samples": sample_list})
  149. # sort
  150. matches.sort(key=lambda x: x["count"], reverse=True)
  151. result["by_src_port"][port] = matches
  152. return result
  153. total_reported = data.get("Data", {}).get("conn_num")
  154. conns = data.get("Data", {}).get("conn", []) or []
  155. result["total_reported"] = total_reported
  156. result["fetched"] = len(conns)
  157. # Count (src_port, dst_addr) pairs
  158. pairs = Counter()
  159. samples: dict[tuple, list] = {}
  160. for entry in conns:
  161. src_port = entry.get("src_port")
  162. dst = entry.get("dst_addr")
  163. key = (src_port, dst)
  164. pairs[key] += 1
  165. samples.setdefault(key, []).append(entry)
  166. for (src_port, dst), cnt in pairs.items():
  167. if cnt > threshold:
  168. sample_list = samples.get((src_port, dst), [])[:5]
  169. result["matches"].append({
  170. "src_port": src_port,
  171. "dst_addr": dst,
  172. "count": cnt,
  173. "samples": sample_list,
  174. })
  175. # sort matches by count desc
  176. result["matches"].sort(key=lambda x: x["count"], reverse=True)
  177. return result