| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- """Entry point to call various API actions."""
- import sys
- from auth_session import login, set_sess_key
- from log_util import get_logger
- from acl_actions import get_acl_rules, add_acl_rule, edit_acl_rule, del_acl_rule
- import json
- from advanced_acl import get_all_test_ips, add_ip, del_ip
- from analysis_connections import monitor_lanip
- import gui
- from protector import ProtectorRunner
- def main():
- logger = get_logger("main")
- logger.info("开始 main()")
- # If --gui passed, launch GUI and start protector runner
- if "--gui" in sys.argv:
- # start background protector
- pr = ProtectorRunner()
- pr.start()
- # launch GUI (blocking) and pass protector runner so GUI can display countdown
- gui.run_gui(protector_runner=pr)
- # when GUI exits, stop protector
- pr.stop()
- return
- try:
- resp, sess_cookie = login()
- except FileNotFoundError as e:
- logger.error(f"配置错误: {e}")
- return
- except Exception as e:
- logger.exception(f"请求或其他错误: {e}")
- return
- logger.info(f"已调用: login() | 参数: 默认 payload | 状态: {resp.status_code}")
- if sess_cookie:
- logger.info(f"返回的 sess_key: {sess_cookie}")
- # 保存为全局会话状态,供其他接口调用时使用
- set_sess_key(sess_cookie)
- print(f"sess_key: {sess_cookie}")
- # # 集成测试:演示高级接口的使用流程
- # test_ip = "1.5.6.7"
- # # 1) 列出当前集合
- # try:
- # ips = get_all_test_ips()
- # logger.info(f"当前 Test_* IP 数量: {len(ips)}")
- # print(f"当前 Test_* IP 数量: {len(ips)}")
- # except Exception as e:
- # logger.exception(f"获取 Test IP 列表失败: {e}")
- # print(f"获取 Test IP 列表失败: {e}")
- # # 2) 添加测试 IP
- # try:
- # res = add_ip(test_ip)
- # logger.info(f"add_ip({test_ip}) -> {res}")
- # print(f"add_ip -> {res}")
- # except Exception as e:
- # logger.exception(f"添加测试 IP 失败: {e}")
- # print(f"添加测试 IP 失败: {e}")
- # # 3) 再次列出以验证添加
- # try:
- # ips_after = get_all_test_ips()
- # logger.info(f"添加后 Test_* IP 数量: {len(ips_after)}")
- # print(f"添加后 Test_* IP 数量: {len(ips_after)}")
- # except Exception as e:
- # logger.exception(f"获取添加后列表失败: {e}")
- # print(f"获取添加后列表失败: {e}")
- # # 4) 删除测试 IP
- # try:
- # res_del = del_ip(test_ip)
- # logger.info(f"del_ip({test_ip}) -> {res_del}")
- # print(f"del_ip -> {res_del}")
- # except Exception as e:
- # logger.exception(f"删除测试 IP 失败: {e}")
- # print(f"删除测试 IP 失败: {e}")
- # # 5) 最终验证
- # try:
- # ips_final = get_all_test_ips()
- # logger.info(f"最终 Test_* IP 数量: {len(ips_final)}")
- # print(f"最终 Test_* IP 数量: {len(ips_final)}")
- # except Exception as e:
- # logger.exception(f"获取最终列表失败: {e}")
- # print(f"获取最终列表失败: {e}")
- # 旧的低级测试保留(按需)
- # 测试ACL接口
- # try:
- # acl_resp, acl_data = get_acl_rules()
- # logger.info(f"已调用: get_acl_rules() | 状态: {acl_resp.status_code}")
- # if acl_data:
- # logger.info(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}")
- # print(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}")
- # else:
- # logger.warning("未获取到ACL数据")
- # print("未获取到ACL数据")
- # except Exception as e:
- # logger.exception(f"测试ACL接口失败: {e}")
- # print(f"测试ACL接口失败: {e}")
- # # 测试添加ACL规则
- # try:
- # add_resp, add_data = add_acl_rule(dst_addr="1.2.3.4,2.4.5.6", comment="remark_kkkkkk")
- # logger.info(f"已调用: add_acl_rule() | 状态: {add_resp.status_code}")
- # if add_data:
- # row_id = add_data.get('RowId')
- # logger.info(f"新增ACL规则ID: {row_id}")
- # print(f"新增ACL规则ID: {row_id}")
- # else:
- # logger.warning("未获取到添加结果")
- # print("未获取到添加结果")
- # except Exception as e:
- # logger.exception(f"测试添加ACL规则失败: {e}")
- # print(f"测试添加ACL规则失败: {e}")
- # # 测试编辑ACL规则
- # try:
- # edit_resp, edit_data = edit_acl_rule(rule_id=14, dst_addr="1.2.3.4", comment="remark_kkkkkk")
- # logger.info(f"已调用: edit_acl_rule() | 状态: {edit_resp.status_code}")
- # if edit_data:
- # logger.info("ACL规则编辑成功")
- # print("ACL规则编辑成功")
- # else:
- # logger.warning("未获取到编辑结果")
- # print("未获取到编辑结果")
- # except Exception as e:
- # logger.exception(f"测试编辑ACL规则失败: {e}")
- # print(f"测试编辑ACL规则失败: {e}")
- # # 测试删除ACL规则
- # try:
- # del_resp, del_data = del_acl_rule(rule_id=14)
- # logger.info(f"已调用: del_acl_rule() | 状态: {del_resp.status_code}")
- # if del_data:
- # logger.info("ACL规则删除成功")
- # print("ACL规则删除成功")
- # else:
- # logger.warning("未获取到删除结果")
- # print("未获取到删除结果")
- # except Exception as e:
- # logger.exception(f"测试删除ACL规则失败: {e}")
- # print(f"测试删除ACL规则失败: {e}")
- # 6) 演示 monitor_lanip 接口
- try:
- conn_ip = "10.8.7.2"
- resp_conn, data_conn = monitor_lanip(conn_ip)
- logger.info(f"已调用: monitor_lanip({conn_ip}) | 状态: {resp_conn.status_code}")
- if data_conn:
- print(json.dumps(data_conn, ensure_ascii=False, indent=2))
- else:
- print(resp_conn.text)
- except Exception as e:
- logger.exception(f"测试 monitor_lanip 失败: {e}")
- print(f"测试 monitor_lanip 失败: {e}")
- else:
- logger.warning("未在响应中找到 sess_key")
- print("未在响应中找到 sess_key")
- if __name__ == "__main__":
- main()
|