一个可用的气象数字档案馆,其核心是能够自动化、结构化地存储与查询历史及实时气象数据。我们采用微服务架构,确保系统可扩展、易维护。
以下是经过生产验证的组件清单:
在开始前,请确保你的服务器(Linux)已安装 Docker 和 docker-compose。
执行以下命令进行安装:
``` 安装Docker curl -fsSL https://get.docker.com -o get-docker.sh sudo sh get-docker.sh sudo usermod -aG docker $USER newgrp docker 安装Docker Compose sudo curl -L "https://github.com/docker/compose/releases/download/v2.17.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose sudo chmod +x /usr/local/bin/docker-compose ```在服务器上创建以下目录,用于存放配置、数据和代码。
``` mkdir -p ~/weather-archive/{influxdb,postgres,minio,grafana,kafka,processor} cd ~/weather-archive ```我们将通过一个 `docker-compose.yml` 文件启动所有核心服务。
在 `~/weather-archive` 目录下创建 `docker-compose.yml` 文件,内容如下:
``` version: '3.8' services: postgres: image: postgres:14-alpine container_name: weather-postgres environment: POSTGRES_DB: weather_meta POSTGRES_USER: admin POSTGRES_PASSWORD: StrongPass123! volumes: - ./postgres/data:/var/lib/postgresql/data - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql ports: - "5432:5432" restart: unless-stopped influxdb: image: influxdb:2.6 container_name: weather-influxdb environment: DOCKER_INFLUXDB_INIT_MODE: setup DOCKER_INFLUXDB_INIT_USERNAME: admin DOCKER_INFLUXDB_INIT_PASSWORD: InfluxPass123! DOCKER_INFLUXDB_INIT_ORG: weather_org DOCKER_INFLUXDB_INIT_BUCKET: raw_data DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: my-super-secret-auth-token volumes: - ./influxdb/data:/var/lib/influxdb2 ports: - "8086:8086" restart: unless-stopped minio: image: minio/minio container_name: weather-minio command: server /data --console-address ":9001" environment: MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin123 volumes: - ./minio/data:/data ports: - "9000:9000" - "9001:9001" restart: unless-stopped zookeeper: image: confluentinc/cp-zookeeper:7.3.0 container_name: weather-zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" restart: unless-stopped kafka: image: confluentinc/cp-kafka:7.3.0 container_name: weather-kafka depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ports: - "9092:9092" - "29092:29092" restart: unless-stopped grafana: image: grafana/grafana:9.3.2 container_name: weather-grafana environment: GF_SECURITY_ADMIN_PASSWORD: GrafanaPass123! volumes: - ./grafana/data:/var/lib/grafana ports: - "3000:3000" restart: unless-stopped ```保存文件后,在终端执行 `docker-compose up -d` 启动所有服务。等待几分钟,使用 `docker-compose ps` 检查所有容器状态是否为 “Up”。

在 `~/weather-archive/postgres` 目录下创建 `init.sql` 文件,用于创建存储站点信息和数据索引的表。
``` CREATE TABLE IF NOT EXISTS weather_stations ( station_id VARCHAR(20) PRIMARY KEY, station_name VARCHAR(100) NOT NULL, longitude DECIMAL(9,6) NOT NULL, latitude DECIMAL(8,6) NOT NULL, altitude DECIMAL(6,2), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS data_file_index ( file_id SERIAL PRIMARY KEY, station_id VARCHAR(20) REFERENCES weather_stations(station_id), file_name VARCHAR(255) NOT NULL, file_path VARCHAR(500) NOT NULL, -- 例如:s3://bucket-name/path/to/file data_type VARCHAR(50), -- 如:'surface_obs', 'radar_base', 'satellite' time_range_start TIMESTAMP NOT NULL, time_range_end TIMESTAMP NOT NULL, file_size BIGINT, ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 插入一个示例气象站数据 INSERT INTO weather_stations (station_id, station_name, longitude, latitude, altitude) VALUES ('ZSSS', '上海虹桥', 121.336317, 31.197689, 3.0) ON CONFLICT (station_id) DO NOTHING; ```我们将编写一个Python服务,从Kafka消费原始气象数据,处理后分别存入InfluxDB和MinIO。
进入 `~/weather-archive/processor` 目录,创建项目结构。
``` cd ~/weather-archive/processor touch requirements.txt main.py config.py ```编辑 `requirements.txt` 文件:
``` fastapi==0.95.0 uvicorn[standard]==0.21.1 influxdb-client==1.36.1 confluent-kafka==2.1.1 psycopg2-binary==2.9.6 boto3==1.26.137 pandas==2.0.1 ```编辑 `config.py` 文件,配置所有服务的连接信息:
``` 配置文件 config.py import os class Config: Kafka配置 KAFKA_BOOTSTRAP_SERVERS = 'localhost:29092' KAFKA_TOPIC_RAW_DATA = 'weather_raw_data' InfluxDB配置 INFLUXDB_URL = 'http://localhost:8086' INFLUXDB_TOKEN = 'my-super-secret-auth-token' INFLUXDB_ORG = 'weather_org' INFLUXDB_BUCKET = 'raw_data' PostgreSQL配置 PG_HOST = 'localhost' PG_PORT = 5432 PG_DATABASE = 'weather_meta' PG_USER = 'admin' PG_PASSWORD = 'StrongPass123!' MinIO配置 MINIO_ENDPOINT = 'localhost:9000' MINIO_ACCESS_KEY = 'minioadmin' MINIO_SECRET_KEY = 'minioadmin123' MINIO_SECURE = False RAW_DATA_BUCKET = 'weather-raw-files' ```编辑 `main.py` 文件,实现数据消费、解析和存储的全流程。
``` import json import logging from datetime import datetime from confluent_kafka import Consumer, KafkaError from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS import psycopg2 import boto3 from botocore.client import Config as BotoConfig import config import pandas as pd 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class WeatherDataProcessor: def __init__(self): 初始化InfluxDB客户端 self.influx_client = InfluxDBClient( url=config.INFLUXDB_URL, token=config.INFLUXDB_TOKEN, org=config.INFLUXDB_ORG ) self.write_api = self.influx_client.write_api(write_options=SYNCHRONOUS) 初始化PostgreSQL连接 self.pg_conn = psycopg2.connect( host=config.PG_HOST, port=config.PG_PORT, database=config.PG_DATABASE, user=config.PG_USER, password=config.PG_PASSWORD ) 初始化MinIO客户端 self.s3_client = boto3.client( 's3', endpoint_url=f"http://{config.MINIO_ENDPOINT}", aws_access_key_id=config.MINIO_ACCESS_KEY, aws_secret_access_key=config.MINIO_SECRET_KEY, config=BotoConfig(signature_version='s3v4') ) 确保存储桶存在 try: self.s3_client.create_bucket(Bucket=config.RAW_DATA_BUCKET) except Exception: pass 初始化Kafka消费者 self.consumer = Consumer({ 'bootstrap.servers': config.KAFKA_BOOTSTRAP_SERVERS, 'group.id': 'weather_processor_group', 'auto.offset.reset': 'earliest' }) self.consumer.subscribe([config.KAFKA_TOPIC_RAW_DATA]) def parse_observation_data(self, raw_message): """解析一条气象观测数据,这里以JSON格式示例""" try: data = json.loads(raw_message) 假设数据格式包含:站号、时间、温度、气压、湿度、风速、风向 required_fields = ['station_id', 'obs_time', 'temperature', 'pressure', 'humidity', 'wind_speed', 'wind_dir'] for field in required_fields: if field not in data: raise ValueError(f"Missing required field: {field}") return data except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}") return None except ValueError as e: logger.error(f"Data validation failed: {e}") return None def save_to_influxdb(self, data_point): """将数据点写入InfluxDB""" point = Point("weather_observation") \ .tag("station_id", data_point['station_id']) \ .field("temperature", float(data_point['temperature'])) \ .field("pressure", float(data_point['pressure'])) \ .field("humidity", float(data_point['humidity'])) \ .field("wind_speed", float(data_point['wind_speed'])) \ .field("wind_dir", int(data_point['wind_dir'])) \ .time(datetime.fromisoformat(data_point['obs_time']), WritePrecision.NS) self.write_api.write(bucket=config.INFLUXDB_BUCKET, org=config.INFLUXDB_ORG, record=point) logger.info(f"Data written to InfluxDB for station {data_point['station_id']} at {data_point['obs_time']}") def save_raw_file_to_minio(self, file_name, file_content): """将原始数据文件(如报文)保存到MinIO""" try: self.s3_client.put_object( Bucket=config.RAW_DATA_BUCKET, Key=file_name, Body=file_content ) logger.info(f"Raw file saved to MinIO: {file_name}") return f"s3://{config.RAW_DATA_BUCKET}/{file_name}" except Exception as e: logger.error(f"Failed to save file to MinIO: {e}") return None def update_file_index(self, station_id, file_name, s3_path, data_type, start_time, end_time, file_size): """更新PostgreSQL中的文件索引""" cur = self.pg_conn.cursor() query = """ INSERT INTO data_file_index (station_id, file_name, file_path, data_type, time_range_start, time_range_end, file_size) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cur.execute(query, (station_id, file_name, s3_path, data_type, start_time, end_time, file_size)) self.pg_conn.commit() cur.close() def run(self): """主循环,消费Kafka消息并处理""" logger.info("Starting Weather Data Processor...") try: while True: msg = self.consumer.poll(1.0) 超时1秒 if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: logger.error(f"Kafka error: {msg.error()}") break 解析消息 raw_data = msg.value().decode('utf-8') parsed_data = self.parse_observation_data(raw_data) if parsed_data: 1. 写入时序数据库 self.save_to_influxdb(parsed_data) 2. 示例:如果消息中包含一个文件块,将其保存到MinIO并更新索引 这里假设每10条观测数据打包成一个文件进行归档 实际逻辑根据你的数据源调整 if some_condition: s3_path = self.save_raw_file_to_minio(...) self.update_file_index(...) except KeyboardInterrupt: logger.info("Shutting down processor...") finally: self.consumer.close() self.influx_client.close() self.pg_conn.close() if __name__ == "__main__": processor = WeatherDataProcessor() processor.run() ```在 `~/weather-archive/processor` 目录下,安装依赖并运行服务。
``` 创建虚拟环境(可选但推荐) python3 -m venv venv source venv/bin/activate 安装依赖 pip install -r requirements.txt 运行处理服务 python main.py ```服务启动后,会持续监听Kafka的 `weather_raw_data` 主题。
使用Grafana连接InfluxDB和PostgreSQL,创建监控仪表盘。