import json import sys from pathlib import Path from typing import Optional, Tuple import requests from log_util import get_logger, log_request from auth_session import load_config, get_sess_key, get_base_url, get_host ROOT = Path(__file__).resolve().parent CONFIG_PATH = ROOT / "config.json" def monitor_lanip(ip: str, interface: str = "all", proto: str = "all", maxnum: int = 100000, limit: str = "0,100000", timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]: """Call the monitor_lanip action and return (resp, json_data). Example payload: {"func_name":"monitor_lanip","action":"show","param":{"TYPE":"conn,conn_num","ip":"10.8.7.2","interface":"all","proto":"all","maxnum":500,"limit":"0,20","ORDER_BY":"","ORDER":""}} """ cfg = load_config() base = cfg.get("base_url", "").rstrip("/") url = f"{base}/Action/call" data = { "func_name": "monitor_lanip", "action": "show", "param": { "TYPE": "conn,conn_num", "ip": ip, "interface": interface, "proto": proto, "maxnum": maxnum, "limit": limit, "ORDER_BY": "", "ORDER": "", }, } logger = get_logger("analysis_connections") sess_key = get_sess_key() if not sess_key: raise ValueError("未找到 sess_key,请先登录") cookies = {"sess_key": sess_key.split("=", 1)[-1].rstrip(";")} # Prepare headers including Origin/Referer/Host derived from config base = get_base_url() host = get_host() headers = { "Origin": base + "/", "Referer": base + "/", "Host": host, "User-Agent": "python-requests/advanced-client", "Accept": "application/json, text/plain, */*", "Content-Type": "application/json;charset=UTF-8", } logger.debug(f"准备发送请求,URL: {url} payload: {data} headers: {headers}") resp = requests.post(url, json=data, cookies=cookies, headers=headers, timeout=timeout) try: log_request(logger, "monitor_lanip", url, data, resp) except Exception: logger.exception("记录请求/响应失败") try: json_data = resp.json() except ValueError: json_data = None return resp, json_data def main(): logger = get_logger("main") if len(sys.argv) < 2: print("用法: python analysis_connections.py ") sys.exit(2) ip = sys.argv[1] try: resp, data = monitor_lanip(ip) except FileNotFoundError as e: logger.error(f"配置错误: {e}") sys.exit(2) except ValueError as e: logger.error(f"会话错误: {e}") sys.exit(3) except requests.RequestException as e: logger.error(f"请求失败: {e}") sys.exit(1) logger.info(f"状态: {resp.status_code}") if data: try: pretty = json.dumps(data, ensure_ascii=False, indent=2) logger.info(f"响应 JSON:\n{pretty}") print(pretty) except Exception: logger.info(f"响应文本: {resp.text}") print(resp.text) else: logger.info(f"响应文本: {resp.text}") print(resp.text) if __name__ == "__main__": main() def find_high_connection_pairs(ip: str, threshold: int | None = None, limit: str = "0,100000") -> dict: """Fetch connections for `ip` and return pairs (src_port, dst_addr) whose count > threshold. Returns a dict with keys: threshold, total_reported, fetched, matches (list of dicts). Each match: {"src_port": ..., "dst_addr": ..., "count": ..., "samples": [..sample entries..]} If threshold is None, will read `conn_threshold` from config.json (default 20). """ from collections import Counter cfg = load_config() cfg_threshold = cfg.get("conn_threshold", 20) if threshold is None: threshold = int(cfg_threshold) # fetch with large limit (caller may specify) resp, data = monitor_lanip(ip, limit=limit) result = {"threshold": threshold, "total_reported": None, "fetched": 0, "matches": []} if not data: return result def find_high_connections_for_src_ports(ip: str, src_ports, threshold: int | None = None, limit: str = "0,100000") -> dict: """Analyze only the specified src_ports (single int or iterable) for given ip. Returns dict: {"threshold": n, "total_reported": x, "fetched": y, "by_src_port": {port: [{dst_addr, count, samples}, ...]}} """ from collections import Counter # normalize src_ports to a set of ints/strings if isinstance(src_ports, (int, str)): ports_set = {int(src_ports)} else: ports_set = {int(p) for p in src_ports} cfg = load_config() cfg_threshold = cfg.get("conn_threshold", 20) if threshold is None: threshold = int(cfg_threshold) resp, data = monitor_lanip(ip, limit=limit) result = {"threshold": threshold, "total_reported": None, "fetched": 0, "by_src_port": {}} if not data: return result total_reported = data.get("Data", {}).get("conn_num") conns = data.get("Data", {}).get("conn", []) or [] result["total_reported"] = total_reported result["fetched"] = len(conns) # group by src_port then count dst_addr grouped: dict[int, Counter] = {} samples: dict[tuple, list] = {} for entry in conns: try: src_port = int(entry.get("src_port")) except Exception: continue if src_port not in ports_set: continue dst = entry.get("dst_addr") key = (src_port, dst) grouped.setdefault(src_port, Counter())[dst] += 1 samples.setdefault(key, []).append(entry) for port in ports_set: counter = grouped.get(port, Counter()) matches = [] for dst, cnt in counter.items(): if cnt > threshold: sample_list = samples.get((port, dst), [])[:5] matches.append({"dst_addr": dst, "count": cnt, "samples": sample_list}) # sort matches.sort(key=lambda x: x["count"], reverse=True) result["by_src_port"][port] = matches return result total_reported = data.get("Data", {}).get("conn_num") conns = data.get("Data", {}).get("conn", []) or [] result["total_reported"] = total_reported result["fetched"] = len(conns) # Count (src_port, dst_addr) pairs pairs = Counter() samples: dict[tuple, list] = {} for entry in conns: src_port = entry.get("src_port") dst = entry.get("dst_addr") key = (src_port, dst) pairs[key] += 1 samples.setdefault(key, []).append(entry) for (src_port, dst), cnt in pairs.items(): if cnt > threshold: sample_list = samples.get((src_port, dst), [])[:5] result["matches"].append({ "src_port": src_port, "dst_addr": dst, "count": cnt, "samples": sample_list, }) # sort matches by count desc result["matches"].sort(key=lambda x: x["count"], reverse=True) return result def find_high_connections_for_src_ports(ip: str, src_ports, threshold: int | None = None, limit: str = "0,100000") -> dict: """Analyze only the specified src_ports (single int or iterable) for given ip. Returns dict: {"threshold": n, "total_reported": x, "fetched": y, "by_src_port": {port: [{dst_addr, count, samples}, ...]}} """ from collections import Counter # normalize src_ports to a set of ints/strings if isinstance(src_ports, (int, str)): ports_set = {int(src_ports)} else: ports_set = {int(p) for p in src_ports} cfg = load_config() cfg_threshold = cfg.get("conn_threshold", 20) if threshold is None: threshold = int(cfg_threshold) resp, data = monitor_lanip(ip, limit=limit) result = {"threshold": threshold, "total_reported": None, "fetched": 0, "by_src_port": {}} if not data: return result total_reported = data.get("Data", {}).get("conn_num") conns = data.get("Data", {}).get("conn", []) or [] result["total_reported"] = total_reported result["fetched"] = len(conns) # group by src_port then count dst_addr grouped: dict[int, Counter] = {} samples: dict[tuple, list] = {} for entry in conns: try: src_port = int(entry.get("src_port")) except Exception: continue if src_port not in ports_set: continue dst = entry.get("dst_addr") key = (src_port, dst) grouped.setdefault(src_port, Counter())[dst] += 1 samples.setdefault(key, []).append(entry) for port in ports_set: counter = grouped.get(port, Counter()) matches = [] for dst, cnt in counter.items(): if cnt > threshold: sample_list = samples.get((port, dst), [])[:5] matches.append({"dst_addr": dst, "count": cnt, "samples": sample_list}) # sort matches.sort(key=lambda x: x["count"], reverse=True) result["by_src_port"][port] = matches return result