数字档案馆系统档案平台运维:让档案数据像老母鸡下蛋一样稳当
在开始采集前,请确保你的操作系统已安装以下工具。我们将使用Python作为主要编程语言,因为它拥有最丰富的网络采集库。
首先安装Python 3.8或更高版本。访问Python官网下载页面:https://www.python.org/downloads/,选择对应操作系统的安装包。安装时务必勾选"Add Python to PATH"选项。
安装完成后,打开命令行工具(Windows使用CMD或PowerShell,Mac/Linux使用Terminal),验证安装是否成功:
``` python --version ```如果显示Python 3.x.x,说明安装成功。
执行以下命令安装所有必需的Python库:
``` pip install requests beautifulsoup4 selenium scrapy pandas lxml ```我们将从最简单的静态网页采集开始,逐步深入到复杂场景。
创建一个名为basic_scraper.py的文件,写入以下代码:
``` import requests from bs4 import BeautifulSoup 设置请求头,模拟真实浏览器访问 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } def fetch_webpage(url): try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() 检查HTTP错误 response.encoding = response.apparent_encoding 自动检测编码 return response.text except requests.exceptions.RequestException as e: print(f"请求失败: {e}") return None 示例:采集豆瓣电影Top250第一页 url = "https://movie.douban.com/top250" html_content = fetch_webpage(url) if html_content: soup = BeautifulSoup(html_content, 'lxml') 提取所有电影标题 movie_titles = soup.select('.title') for title in movie_titles[:10]: 只显示前10个 print(title.get_text(strip=True)) ```创建data_extractor.py文件,实现结构化数据提取:
``` def extract_movie_data(html_content): soup = BeautifulSoup(html_content, 'lxml') movies = [] 找到所有电影项目 items = soup.select('.item') for item in items: movie = {} 提取电影标题 title_elem = item.select_one('.title') if title_elem: movie['title'] = title_elem.get_text(strip=True) 提取评分 rating_elem = item.select_one('.rating_num') if rating_elem: movie['rating'] = rating_elem.get_text(strip=True) 提取评价人数 votes_elem = item.select_one('.star span:last-child') if votes_elem: movie['votes'] = votes_elem.get_text(strip=True).replace('人评价', '') 提取简介 quote_elem = item.select_one('.quote .inq') if quote_elem: movie['quote'] = quote_elem.get_text(strip=True) if movie: 只添加有数据的电影 movies.append(movie) return movies 使用示例 movies = extract_movie_data(html_content) for movie in movies[:5]: print(movie) ```对于使用JavaScript动态加载数据的网站,需要使用Selenium。
首先下载对应浏览器的WebDriver:
下载后,将WebDriver可执行文件放在系统PATH路径中,或与Python脚本同一目录。

创建dynamic_scraper.py文件:
``` from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time 配置Chrome选项 options = webdriver.ChromeOptions() options.add_argument('--headless') 无头模式,不显示浏览器窗口 options.add_argument('--disable-gpu') options.add_argument('--no-sandbox') def scrape_dynamic_page(url, wait_element_selector, max_wait=10): driver = webdriver.Chrome(options=options) try: driver.get(url) 等待目标元素加载 wait = WebDriverWait(driver, max_wait) wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, wait_element_selector))) 滚动加载更多内容(如果需要) for _ in range(3): 滚动3次 driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) 等待加载 获取页面源码 page_source = driver.page_source return page_source finally: driver.quit() 示例:采集需要滚动加载的页面 url = "https://www.example.com/infinite-scroll-page" html = scrape_dynamic_page(url, ".product-item", max_wait=15) ```现在我们将所有组件整合成一个完整的系统。
创建config.py文件:
``` 采集配置 CONFIG = { 'start_urls': [ 'https://movie.douban.com/top250', 添加更多起始URL ], 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'timeout': 30, 'retry_times': 3, 'delay_between_requests': 1, 请求间隔,避免被封 'output_format': 'csv', 输出格式:csv, json, excel 'output_file': 'collected_data.csv' } 代理设置(如果需要) PROXIES = { 'http': 'http://proxy.example.com:8080', 'https': 'https://proxy.example.com:8080', } ```创建main_scraper.py文件:
``` import requests from bs4 import BeautifulSoup import time import json import csv from config import CONFIG, PROXIES class WebScraper: def __init__(self): self.session = requests.Session() self.session.headers.update({'User-Agent': CONFIG['user_agent']}) def fetch_with_retry(self, url, retry_times=CONFIG['retry_times']): for attempt in range(retry_times): try: response = self.session.get( url, timeout=CONFIG['timeout'], proxies=PROXIES if PROXIES else None ) response.raise_for_status() return response.text except Exception as e: if attempt == retry_times - 1: print(f"请求失败: {url}, 错误: {e}") return None time.sleep(2 attempt) 指数退避 return None def parse_page(self, html_content): 根据实际网站结构编写解析逻辑 soup = BeautifulSoup(html_content, 'lxml') data_list = [] 示例:解析产品列表 products = soup.select('.product-item') for product in products: data = {} 提取产品名称 name_elem = product.select_one('.product-name') if name_elem: data['name'] = name_elem.get_text(strip=True) 提取价格 price_elem = product.select_one('.price') if price_elem: data['price'] = price_elem.get_text(strip=True) if data: data_list.append(data) return data_list def save_data(self, data_list, filename=CONFIG['output_file']): if CONFIG['output_format'] == 'csv': with open(filename, 'w', newline='', encoding='utf-8-sig') as f: if data_list: writer = csv.DictWriter(f, fieldnames=data_list[0].keys()) writer.writeheader() writer.writerows(data_list) elif CONFIG['output_format'] == 'json': with open(filename.replace('.csv', '.json'), 'w', encoding='utf-8') as f: json.dump(data_list, f, ensure_ascii=False, indent=2) print(f"数据已保存到: {filename}") def run(self): all_data = [] for url in CONFIG['start_urls']: print(f"正在采集: {url}") html = self.fetch_with_retry(url) if html: page_data = self.parse_page(html) all_data.extend(page_data) 遵守robots.txt,延迟请求 time.sleep(CONFIG['delay_between_requests']) if all_data: self.save_data(all_data) print(f"采集完成,共获取{len(all_data)}条数据") else: print("未采集到任何数据") if __name__ == "__main__": scraper = WebScraper() scraper.run() ```创建pagination_handler.py:
``` def handle_pagination(base_url, max_pages=10): all_data = [] for page in range(1, max_pages + 1): 根据网站分页规则构造URL if '?' in base_url: page_url = f"{base_url}&page={page}" else: page_url = f"{base_url}?page={page}" print(f"采集第{page}页: {page_url}") html = fetch_webpage(page_url) if not html: print(f"第{page}页采集失败,停止分页") break page_data = extract_movie_data(html) 使用之前的数据提取函数 if not page_data: 如果没有数据,可能已到最后一页 break all_data.extend(page_data) time.sleep(1) 分页请求间隔 return all_data ```创建logger.py:
``` import logging from datetime import datetime def setup_logger(): logger = logging.getLogger('web_scraper') logger.setLevel(logging.INFO) 文件处理器 file_handler = logging.FileHandler(f'scraper_log_{datetime.now().strftime("%Y%m%d")}.log') file_handler.setLevel(logging.INFO) 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.WARNING) 格式设置 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger 使用示例 logger = setup_logger() logger.info("开始采集任务") logger.error("采集出错,URL: %s", url) ```在config.py中添加以下配置:
``` 反爬虫策略配置 ANTI_SCRAPING = { 'use_random_user_agent': True, 'user_agents': [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', ], 'use_proxy_rotation': False, 如果需要,设置为True 'request_delay_range': (1, 3), 随机延迟1-3秒 'enable_javascript': False, 是否启用JavaScript渲染 } ```在Linux/Mac上使用cron,创建定时任务:
``` 编辑crontab crontab -e 添加以下行,每天凌晨2点执行采集任务 0 2 cd /path/to/your/script && /usr/bin/python3 main_scraper.py >> /var/log/scraper.log 2>&1 ```在Windows上使用任务计划程序:
创建data_validator.py确保数据质量:
``` import pandas as pd def validate_data(file_path): try: df = pd.read_csv(file_path) 检查数据完整性 total_rows = len(df) missing_values = df.isnull().sum().sum() print(f"总行数: {total_rows}") print(f"缺失值总数: {missing_values}") 检查重复数据 duplicates = df.duplicated().sum() print(f"重复行数: {duplicates}") 数据质量报告 if missing_values / total_rows > 0.1: print("警告:缺失值超过10%") if duplicates / total_rows > 0.05: print("警告:重复数据超过5%") return True except Exception as e: print(f"数据验证失败: {e}") return False ```按照以上步骤,你已经拥有一个完整的数字资源采集系统。从基础采集到高级功能,每个模块都可以直接使用或根据具体需求修改。记住始终遵守网站的robots.txt规则,合理控制采集频率,避免对目标网站造成过大压力。
数字档案馆系统档案平台运维:让档案数据像老母鸡下蛋一样稳当