| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- import json
- import sys
- from pathlib import Path
- from typing import Optional, Tuple
- import requests
- from log_util import get_logger, log_request
- from auth_session import load_config, get_sess_key, get_base_url, get_host
- ROOT = Path(__file__).resolve().parent
- CONFIG_PATH = ROOT / "config.json"
- def monitor_lanip(ip: str, interface: str = "all", proto: str = "all", maxnum: int = 100000, limit: str = "0,100000", timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
- """Call the monitor_lanip action and return (resp, json_data).
- Example payload:
- {"func_name":"monitor_lanip","action":"show","param":{"TYPE":"conn,conn_num","ip":"10.8.7.2","interface":"all","proto":"all","maxnum":500,"limit":"0,20","ORDER_BY":"","ORDER":""}}
- """
- cfg = load_config()
- base = cfg.get("base_url", "").rstrip("/")
- url = f"{base}/Action/call"
- data = {
- "func_name": "monitor_lanip",
- "action": "show",
- "param": {
- "TYPE": "conn,conn_num",
- "ip": ip,
- "interface": interface,
- "proto": proto,
- "maxnum": maxnum,
- "limit": limit,
- "ORDER_BY": "",
- "ORDER": "",
- },
- }
- logger = get_logger("analysis_connections")
- sess_key = get_sess_key()
- if not sess_key:
- raise ValueError("未找到 sess_key,请先登录")
- cookies = {"sess_key": sess_key.split("=", 1)[-1].rstrip(";")}
- # Prepare headers including Origin/Referer/Host derived from config
- base = get_base_url()
- host = get_host()
- headers = {
- "Origin": base + "/",
- "Referer": base + "/",
- "Host": host,
- "User-Agent": "python-requests/advanced-client",
- "Accept": "application/json, text/plain, */*",
- "Content-Type": "application/json;charset=UTF-8",
- }
- logger.debug(f"准备发送请求,URL: {url} payload: {data} headers: {headers}")
- resp = requests.post(url, json=data, cookies=cookies, headers=headers, timeout=timeout)
- try:
- log_request(logger, "monitor_lanip", url, data, resp)
- except Exception:
- logger.exception("记录请求/响应失败")
- try:
- json_data = resp.json()
- except ValueError:
- json_data = None
- return resp, json_data
- def main():
- logger = get_logger("main")
- if len(sys.argv) < 2:
- print("用法: python analysis_connections.py <ip>")
- sys.exit(2)
- ip = sys.argv[1]
- try:
- resp, data = monitor_lanip(ip)
- except FileNotFoundError as e:
- logger.error(f"配置错误: {e}")
- sys.exit(2)
- except ValueError as e:
- logger.error(f"会话错误: {e}")
- sys.exit(3)
- except requests.RequestException as e:
- logger.error(f"请求失败: {e}")
- sys.exit(1)
- logger.info(f"状态: {resp.status_code}")
- if data:
- try:
- pretty = json.dumps(data, ensure_ascii=False, indent=2)
- logger.info(f"响应 JSON:\n{pretty}")
- print(pretty)
- except Exception:
- logger.info(f"响应文本: {resp.text}")
- print(resp.text)
- else:
- logger.info(f"响应文本: {resp.text}")
- print(resp.text)
- if __name__ == "__main__":
- main()
- def find_high_connection_pairs(ip: str, threshold: int | None = None, limit: str = "0,100000") -> dict:
- """Fetch connections for `ip` and return pairs (src_port, dst_addr) whose count > threshold.
- Returns a dict with keys: threshold, total_reported, fetched, matches (list of dicts).
- Each match: {"src_port": ..., "dst_addr": ..., "count": ..., "samples": [..sample entries..]}
- If threshold is None, will read `conn_threshold` from config.json (default 20).
- """
- from collections import Counter
- cfg = load_config()
- cfg_threshold = cfg.get("conn_threshold", 20)
- if threshold is None:
- threshold = int(cfg_threshold)
- # fetch with large limit (caller may specify)
- resp, data = monitor_lanip(ip, limit=limit)
- result = {"threshold": threshold, "total_reported": None, "fetched": 0, "matches": []}
- if not data:
- return result
- def find_high_connections_for_src_ports(ip: str, src_ports, threshold: int | None = None, limit: str = "0,100000") -> dict:
- """Analyze only the specified src_ports (single int or iterable) for given ip.
- Returns dict: {"threshold": n, "total_reported": x, "fetched": y, "by_src_port": {port: [{dst_addr, count, samples}, ...]}}
- """
- from collections import Counter
- # normalize src_ports to a set of ints/strings
- if isinstance(src_ports, (int, str)):
- ports_set = {int(src_ports)}
- else:
- ports_set = {int(p) for p in src_ports}
- cfg = load_config()
- cfg_threshold = cfg.get("conn_threshold", 20)
- if threshold is None:
- threshold = int(cfg_threshold)
- resp, data = monitor_lanip(ip, limit=limit)
- result = {"threshold": threshold, "total_reported": None, "fetched": 0, "by_src_port": {}}
- if not data:
- return result
- total_reported = data.get("Data", {}).get("conn_num")
- conns = data.get("Data", {}).get("conn", []) or []
- result["total_reported"] = total_reported
- result["fetched"] = len(conns)
- # group by src_port then count dst_addr
- grouped: dict[int, Counter] = {}
- samples: dict[tuple, list] = {}
- for entry in conns:
- try:
- src_port = int(entry.get("src_port"))
- except Exception:
- continue
- if src_port not in ports_set:
- continue
- dst = entry.get("dst_addr")
- key = (src_port, dst)
- grouped.setdefault(src_port, Counter())[dst] += 1
- samples.setdefault(key, []).append(entry)
- for port in ports_set:
- counter = grouped.get(port, Counter())
- matches = []
- for dst, cnt in counter.items():
- if cnt > threshold:
- sample_list = samples.get((port, dst), [])[:5]
- matches.append({"dst_addr": dst, "count": cnt, "samples": sample_list})
- # sort
- matches.sort(key=lambda x: x["count"], reverse=True)
- result["by_src_port"][port] = matches
- return result
- total_reported = data.get("Data", {}).get("conn_num")
- conns = data.get("Data", {}).get("conn", []) or []
- result["total_reported"] = total_reported
- result["fetched"] = len(conns)
- # Count (src_port, dst_addr) pairs
- pairs = Counter()
- samples: dict[tuple, list] = {}
- for entry in conns:
- src_port = entry.get("src_port")
- dst = entry.get("dst_addr")
- key = (src_port, dst)
- pairs[key] += 1
- samples.setdefault(key, []).append(entry)
- for (src_port, dst), cnt in pairs.items():
- if cnt > threshold:
- sample_list = samples.get((src_port, dst), [])[:5]
- result["matches"].append({
- "src_port": src_port,
- "dst_addr": dst,
- "count": cnt,
- "samples": sample_list,
- })
- # sort matches by count desc
- result["matches"].sort(key=lambda x: x["count"], reverse=True)
- return result
- def find_high_connections_for_src_ports(ip: str, src_ports, threshold: int | None = None, limit: str = "0,100000") -> dict:
- """Analyze only the specified src_ports (single int or iterable) for given ip.
- Returns dict: {"threshold": n, "total_reported": x, "fetched": y, "by_src_port": {port: [{dst_addr, count, samples}, ...]}}
- """
- from collections import Counter
- # normalize src_ports to a set of ints/strings
- if isinstance(src_ports, (int, str)):
- ports_set = {int(src_ports)}
- else:
- ports_set = {int(p) for p in src_ports}
- cfg = load_config()
- cfg_threshold = cfg.get("conn_threshold", 20)
- if threshold is None:
- threshold = int(cfg_threshold)
- resp, data = monitor_lanip(ip, limit=limit)
- result = {"threshold": threshold, "total_reported": None, "fetched": 0, "by_src_port": {}}
- if not data:
- return result
- total_reported = data.get("Data", {}).get("conn_num")
- conns = data.get("Data", {}).get("conn", []) or []
- result["total_reported"] = total_reported
- result["fetched"] = len(conns)
- # group by src_port then count dst_addr
- grouped: dict[int, Counter] = {}
- samples: dict[tuple, list] = {}
- for entry in conns:
- try:
- src_port = int(entry.get("src_port"))
- except Exception:
- continue
- if src_port not in ports_set:
- continue
- dst = entry.get("dst_addr")
- key = (src_port, dst)
- grouped.setdefault(src_port, Counter())[dst] += 1
- samples.setdefault(key, []).append(entry)
- for port in ports_set:
- counter = grouped.get(port, Counter())
- matches = []
- for dst, cnt in counter.items():
- if cnt > threshold:
- sample_list = samples.get((port, dst), [])[:5]
- matches.append({"dst_addr": dst, "count": cnt, "samples": sample_list})
- # sort
- matches.sort(key=lambda x: x["count"], reverse=True)
- result["by_src_port"][port] = matches
- return result
|