AI应急响应处置实战:从日志告警到自动封堵,新手也能搭建
适用场景与准备工作
AI应急响应处置的核心思路是:当服务器出现异常(如大量登录失败、CPU飙升或恶意请求)时,让AI自动分析日志并执行预设的处置动作,例如封禁IP、重启服务或发送通知。
这套方案适合所有需要快速响应又不希望人工24小时盯屏的运维场景。
在开始之前,你需要准备以下条件:
- 一台Linux服务器(CentOS 7+ 或 Ubuntu 20.04+),并拥有root或sudo权限。
- Python 3.6+ 环境(一般系统自带,若没有请运行
sudo apt install python3 python3-pip或sudo yum install python3 python3-pip)。 - 一个AI大模型API的访问密钥(例如OpenAI API key,或兼容OpenAI接口的本地模型地址)。如果使用本地模型,需先部署如Ollama等推理服务。
- 基本的日志文件路径,比如
/var/log/auth.log用于SSH登录日志,或Nginx的访问日志/var/log/nginx/access.log。
关键提示:本文示例将使用OpenAI兼容的API接口,你可以在代码中替换成任何兼容的端点(如本地Ollama地址 http://192.168.1.100:11434/v1)。
搭建AI应急响应脚本
我们将编写一个Python脚本 ai_responder.py,它的工作流程如下:
- 读取指定日志文件的最新若干条(例如最近100行)。
- 将日志内容拼接成提示词,发送给AI API,要求AI判断是否存在异常行为,并给出处置建议(如封禁IP
192.168.1.2)。 - 解析AI返回的JSON结果,提取需要执行的命令(如
iptables -A INPUT -s IP -j DROP)。 - 执行命令并记录结果。
完整脚本示例:
# 创建脚本文件
nano ai_responder.py
将以下代码粘贴进去(请替换 your_api_key 和 api_url):
import os
import subprocess
import json
import requests
# 配置
API_KEY = "sk-your-api-key"
API_URL = "https://api.openai.com/v1/chat/completions" # 或本地地址
LOG_FILE = "/var/log/auth.log"
TAIL_LINES = 100
# 读取最近日志
def get_recent_logs():
result = subprocess.run(["tail", "-" + str(TAIL_LINES), LOG_FILE],
capture_output=True, text=True)
return result.stdout
# 调用AI获取处置建议
def ask_ai(log_text):
prompt = f"""你是一个服务器安全分析师。以下是最近一段日志内容:
{log_text}
请分析是否存在异常行为(如暴力破解、异常请求等)。如果存在异常,请给出处置命令(如封禁IP的iptables命令),并以JSON格式输出:
{{"judgment": "normal" | "abnormal", "commands": ["cmd1", "cmd2"]}}
如果无异常,commands为空数组。只输出JSON。"""
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
data = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1
}
resp = requests.post(API_URL, headers=headers, json=data, timeout=30)
return resp.json()["choices"][0]["message"]["content"]
# 执行命令
def execute_commands(commands):
for cmd in commands:
print(f"执行命令: {cmd}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
print(f"错误: {result.stderr}")
if __name__ == "__main__":
logs = get_recent_logs()
print("日志已读取,正在请求AI分析...")
try:
ai_text = ask_ai(logs)
decision = json.loads(ai_text)
if decision["judgment"] == "abnormal":
print("AI判断为异常,开始执行处置...")
execute_commands(decision["commands"])
else:
print("AI判断为正常,不执行任何操作。")
except Exception as e:
print(f"AI调用或解析失败: {e}")
保存后赋予执行权限:
chmod +x ai_responder.py
配置触发与执行机制
为了让脚本自动运行,我们需要将其接入告警触发。
两种常见方式:
- 定时任务(cron):每5分钟执行一次检查。运行
crontab -e,添加一行:
*/5 * * * * /usr/bin/python3 /path/to/ai_responder.py >> /var/log/ai_responder.log 2>&1
- 日志监控触发:使用
systemd的路径单元或incrond监控日志文件变化后再执行脚本。这里以简单的cron方式为例,新手更容易上手。
重要注意:如果日志文件写入频繁(如Nginx访问日志),建议cron间隔设为1-2分钟,避免漏掉突发异常。
避坑与常见问题
- API Key安全:请勿将API Key硬编码在脚本中并上传公开仓库。建议使用环境变量(
os.environ["OPENAI_API_KEY"])或系统秘钥管理服务。 - 解析错误:AI返回的文本可能不是严格的JSON,可以在
ask_ai函数中添加异常重试逻辑,或要求AI只输出固定格式(已在prompt中强调)。 - 权限不足:执行
iptables或systemctl restart等命令需要root权限。cron默认以普通用户运行,建议使用root的crontab。 - 日志轮转:如果日志被logrotate切分,脚本读取的
tail可能获取到旧内容,建议结合inotifywait只处理新增行,但对于新手先用cron轮询即可。 - AI误判:AI可能将正常高并发请求误判为攻击。建议在prompt中加入“若不确定则判为normal”,或让AI输出置信度,仅当置信度高时才执行。
验证效果
- 手动测试:先手动向
auth.log写入几条模拟的非法登录记录(但不要实际攻击)。例如:
echo "Jan 1 12:00:00 server sshd[1234]: Failed password for root from 192.168.1.100 port 22 ssh2" >> /var/log/auth.log
- 手动运行脚本:
python3 ai_responder.py
观察输出,如果AI正确识别并返回封禁命令,则系统会执行 iptables -A INPUT -s 192.168.1.100 -j DROP。
- 检查iptables规则:
iptables -L -n | grep 192.168.1.100
应看到DROP规则。
- 清除测试规则:
iptables -D INPUT -s 192.168.1.100 -j DROP
- 让cron运行一阵子,再查看
/var/log/ai_responder.log确认脚本按预期执行。
如果以上步骤均通过,说明你的AI应急响应处置已初步搭建完成。
后续可以根据实际需要扩展脚本,比如对接企业微信通知、处理更多日志类型等。
如果你在配置过程中遇到报错,
请先检查API密钥是否有效、
网络能否访问外网或本地模型地址,
以及Python依赖包 requests 是否安装(pip3 install requests)。