1. 安装Python 3.8+(官网下载地址:https://www.python.org/downloads/,安装时勾选Add Python to PATH)
2. 安装MySQL 5.7+(官网下载地址:https://dev.mysql.com/downloads/mysql/),安装时设置root密码为123456
3. 安装依赖库,复制命令在终端执行:
pip install pandas pymysql python-dotenv
打开MySQL命令行,执行以下完整SQL:
CREATE DATABASE archive_system DEFAULT CHARACTER SET utf8mb4;
USE archive_system;
CREATE TABLE archive_records (
id INT AUTO_INCREMENT PRIMARY KEY,
archive_no VARCHAR(20) NOT NULL COMMENT '档案编号',
category VARCHAR(50) NOT NULL COMMENT '档案类别',
store_date DATE NOT NULL COMMENT '存储日期',
expire_date DATE NOT NULL COMMENT '到期日期',
location VARCHAR(100) NOT NULL COMMENT '存储位置',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
上述SQL会创建名为archive_system的数据库,以及存储档案核心信息的archive_records表。
1. 新建test_archive.csv文件,内容如下(直接复制保存):
档案编号,类别,存储日期,到期日期,存储位置
DA2024001,人事档案,2024-01-10,2034-01-09,档案室A区1柜
DA2024002,合同档案,2024-02-15,2044-02-14,档案室B区3柜
DA2024003,财务凭证,2024-03-20,2034-03-19,档案室C区2柜
DA2024004,人事档案,2024-04-05,2034-04-04,档案室A区2柜
DA2024005,合同档案,2024-05-12,2044-05-11,档案室B区4柜
2. 新建.env文件,配置MySQL连接信息(和Python代码同目录):
DB_HOST=localhost
DB_USER=root
DB_PASS=123456
DB_NAME=archive_system
3. 新建import_archive.py,执行数据导入:

import os
import pandas as pd
import pymysql
from dotenv import load_dotenv
load_dotenv()
读取CSV数据
df = pd.read_csv('test_archive.csv')
连接MySQL
conn = pymysql.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASS'),
database=os.getenv('DB_NAME'),
charset='utf8mb4'
)
写入数据库
df.to_sql('archive_records', conn, if_exists='append', index=False)
conn.close()
print("测试数据导入完成")
4. 执行导入命令:python import_archive.py,提示导入完成即成功。
新建analysis_category.py,代码如下:
import os
import pandas as pd
import pymysql
from dotenv import load_dotenv
load_dotenv()
conn = pymysql.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASS'),
database=os.getenv('DB_NAME'),
charset='utf8mb4'
)
查询类别统计
query = "SELECT category, COUNT() AS count FROM archive_records GROUP BY category;"
result = pd.read_sql(query, conn)
print("档案类别统计结果:")
print(result)
conn.close()
执行命令:python analysis_category.py,会输出每个类别的档案数量。
新建analysis_recent.py,代码如下:
import os
import pandas as pd
import pymysql
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
conn = pymysql.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASS'),
database=os.getenv('DB_NAME'),
charset='utf8mb4'
)
计算30天前的日期
recent_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
query = f"SELECT store_date, COUNT() AS count FROM archive_records WHERE store_date >= '{recent_date}' GROUP BY store_date ORDER BY store_date;"
result = pd.read_sql(query, conn)
print(f"近30天新增档案统计(从{recent_date}起):")
print(result)
conn.close()
执行命令:python analysis_recent.py,输出近1个月每天的新增档案数量。
新建analysis_expire.py,代码如下:
import os
import pandas as pd
import pymysql
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
conn = pymysql.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASS'),
database=os.getenv('DB_NAME'),
charset='utf8mb4'
)
查询已到期(今天及之前)的档案
today = datetime.now().strftime('%Y-%m-%d')
query = f"SELECT archive_no, category, expire_date, location FROM archive_records WHERE expire_date <= '{today}';"
result = pd.read_sql(query, conn)
print(f"已到期档案预警(截至{today}):")
print(result)
conn.close()
执行命令:python analysis_expire.py,输出所有已到期的档案信息。
以上步骤完成后,可根据业务需求扩展,比如增加月度统计、按存储位置分类、到期预警通知等,核心逻辑都是通过SQL从archive_records表中提取数据,用pandas做聚合分析,无需额外修改核心代码框架。