Skip to content

InfluxDB 详解

InfluxDB 是一个专为时间序列数据设计的开源数据库,由 InfluxData 公司开发。它针对时间序列数据的存储、查询和分析进行了优化,广泛应用于监控、IoT、实时分析等场景。

概述

核心概念

  • 时间序列 (Time Series):按时间顺序排列的数据点序列
  • 数据点 (Point):包含时间戳、字段和标签的数据记录
  • 时间戳 (Timestamp):数据点的时间标识,精确到纳秒
  • 字段 (Field):实际的数值数据,支持多种数据类型
  • 标签 (Tag):用于索引和分组的键值对,只能是字符串
  • 测量 (Measurement):类似关系数据库中的表
  • 数据库 (Database):测量的容器
  • 保留策略 (Retention Policy):定义数据保留时间和副本数
  • 分片 (Shard):数据的时间分区
  • 序列 (Series):共享测量、标签集和字段键的数据点集合

主要特性

  1. 高性能写入:针对时间序列数据优化的存储引擎
  2. 压缩存储:高效的数据压缩算法
  3. SQL-like 查询语言:InfluxQL 和 Flux 查询语言
  4. 自动数据过期:基于时间的数据保留策略
  5. 连续查询:自动执行的定期查询
  6. 数据降采样:自动聚合历史数据
  7. 集群支持:水平扩展能力(企业版)
  8. 丰富的客户端库:支持多种编程语言

版本对比

特性InfluxDB 1.xInfluxDB 2.x
查询语言InfluxQLFlux
存储引擎TSMTSM
用户界面Chronograf内置 UI
认证授权基本认证Token 认证
组织管理数据库组织/桶
APIHTTP APIHTTP API v2

安装与配置

安装方式

1. 官方安装包

bash
# macOS
brew install influxdb
brew install influxdb-cli

# Ubuntu/Debian
wget -qO- https://repos.influxdata.com/influxdb.key | sudo apt-key add -
echo "deb https://repos.influxdata.com/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/influxdb.list
sudo apt-get update
sudo apt-get install influxdb2

# CentOS/RHEL
cat <<EOF | sudo tee /etc/yum.repos.d/influxdb.repo
[influxdb]
name = InfluxDB Repository - RHEL
baseurl = https://repos.influxdata.com/rhel/\$releasever/\$basearch/stable
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdb.key
EOF
sudo yum install influxdb2

2. Docker 安装

bash
# InfluxDB 2.x
docker run --name influxdb \
  -p 8086:8086 \
  -v influxdb-data:/var/lib/influxdb2 \
  -v influxdb-config:/etc/influxdb2 \
  influxdb:2.7

# 带初始化配置
docker run --name influxdb \
  -p 8086:8086 \
  -e DOCKER_INFLUXDB_INIT_MODE=setup \
  -e DOCKER_INFLUXDB_INIT_USERNAME=admin \
  -e DOCKER_INFLUXDB_INIT_PASSWORD=password123 \
  -e DOCKER_INFLUXDB_INIT_ORG=myorg \
  -e DOCKER_INFLUXDB_INIT_BUCKET=mybucket \
  -v influxdb-data:/var/lib/influxdb2 \
  influxdb:2.7

# InfluxDB 1.x
docker run --name influxdb1 \
  -p 8086:8086 \
  -v influxdb1-data:/var/lib/influxdb \
  influxdb:1.8

3. Docker Compose

yaml
# docker-compose.yml
version: '3.8'
services:
  influxdb:
    image: influxdb:2.7
    container_name: influxdb
    ports:
      - "8086:8086"
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=admin
      - DOCKER_INFLUXDB_INIT_PASSWORD=password123
      - DOCKER_INFLUXDB_INIT_ORG=myorg
      - DOCKER_INFLUXDB_INIT_BUCKET=mybucket
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
    volumes:
      - influxdb-data:/var/lib/influxdb2
      - influxdb-config:/etc/influxdb2
    restart: unless-stopped

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana-data:/var/lib/grafana
    restart: unless-stopped
    depends_on:
      - influxdb

volumes:
  influxdb-data:
  influxdb-config:
  grafana-data:

基本配置

InfluxDB 2.x 配置

toml
# /etc/influxdb2/config.toml

# HTTP 服务配置
http-bind-address = ":8086"
http-read-header-timeout = "10s"
http-read-timeout = "0"
http-write-timeout = "0"
http-idle-timeout = "3m"
http-max-body-size = 25000000

# 存储配置
engine-path = "/var/lib/influxdb2/engine"
bolt-path = "/var/lib/influxdb2/influxd.bolt"

# 日志配置
log-level = "info"

# 查询配置
query-concurrency = 1024
query-queue-size = 1024
query-memory-bytes = 0

# 存储引擎配置
storage-cache-max-memory-size = 1073741824
storage-cache-snapshot-memory-size = 26214400
storage-cache-snapshot-write-cold-duration = "10m"
storage-compact-full-write-cold-duration = "4h"
storage-compact-throughput-burst = 50331648
storage-max-concurrent-compactions = 0
storage-max-index-log-file-size = 1048576
storage-series-id-set-cache-size = 100

# TLS 配置
# tls-cert = "/path/to/cert.pem"
# tls-key = "/path/to/key.pem"

InfluxDB 1.x 配置

toml
# /etc/influxdb/influxdb.conf

[meta]
  dir = "/var/lib/influxdb/meta"
  retention-autocreate = true
  logging-enabled = true

[data]
  dir = "/var/lib/influxdb/data"
  wal-dir = "/var/lib/influxdb/wal"
  query-log-enabled = true
  cache-max-memory-size = 1073741824
  cache-snapshot-memory-size = 26214400
  cache-snapshot-write-cold-duration = "10m"
  compact-full-write-cold-duration = "4h"
  max-series-per-database = 1000000
  max-values-per-tag = 100000

[coordinator]
  write-timeout = "10s"
  max-concurrent-queries = 0
  query-timeout = "0s"
  log-queries-after = "0s"
  max-select-point = 0
  max-select-series = 0
  max-select-buckets = 0

[retention]
  enabled = true
  check-interval = "30m"

[shard-precreation]
  enabled = true
  check-interval = "10m"
  advance-period = "30m"

[monitor]
  store-enabled = true
  store-database = "_internal"
  store-interval = "10s"

[subscriber]
  enabled = true
  http-timeout = "30s"
  insecure-skip-verify = false
  ca-certs = ""
  write-concurrency = 40
  write-buffer-size = 1000

[http]
  enabled = true
  bind-address = ":8086"
  auth-enabled = false
  log-enabled = true
  write-tracing = false
  pprof-enabled = true
  https-enabled = false
  https-certificate = "/etc/ssl/influxdb.pem"
  https-private-key = "/etc/ssl/influxdb-key.pem"
  max-row-limit = 0
  max-connection-limit = 0
  shared-secret = ""
  realm = "InfluxDB"
  unix-socket-enabled = false
  bind-socket = "/var/run/influxdb.sock"

基本操作

InfluxDB 2.x 操作

初始化设置

bash
# 命令行初始化
influx setup \
  --username admin \
  --password password123 \
  --org myorg \
  --bucket mybucket \
  --force

# 创建认证令牌
influx auth create \
  --org myorg \
  --all-access \
  --description "Admin token"

# 查看组织
influx org list

# 创建组织
influx org create --name neworg

# 查看桶
influx bucket list

# 创建桶
influx bucket create \
  --name sensors \
  --org myorg \
  --retention 30d

写入数据

bash
# 使用 influx CLI 写入
influx write \
  --bucket mybucket \
  --org myorg \
  --token $INFLUX_TOKEN \
  'temperature,location=room1 value=23.5 1640995200000000000'

# 从文件写入
echo 'temperature,location=room1 value=24.0
temperature,location=room2 value=22.5' | \
influx write \
  --bucket mybucket \
  --org myorg \
  --token $INFLUX_TOKEN

# 使用 HTTP API 写入
curl -XPOST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
  -H "Authorization: Token $INFLUX_TOKEN" \
  -H "Content-Type: text/plain; charset=utf-8" \
  --data-binary '
temperature,location=room1 value=23.5
humidity,location=room1 value=65.2
temperature,location=room2 value=22.8
humidity,location=room2 value=68.1
'

查询数据

bash
# 使用 Flux 查询
influx query \
  --org myorg \
  --token $INFLUX_TOKEN \
  'from(bucket: "mybucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "temperature")
    |> mean()'

# 使用 HTTP API 查询
curl -XPOST "http://localhost:8086/api/v2/query?org=myorg" \
  -H "Authorization: Token $INFLUX_TOKEN" \
  -H "Content-Type: application/vnd.flux" \
  --data 'from(bucket: "mybucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "temperature")'

InfluxDB 1.x 操作

数据库管理

sql
-- 连接到 InfluxDB
influx -precision rfc3339

-- 创建数据库
CREATE DATABASE sensors

-- 查看数据库
SHOW DATABASES

-- 使用数据库
USE sensors

-- 创建保留策略
CREATE RETENTION POLICY "one_week" ON "sensors" DURATION 7d REPLICATION 1 DEFAULT

-- 查看保留策略
SHOW RETENTION POLICIES ON sensors

-- 删除数据库
DROP DATABASE sensors

写入数据

sql
-- 插入单条数据
INSERT temperature,location=room1 value=23.5

-- 插入多条数据
INSERT temperature,location=room1 value=23.5 1640995200000000000
INSERT temperature,location=room2 value=22.8 1640995200000000000
INSERT humidity,location=room1 value=65.2 1640995200000000000

-- 使用 HTTP API 写入
curl -i -XPOST 'http://localhost:8086/write?db=sensors' \
  --data-binary '
temperature,location=room1 value=23.5
humidity,location=room1 value=65.2
temperature,location=room2 value=22.8
humidity,location=room2 value=68.1
'

查询数据

sql
-- 查询所有数据
SELECT * FROM temperature

-- 时间范围查询
SELECT * FROM temperature WHERE time >= now() - 1h

-- 条件查询
SELECT * FROM temperature WHERE location = 'room1' AND time >= now() - 1h

-- 聚合查询
SELECT mean(value) FROM temperature WHERE time >= now() - 1h GROUP BY time(10m)

-- 多测量查询
SELECT mean(temperature.value), mean(humidity.value) 
FROM temperature, humidity 
WHERE time >= now() - 1h 
GROUP BY time(10m)

数据模型和 Line Protocol

Line Protocol 格式

measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp

示例数据

bash
# 基本格式
temperature,location=room1 value=23.5

# 带时间戳
temperature,location=room1 value=23.5 1640995200000000000

# 多个标签和字段
weather,location=beijing,station=airport temperature=25.3,humidity=60.2,pressure=1013.25

# 字符串字段(需要引号)
log,level=error,service=api message="Connection failed",count=1i

# 不同数据类型
sensor,device=sensor01 temperature=23.5,humidity=65i,active=true,location="room1"

数据类型

类型示例说明
Float23.5默认数值类型
Integer65i整数,需要 i 后缀
String"room1"字符串,需要引号
Booleantrue布尔值
Timestamp1640995200000000000纳秒时间戳

Flux 查询语言

基本语法

txt
// 基本查询结构
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> yield()

// 时间范围
from(bucket: "mybucket")
  |> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-02T00:00:00Z)

// 相对时间
from(bucket: "mybucket")
  |> range(start: -24h, stop: -1h)

过滤和转换

txt
// 过滤条件
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "room1")
  |> filter(fn: (r) => r._value > 20.0)

// 字段选择
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> keep(columns: ["_time", "_value", "location"])

// 字段重命名
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> rename(columns: {_value: "temperature"})

// 数据转换
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> map(fn: (r) => ({ r with celsius: (r._value - 32.0) * 5.0 / 9.0 }))

聚合函数

txt
// 基本聚合
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> mean()

// 时间窗口聚合
from(bucket: "mybucket")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)

// 分组聚合
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> group(columns: ["location"])
  |> mean()

// 多个聚合
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> group(columns: ["location"])
  |> aggregateWindow(every: 10m, fn: mean)
  |> yield(name: "mean")

from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> group(columns: ["location"])
  |> aggregateWindow(every: 10m, fn: max)
  |> yield(name: "max")

高级查询

txt
// 连接查询
temp = from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> aggregateWindow(every: 5m, fn: mean)

humidity = from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "humidity")
  |> aggregateWindow(every: 5m, fn: mean)

join(tables: {temp: temp, humidity: humidity}, on: ["_time", "location"])

// 数学运算
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> map(fn: (r) => ({ r with 
      fahrenheit: r._value * 9.0 / 5.0 + 32.0,
      kelvin: r._value + 273.15
  }))

// 条件逻辑
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> map(fn: (r) => ({ r with 
      status: if r._value > 25.0 then "hot" 
              else if r._value < 15.0 then "cold" 
              else "normal"
  }))

// 排序和限制
from(bucket: "mybucket")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> sort(columns: ["_value"], desc: true)
  |> limit(n: 10)

InfluxQL 查询语言(1.x)

基本查询

sql
-- 选择所有字段
SELECT * FROM temperature

-- 选择特定字段
SELECT value FROM temperature

-- 带条件查询
SELECT * FROM temperature WHERE location = 'room1'

-- 时间范围查询
SELECT * FROM temperature WHERE time >= now() - 1h

-- 复合条件
SELECT * FROM temperature 
WHERE location = 'room1' AND time >= now() - 1h AND value > 20

聚合函数

sql
-- 基本聚合
SELECT mean(value) FROM temperature WHERE time >= now() - 1h

-- 多个聚合
SELECT mean(value), max(value), min(value), count(value) 
FROM temperature 
WHERE time >= now() - 1h

-- 时间分组
SELECT mean(value) FROM temperature 
WHERE time >= now() - 1h 
GROUP BY time(10m)

-- 标签分组
SELECT mean(value) FROM temperature 
WHERE time >= now() - 1h 
GROUP BY location

-- 时间和标签分组
SELECT mean(value) FROM temperature 
WHERE time >= now() - 1h 
GROUP BY time(10m), location

高级查询

sql
-- 子查询
SELECT mean(max_temp) FROM (
  SELECT max(value) as max_temp FROM temperature 
  WHERE time >= now() - 1h 
  GROUP BY time(10m)
)

-- 数学运算
SELECT value * 9 / 5 + 32 as fahrenheit FROM temperature

-- 条件函数
SELECT value, 
       CASE 
         WHEN value > 25 THEN 'hot'
         WHEN value < 15 THEN 'cold'
         ELSE 'normal'
       END as status
FROM temperature

-- 正则表达式
SELECT * FROM temperature WHERE location =~ /room[0-9]+/

-- 排序和限制
SELECT * FROM temperature 
WHERE time >= now() - 1h 
ORDER BY time DESC 
LIMIT 10

连续查询

sql
-- 创建连续查询
CREATE CONTINUOUS QUERY "cq_mean_temp" ON "sensors"
BEGIN
  SELECT mean(value) INTO "average_temperature" 
  FROM "temperature" 
  GROUP BY time(1h), location
END

-- 查看连续查询
SHOW CONTINUOUS QUERIES

-- 删除连续查询
DROP CONTINUOUS QUERY "cq_mean_temp" ON "sensors"

-- 带保留策略的连续查询
CREATE CONTINUOUS QUERY "cq_downsample" ON "sensors"
BEGIN
  SELECT mean(value) INTO "sensors"."one_year"."temperature_hourly" 
  FROM "sensors"."one_week"."temperature" 
  GROUP BY time(1h), *
END

性能优化

数据模型优化

bash
# 合理设计标签和字段
# 好的设计
temperature,sensor_id=001,location=room1,building=A value=23.5

# 避免高基数标签
# 不好的设计(timestamp 作为标签)
temperature,sensor_id=001,timestamp=1640995200 value=23.5

# 标签值应该是有限集合
# 好的设计
temperature,location=room1,status=normal value=23.5

# 不好的设计(无限增长的标签值)
temperature,location=room1,request_id=uuid-12345 value=23.5

写入优化

bash
# 批量写入
curl -XPOST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
  -H "Authorization: Token $INFLUX_TOKEN" \
  --data-binary @data.txt

# 使用 gzip 压缩
curl -XPOST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
  -H "Authorization: Token $INFLUX_TOKEN" \
  -H "Content-Encoding: gzip" \
  --data-binary @data.txt.gz

# 并发写入(注意控制并发数)
for i in {1..10}; do
  (
    curl -XPOST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
      -H "Authorization: Token $INFLUX_TOKEN" \
      --data-binary @data_${i}.txt
  ) &
done
wait

查询优化

txt
// 使用时间范围过滤
from(bucket: "mybucket")
  |> range(start: -1h)  // 总是指定时间范围
  |> filter(fn: (r) => r._measurement == "temperature")

// 早期过滤
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")  // 先过滤测量
  |> filter(fn: (r) => r.location == "room1")           // 再过滤标签
  |> filter(fn: (r) => r._value > 20.0)                 // 最后过滤值

// 避免不必要的分组
from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> mean()  // 直接聚合,不需要 group()

// 使用 aggregateWindow 而不是 window + aggregate
from(bucket: "mybucket")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> aggregateWindow(every: 1h, fn: mean)  // 推荐

存储优化

toml
# 配置文件优化
[data]
  # 缓存大小
  cache-max-memory-size = 1073741824  # 1GB
  
  # 压缩设置
  compact-full-write-cold-duration = "4h"
  
  # WAL 设置
  wal-fsync-delay = "0s"
  
  # 索引设置
  max-series-per-database = 1000000
  max-values-per-tag = 100000

监控和运维

系统监控

txt
// 查看系统指标
from(bucket: "_monitoring")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "influxdb_database")
  |> filter(fn: (r) => r._field == "numSeries")

// 查看写入速率
from(bucket: "_monitoring")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "influxdb_write")
  |> filter(fn: (r) => r._field == "pointReq")
  |> derivative(unit: 1s, nonNegative: true)

// 查看查询性能
from(bucket: "_monitoring")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "influxdb_queryExecutor")
  |> filter(fn: (r) => r._field == "queriesFinished")

健康检查

bash
# InfluxDB 2.x 健康检查
curl -f http://localhost:8086/health

# 检查准备状态
curl -f http://localhost:8086/ready

# 查看指标
curl http://localhost:8086/metrics

# InfluxDB 1.x 健康检查
curl -f http://localhost:8086/ping

# 查看统计信息
curl http://localhost:8086/debug/vars

备份和恢复

bash
# InfluxDB 2.x 备份
influx backup /path/to/backup --org myorg

# 恢复
influx restore /path/to/backup --org myorg --new-org neworg

# InfluxDB 1.x 备份
influxd backup -portable /path/to/backup

# 恢复
influxd restore -portable /path/to/backup

# 在线备份(1.x)
influxd backup -database sensors /path/to/backup

# 增量备份
influxd backup -database sensors -since 2024-01-01T00:00:00Z /path/to/backup

客户端库使用

Python 客户端

python
# InfluxDB 2.x Python 客户端
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime

# 连接配置
token = "your-token"
org = "myorg"
bucket = "mybucket"
url = "http://localhost:8086"

client = InfluxDBClient(url=url, token=token, org=org)

# 写入数据
write_api = client.write_api(write_options=SYNCHRONOUS)

# 使用 Point 对象
point = Point("temperature") \
    .tag("location", "room1") \
    .field("value", 23.5) \
    .time(datetime.datetime.utcnow(), WritePrecision.NS)

write_api.write(bucket=bucket, org=org, record=point)

# 使用字典
data = {
    "measurement": "temperature",
    "tags": {"location": "room1"},
    "fields": {"value": 23.5},
    "time": datetime.datetime.utcnow()
}

write_api.write(bucket=bucket, org=org, record=data)

# 批量写入
points = []
for i in range(100):
    point = Point("temperature") \
        .tag("location", f"room{i%5}") \
        .field("value", 20 + i * 0.1) \
        .time(datetime.datetime.utcnow() + datetime.timedelta(seconds=i))
    points.append(point)

write_api.write(bucket=bucket, org=org, record=points)

# 查询数据
query_api = client.query_api()

query = f'''
from(bucket: "{bucket}")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "room1")
'''

result = query_api.query(org=org, query=query)

for table in result:
    for record in table.records:
        print(f"Time: {record.get_time()}, Value: {record.get_value()}")

# 关闭连接
client.close()

Go 客户端

go
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
    "github.com/influxdata/influxdb-client-go/v2/api"
)

func main() {
    // 连接配置
    token := "your-token"
    url := "http://localhost:8086"
    org := "myorg"
    bucket := "mybucket"

    // 创建客户端
    client := influxdb2.NewClient(url, token)
    defer client.Close()

    // 写入数据
    writeAPI := client.WriteAPIBlocking(org, bucket)

    // 创建数据点
    p := influxdb2.NewPoint("temperature",
        map[string]string{"location": "room1"},
        map[string]interface{}{"value": 23.5},
        time.Now())

    // 写入
    err := writeAPI.WritePoint(context.Background(), p)
    if err != nil {
        log.Fatal(err)
    }

    // 批量写入
    points := make([]*influxdb2.Point, 0)
    for i := 0; i < 100; i++ {
        p := influxdb2.NewPoint("temperature",
            map[string]string{"location": fmt.Sprintf("room%d", i%5)},
            map[string]interface{}{"value": 20.0 + float64(i)*0.1},
            time.Now().Add(time.Duration(i)*time.Second))
        points = append(points, p)
    }

    err = writeAPI.WritePoint(context.Background(), points...)
    if err != nil {
        log.Fatal(err)
    }

    // 查询数据
    queryAPI := client.QueryAPI(org)

    query := fmt.Sprintf(`
        from(bucket: "%s")
          |> range(start: -1h)
          |> filter(fn: (r) => r._measurement == "temperature")
          |> filter(fn: (r) => r.location == "room1")
    `, bucket)

    result, err := queryAPI.Query(context.Background(), query)
    if err != nil {
        log.Fatal(err)
    }

    for result.Next() {
        fmt.Printf("Time: %v, Value: %v\n", 
            result.Record().Time(), 
            result.Record().Value())
    }

    if result.Err() != nil {
        log.Fatal(result.Err())
    }
}

Node.js 客户端

javascript
const { InfluxDB, Point } = require('@influxdata/influxdb-client');

// 连接配置
const token = 'your-token';
const url = 'http://localhost:8086';
const org = 'myorg';
const bucket = 'mybucket';

const client = new InfluxDB({ url, token });

// 写入数据
const writeApi = client.getWriteApi(org, bucket);
writeApi.useDefaultTags({ host: 'server01' });

// 创建数据点
const point = new Point('temperature')
  .tag('location', 'room1')
  .floatField('value', 23.5)
  .timestamp(new Date());

writeApi.writePoint(point);

// 批量写入
for (let i = 0; i < 100; i++) {
  const point = new Point('temperature')
    .tag('location', `room${i % 5}`)
    .floatField('value', 20 + i * 0.1)
    .timestamp(new Date(Date.now() + i * 1000));
  
  writeApi.writePoint(point);
}

// 提交写入
writeApi.close().then(() => {
  console.log('Write completed');
}).catch(err => {
  console.error('Write failed', err);
});

// 查询数据
const queryApi = client.getQueryApi(org);

const query = `
  from(bucket: "${bucket}")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "temperature")
    |> filter(fn: (r) => r.location == "room1")
`;

queryApi.queryRows(query, {
  next(row, tableMeta) {
    const o = tableMeta.toObject(row);
    console.log(`Time: ${o._time}, Value: ${o._value}`);
  },
  error(error) {
    console.error('Query failed', error);
  },
  complete() {
    console.log('Query completed');
  },
});

实际应用案例

1. IoT 传感器数据收集

python
# IoT 数据收集系统
import random
import time
from datetime import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

class IoTDataCollector:
    def __init__(self, url, token, org, bucket):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.org = org
        self.bucket = bucket
    
    def collect_sensor_data(self, sensor_id, location):
        """模拟传感器数据收集"""
        while True:
            # 模拟传感器数据
            temperature = random.uniform(18.0, 30.0)
            humidity = random.uniform(40.0, 80.0)
            pressure = random.uniform(990.0, 1030.0)
            
            # 创建数据点
            points = [
                Point("temperature")
                    .tag("sensor_id", sensor_id)
                    .tag("location", location)
                    .field("value", temperature)
                    .time(datetime.utcnow(), WritePrecision.NS),
                
                Point("humidity")
                    .tag("sensor_id", sensor_id)
                    .tag("location", location)
                    .field("value", humidity)
                    .time(datetime.utcnow(), WritePrecision.NS),
                
                Point("pressure")
                    .tag("sensor_id", sensor_id)
                    .tag("location", location)
                    .field("value", pressure)
                    .time(datetime.utcnow(), WritePrecision.NS)
            ]
            
            # 写入数据
            try:
                self.write_api.write(bucket=self.bucket, org=self.org, record=points)
                print(f"Data written for sensor {sensor_id}: T={temperature:.1f}°C, H={humidity:.1f}%, P={pressure:.1f}hPa")
            except Exception as e:
                print(f"Error writing data: {e}")
            
            time.sleep(10)  # 每10秒收集一次数据
    
    def get_sensor_stats(self, sensor_id, hours=24):
        """获取传感器统计信息"""
        query_api = self.client.query_api()
        
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -{hours}h)
          |> filter(fn: (r) => r.sensor_id == "{sensor_id}")
          |> group(columns: ["_measurement"])
          |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
        '''
        
        result = query_api.query(org=self.org, query=query)
        
        stats = {}
        for table in result:
            measurement = table.records[0].get_measurement()
            stats[measurement] = []
            for record in table.records:
                stats[measurement].append({
                    'time': record.get_time(),
                    'value': record.get_value()
                })
        
        return stats

# 使用示例
if __name__ == "__main__":
    collector = IoTDataCollector(
        url="http://localhost:8086",
        token="your-token",
        org="myorg",
        bucket="iot_sensors"
    )
    
    # 开始收集数据
    collector.collect_sensor_data("sensor_001", "warehouse_a")

2. 应用性能监控

go
package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

type APMCollector struct {
    client   influxdb2.Client
    writeAPI api.WriteAPIBlocking
    org      string
    bucket   string
}

func NewAPMCollector(url, token, org, bucket string) *APMCollector {
    client := influxdb2.NewClient(url, token)
    writeAPI := client.WriteAPIBlocking(org, bucket)
    
    return &APMCollector{
        client:   client,
        writeAPI: writeAPI,
        org:      org,
        bucket:   bucket,
    }
}

func (apm *APMCollector) RecordHTTPRequest(method, endpoint, status string, duration time.Duration) {
    point := influxdb2.NewPoint("http_requests",
        map[string]string{
            "method":   method,
            "endpoint": endpoint,
            "status":   status,
        },
        map[string]interface{}{
            "duration_ms": float64(duration.Nanoseconds()) / 1e6,
            "count":       1,
        },
        time.Now())
    
    err := apm.writeAPI.WritePoint(context.Background(), point)
    if err != nil {
        log.Printf("Error writing HTTP request metric: %v", err)
    }
}

func (apm *APMCollector) RecordDatabaseQuery(query, database string, duration time.Duration, rowsAffected int) {
    point := influxdb2.NewPoint("database_queries",
        map[string]string{
            "database": database,
            "query":    query,
        },
        map[string]interface{}{
            "duration_ms":    float64(duration.Nanoseconds()) / 1e6,
            "rows_affected": rowsAffected,
        },
        time.Now())
    
    err := apm.writeAPI.WritePoint(context.Background(), point)
    if err != nil {
        log.Printf("Error writing database query metric: %v", err)
    }
}

func (apm *APMCollector) RecordSystemMetrics() {
    // 模拟系统指标
    cpuUsage := rand.Float64() * 100
    memoryUsage := rand.Float64() * 100
    diskUsage := rand.Float64() * 100
    
    points := []*influxdb2.Point{
        influxdb2.NewPoint("system_metrics",
            map[string]string{"metric": "cpu_usage"},
            map[string]interface{}{"value": cpuUsage},
            time.Now()),
        
        influxdb2.NewPoint("system_metrics",
            map[string]string{"metric": "memory_usage"},
            map[string]interface{}{"value": memoryUsage},
            time.Now()),
        
        influxdb2.NewPoint("system_metrics",
            map[string]string{"metric": "disk_usage"},
            map[string]interface{}{"value": diskUsage},
            time.Now()),
    }
    
    err := apm.writeAPI.WritePoint(context.Background(), points...)
    if err != nil {
        log.Printf("Error writing system metrics: %v", err)
    }
}

func (apm *APMCollector) GetPerformanceReport(hours int) {
    queryAPI := apm.client.QueryAPI(apm.org)
    
    // HTTP 请求统计
    httpQuery := fmt.Sprintf(`
        from(bucket: "%s")
          |> range(start: -%dh)
          |> filter(fn: (r) => r._measurement == "http_requests")
          |> group(columns: ["endpoint", "status"])
          |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
    `, apm.bucket, hours)
    
    result, err := queryAPI.Query(context.Background(), httpQuery)
    if err != nil {
        log.Printf("Error querying HTTP metrics: %v", err)
        return
    }
    
    fmt.Println("HTTP Request Performance:")
    for result.Next() {
        record := result.Record()
        fmt.Printf("Endpoint: %s, Status: %s, Avg Duration: %.2f ms\n",
            record.ValueByKey("endpoint"),
            record.ValueByKey("status"),
            record.Value())
    }
}

func main() {
    apm := NewAPMCollector(
        "http://localhost:8086",
        "your-token",
        "myorg",
        "apm_metrics",
    )
    
    // 模拟应用指标收集
    go func() {
        for {
            // 模拟 HTTP 请求
            endpoints := []string{"/api/users", "/api/orders", "/api/products"}
            methods := []string{"GET", "POST", "PUT", "DELETE"}
            statuses := []string{"200", "404", "500"}
            
            endpoint := endpoints[rand.Intn(len(endpoints))]
            method := methods[rand.Intn(len(methods))]
            status := statuses[rand.Intn(len(statuses))]
            duration := time.Duration(rand.Intn(1000)) * time.Millisecond
            
            apm.RecordHTTPRequest(method, endpoint, status, duration)
            
            // 模拟数据库查询
            queries := []string{"SELECT", "INSERT", "UPDATE", "DELETE"}
            query := queries[rand.Intn(len(queries))]
            dbDuration := time.Duration(rand.Intn(500)) * time.Millisecond
            rowsAffected := rand.Intn(100)
            
            apm.RecordDatabaseQuery(query, "main_db", dbDuration, rowsAffected)
            
            time.Sleep(1 * time.Second)
        }
    }()
    
    // 定期收集系统指标
    go func() {
        for {
            apm.RecordSystemMetrics()
            time.Sleep(30 * time.Second)
        }
    }()
    
    // 定期生成报告
    ticker := time.NewTicker(5 * time.Minute)
    for range ticker.C {
        apm.GetPerformanceReport(1)
    }
}

3. 金融数据分析

txt
// 股票价格分析
stock_data = from(bucket: "financial_data")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "stock_price")
  |> filter(fn: (r) => r.symbol == "AAPL")

// 计算移动平均线
ma_5 = stock_data
  |> filter(fn: (r) => r._field == "close")
  |> aggregateWindow(every: 1d, fn: mean)
  |> timedMovingAverage(every: 1d, period: 5d)
  |> set(key: "_field", value: "ma_5")

ma_20 = stock_data
  |> filter(fn: (r) => r._field == "close")
  |> aggregateWindow(every: 1d, fn: mean)
  |> timedMovingAverage(every: 1d, period: 20d)
  |> set(key: "_field", value: "ma_20")

// 计算 RSI
rsi = stock_data
  |> filter(fn: (r) => r._field == "close")
  |> aggregateWindow(every: 1d, fn: last)
  |> difference(nonNegative: false)
  |> map(fn: (r) => ({
      r with
      gain: if r._value > 0.0 then r._value else 0.0,
      loss: if r._value < 0.0 then math.abs(x: r._value) else 0.0
  }))
  |> timedMovingAverage(every: 1d, period: 14d, column: "gain")
  |> timedMovingAverage(every: 1d, period: 14d, column: "loss")
  |> map(fn: (r) => ({
      r with
      rsi: 100.0 - (100.0 / (1.0 + (r.gain / r.loss)))
  }))
  |> keep(columns: ["_time", "symbol", "rsi"])
  |> set(key: "_field", value: "rsi")

// 合并所有指标
union(tables: [stock_data, ma_5, ma_20, rsi])
  |> sort(columns: ["_time"])
  |> yield()

// 交易信号检测
from(bucket: "financial_data")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "stock_price")
  |> filter(fn: (r) => r._field == "close")
  |> aggregateWindow(every: 1h, fn: last)
  |> map(fn: (r) => ({
      r with
      signal: if r._value > r.ma_20 and r.rsi < 30.0 then "BUY"
              else if r._value < r.ma_20 and r.rsi > 70.0 then "SELL"
              else "HOLD"
  }))
  |> filter(fn: (r) => r.signal != "HOLD")

最佳实践

1. 数据模型设计

  • 标签设计:使用有限集合的值,避免高基数
  • 字段设计:数值数据使用字段,元数据使用标签
  • 测量命名:使用描述性的测量名称
  • 时间精度:根据需要选择合适的时间精度

2. 性能优化

  • 批量写入:使用批量写入提高性能
  • 时间范围:查询时总是指定时间范围
  • 索引优化:合理设计标签以利用索引
  • 数据保留:设置合适的数据保留策略

3. 运维管理

  • 监控告警:监控关键指标和性能
  • 备份策略:定期备份重要数据
  • 容量规划:监控存储使用情况
  • 版本升级:制定升级计划和测试策略

4. 安全配置

  • 认证授权:启用认证和适当的权限控制
  • 网络安全:使用 HTTPS 和防火墙
  • 数据加密:考虑数据传输和存储加密
  • 审计日志:启用审计日志记录

总结

InfluxDB 是一个专为时间序列数据设计的高性能数据库,特别适合监控、IoT、金融分析等场景。通过合理的数据模型设计、查询优化和运维管理,InfluxDB 能够为时间序列数据应用提供强大的存储和分析能力。

优势

  • 专为时间序列数据优化
  • 高性能的写入和查询
  • 强大的数据压缩能力
  • 丰富的查询和分析功能
  • 良好的生态系统集成

适用场景

  • 系统监控和运维
  • IoT 数据收集和分析
  • 应用性能监控(APM)
  • 金融数据分析
  • 实时数据处理

通过深入理解 InfluxDB 的核心概念和最佳实践,可以充分发挥其在时间序列数据处理场景中的价值。