一、系统架构与核心组件选型
档案管理系统需要处理海量非结构化数据,我们采用微服务架构,核心组件如下:
1.1 存储层选型
档案文件存储采用MinIO对象存储,配置如下:
```yaml
docker-compose.yml
version: '3.8'
services:
minio:
image: minio/minio:latest
container_name: minio
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: your_strong_password
volumes:
- ./minio-data:/data
command: server /data --console-address ":9001"
```
1.2 数据库设计
使用PostgreSQL存储元数据,创建核心表:
```sql
-- 档案分类表
CREATE TABLE archive_category (
id SERIAL PRIMARY KEY,
category_code VARCHAR(50) UNIQUE NOT NULL,
category_name VARCHAR(100) NOT NULL,
parent_id INTEGER REFERENCES archive_category(id),
retention_years INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 档案主表
CREATE TABLE archive (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
archive_no VARCHAR(100) UNIQUE NOT NULL,
title VARCHAR(500) NOT NULL,
category_id INTEGER REFERENCES archive_category(id),
storage_path VARCHAR(1000) NOT NULL,
file_size BIGINT NOT NULL,
file_type VARCHAR(50),
upload_user_id INTEGER NOT NULL,
upload_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(20) DEFAULT 'active'
);
-- 创建索引
CREATE INDEX idx_archive_category ON archive(category_id);
CREATE INDEX idx_archive_status ON archive(status);
```
二、档案扫描与数字化处理
2.1 扫描设备配置
使用TWAIN协议连接扫描仪,安装驱动后配置扫描参数:
```python
scan_config.py
SCAN_CONFIG = {
'resolution': 300, DPI
'color_mode': 'color', color/gray/lineart
'paper_source': 'adf', 自动进纸器
'duplex': True, 双面扫描
'format': 'tiff', 原始格式
'compression': 'lzw'
}
```
2.2 图像预处理流程
使用OpenCV进行图像增强:
```python
image_processor.py
import cv2
import numpy as np
def preprocess_image(image_path):
读取图像
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
自动阈值二值化
_, binary = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
去噪
denoised = cv2.medianBlur(binary, 3)
纠偏
coords = np.column_stack(np.where(denoised == 0))
angle = cv2.minAreaRect(coords)[-1]
if angle < -45:
angle = 90 + angle
(h, w) = denoised.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(denoised, M, (w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE)
return rotated
```
三、OCR识别与元数据提取
3.1 Tesseract OCR配置

安装Tesseract并训练中文模型:
```bash
Ubuntu安装
sudo apt-get install tesseract-ocr
sudo apt-get install tesseract-ocr-chi-sim
训练数据下载
wget https://github.com/tesseract-ocr/tessdata/raw/main/chi_sim.traineddata
sudo mv chi_sim.traineddata /usr/share/tesseract-ocr/4.00/tessdata/
```
3.2 批量OCR处理脚本
```python
ocr_processor.py
import pytesseract
from PIL import Image
import os
def batch_ocr_processing(image_folder, output_folder):
supported_formats = ['.tiff', '.tif', '.jpg', '.png', '.bmp']
for filename in os.listdir(image_folder):
if any(filename.lower().endswith(fmt) for fmt in supported_formats):
image_path = os.path.join(image_folder, filename)
OCR识别
text = pytesseract.image_to_string(
Image.open(image_path),
lang='chi_sim+eng',
config='--psm 3 --oem 3'
)
保存结果
txt_filename = os.path.splitext(filename)[0] + '.txt'
txt_path = os.path.join(output_folder, txt_filename)
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(text)
提取关键信息
metadata = extract_metadata(text)
save_metadata(metadata, filename)
def extract_metadata(text):
"""从OCR文本中提取关键元数据"""
metadata = {
'document_type': '',
'date': '',
'document_no': '',
'keywords': []
}
日期提取正则
import re
date_patterns = [
r'\d{4}年\d{1,2}月\d{1,2}日',
r'\d{4}-\d{2}-\d{2}',
r'\d{4}/\d{2}/\d{2}'
]
for pattern in date_patterns:
dates = re.findall(pattern, text)
if dates:
metadata['date'] = dates[0]
break
return metadata
```
四、档案存储与检索系统
4.1 MinIO文件上传接口
```python
storage_service.py
from minio import Minio
from minio.error import S3Error
import hashlib
class ArchiveStorage:
def __init__(self):
self.client = Minio(
'localhost:9000',
access_key='admin',
secret_key='your_strong_password',
secure=False
)
创建存储桶
self.bucket_name = 'archives'
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
def upload_file(self, file_path, category):
"""上传文件到MinIO"""
try:
生成唯一文件名
with open(file_path, 'rb') as file:
file_hash = hashlib.md5(file.read()).hexdigest()
object_name = f"{category}/{file_hash}_{os.path.basename(file_path)}"
上传文件
self.client.fput_object(
self.bucket_name,
object_name,
file_path
)
return object_name
except S3Error as exc:
print("上传失败:", exc)
return None
```
4.2 全文检索实现
使用Elasticsearch建立全文索引:
```python
search_service.py
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ArchiveSearch:
def __init__(self):
self.es = Elasticsearch(['http://localhost:9200'])
self.index_name = 'archives'
创建索引映射
self.create_index()
def create_index(self):
"""创建Elasticsearch索引"""
mapping = {
"mappings": {
"properties": {
"archive_no": {"type": "keyword"},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"category": {"type": "keyword"},
"upload_date": {"type": "date"},
"keywords": {"type": "keyword"}
}
}
}
if not self.es.indices.exists(index=self.index_name):
self.es.indices.create(index=self.index_name, body=mapping)
def index_document(self, archive_data):
"""索引文档"""
doc = {
'archive_no': archive_data['archive_no'],
'title': archive_data['title'],
'content': archive_data['content'],
'category': archive_data['category'],
'upload_date': archive_data['upload_date'],
'keywords': archive_data.get('keywords', [])
}
self.es.index(index=self.index_name, id=archive_data['archive_no'], body=doc)
```
五、权限控制与审计日志
5.1 RBAC权限模型实现
```sql
-- 权限相关表结构
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
full_name VARCHAR(100),
department_id INTEGER,
is_active BOOLEAN DEFAULT true
);
CREATE TABLE roles (
id SERIAL PRIMARY KEY,
role_name VARCHAR(50) UNIQUE NOT NULL,
description TEXT
);
CREATE TABLE permissions (
id SERIAL PRIMARY KEY,
permission_code VARCHAR(50) UNIQUE NOT NULL,
description TEXT
);
CREATE TABLE user_roles (
user_id INTEGER REFERENCES users(id),
role_id INTEGER REFERENCES roles(id),
PRIMARY KEY (user_id, role_id)
);
CREATE TABLE role_permissions (
role_id INTEGER REFERENCES roles(id),
permission_id INTEGER REFERENCES permissions(id),
PRIMARY KEY (role_id, permission_id)
);
-- 权限检查函数
CREATE OR REPLACE FUNCTION check_permission(
p_user_id INTEGER,
p_permission_code VARCHAR(50)
) RETURNS BOOLEAN AS $$
BEGIN
RETURN EXISTS (
SELECT 1
FROM users u
JOIN user_roles ur ON u.id = ur.user_id
JOIN role_permissions rp ON ur.role_id = rp.role_id
JOIN permissions p ON rp.permission_id = p.id
WHERE u.id = p_user_id
AND p.permission_code = p_permission_code
AND u.is_active = true
);
END;
$$ LANGUAGE plpgsql;
```
5.2 操作审计日志
```python
audit_logger.py
import logging
from datetime import datetime
import json
class AuditLogger:
def __init__(self):
配置审计日志
self.logger = logging.getLogger('audit')
self.logger.setLevel(logging.INFO)
文件处理器
file_handler = logging.FileHandler('audit.log')
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_operation(self, user_id, operation, target, details):
"""记录操作日志"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'operation': operation,
'target': target,
'details': details,
'ip_address': self.get_client_ip()
}
self.logger.info(json.dumps(log_entry, ensure_ascii=False))
```
六、系统部署与监控
6.1 Docker Compose部署配置
```yaml
docker-compose.prod.yml
version: '3.8'
services:
postgres:
image: postgres:14-alpine
environment:
POSTGRES_DB: archive_db
POSTGRES_USER: archive_user
POSTGRES_PASSWORD: your_db_password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: your_minio_password
volumes:
- minio_data:/data
ports:
- "9000:9000"
- "9001:9001"
elasticsearch:
image: elasticsearch:7.17.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- es_data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
app:
build: .
depends_on:
- postgres
- minio
- elasticsearch
environment:
DATABASE_URL: postgresql://archive_user:your_db_password@postgres:5432/archive_db
MINIO_ENDPOINT: minio:9000
ES_HOST: elasticsearch:9200
ports:
- "8000:8000"
volumes:
- ./app:/app
volumes:
postgres_data:
minio_data:
es_data:
```
6.2 监控配置
使用Prometheus监控系统状态:
```yaml
prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'archive_app'
static_configs:
- targets: ['app:8000']
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
- job_name: 'minio'
static_configs:
- targets: ['minio:9000']
```
在应用中添加监控端点:
```python
metrics.py
from prometheus_client import Counter, Histogram, generate_latest
from flask import Response
定义指标
UPLOAD_COUNTER = Counter('archive_uploads_total', 'Total archive uploads')
SEARCH_REQUESTS = Counter('search_requests_total', 'Total search requests')
REQUEST_DURATION = Histogram('request_duration_seconds', 'Request duration')
@app.route('/metrics')
def metrics():
return Response(generate_latest(), mimetype='text/plain')
```
七、数据备份与恢复
7.1 自动化备份脚本
```bash
!/bin/bash
backup.sh
BACKUP_DIR="/backup/archives"
DATE=$(date +%Y%m%d_%H%M%S)
备份数据库
pg_dump -h localhost -U archive_user archive_db > \
"$BACKUP_DIR/db_backup_$DATE.sql"
备份MinIO数据
mc mirror --overwrite minio/archives "$BACKUP_DIR/minio_backup_$DATE/"
备份Elasticsearch索引
curl -X POST "localhost:9200/_snapshot/backup_repository/snapshot_$DATE?wait_for_completion=true"
保留最近30天备份
find "$BACKUP_DIR" -type f -mtime +30 -delete
```
7.2 恢复流程
数据库恢复:
```bash
停止应用
docker-compose stop app
恢复数据库
psql -h localhost -U archive_user archive_db < backup_file.sql
重新创建索引
curl -X POST "localhost:9200/_snapshot/backup_repository/snapshot_20240101/_restore"
启动应用
docker-compose start app
```
按照以上步骤,你可以搭建完整的档案数字化管理系统。系统包含扫描处理、OCR识别、存储检索、权限控制等核心功能,所有配置均可直接复制使用。每个组件都采用容器化部署,便于维护和扩展。