"""Entry point to call various API actions.""" import sys from auth_session import login, set_sess_key from log_util import get_logger from acl_actions import get_acl_rules, add_acl_rule, edit_acl_rule, del_acl_rule import json from advanced_acl import get_all_test_ips, add_ip, del_ip from analysis_connections import monitor_lanip import gui from protector import ProtectorRunner def main(): logger = get_logger("main") logger.info("开始 main()") # If --debug passed, allocate a console so logs and prints show up. # Default behavior: start GUI. If user passes --nogui, run headless flow. if "--debug" in sys.argv: # allocate a fresh console on Windows try: import ctypes ctypes.windll.kernel32.AllocConsole() # reopen std streams to the new console import sys as _sys _sys.stdout = open("CONOUT$", "w", encoding="utf-8", buffering=1) _sys.stderr = open("CONOUT$", "w", encoding="utf-8", buffering=1) except Exception: # best-effort: continue if console allocation fails pass # Default: launch GUI unless explicitly requested not to if "--nogui" not in sys.argv: pr = ProtectorRunner() gui.run_gui(protector_runner=pr) pr.stop() return try: resp, sess_cookie = login() except FileNotFoundError as e: logger.error(f"配置错误: {e}") return except Exception as e: logger.exception(f"请求或其他错误: {e}") return logger.info(f"已调用: login() | 参数: 默认 payload | 状态: {resp.status_code}") if sess_cookie: logger.info(f"返回的 sess_key: {sess_cookie}") # 保存为全局会话状态,供其他接口调用时使用 set_sess_key(sess_cookie) print(f"sess_key: {sess_cookie}") # # 集成测试:演示高级接口的使用流程 # test_ip = "1.5.6.7" # # 1) 列出当前集合 # try: # ips = get_all_test_ips() # logger.info(f"当前 Test_* IP 数量: {len(ips)}") # print(f"当前 Test_* IP 数量: {len(ips)}") # except Exception as e: # logger.exception(f"获取 Test IP 列表失败: {e}") # print(f"获取 Test IP 列表失败: {e}") # # 2) 添加测试 IP # try: # res = add_ip(test_ip) # logger.info(f"add_ip({test_ip}) -> {res}") # print(f"add_ip -> {res}") # except Exception as e: # logger.exception(f"添加测试 IP 失败: {e}") # print(f"添加测试 IP 失败: {e}") # # 3) 再次列出以验证添加 # try: # ips_after = get_all_test_ips() # logger.info(f"添加后 Test_* IP 数量: {len(ips_after)}") # print(f"添加后 Test_* IP 数量: {len(ips_after)}") # except Exception as e: # logger.exception(f"获取添加后列表失败: {e}") # print(f"获取添加后列表失败: {e}") # # 4) 删除测试 IP # try: # res_del = del_ip(test_ip) # logger.info(f"del_ip({test_ip}) -> {res_del}") # print(f"del_ip -> {res_del}") # except Exception as e: # logger.exception(f"删除测试 IP 失败: {e}") # print(f"删除测试 IP 失败: {e}") # # 5) 最终验证 # try: # ips_final = get_all_test_ips() # logger.info(f"最终 Test_* IP 数量: {len(ips_final)}") # print(f"最终 Test_* IP 数量: {len(ips_final)}") # except Exception as e: # logger.exception(f"获取最终列表失败: {e}") # print(f"获取最终列表失败: {e}") # 旧的低级测试保留(按需) # 测试ACL接口 # try: # acl_resp, acl_data = get_acl_rules() # logger.info(f"已调用: get_acl_rules() | 状态: {acl_resp.status_code}") # if acl_data: # logger.info(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}") # print(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}") # else: # logger.warning("未获取到ACL数据") # print("未获取到ACL数据") # except Exception as e: # logger.exception(f"测试ACL接口失败: {e}") # print(f"测试ACL接口失败: {e}") # # 测试添加ACL规则 # try: # add_resp, add_data = add_acl_rule(dst_addr="1.2.3.4,2.4.5.6", comment="remark_kkkkkk") # logger.info(f"已调用: add_acl_rule() | 状态: {add_resp.status_code}") # if add_data: # row_id = add_data.get('RowId') # logger.info(f"新增ACL规则ID: {row_id}") # print(f"新增ACL规则ID: {row_id}") # else: # logger.warning("未获取到添加结果") # print("未获取到添加结果") # except Exception as e: # logger.exception(f"测试添加ACL规则失败: {e}") # print(f"测试添加ACL规则失败: {e}") # # 测试编辑ACL规则 # try: # edit_resp, edit_data = edit_acl_rule(rule_id=14, dst_addr="1.2.3.4", comment="remark_kkkkkk") # logger.info(f"已调用: edit_acl_rule() | 状态: {edit_resp.status_code}") # if edit_data: # logger.info("ACL规则编辑成功") # print("ACL规则编辑成功") # else: # logger.warning("未获取到编辑结果") # print("未获取到编辑结果") # except Exception as e: # logger.exception(f"测试编辑ACL规则失败: {e}") # print(f"测试编辑ACL规则失败: {e}") # # 测试删除ACL规则 # try: # del_resp, del_data = del_acl_rule(rule_id=14) # logger.info(f"已调用: del_acl_rule() | 状态: {del_resp.status_code}") # if del_data: # logger.info("ACL规则删除成功") # print("ACL规则删除成功") # else: # logger.warning("未获取到删除结果") # print("未获取到删除结果") # except Exception as e: # logger.exception(f"测试删除ACL规则失败: {e}") # print(f"测试删除ACL规则失败: {e}") # 6) 演示 monitor_lanip 接口 try: conn_ip = "10.8.7.2" resp_conn, data_conn = monitor_lanip(conn_ip) logger.info(f"已调用: monitor_lanip({conn_ip}) | 状态: {resp_conn.status_code}") if data_conn: print(json.dumps(data_conn, ensure_ascii=False, indent=2)) else: print(resp_conn.text) except Exception as e: logger.exception(f"测试 monitor_lanip 失败: {e}") print(f"测试 monitor_lanip 失败: {e}") else: logger.warning("未在响应中找到 sess_key") print("未在响应中找到 sess_key") if __name__ == "__main__": main()