Просмотр исходного кода

feat(analysis_connections): 新增高连接对分析功能

- 添加 find_high_connection_pairs 函数用于查找超过阈值的连接对
- 支持通过配置文件设置默认连接阈值
- 实现按源端口分组统计目标地址连接次数
- 提供样本数据以供调试和验证
- 支持限制返回结果数量及自定义查询范围
- 结果按连接次数降序排列便于识别异常连接
mcbaiyun 2 месяцев назад
Родитель
Сommit
952d184ecf
1 измененных файлов с 110 добавлено и 0 удалено
  1. 110 0
      analysis_connections.py

+ 110 - 0
analysis_connections.py

@@ -110,3 +110,113 @@ def main():
 if __name__ == "__main__":
 	main()
 
+
+def find_high_connection_pairs(ip: str, threshold: int | None = None, limit: str = "0,100000") -> dict:
+	"""Fetch connections for `ip` and return pairs (src_port, dst_addr) whose count > threshold.
+
+	Returns a dict with keys: threshold, total_reported, fetched, matches (list of dicts).
+	Each match: {"src_port": ..., "dst_addr": ..., "count": ..., "samples": [..sample entries..]}
+
+	If threshold is None, will read `conn_threshold` from config.json (default 20).
+	"""
+	from collections import Counter
+
+	cfg = load_config()
+	cfg_threshold = cfg.get("conn_threshold", 20)
+	if threshold is None:
+		threshold = int(cfg_threshold)
+
+	# fetch with large limit (caller may specify)
+	resp, data = monitor_lanip(ip, limit=limit)
+	result = {"threshold": threshold, "total_reported": None, "fetched": 0, "matches": []}
+
+	if not data:
+		return result
+
+
+	def find_high_connections_for_src_ports(ip: str, src_ports, threshold: int | None = None, limit: str = "0,100000") -> dict:
+		"""Analyze only the specified src_ports (single int or iterable) for given ip.
+
+		Returns dict: {"threshold": n, "total_reported": x, "fetched": y, "by_src_port": {port: [{dst_addr, count, samples}, ...]}}
+		"""
+		from collections import Counter
+
+		# normalize src_ports to a set of ints/strings
+		if isinstance(src_ports, (int, str)):
+			ports_set = {int(src_ports)}
+		else:
+			ports_set = {int(p) for p in src_ports}
+
+		cfg = load_config()
+		cfg_threshold = cfg.get("conn_threshold", 20)
+		if threshold is None:
+			threshold = int(cfg_threshold)
+
+		resp, data = monitor_lanip(ip, limit=limit)
+		result = {"threshold": threshold, "total_reported": None, "fetched": 0, "by_src_port": {}}
+
+		if not data:
+			return result
+
+		total_reported = data.get("Data", {}).get("conn_num")
+		conns = data.get("Data", {}).get("conn", []) or []
+		result["total_reported"] = total_reported
+		result["fetched"] = len(conns)
+
+		# group by src_port then count dst_addr
+		grouped: dict[int, Counter] = {}
+		samples: dict[tuple, list] = {}
+		for entry in conns:
+			try:
+				src_port = int(entry.get("src_port"))
+			except Exception:
+				continue
+			if src_port not in ports_set:
+				continue
+			dst = entry.get("dst_addr")
+			key = (src_port, dst)
+			grouped.setdefault(src_port, Counter())[dst] += 1
+			samples.setdefault(key, []).append(entry)
+
+		for port in ports_set:
+			counter = grouped.get(port, Counter())
+			matches = []
+			for dst, cnt in counter.items():
+				if cnt > threshold:
+					sample_list = samples.get((port, dst), [])[:5]
+					matches.append({"dst_addr": dst, "count": cnt, "samples": sample_list})
+			# sort
+			matches.sort(key=lambda x: x["count"], reverse=True)
+			result["by_src_port"][port] = matches
+
+		return result
+
+	total_reported = data.get("Data", {}).get("conn_num")
+	conns = data.get("Data", {}).get("conn", []) or []
+	result["total_reported"] = total_reported
+	result["fetched"] = len(conns)
+
+	# Count (src_port, dst_addr) pairs
+	pairs = Counter()
+	samples: dict[tuple, list] = {}
+	for entry in conns:
+		src_port = entry.get("src_port")
+		dst = entry.get("dst_addr")
+		key = (src_port, dst)
+		pairs[key] += 1
+		samples.setdefault(key, []).append(entry)
+
+	for (src_port, dst), cnt in pairs.items():
+		if cnt > threshold:
+			sample_list = samples.get((src_port, dst), [])[:5]
+			result["matches"].append({
+				"src_port": src_port,
+				"dst_addr": dst,
+				"count": cnt,
+				"samples": sample_list,
+			})
+
+	# sort matches by count desc
+	result["matches"].sort(key=lambda x: x["count"], reverse=True)
+	return result
+