可定制性建立在清晰的架构分层上。我们采用四层架构:数据层、服务层、API层和表现层。
使用PostgreSQL作为主数据库,创建核心表结构:
```sql -- 文档元数据表 CREATE TABLE documents ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), title VARCHAR(500) NOT NULL, document_type VARCHAR(100) NOT NULL, status VARCHAR(50) DEFAULT 'draft', metadata JSONB NOT NULL DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 自定义字段定义表 CREATE TABLE custom_fields ( id SERIAL PRIMARY KEY, field_name VARCHAR(100) NOT NULL, field_type VARCHAR(50) NOT NULL, field_config JSONB NOT NULL DEFAULT '{}', document_type VARCHAR(100) NOT NULL, is_required BOOLEAN DEFAULT false, sort_order INTEGER DEFAULT 0 ); -- 文档版本表 CREATE TABLE document_versions ( id SERIAL PRIMARY KEY, document_id UUID REFERENCES documents(id), version_number INTEGER NOT NULL, file_path VARCHAR(500) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); ```创建三个核心服务类,使用Python实现:
```python document_service.py class DocumentService: def __init__(self, db_session): self.db = db_session def create_document(self, title, doc_type, custom_data): 验证自定义字段 custom_fields = self.get_custom_fields(doc_type) self._validate_custom_data(custom_fields, custom_data) 创建文档记录 document = Document( title=title, document_type=doc_type, metadata=custom_data ) self.db.add(document) self.db.commit() return document.id def _validate_custom_data(self, field_definitions, data): for field in field_definitions: if field.is_required and field.field_name not in data: raise ValidationError(f"缺少必填字段: {field.field_name}") ```这是实现可定制性的核心模块。
创建字段类型枚举和配置类:
```python field_types.py from enum import Enum from pydantic import BaseModel class FieldType(str, Enum): TEXT = "text" NUMBER = "number" DATE = "date" SELECT = "select" FILE = "file" CHECKBOX = "checkbox" class FieldConfig(BaseModel): field_name: str field_type: FieldType label: str placeholder: str = "" default_value: any = None options: list = [] 用于SELECT类型 validation_rules: dict = {} is_visible: bool = True is_editable: bool = True ```根据字段定义生成前端表单:
```javascript // form-generator.js class FormGenerator { generateForm(fieldDefinitions) { return fieldDefinitions.map(field => { const baseConfig = { name: field.field_name, label: field.label, required: field.is_required, placeholder: field.placeholder }; switch(field.field_type) { case 'text': return { ...baseConfig, type: 'text', component: 'Input' }; case 'select': return { ...baseConfig, type: 'select', component: 'Select', options: field.options }; case 'date': return { ...baseConfig, type: 'date', component: 'DatePicker', format: 'YYYY-MM-DD' }; // 其他字段类型... } }); } } ```通过配置文件实现系统行为的可定制。
创建config.yaml文件:
```yaml config/config.yaml system: name: "文书档案管理系统" version: "1.0.0" storage: local_path: "/var/data/documents" max_file_size: "100MB" allowed_extensions: [".pdf", ".doc", ".docx", ".xls", ".xlsx", ".jpg", ".png"] workflow: default_statuses: ["草稿", "审核中", "已批准", "已归档", "已作废"] auto_numbering: enabled: true pattern: "DOC-{YYYY}{MM}{DD}-{SEQ:6}" security: password_policy: min_length: 8 require_uppercase: true require_lowercase: true require_numbers: true require_special_chars: true session_timeout: 3600 ```实现配置热加载功能:
```python config_loader.py import yaml import os from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class ConfigLoader: def __init__(self, config_path): self.config_path = config_path self.config = self.load_config() self.setup_watcher() def load_config(self): with open(self.config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def setup_watcher(self): event_handler = FileSystemEventHandler() event_handler.on_modified = lambda event: self.reload_config() observer = Observer() observer.schedule(event_handler, os.path.dirname(self.config_path), recursive=False) observer.start() def reload_config(self): self.config = self.load_config() print("配置已重新加载") ```支持自定义审批流程。
创建工作流定义表:
```sql CREATE TABLE workflows ( id SERIAL PRIMARY KEY, name VARCHAR(200) NOT NULL, document_type VARCHAR(100) NOT NULL, definition JSONB NOT NULL, is_active BOOLEAN DEFAULT true ); -- 工作流定义示例 INSERT INTO workflows (name, document_type, definition) VALUES ( '合同审批流程', 'contract', '{ "start": "draft", "states": [ {"name": "draft", "actions": ["submit"]}, {"name": "department_review", "actions": ["approve", "reject"]}, {"name": "legal_review", "actions": ["approve", "reject"]}, {"name": "final_approval", "actions": ["approve", "reject"]}, {"name": "archived", "actions": []} ], "transitions": [ {"from": "draft", "to": "department_review", "action": "submit"}, {"from": "department_review", "to": "legal_review", "action": "approve"}, {"from": "legal_review", "to": "final_approval", "action": "approve"}, {"from": "final_approval", "to": "archived", "action": "approve"}, {"from": "", "to": "draft", "action": "reject"} ] }' ); ```
实现状态机引擎:
```python workflow_engine.py class WorkflowEngine: def __init__(self, workflow_definition): self.definition = workflow_definition self.current_state = workflow_definition['start'] def get_available_actions(self): current_state_def = next( s for s in self.definition['states'] if s['name'] == self.current_state ) return current_state_def.get('actions', []) def execute_action(self, action, document_id, user_id): 查找对应的转移规则 transition = next( t for t in self.definition['transitions'] if t['from'] in [self.current_state, ''] and t['action'] == action ) 记录操作日志 self.log_transition(document_id, user_id, self.current_state, transition['to'], action) 更新状态 self.current_state = transition['to'] 触发状态变更事件 self.trigger_state_change(document_id, transition['to']) return self.current_state def log_transition(self, document_id, user_id, from_state, to_state, action): 记录到操作日志表 pass ```基于角色的访问控制(RBAC)实现。
在API层实现权限验证:
```python permission_middleware.py from functools import wraps from flask import request, jsonify def require_permission(resource, action): def decorator(f): @wraps(f) def decorated_function(args, kwargs): user_id = get_current_user_id() user_roles = get_user_roles(user_id) 检查所有角色权限 has_permission = False for role in user_roles: if role.permissions.get(resource, {}).get(action, False): has_permission = True break if not has_permission: return jsonify({"error": "权限不足"}), 403 return f(args, kwargs) return decorated_function return decorator 使用示例 @app.route('/api/documents/使用Docker进行容器化部署。
创建Dockerfile:
```dockerfile Dockerfile FROM python:3.9-slim WORKDIR /app 安装系统依赖 RUN apt-get update && apt-get install -y \ postgresql-client \ && rm -rf /var/lib/apt/lists/ 复制依赖文件 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt 复制应用代码 COPY . . 创建数据目录 RUN mkdir -p /var/data/documents 设置环境变量 ENV PYTHONPATH=/app ENV CONFIG_PATH=/app/config/config.yaml 暴露端口 EXPOSE 8000 启动命令 CMD ["gunicorn", "app:app", "-b", "0.0.0.0:8000", "--workers", "4"] ```创建docker-compose.yml:
```yaml docker-compose.yml version: '3.8' services: postgres: image: postgres:14 environment: POSTGRES_DB: document_db POSTGRES_USER: document_user POSTGRES_PASSWORD: your_secure_password volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432" redis: image: redis:7-alpine ports: - "6379:6379" app: build: . depends_on: - postgres - redis environment: DATABASE_URL: postgresql://document_user:your_secure_password@postgres:5432/document_db REDIS_URL: redis://redis:6379/0 volumes: - document_data:/var/data/documents - ./config:/app/config ports: - "8000:8000" volumes: postgres_data: document_data: ```创建数据库初始化脚本:
```bash !/bin/bash init-db.sh echo "等待PostgreSQL启动..." sleep 10 echo "创建数据库表..." psql $DATABASE_URL -f /app/sql/schema.sql echo "初始化基础数据..." psql $DATABASE_URL -f /app/sql/seed_data.sql echo "数据库初始化完成" ```预留可扩展接口,方便后续定制。
创建插件接口:
```python plugin_interface.py from abc import ABC, abstractmethod class DocumentPlugin(ABC): @abstractmethod def before_save(self, document): """文档保存前的处理""" pass @abstractmethod def after_save(self, document): """文档保存后的处理""" pass class WorkflowPlugin(ABC): @abstractmethod def before_transition(self, document, from_state, to_state): """状态转移前的处理""" pass @abstractmethod def after_transition(self, document, from_state, to_state): """状态转移后的处理""" pass 插件管理器 class PluginManager: def __init__(self): self.plugins = [] def register_plugin(self, plugin): self.plugins.append(plugin) def execute_before_save(self, document): for plugin in self.plugins: if isinstance(plugin, DocumentPlugin): plugin.before_save(document) ```实现事件驱动的Webhook系统:
```python webhook_service.py import requests import json from threading import Thread class WebhookService: def __init__(self): self.webhooks = self.load_webhooks() def trigger_event(self, event_type, data): for webhook in self.webhooks.get(event_type, []): 异步发送Webhook thread = Thread(target=self.send_webhook, args=(webhook, data)) thread.start() def send_webhook(self, webhook_url, data): try: response = requests.post( webhook_url, json=data, headers={'Content-Type': 'application/json'}, timeout=5 ) response.raise_for_status() except requests.RequestException as e: print(f"Webhook发送失败: {e}") ```按照以上步骤实现,你将获得一个完全可定制的文书档案管理系统。系统支持字段自定义、工作流自定义、权限自定义,并且可以通过插件系统进行功能扩展。所有配置都支持热加载,无需重启服务即可生效。