在进行档案软件合规审计系统搭建前,我们需要准备一个干净的Linux服务器环境(推荐CentOS 7.9或Ubuntu 20.04)。本方案基于Python 3.8+和MySQL 5.7+构建,核心思路是通过MySQL审计插件获取数据库层操作,结合Python脚本实时分析应用日志,实现双重审计。
执行以下命令安装基础依赖:
yum install -y python3 python3-pip mysql-server mysql-devel gcc git
pip3 install watchdog pymysql jinja2
注意:如果系统是Ubuntu,请将yum替换为apt,并确保MySQL服务已启动。
档案软件的核心数据存储在数据库中,我们需要开启MySQL的审计日志。这里使用MariaDB Audit Plugin(兼容MySQL)来实现。
1. 下载并安装审计插件:
cd /tmp
wget https://downloads.mariadb.com/enterprise/a8j6-k5t6/server_audit-1.1.1.tar.gz
tar -xzf server_audit-1.1.1.tar.gz
将插件拷贝到MySQL插件目录
cp server_audit-1.1.1/server_audit.so /usr/lib64/mysql/plugin/
2. 修改MySQL配置文件/etc/my.cnf,在[mysqld]模块下添加以下配置:
[mysqld]
plugin-load = server_audit=server_audit.so
server_audit_logging = ON
server_audit_events = CONNECT,QUERY,TABLE
server_audit_file_path = /var/log/mysql/
server_audit_file_rotate_size = 100M
server_audit_file_rotations = 9
server_audit_incl_users = archive_user,admin
配置详解:
3. 重启MySQL服务并验证:
systemctl restart mysqld
mysql -u root -p -e "SHOW VARIABLES LIKE 'server_audit_logging';"
输出结果中Value为ON即表示配置成功。此时,所有档案软件的数据库操作都会被记录到/var/log/mysql/server_audit.log中。
数据库审计解决了“数据改了没”的问题,应用层日志审计则解决“谁在什么时间点了什么功能”的问题。我们编写一个Python脚本audit_agent.py来实时监控档案软件的日志文件。

假设档案软件日志路径为/opt/archive_app/logs/app.log,日志格式为:[2023-10-27 10:00:00] [USER:admin] [ACTION:download] [ID:1001]。
创建文件/opt/audit/audit_agent.py:
import time
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import re
LOG_FILE = "/opt/archive_app/logs/app.log"
AUDIT_OUTPUT = "/var/log/audit/audit_results.log"
确保输出目录存在
os.makedirs(os.path.dirname(AUDIT_OUTPUT), exist_ok=True)
class LogHandler(FileSystemEventHandler):
def on_modified(self, event):
if event.src_path == LOG_FILE:
self.parse_new_logs()
def parse_new_logs(self):
with open(LOG_FILE, 'r', encoding='utf-8') as f:
这里简化处理,实际生产中应记录文件偏移量
lines = f.readlines()
for line in lines:
self.analyze_line(line.strip())
def analyze_line(self, line):
定义正则匹配规则:提取时间、用户、操作、文件ID
pattern = r'\[(.?)\] \[USER:(.?)\] \[ACTION:(.?)\] \[ID:(.?)\]'
match = re.search(pattern, line)
if match:
timestamp, user, action, doc_id = match.groups()
合规检查逻辑:例如检查非工作时间下载
hour = int(timestamp.split(' ')[1].split(':')[0])
if action == 'download' and (hour < 9 or hour > 18):
alert_msg = f"[ALERT] 非工作时间违规下载: 用户 {user} 在 {timestamp} 下载了文档 {doc_id}\n"
with open(AUDIT_OUTPUT, 'a', encoding='utf-8') as af:
af.write(alert_msg)
print(alert_msg)
if __name__ == "__main__":
event_handler = LogHandler()
observer = Observer()
observer.schedule(event_handler, path=os.path.dirname(LOG_FILE), recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
运行此脚本前,请确保已安装watchdog库。该脚本会监控日志文件的变动,一旦检测到“非工作时间下载”等敏感操作,立即写入审计结果日志。
单纯收集日志不够,我们需要定期生成可视化的合规报告。这里我们使用Python的jinja2模板引擎生成HTML报告。
创建文件/opt/audit/generate_report.py:
import os
import re
from datetime import datetime
from jinja2 import Template
AUDIT_LOG = "/var/log/audit/audit_results.log"
DB_AUDIT_LOG = "/var/log/mysql/server_audit.log"
REPORT_PATH = "/var/www/html/audit_report.html"
定义HTML模板
HTML_TEMPLATE = """
档案软件合规审计报告
审计报告生成时间: {{ timestamp }}
应用层高危操作记录
时间
级别
详情
{% for item in app_logs %}
{{ item.time }}
ALERT
{{ item.content }}
{% endfor %}
数据库层敏感操作记录 (最近50条)
时间戳
用户
SQL语句
{% for item in db_logs %}
{{ item.timestamp }}
{{ item.user }}
{{ item.query }}
{% endfor %}
"""
def parse_app_logs():
logs = []
if not os.path.exists(AUDIT_LOG):
return logs
with open(AUDIT_LOG, 'r', encoding='utf-8') as f:
for line in f:
简单解析示例
logs.append({'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'content': line.strip()})
return logs
def parse_db_logs():
logs = []
if not os.path.exists(DB_AUDIT_LOG):
return logs
读取最后N行,这里简化处理
with open(DB_AUDIT_LOG, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
只取最后50条包含QUERY的记录
query_lines = [l for l in lines if 'QUERY' in l][-50:]
for line in query_lines:
解析格式:timestamp,host,user,connection_id,query_id,operation,database,object,ret
parts = line.strip().split(',')
if len(parts) > 6:
logs.append({
'timestamp': parts[0],
'user': parts[2],
'query': parts[6]
})
return logs
def main():
app_logs = parse_app_logs()
db_logs = parse_db_logs()
template = Template(HTML_TEMPLATE)
html_content = template.render(
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
app_logs=app_logs,
db_logs=db_logs
)
os.makedirs(os.path.dirname(REPORT_PATH), exist_ok=True)
with open(REPORT_PATH, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"报告已生成: {REPORT_PATH}")
if __name__ == "__main__":
main()
执行python3 /opt/audit/generate_report.py即可生成报告。该脚本会汇总应用层的违规告警和数据库层的敏感SQL操作,生成一个静态HTML页面,方便管理人员查阅。
为了确审计7x24小时运行,我们需要将采集Agent注册为系统服务,并将报告生成加入定时任务。
1. 创建Systemd服务文件/etc/systemd/system/audit-agent.service:
[Unit]
Description=Archive Audit Agent
After=network.target mysql.service
[Service]
Type=simple
User=root
ExecStart=/usr/bin/python3 /opt/audit/audit_agent.py
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
2. 启动服务并设置开机自启:
systemctl daemon-reload
systemctl enable audit-agent
systemctl start audit-agent
systemctl status audit-agent
3. 配置Crontab每天早上8点生成审计报告:
crontab -e
添加以下内容:
0 8 /usr/bin/python3 /opt/audit/generate_report.py
至此,一套完整的档案软件合规审计系统已搭建完成。系统会自动监控数据库操作和应用日志,检测非工作时间访问等违规行为,并每日生成HTML合规报告。所有操作均基于开源组件,无需额外采购商业软件,且代码完全可控,可直接用于生产环境。