main.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. """Entry point to call various API actions."""
  2. import sys
  3. from auth_session import login, set_sess_key
  4. from log_util import get_logger
  5. from acl_actions import get_acl_rules, add_acl_rule, edit_acl_rule, del_acl_rule
  6. import json
  7. from advanced_acl import get_all_test_ips, add_ip, del_ip
  8. from analysis_connections import monitor_lanip
  9. import gui
  10. from protector import ProtectorRunner
  11. def main():
  12. logger = get_logger("main")
  13. logger.info("开始 main()")
  14. # If --gui passed, launch GUI and start protector runner
  15. if "--gui" in sys.argv:
  16. # start background protector
  17. pr = ProtectorRunner()
  18. pr.start()
  19. # launch GUI (blocking) and pass protector runner so GUI can display countdown
  20. gui.run_gui(protector_runner=pr)
  21. # when GUI exits, stop protector
  22. pr.stop()
  23. return
  24. try:
  25. resp, sess_cookie = login()
  26. except FileNotFoundError as e:
  27. logger.error(f"配置错误: {e}")
  28. return
  29. except Exception as e:
  30. logger.exception(f"请求或其他错误: {e}")
  31. return
  32. logger.info(f"已调用: login() | 参数: 默认 payload | 状态: {resp.status_code}")
  33. if sess_cookie:
  34. logger.info(f"返回的 sess_key: {sess_cookie}")
  35. # 保存为全局会话状态,供其他接口调用时使用
  36. set_sess_key(sess_cookie)
  37. print(f"sess_key: {sess_cookie}")
  38. # # 集成测试:演示高级接口的使用流程
  39. # test_ip = "1.5.6.7"
  40. # # 1) 列出当前集合
  41. # try:
  42. # ips = get_all_test_ips()
  43. # logger.info(f"当前 Test_* IP 数量: {len(ips)}")
  44. # print(f"当前 Test_* IP 数量: {len(ips)}")
  45. # except Exception as e:
  46. # logger.exception(f"获取 Test IP 列表失败: {e}")
  47. # print(f"获取 Test IP 列表失败: {e}")
  48. # # 2) 添加测试 IP
  49. # try:
  50. # res = add_ip(test_ip)
  51. # logger.info(f"add_ip({test_ip}) -> {res}")
  52. # print(f"add_ip -> {res}")
  53. # except Exception as e:
  54. # logger.exception(f"添加测试 IP 失败: {e}")
  55. # print(f"添加测试 IP 失败: {e}")
  56. # # 3) 再次列出以验证添加
  57. # try:
  58. # ips_after = get_all_test_ips()
  59. # logger.info(f"添加后 Test_* IP 数量: {len(ips_after)}")
  60. # print(f"添加后 Test_* IP 数量: {len(ips_after)}")
  61. # except Exception as e:
  62. # logger.exception(f"获取添加后列表失败: {e}")
  63. # print(f"获取添加后列表失败: {e}")
  64. # # 4) 删除测试 IP
  65. # try:
  66. # res_del = del_ip(test_ip)
  67. # logger.info(f"del_ip({test_ip}) -> {res_del}")
  68. # print(f"del_ip -> {res_del}")
  69. # except Exception as e:
  70. # logger.exception(f"删除测试 IP 失败: {e}")
  71. # print(f"删除测试 IP 失败: {e}")
  72. # # 5) 最终验证
  73. # try:
  74. # ips_final = get_all_test_ips()
  75. # logger.info(f"最终 Test_* IP 数量: {len(ips_final)}")
  76. # print(f"最终 Test_* IP 数量: {len(ips_final)}")
  77. # except Exception as e:
  78. # logger.exception(f"获取最终列表失败: {e}")
  79. # print(f"获取最终列表失败: {e}")
  80. # 旧的低级测试保留(按需)
  81. # 测试ACL接口
  82. # try:
  83. # acl_resp, acl_data = get_acl_rules()
  84. # logger.info(f"已调用: get_acl_rules() | 状态: {acl_resp.status_code}")
  85. # if acl_data:
  86. # logger.info(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}")
  87. # print(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}")
  88. # else:
  89. # logger.warning("未获取到ACL数据")
  90. # print("未获取到ACL数据")
  91. # except Exception as e:
  92. # logger.exception(f"测试ACL接口失败: {e}")
  93. # print(f"测试ACL接口失败: {e}")
  94. # # 测试添加ACL规则
  95. # try:
  96. # add_resp, add_data = add_acl_rule(dst_addr="1.2.3.4,2.4.5.6", comment="remark_kkkkkk")
  97. # logger.info(f"已调用: add_acl_rule() | 状态: {add_resp.status_code}")
  98. # if add_data:
  99. # row_id = add_data.get('RowId')
  100. # logger.info(f"新增ACL规则ID: {row_id}")
  101. # print(f"新增ACL规则ID: {row_id}")
  102. # else:
  103. # logger.warning("未获取到添加结果")
  104. # print("未获取到添加结果")
  105. # except Exception as e:
  106. # logger.exception(f"测试添加ACL规则失败: {e}")
  107. # print(f"测试添加ACL规则失败: {e}")
  108. # # 测试编辑ACL规则
  109. # try:
  110. # edit_resp, edit_data = edit_acl_rule(rule_id=14, dst_addr="1.2.3.4", comment="remark_kkkkkk")
  111. # logger.info(f"已调用: edit_acl_rule() | 状态: {edit_resp.status_code}")
  112. # if edit_data:
  113. # logger.info("ACL规则编辑成功")
  114. # print("ACL规则编辑成功")
  115. # else:
  116. # logger.warning("未获取到编辑结果")
  117. # print("未获取到编辑结果")
  118. # except Exception as e:
  119. # logger.exception(f"测试编辑ACL规则失败: {e}")
  120. # print(f"测试编辑ACL规则失败: {e}")
  121. # # 测试删除ACL规则
  122. # try:
  123. # del_resp, del_data = del_acl_rule(rule_id=14)
  124. # logger.info(f"已调用: del_acl_rule() | 状态: {del_resp.status_code}")
  125. # if del_data:
  126. # logger.info("ACL规则删除成功")
  127. # print("ACL规则删除成功")
  128. # else:
  129. # logger.warning("未获取到删除结果")
  130. # print("未获取到删除结果")
  131. # except Exception as e:
  132. # logger.exception(f"测试删除ACL规则失败: {e}")
  133. # print(f"测试删除ACL规则失败: {e}")
  134. # 6) 演示 monitor_lanip 接口
  135. try:
  136. conn_ip = "10.8.7.2"
  137. resp_conn, data_conn = monitor_lanip(conn_ip)
  138. logger.info(f"已调用: monitor_lanip({conn_ip}) | 状态: {resp_conn.status_code}")
  139. if data_conn:
  140. print(json.dumps(data_conn, ensure_ascii=False, indent=2))
  141. else:
  142. print(resp_conn.text)
  143. except Exception as e:
  144. logger.exception(f"测试 monitor_lanip 失败: {e}")
  145. print(f"测试 monitor_lanip 失败: {e}")
  146. else:
  147. logger.warning("未在响应中找到 sess_key")
  148. print("未在响应中找到 sess_key")
  149. if __name__ == "__main__":
  150. main()