一、前置环境准备(零门槛落地)
需安装的核心工具及命令如下,直接复制执行即可:
- Python3.9及以上(用于数据处理):
sudo apt update && sudo apt install python3.9 python3-pip -y
- MySQL5.7及以上(档案馆核心数据库):
sudo apt install mysql-server-5.7 -y
- 依赖包安装:
pip install pymysql pandas requests configparser
关键提示:安装完成后,执行mysql_secure_installation配置MySQL root密码,后续数据库操作需用到该密码。
二、数据对接核心配置(实操步骤)
1. 建立数据库连接配置
在项目目录下新建config.ini文件,内容完整复制如下(需替换为实际的数据库及API信息):
```ini
[database]
host = 127.0.0.1
port = 3306
user = arch_admin
password = ArchPass@2024
db = digital_archive
charset = utf8mb4
[api]
endpoint = http://127.0.0.1:8080/archive/api
token = abc123xyz
```
操作要求:该文件必须与Python脚本放在同一目录,避免路径读取失败。
编写Python读取配置文件的代码(保存为config_helper.py):
```python
import configparser
def load_config(config_path='config.ini'):
config = configparser.ConfigParser()
config.read(config_path, encoding='utf-8')
验证配置完整性
required_sections = ['database', 'api']
for sec in required_sections:
if sec not in config:
raise ValueError(f"配置文件缺少[{sec}]模块")
return config
```
2. 元数据映射配置(档案馆核心规则)
按照国家档案局《数字档案元数据规范》建立映射关系,映射表结构如下(直接在MySQL中执行创建语句):
```sql
CREATE TABLE IF NOT EXISTS metadata_mapping (
id INT AUTO_INCREMENT PRIMARY KEY,
original_field VARCHAR(64) NOT NULL COMMENT '档案馆原始字段名',
system_field VARCHAR(64) NOT NULL COMMENT '系统映射字段名',
field_type VARCHAR(32) NOT NULL COMMENT '字段类型',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
```

插入映射数据(直接执行以下SQL):
```sql
INSERT INTO metadata_mapping (original_field, system_field, field_type)
VALUES
('档案编号', 'archive_code', 'VARCHAR(32)'),
('题名', 'title', 'VARCHAR(255)'),
('立卷单位', 'create_unit', 'VARCHAR(128)'),
('归档日期', 'archive_date', 'DATE'),
('档案类别', 'archive_type', 'VARCHAR(64)');
```
注意:禁止修改映射字段名,否则会导致后续数据同步失败。
三、系统部署与结果验证
1. 部署同步脚本
编写同步脚本(保存为archive_sync.py),代码如下:
```python
import pymysql
import requests
from config_helper import load_config
def sync_archive_data():
加载配置
config = load_config()
db_conf = config['database']
api_conf = config['api']
连接数据库
conn = pymysql.connect(
host=db_conf['host'],
port=int(db_conf['port']),
user=db_conf['user'],
password=db_conf['password'],
db=db_conf['db'],
charset=db_conf['charset']
)
cursor = conn.cursor()
查询映射关系
sync_sql = "SELECT original_field, system_field, field_type FROM metadata_mapping"
cursor.execute(sync_sql)
mappings = cursor.fetchall()
调用API同步档案数据
api_headers = {'Authorization': f"Bearer {api_conf['token']}"}
替换为实际的源档案查询SQL
archive_sql = "SELECT archive_code, title, create_unit, archive_date FROM source_archive LIMIT 100"
cursor.execute(archive_sql)
archives = cursor.fetchall()
组装数据并推送
for arch in archives:
data = {}
for idx, map_item in enumerate(mappings):
data[map_item[1]] = arch[idx]
response = requests.post(api_conf['endpoint'], json=data, headers=api_headers)
if response.status_code != 200:
print(f"同步失败:{data['archive_code']},错误:{response.text}")
关闭连接
cursor.close()
conn.close()
print("同步完成")
if __name__ == "__main__":
sync_archive_data()
```
执行同步脚本的命令:python3 archive_sync.py
关键操作:执行前必须停止档案馆数据库的写入操作,避免数据冲突。
2. 结果验证
验证步骤如下:
- 检查脚本输出,确认显示“同步完成”;
- 执行MySQL查询,查看同步后的档案数量:
SELECT COUNT() FROM archive_items WHERE status = 'synced';
- 登录数字档案馆系统,查看10条以上同步的档案,确认字段无缺失、乱码。
异常排查:若同步失败,优先检查:
- config.ini文件中的密码、端口是否正确;
- API token是否过期;
- 数据库账号是否有读写权限。