Appearance
InfluxDB 详解
InfluxDB 是一个专为时间序列数据设计的开源数据库,由 InfluxData 公司开发。它针对时间序列数据的存储、查询和分析进行了优化,广泛应用于监控、IoT、实时分析等场景。
概述
核心概念
- 时间序列 (Time Series):按时间顺序排列的数据点序列
- 数据点 (Point):包含时间戳、字段和标签的数据记录
- 时间戳 (Timestamp):数据点的时间标识,精确到纳秒
- 字段 (Field):实际的数值数据,支持多种数据类型
- 标签 (Tag):用于索引和分组的键值对,只能是字符串
- 测量 (Measurement):类似关系数据库中的表
- 数据库 (Database):测量的容器
- 保留策略 (Retention Policy):定义数据保留时间和副本数
- 分片 (Shard):数据的时间分区
- 序列 (Series):共享测量、标签集和字段键的数据点集合
主要特性
- 高性能写入:针对时间序列数据优化的存储引擎
- 压缩存储:高效的数据压缩算法
- SQL-like 查询语言:InfluxQL 和 Flux 查询语言
- 自动数据过期:基于时间的数据保留策略
- 连续查询:自动执行的定期查询
- 数据降采样:自动聚合历史数据
- 集群支持:水平扩展能力(企业版)
- 丰富的客户端库:支持多种编程语言
版本对比
特性 | InfluxDB 1.x | InfluxDB 2.x |
---|---|---|
查询语言 | InfluxQL | Flux |
存储引擎 | TSM | TSM |
用户界面 | Chronograf | 内置 UI |
认证授权 | 基本认证 | Token 认证 |
组织管理 | 数据库 | 组织/桶 |
API | HTTP API | HTTP API v2 |
安装与配置
安装方式
1. 官方安装包
bash
# macOS
brew install influxdb
brew install influxdb-cli
# Ubuntu/Debian
wget -qO- https://repos.influxdata.com/influxdb.key | sudo apt-key add -
echo "deb https://repos.influxdata.com/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/influxdb.list
sudo apt-get update
sudo apt-get install influxdb2
# CentOS/RHEL
cat <<EOF | sudo tee /etc/yum.repos.d/influxdb.repo
[influxdb]
name = InfluxDB Repository - RHEL
baseurl = https://repos.influxdata.com/rhel/\$releasever/\$basearch/stable
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdb.key
EOF
sudo yum install influxdb2
2. Docker 安装
bash
# InfluxDB 2.x
docker run --name influxdb \
-p 8086:8086 \
-v influxdb-data:/var/lib/influxdb2 \
-v influxdb-config:/etc/influxdb2 \
influxdb:2.7
# 带初始化配置
docker run --name influxdb \
-p 8086:8086 \
-e DOCKER_INFLUXDB_INIT_MODE=setup \
-e DOCKER_INFLUXDB_INIT_USERNAME=admin \
-e DOCKER_INFLUXDB_INIT_PASSWORD=password123 \
-e DOCKER_INFLUXDB_INIT_ORG=myorg \
-e DOCKER_INFLUXDB_INIT_BUCKET=mybucket \
-v influxdb-data:/var/lib/influxdb2 \
influxdb:2.7
# InfluxDB 1.x
docker run --name influxdb1 \
-p 8086:8086 \
-v influxdb1-data:/var/lib/influxdb \
influxdb:1.8
3. Docker Compose
yaml
# docker-compose.yml
version: '3.8'
services:
influxdb:
image: influxdb:2.7
container_name: influxdb
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=password123
- DOCKER_INFLUXDB_INIT_ORG=myorg
- DOCKER_INFLUXDB_INIT_BUCKET=mybucket
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token
volumes:
- influxdb-data:/var/lib/influxdb2
- influxdb-config:/etc/influxdb2
restart: unless-stopped
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-data:/var/lib/grafana
restart: unless-stopped
depends_on:
- influxdb
volumes:
influxdb-data:
influxdb-config:
grafana-data:
基本配置
InfluxDB 2.x 配置
toml
# /etc/influxdb2/config.toml
# HTTP 服务配置
http-bind-address = ":8086"
http-read-header-timeout = "10s"
http-read-timeout = "0"
http-write-timeout = "0"
http-idle-timeout = "3m"
http-max-body-size = 25000000
# 存储配置
engine-path = "/var/lib/influxdb2/engine"
bolt-path = "/var/lib/influxdb2/influxd.bolt"
# 日志配置
log-level = "info"
# 查询配置
query-concurrency = 1024
query-queue-size = 1024
query-memory-bytes = 0
# 存储引擎配置
storage-cache-max-memory-size = 1073741824
storage-cache-snapshot-memory-size = 26214400
storage-cache-snapshot-write-cold-duration = "10m"
storage-compact-full-write-cold-duration = "4h"
storage-compact-throughput-burst = 50331648
storage-max-concurrent-compactions = 0
storage-max-index-log-file-size = 1048576
storage-series-id-set-cache-size = 100
# TLS 配置
# tls-cert = "/path/to/cert.pem"
# tls-key = "/path/to/key.pem"
InfluxDB 1.x 配置
toml
# /etc/influxdb/influxdb.conf
[meta]
dir = "/var/lib/influxdb/meta"
retention-autocreate = true
logging-enabled = true
[data]
dir = "/var/lib/influxdb/data"
wal-dir = "/var/lib/influxdb/wal"
query-log-enabled = true
cache-max-memory-size = 1073741824
cache-snapshot-memory-size = 26214400
cache-snapshot-write-cold-duration = "10m"
compact-full-write-cold-duration = "4h"
max-series-per-database = 1000000
max-values-per-tag = 100000
[coordinator]
write-timeout = "10s"
max-concurrent-queries = 0
query-timeout = "0s"
log-queries-after = "0s"
max-select-point = 0
max-select-series = 0
max-select-buckets = 0
[retention]
enabled = true
check-interval = "30m"
[shard-precreation]
enabled = true
check-interval = "10m"
advance-period = "30m"
[monitor]
store-enabled = true
store-database = "_internal"
store-interval = "10s"
[subscriber]
enabled = true
http-timeout = "30s"
insecure-skip-verify = false
ca-certs = ""
write-concurrency = 40
write-buffer-size = 1000
[http]
enabled = true
bind-address = ":8086"
auth-enabled = false
log-enabled = true
write-tracing = false
pprof-enabled = true
https-enabled = false
https-certificate = "/etc/ssl/influxdb.pem"
https-private-key = "/etc/ssl/influxdb-key.pem"
max-row-limit = 0
max-connection-limit = 0
shared-secret = ""
realm = "InfluxDB"
unix-socket-enabled = false
bind-socket = "/var/run/influxdb.sock"
基本操作
InfluxDB 2.x 操作
初始化设置
bash
# 命令行初始化
influx setup \
--username admin \
--password password123 \
--org myorg \
--bucket mybucket \
--force
# 创建认证令牌
influx auth create \
--org myorg \
--all-access \
--description "Admin token"
# 查看组织
influx org list
# 创建组织
influx org create --name neworg
# 查看桶
influx bucket list
# 创建桶
influx bucket create \
--name sensors \
--org myorg \
--retention 30d
写入数据
bash
# 使用 influx CLI 写入
influx write \
--bucket mybucket \
--org myorg \
--token $INFLUX_TOKEN \
'temperature,location=room1 value=23.5 1640995200000000000'
# 从文件写入
echo 'temperature,location=room1 value=24.0
temperature,location=room2 value=22.5' | \
influx write \
--bucket mybucket \
--org myorg \
--token $INFLUX_TOKEN
# 使用 HTTP API 写入
curl -XPOST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
-H "Authorization: Token $INFLUX_TOKEN" \
-H "Content-Type: text/plain; charset=utf-8" \
--data-binary '
temperature,location=room1 value=23.5
humidity,location=room1 value=65.2
temperature,location=room2 value=22.8
humidity,location=room2 value=68.1
'
查询数据
bash
# 使用 Flux 查询
influx query \
--org myorg \
--token $INFLUX_TOKEN \
'from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> mean()'
# 使用 HTTP API 查询
curl -XPOST "http://localhost:8086/api/v2/query?org=myorg" \
-H "Authorization: Token $INFLUX_TOKEN" \
-H "Content-Type: application/vnd.flux" \
--data 'from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")'
InfluxDB 1.x 操作
数据库管理
sql
-- 连接到 InfluxDB
influx -precision rfc3339
-- 创建数据库
CREATE DATABASE sensors
-- 查看数据库
SHOW DATABASES
-- 使用数据库
USE sensors
-- 创建保留策略
CREATE RETENTION POLICY "one_week" ON "sensors" DURATION 7d REPLICATION 1 DEFAULT
-- 查看保留策略
SHOW RETENTION POLICIES ON sensors
-- 删除数据库
DROP DATABASE sensors
写入数据
sql
-- 插入单条数据
INSERT temperature,location=room1 value=23.5
-- 插入多条数据
INSERT temperature,location=room1 value=23.5 1640995200000000000
INSERT temperature,location=room2 value=22.8 1640995200000000000
INSERT humidity,location=room1 value=65.2 1640995200000000000
-- 使用 HTTP API 写入
curl -i -XPOST 'http://localhost:8086/write?db=sensors' \
--data-binary '
temperature,location=room1 value=23.5
humidity,location=room1 value=65.2
temperature,location=room2 value=22.8
humidity,location=room2 value=68.1
'
查询数据
sql
-- 查询所有数据
SELECT * FROM temperature
-- 时间范围查询
SELECT * FROM temperature WHERE time >= now() - 1h
-- 条件查询
SELECT * FROM temperature WHERE location = 'room1' AND time >= now() - 1h
-- 聚合查询
SELECT mean(value) FROM temperature WHERE time >= now() - 1h GROUP BY time(10m)
-- 多测量查询
SELECT mean(temperature.value), mean(humidity.value)
FROM temperature, humidity
WHERE time >= now() - 1h
GROUP BY time(10m)
数据模型和 Line Protocol
Line Protocol 格式
measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
示例数据
bash
# 基本格式
temperature,location=room1 value=23.5
# 带时间戳
temperature,location=room1 value=23.5 1640995200000000000
# 多个标签和字段
weather,location=beijing,station=airport temperature=25.3,humidity=60.2,pressure=1013.25
# 字符串字段(需要引号)
log,level=error,service=api message="Connection failed",count=1i
# 不同数据类型
sensor,device=sensor01 temperature=23.5,humidity=65i,active=true,location="room1"
数据类型
类型 | 示例 | 说明 |
---|---|---|
Float | 23.5 | 默认数值类型 |
Integer | 65i | 整数,需要 i 后缀 |
String | "room1" | 字符串,需要引号 |
Boolean | true | 布尔值 |
Timestamp | 1640995200000000000 | 纳秒时间戳 |
Flux 查询语言
基本语法
txt
// 基本查询结构
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> yield()
// 时间范围
from(bucket: "mybucket")
|> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-02T00:00:00Z)
// 相对时间
from(bucket: "mybucket")
|> range(start: -24h, stop: -1h)
过滤和转换
txt
// 过滤条件
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "room1")
|> filter(fn: (r) => r._value > 20.0)
// 字段选择
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> keep(columns: ["_time", "_value", "location"])
// 字段重命名
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> rename(columns: {_value: "temperature"})
// 数据转换
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> map(fn: (r) => ({ r with celsius: (r._value - 32.0) * 5.0 / 9.0 }))
聚合函数
txt
// 基本聚合
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> mean()
// 时间窗口聚合
from(bucket: "mybucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "temperature")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
// 分组聚合
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> group(columns: ["location"])
|> mean()
// 多个聚合
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> group(columns: ["location"])
|> aggregateWindow(every: 10m, fn: mean)
|> yield(name: "mean")
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> group(columns: ["location"])
|> aggregateWindow(every: 10m, fn: max)
|> yield(name: "max")
高级查询
txt
// 连接查询
temp = from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> aggregateWindow(every: 5m, fn: mean)
humidity = from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "humidity")
|> aggregateWindow(every: 5m, fn: mean)
join(tables: {temp: temp, humidity: humidity}, on: ["_time", "location"])
// 数学运算
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> map(fn: (r) => ({ r with
fahrenheit: r._value * 9.0 / 5.0 + 32.0,
kelvin: r._value + 273.15
}))
// 条件逻辑
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> map(fn: (r) => ({ r with
status: if r._value > 25.0 then "hot"
else if r._value < 15.0 then "cold"
else "normal"
}))
// 排序和限制
from(bucket: "mybucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "temperature")
|> sort(columns: ["_value"], desc: true)
|> limit(n: 10)
InfluxQL 查询语言(1.x)
基本查询
sql
-- 选择所有字段
SELECT * FROM temperature
-- 选择特定字段
SELECT value FROM temperature
-- 带条件查询
SELECT * FROM temperature WHERE location = 'room1'
-- 时间范围查询
SELECT * FROM temperature WHERE time >= now() - 1h
-- 复合条件
SELECT * FROM temperature
WHERE location = 'room1' AND time >= now() - 1h AND value > 20
聚合函数
sql
-- 基本聚合
SELECT mean(value) FROM temperature WHERE time >= now() - 1h
-- 多个聚合
SELECT mean(value), max(value), min(value), count(value)
FROM temperature
WHERE time >= now() - 1h
-- 时间分组
SELECT mean(value) FROM temperature
WHERE time >= now() - 1h
GROUP BY time(10m)
-- 标签分组
SELECT mean(value) FROM temperature
WHERE time >= now() - 1h
GROUP BY location
-- 时间和标签分组
SELECT mean(value) FROM temperature
WHERE time >= now() - 1h
GROUP BY time(10m), location
高级查询
sql
-- 子查询
SELECT mean(max_temp) FROM (
SELECT max(value) as max_temp FROM temperature
WHERE time >= now() - 1h
GROUP BY time(10m)
)
-- 数学运算
SELECT value * 9 / 5 + 32 as fahrenheit FROM temperature
-- 条件函数
SELECT value,
CASE
WHEN value > 25 THEN 'hot'
WHEN value < 15 THEN 'cold'
ELSE 'normal'
END as status
FROM temperature
-- 正则表达式
SELECT * FROM temperature WHERE location =~ /room[0-9]+/
-- 排序和限制
SELECT * FROM temperature
WHERE time >= now() - 1h
ORDER BY time DESC
LIMIT 10
连续查询
sql
-- 创建连续查询
CREATE CONTINUOUS QUERY "cq_mean_temp" ON "sensors"
BEGIN
SELECT mean(value) INTO "average_temperature"
FROM "temperature"
GROUP BY time(1h), location
END
-- 查看连续查询
SHOW CONTINUOUS QUERIES
-- 删除连续查询
DROP CONTINUOUS QUERY "cq_mean_temp" ON "sensors"
-- 带保留策略的连续查询
CREATE CONTINUOUS QUERY "cq_downsample" ON "sensors"
BEGIN
SELECT mean(value) INTO "sensors"."one_year"."temperature_hourly"
FROM "sensors"."one_week"."temperature"
GROUP BY time(1h), *
END
性能优化
数据模型优化
bash
# 合理设计标签和字段
# 好的设计
temperature,sensor_id=001,location=room1,building=A value=23.5
# 避免高基数标签
# 不好的设计(timestamp 作为标签)
temperature,sensor_id=001,timestamp=1640995200 value=23.5
# 标签值应该是有限集合
# 好的设计
temperature,location=room1,status=normal value=23.5
# 不好的设计(无限增长的标签值)
temperature,location=room1,request_id=uuid-12345 value=23.5
写入优化
bash
# 批量写入
curl -XPOST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
-H "Authorization: Token $INFLUX_TOKEN" \
--data-binary @data.txt
# 使用 gzip 压缩
curl -XPOST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
-H "Authorization: Token $INFLUX_TOKEN" \
-H "Content-Encoding: gzip" \
--data-binary @data.txt.gz
# 并发写入(注意控制并发数)
for i in {1..10}; do
(
curl -XPOST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
-H "Authorization: Token $INFLUX_TOKEN" \
--data-binary @data_${i}.txt
) &
done
wait
查询优化
txt
// 使用时间范围过滤
from(bucket: "mybucket")
|> range(start: -1h) // 总是指定时间范围
|> filter(fn: (r) => r._measurement == "temperature")
// 早期过滤
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature") // 先过滤测量
|> filter(fn: (r) => r.location == "room1") // 再过滤标签
|> filter(fn: (r) => r._value > 20.0) // 最后过滤值
// 避免不必要的分组
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> mean() // 直接聚合,不需要 group()
// 使用 aggregateWindow 而不是 window + aggregate
from(bucket: "mybucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "temperature")
|> aggregateWindow(every: 1h, fn: mean) // 推荐
存储优化
toml
# 配置文件优化
[data]
# 缓存大小
cache-max-memory-size = 1073741824 # 1GB
# 压缩设置
compact-full-write-cold-duration = "4h"
# WAL 设置
wal-fsync-delay = "0s"
# 索引设置
max-series-per-database = 1000000
max-values-per-tag = 100000
监控和运维
系统监控
txt
// 查看系统指标
from(bucket: "_monitoring")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "influxdb_database")
|> filter(fn: (r) => r._field == "numSeries")
// 查看写入速率
from(bucket: "_monitoring")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "influxdb_write")
|> filter(fn: (r) => r._field == "pointReq")
|> derivative(unit: 1s, nonNegative: true)
// 查看查询性能
from(bucket: "_monitoring")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "influxdb_queryExecutor")
|> filter(fn: (r) => r._field == "queriesFinished")
健康检查
bash
# InfluxDB 2.x 健康检查
curl -f http://localhost:8086/health
# 检查准备状态
curl -f http://localhost:8086/ready
# 查看指标
curl http://localhost:8086/metrics
# InfluxDB 1.x 健康检查
curl -f http://localhost:8086/ping
# 查看统计信息
curl http://localhost:8086/debug/vars
备份和恢复
bash
# InfluxDB 2.x 备份
influx backup /path/to/backup --org myorg
# 恢复
influx restore /path/to/backup --org myorg --new-org neworg
# InfluxDB 1.x 备份
influxd backup -portable /path/to/backup
# 恢复
influxd restore -portable /path/to/backup
# 在线备份(1.x)
influxd backup -database sensors /path/to/backup
# 增量备份
influxd backup -database sensors -since 2024-01-01T00:00:00Z /path/to/backup
客户端库使用
Python 客户端
python
# InfluxDB 2.x Python 客户端
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
# 连接配置
token = "your-token"
org = "myorg"
bucket = "mybucket"
url = "http://localhost:8086"
client = InfluxDBClient(url=url, token=token, org=org)
# 写入数据
write_api = client.write_api(write_options=SYNCHRONOUS)
# 使用 Point 对象
point = Point("temperature") \
.tag("location", "room1") \
.field("value", 23.5) \
.time(datetime.datetime.utcnow(), WritePrecision.NS)
write_api.write(bucket=bucket, org=org, record=point)
# 使用字典
data = {
"measurement": "temperature",
"tags": {"location": "room1"},
"fields": {"value": 23.5},
"time": datetime.datetime.utcnow()
}
write_api.write(bucket=bucket, org=org, record=data)
# 批量写入
points = []
for i in range(100):
point = Point("temperature") \
.tag("location", f"room{i%5}") \
.field("value", 20 + i * 0.1) \
.time(datetime.datetime.utcnow() + datetime.timedelta(seconds=i))
points.append(point)
write_api.write(bucket=bucket, org=org, record=points)
# 查询数据
query_api = client.query_api()
query = f'''
from(bucket: "{bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "room1")
'''
result = query_api.query(org=org, query=query)
for table in result:
for record in table.records:
print(f"Time: {record.get_time()}, Value: {record.get_value()}")
# 关闭连接
client.close()
Go 客户端
go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
func main() {
// 连接配置
token := "your-token"
url := "http://localhost:8086"
org := "myorg"
bucket := "mybucket"
// 创建客户端
client := influxdb2.NewClient(url, token)
defer client.Close()
// 写入数据
writeAPI := client.WriteAPIBlocking(org, bucket)
// 创建数据点
p := influxdb2.NewPoint("temperature",
map[string]string{"location": "room1"},
map[string]interface{}{"value": 23.5},
time.Now())
// 写入
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
log.Fatal(err)
}
// 批量写入
points := make([]*influxdb2.Point, 0)
for i := 0; i < 100; i++ {
p := influxdb2.NewPoint("temperature",
map[string]string{"location": fmt.Sprintf("room%d", i%5)},
map[string]interface{}{"value": 20.0 + float64(i)*0.1},
time.Now().Add(time.Duration(i)*time.Second))
points = append(points, p)
}
err = writeAPI.WritePoint(context.Background(), points...)
if err != nil {
log.Fatal(err)
}
// 查询数据
queryAPI := client.QueryAPI(org)
query := fmt.Sprintf(`
from(bucket: "%s")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "room1")
`, bucket)
result, err := queryAPI.Query(context.Background(), query)
if err != nil {
log.Fatal(err)
}
for result.Next() {
fmt.Printf("Time: %v, Value: %v\n",
result.Record().Time(),
result.Record().Value())
}
if result.Err() != nil {
log.Fatal(result.Err())
}
}
Node.js 客户端
javascript
const { InfluxDB, Point } = require('@influxdata/influxdb-client');
// 连接配置
const token = 'your-token';
const url = 'http://localhost:8086';
const org = 'myorg';
const bucket = 'mybucket';
const client = new InfluxDB({ url, token });
// 写入数据
const writeApi = client.getWriteApi(org, bucket);
writeApi.useDefaultTags({ host: 'server01' });
// 创建数据点
const point = new Point('temperature')
.tag('location', 'room1')
.floatField('value', 23.5)
.timestamp(new Date());
writeApi.writePoint(point);
// 批量写入
for (let i = 0; i < 100; i++) {
const point = new Point('temperature')
.tag('location', `room${i % 5}`)
.floatField('value', 20 + i * 0.1)
.timestamp(new Date(Date.now() + i * 1000));
writeApi.writePoint(point);
}
// 提交写入
writeApi.close().then(() => {
console.log('Write completed');
}).catch(err => {
console.error('Write failed', err);
});
// 查询数据
const queryApi = client.getQueryApi(org);
const query = `
from(bucket: "${bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "room1")
`;
queryApi.queryRows(query, {
next(row, tableMeta) {
const o = tableMeta.toObject(row);
console.log(`Time: ${o._time}, Value: ${o._value}`);
},
error(error) {
console.error('Query failed', error);
},
complete() {
console.log('Query completed');
},
});
实际应用案例
1. IoT 传感器数据收集
python
# IoT 数据收集系统
import random
import time
from datetime import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
class IoTDataCollector:
def __init__(self, url, token, org, bucket):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.org = org
self.bucket = bucket
def collect_sensor_data(self, sensor_id, location):
"""模拟传感器数据收集"""
while True:
# 模拟传感器数据
temperature = random.uniform(18.0, 30.0)
humidity = random.uniform(40.0, 80.0)
pressure = random.uniform(990.0, 1030.0)
# 创建数据点
points = [
Point("temperature")
.tag("sensor_id", sensor_id)
.tag("location", location)
.field("value", temperature)
.time(datetime.utcnow(), WritePrecision.NS),
Point("humidity")
.tag("sensor_id", sensor_id)
.tag("location", location)
.field("value", humidity)
.time(datetime.utcnow(), WritePrecision.NS),
Point("pressure")
.tag("sensor_id", sensor_id)
.tag("location", location)
.field("value", pressure)
.time(datetime.utcnow(), WritePrecision.NS)
]
# 写入数据
try:
self.write_api.write(bucket=self.bucket, org=self.org, record=points)
print(f"Data written for sensor {sensor_id}: T={temperature:.1f}°C, H={humidity:.1f}%, P={pressure:.1f}hPa")
except Exception as e:
print(f"Error writing data: {e}")
time.sleep(10) # 每10秒收集一次数据
def get_sensor_stats(self, sensor_id, hours=24):
"""获取传感器统计信息"""
query_api = self.client.query_api()
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -{hours}h)
|> filter(fn: (r) => r.sensor_id == "{sensor_id}")
|> group(columns: ["_measurement"])
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
'''
result = query_api.query(org=self.org, query=query)
stats = {}
for table in result:
measurement = table.records[0].get_measurement()
stats[measurement] = []
for record in table.records:
stats[measurement].append({
'time': record.get_time(),
'value': record.get_value()
})
return stats
# 使用示例
if __name__ == "__main__":
collector = IoTDataCollector(
url="http://localhost:8086",
token="your-token",
org="myorg",
bucket="iot_sensors"
)
# 开始收集数据
collector.collect_sensor_data("sensor_001", "warehouse_a")
2. 应用性能监控
go
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/influxdata/influxdb-client-go/v2"
)
type APMCollector struct {
client influxdb2.Client
writeAPI api.WriteAPIBlocking
org string
bucket string
}
func NewAPMCollector(url, token, org, bucket string) *APMCollector {
client := influxdb2.NewClient(url, token)
writeAPI := client.WriteAPIBlocking(org, bucket)
return &APMCollector{
client: client,
writeAPI: writeAPI,
org: org,
bucket: bucket,
}
}
func (apm *APMCollector) RecordHTTPRequest(method, endpoint, status string, duration time.Duration) {
point := influxdb2.NewPoint("http_requests",
map[string]string{
"method": method,
"endpoint": endpoint,
"status": status,
},
map[string]interface{}{
"duration_ms": float64(duration.Nanoseconds()) / 1e6,
"count": 1,
},
time.Now())
err := apm.writeAPI.WritePoint(context.Background(), point)
if err != nil {
log.Printf("Error writing HTTP request metric: %v", err)
}
}
func (apm *APMCollector) RecordDatabaseQuery(query, database string, duration time.Duration, rowsAffected int) {
point := influxdb2.NewPoint("database_queries",
map[string]string{
"database": database,
"query": query,
},
map[string]interface{}{
"duration_ms": float64(duration.Nanoseconds()) / 1e6,
"rows_affected": rowsAffected,
},
time.Now())
err := apm.writeAPI.WritePoint(context.Background(), point)
if err != nil {
log.Printf("Error writing database query metric: %v", err)
}
}
func (apm *APMCollector) RecordSystemMetrics() {
// 模拟系统指标
cpuUsage := rand.Float64() * 100
memoryUsage := rand.Float64() * 100
diskUsage := rand.Float64() * 100
points := []*influxdb2.Point{
influxdb2.NewPoint("system_metrics",
map[string]string{"metric": "cpu_usage"},
map[string]interface{}{"value": cpuUsage},
time.Now()),
influxdb2.NewPoint("system_metrics",
map[string]string{"metric": "memory_usage"},
map[string]interface{}{"value": memoryUsage},
time.Now()),
influxdb2.NewPoint("system_metrics",
map[string]string{"metric": "disk_usage"},
map[string]interface{}{"value": diskUsage},
time.Now()),
}
err := apm.writeAPI.WritePoint(context.Background(), points...)
if err != nil {
log.Printf("Error writing system metrics: %v", err)
}
}
func (apm *APMCollector) GetPerformanceReport(hours int) {
queryAPI := apm.client.QueryAPI(apm.org)
// HTTP 请求统计
httpQuery := fmt.Sprintf(`
from(bucket: "%s")
|> range(start: -%dh)
|> filter(fn: (r) => r._measurement == "http_requests")
|> group(columns: ["endpoint", "status"])
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
`, apm.bucket, hours)
result, err := queryAPI.Query(context.Background(), httpQuery)
if err != nil {
log.Printf("Error querying HTTP metrics: %v", err)
return
}
fmt.Println("HTTP Request Performance:")
for result.Next() {
record := result.Record()
fmt.Printf("Endpoint: %s, Status: %s, Avg Duration: %.2f ms\n",
record.ValueByKey("endpoint"),
record.ValueByKey("status"),
record.Value())
}
}
func main() {
apm := NewAPMCollector(
"http://localhost:8086",
"your-token",
"myorg",
"apm_metrics",
)
// 模拟应用指标收集
go func() {
for {
// 模拟 HTTP 请求
endpoints := []string{"/api/users", "/api/orders", "/api/products"}
methods := []string{"GET", "POST", "PUT", "DELETE"}
statuses := []string{"200", "404", "500"}
endpoint := endpoints[rand.Intn(len(endpoints))]
method := methods[rand.Intn(len(methods))]
status := statuses[rand.Intn(len(statuses))]
duration := time.Duration(rand.Intn(1000)) * time.Millisecond
apm.RecordHTTPRequest(method, endpoint, status, duration)
// 模拟数据库查询
queries := []string{"SELECT", "INSERT", "UPDATE", "DELETE"}
query := queries[rand.Intn(len(queries))]
dbDuration := time.Duration(rand.Intn(500)) * time.Millisecond
rowsAffected := rand.Intn(100)
apm.RecordDatabaseQuery(query, "main_db", dbDuration, rowsAffected)
time.Sleep(1 * time.Second)
}
}()
// 定期收集系统指标
go func() {
for {
apm.RecordSystemMetrics()
time.Sleep(30 * time.Second)
}
}()
// 定期生成报告
ticker := time.NewTicker(5 * time.Minute)
for range ticker.C {
apm.GetPerformanceReport(1)
}
}
3. 金融数据分析
txt
// 股票价格分析
stock_data = from(bucket: "financial_data")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "stock_price")
|> filter(fn: (r) => r.symbol == "AAPL")
// 计算移动平均线
ma_5 = stock_data
|> filter(fn: (r) => r._field == "close")
|> aggregateWindow(every: 1d, fn: mean)
|> timedMovingAverage(every: 1d, period: 5d)
|> set(key: "_field", value: "ma_5")
ma_20 = stock_data
|> filter(fn: (r) => r._field == "close")
|> aggregateWindow(every: 1d, fn: mean)
|> timedMovingAverage(every: 1d, period: 20d)
|> set(key: "_field", value: "ma_20")
// 计算 RSI
rsi = stock_data
|> filter(fn: (r) => r._field == "close")
|> aggregateWindow(every: 1d, fn: last)
|> difference(nonNegative: false)
|> map(fn: (r) => ({
r with
gain: if r._value > 0.0 then r._value else 0.0,
loss: if r._value < 0.0 then math.abs(x: r._value) else 0.0
}))
|> timedMovingAverage(every: 1d, period: 14d, column: "gain")
|> timedMovingAverage(every: 1d, period: 14d, column: "loss")
|> map(fn: (r) => ({
r with
rsi: 100.0 - (100.0 / (1.0 + (r.gain / r.loss)))
}))
|> keep(columns: ["_time", "symbol", "rsi"])
|> set(key: "_field", value: "rsi")
// 合并所有指标
union(tables: [stock_data, ma_5, ma_20, rsi])
|> sort(columns: ["_time"])
|> yield()
// 交易信号检测
from(bucket: "financial_data")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "stock_price")
|> filter(fn: (r) => r._field == "close")
|> aggregateWindow(every: 1h, fn: last)
|> map(fn: (r) => ({
r with
signal: if r._value > r.ma_20 and r.rsi < 30.0 then "BUY"
else if r._value < r.ma_20 and r.rsi > 70.0 then "SELL"
else "HOLD"
}))
|> filter(fn: (r) => r.signal != "HOLD")
最佳实践
1. 数据模型设计
- 标签设计:使用有限集合的值,避免高基数
- 字段设计:数值数据使用字段,元数据使用标签
- 测量命名:使用描述性的测量名称
- 时间精度:根据需要选择合适的时间精度
2. 性能优化
- 批量写入:使用批量写入提高性能
- 时间范围:查询时总是指定时间范围
- 索引优化:合理设计标签以利用索引
- 数据保留:设置合适的数据保留策略
3. 运维管理
- 监控告警:监控关键指标和性能
- 备份策略:定期备份重要数据
- 容量规划:监控存储使用情况
- 版本升级:制定升级计划和测试策略
4. 安全配置
- 认证授权:启用认证和适当的权限控制
- 网络安全:使用 HTTPS 和防火墙
- 数据加密:考虑数据传输和存储加密
- 审计日志:启用审计日志记录
总结
InfluxDB 是一个专为时间序列数据设计的高性能数据库,特别适合监控、IoT、金融分析等场景。通过合理的数据模型设计、查询优化和运维管理,InfluxDB 能够为时间序列数据应用提供强大的存储和分析能力。
优势
- 专为时间序列数据优化
- 高性能的写入和查询
- 强大的数据压缩能力
- 丰富的查询和分析功能
- 良好的生态系统集成
适用场景
- 系统监控和运维
- IoT 数据收集和分析
- 应用性能监控(APM)
- 金融数据分析
- 实时数据处理
通过深入理解 InfluxDB 的核心概念和最佳实践,可以充分发挥其在时间序列数据处理场景中的价值。