main.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. """Entry point to call various API actions."""
  2. from auth_session import login, set_sess_key
  3. from log_util import get_logger
  4. from acl_actions import get_acl_rules, add_acl_rule, edit_acl_rule, del_acl_rule
  5. import json
  6. from advanced_acl import get_all_test_ips, add_ip, del_ip
  7. from analysis_connections import monitor_lanip
  8. def main():
  9. logger = get_logger("main")
  10. logger.info("开始 main()")
  11. try:
  12. resp, sess_cookie = login()
  13. except FileNotFoundError as e:
  14. logger.error(f"配置错误: {e}")
  15. return
  16. except Exception as e:
  17. logger.exception(f"请求或其他错误: {e}")
  18. return
  19. logger.info(f"已调用: login() | 参数: 默认 payload | 状态: {resp.status_code}")
  20. if sess_cookie:
  21. logger.info(f"返回的 sess_key: {sess_cookie}")
  22. # 保存为全局会话状态,供其他接口调用时使用
  23. set_sess_key(sess_cookie)
  24. print(f"sess_key: {sess_cookie}")
  25. # # 集成测试:演示高级接口的使用流程
  26. # test_ip = "1.5.6.7"
  27. # # 1) 列出当前集合
  28. # try:
  29. # ips = get_all_test_ips()
  30. # logger.info(f"当前 Test_* IP 数量: {len(ips)}")
  31. # print(f"当前 Test_* IP 数量: {len(ips)}")
  32. # except Exception as e:
  33. # logger.exception(f"获取 Test IP 列表失败: {e}")
  34. # print(f"获取 Test IP 列表失败: {e}")
  35. # # 2) 添加测试 IP
  36. # try:
  37. # res = add_ip(test_ip)
  38. # logger.info(f"add_ip({test_ip}) -> {res}")
  39. # print(f"add_ip -> {res}")
  40. # except Exception as e:
  41. # logger.exception(f"添加测试 IP 失败: {e}")
  42. # print(f"添加测试 IP 失败: {e}")
  43. # # 3) 再次列出以验证添加
  44. # try:
  45. # ips_after = get_all_test_ips()
  46. # logger.info(f"添加后 Test_* IP 数量: {len(ips_after)}")
  47. # print(f"添加后 Test_* IP 数量: {len(ips_after)}")
  48. # except Exception as e:
  49. # logger.exception(f"获取添加后列表失败: {e}")
  50. # print(f"获取添加后列表失败: {e}")
  51. # # 4) 删除测试 IP
  52. # try:
  53. # res_del = del_ip(test_ip)
  54. # logger.info(f"del_ip({test_ip}) -> {res_del}")
  55. # print(f"del_ip -> {res_del}")
  56. # except Exception as e:
  57. # logger.exception(f"删除测试 IP 失败: {e}")
  58. # print(f"删除测试 IP 失败: {e}")
  59. # # 5) 最终验证
  60. # try:
  61. # ips_final = get_all_test_ips()
  62. # logger.info(f"最终 Test_* IP 数量: {len(ips_final)}")
  63. # print(f"最终 Test_* IP 数量: {len(ips_final)}")
  64. # except Exception as e:
  65. # logger.exception(f"获取最终列表失败: {e}")
  66. # print(f"获取最终列表失败: {e}")
  67. # 旧的低级测试保留(按需)
  68. # 测试ACL接口
  69. # try:
  70. # acl_resp, acl_data = get_acl_rules()
  71. # logger.info(f"已调用: get_acl_rules() | 状态: {acl_resp.status_code}")
  72. # if acl_data:
  73. # logger.info(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}")
  74. # print(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}")
  75. # else:
  76. # logger.warning("未获取到ACL数据")
  77. # print("未获取到ACL数据")
  78. # except Exception as e:
  79. # logger.exception(f"测试ACL接口失败: {e}")
  80. # print(f"测试ACL接口失败: {e}")
  81. # # 测试添加ACL规则
  82. # try:
  83. # add_resp, add_data = add_acl_rule(dst_addr="1.2.3.4,2.4.5.6", comment="remark_kkkkkk")
  84. # logger.info(f"已调用: add_acl_rule() | 状态: {add_resp.status_code}")
  85. # if add_data:
  86. # row_id = add_data.get('RowId')
  87. # logger.info(f"新增ACL规则ID: {row_id}")
  88. # print(f"新增ACL规则ID: {row_id}")
  89. # else:
  90. # logger.warning("未获取到添加结果")
  91. # print("未获取到添加结果")
  92. # except Exception as e:
  93. # logger.exception(f"测试添加ACL规则失败: {e}")
  94. # print(f"测试添加ACL规则失败: {e}")
  95. # # 测试编辑ACL规则
  96. # try:
  97. # edit_resp, edit_data = edit_acl_rule(rule_id=14, dst_addr="1.2.3.4", comment="remark_kkkkkk")
  98. # logger.info(f"已调用: edit_acl_rule() | 状态: {edit_resp.status_code}")
  99. # if edit_data:
  100. # logger.info("ACL规则编辑成功")
  101. # print("ACL规则编辑成功")
  102. # else:
  103. # logger.warning("未获取到编辑结果")
  104. # print("未获取到编辑结果")
  105. # except Exception as e:
  106. # logger.exception(f"测试编辑ACL规则失败: {e}")
  107. # print(f"测试编辑ACL规则失败: {e}")
  108. # # 测试删除ACL规则
  109. # try:
  110. # del_resp, del_data = del_acl_rule(rule_id=14)
  111. # logger.info(f"已调用: del_acl_rule() | 状态: {del_resp.status_code}")
  112. # if del_data:
  113. # logger.info("ACL规则删除成功")
  114. # print("ACL规则删除成功")
  115. # else:
  116. # logger.warning("未获取到删除结果")
  117. # print("未获取到删除结果")
  118. # except Exception as e:
  119. # logger.exception(f"测试删除ACL规则失败: {e}")
  120. # print(f"测试删除ACL规则失败: {e}")
  121. # # 6) 演示 monitor_lanip 接口
  122. # try:
  123. # conn_ip = "10.8.7.2"
  124. # resp_conn, data_conn = monitor_lanip(conn_ip)
  125. # logger.info(f"已调用: monitor_lanip({conn_ip}) | 状态: {resp_conn.status_code}")
  126. # if data_conn:
  127. # print(json.dumps(data_conn, ensure_ascii=False, indent=2))
  128. # else:
  129. # print(resp_conn.text)
  130. # except Exception as e:
  131. # logger.exception(f"测试 monitor_lanip 失败: {e}")
  132. # print(f"测试 monitor_lanip 失败: {e}")
  133. else:
  134. logger.warning("未在响应中找到 sess_key")
  135. print("未在响应中找到 sess_key")
  136. if __name__ == "__main__":
  137. main()