# wget https://dl.influxdata.com/influxdb/releases/influxdb-1.7.8.x86_64.rpm
# sudo yum localinstall influxdb-1.7.8.x86_64.rpm
#sudo systemctl start influxdb
配置开机启动
#sudo systemctl enable influxdb
#开放端口
# firewall-cmd --add-port=8086/tcp --permanent
通过systemctl status firewalld查看firewalld状态。
通过systemctl start firewalld开启防火墙,没有任何提示即开启成功。
再次通过systemctl status firewalld查看firewalld状态。
如果要关闭防火墙设置,过systemctl stop firewalld这条指令来关闭该功能。
# influx
show databases
create database tableName
drop databese tableName
use tableName
show measurements
drop measurement 表名
create user influx with password ‘influxdb’ (创建一个普通用户,密码为‘influxdb’)
create user “admin” with password ‘admin’ with all privileges(创建一个管理员用户,给他所有权限)
show users
set password for influx = ‘influx’
drop user admin
grant all privileges to influx(授权管理员权限)
revoke all privileges from influx
返回一个(field)字段中的非空值的数量。
返回一个字段(field)的唯一值。
返回一个字段(field)中的值的算术平均值(平均值)。字段类型必须是长整型或float64
从单个字段(field)中的排序值返回中间值(中位数)。字段值的类型必须是长整型或float64格式。
返回字段的最小值和最大值之间的差值。数据的类型必须是长整型或float64。
返回一个字段中的所有值的和。字段的类型必须是长整型或float64。
作用:返回一个字段中最小的N个值。字段类型必须是长整型或float64类型。
作用:返回一个字段中最老的取值。
作用:返回一个字段中最新的取值。
作用:返回一个字段中的最大值。该字段类型必须是长整型,float64,或布尔类型。
作用:返回一个字段中的最小值。该字段类型必须是长整型,float64,或布尔类型。
作用:返回排序值排位为N的百分值。字段的类型必须是长整型或float64。
作用:返回一个字段在一个series中的变化率。(这个函数还是看官网舒服点,反正我没怎么看懂)
作用:返回一个字段中连续的时间值之间的差异。字段类型必须是长整型或float64。
作用:返回一个字段在连续的时间间隔间的差异,间隔单位可选,默认为1纳秒。
作用:返回一个连续字段值的移动平均值,字段类型必须是长整形或者float64类型。
作用:返回在一个series中的一个字段中值的变化的非负速率。
作用:返回一个字段中的值的标准偏差。值的类型必须是长整型或float64类型。
wget https://dl.grafana.com/enterprise/release/grafana-enterprise-8.1.3-1.x86_64.rpm
sudo yum install grafana-enterprise-8.1.3-1.x86_64.rpm
sudo systemctl daemon-reload
sudo systemctl start grafana-server
sudo systemctl status grafana-server
sudo systemctl enable grafana-server
1、pom.xml引入相关jar文件,如下: <!-- 引入influxdb依赖 --> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.8</version> </dependency> 2、influxDB工具类封装: package com.mt.core.util; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.influxdb.InfluxDB; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Point.Builder; import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import lombok.Data; ? /** * InfluxDB数据库连接操作类 * * @author Simon */ ? public class InfluxDBConnection { ? // 用户名 private String username; // 密码 private String password; // 连接地址 private String openurl; // 数据库 private String database; // 保留策略 private String retentionPolicy; ? private InfluxDB influxDB; ? public InfluxDBConnection(String username, String password, String openurl, String database, String retentionPolicy) { this.username = username; this.password = password; this.openurl = openurl; this.database = database; this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy; influxDbBuild(); } /** * 创建数据库 * * @param dbName */ @SuppressWarnings("deprecation") public void createDB(String dbName) { influxDB.createDatabase(dbName); } /** * 删除数据库 * * @param dbName */ @SuppressWarnings("deprecation") public void deleteDB(String dbName) { influxDB.deleteDatabase(dbName); } /** * 测试连接是否正常 * * @return true 正常 */ public boolean ping() { boolean isConnected = false; Pong pong; try { pong = influxDB.ping(); if (pong != null) { isConnected = true; } } catch (Exception e) { e.printStackTrace(); } return isConnected; } ? /** * 连接时序数据库 ,若不存在则创建 * * @return */ public InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(openurl, username, password); } try { // if (!influxDB.databaseExists(database)) { // influxDB.createDatabase(database); // } } catch (Exception e) { // 该数据库可能设置动态代理,不支持创建数据库 // e.printStackTrace(); } finally { influxDB.setRetentionPolicy(retentionPolicy); } influxDB.setLogLevel(InfluxDB.LogLevel.NONE); return influxDB; } ? /** * 创建自定义保留策略 * * @param policyName * 策略名 * @param duration * 保存天数 * @param replication * 保存副本数量 * @param isDefault * 是否设为默认保留策略 */ public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) { String sql = String.format("CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %s ", policyName, database, duration, replication); if (isDefault) { sql = sql + " DEFAULT"; } this.query(sql); } /** * 创建默认的保留策略 * * @param 策略名:default,保存天数:30天,保存副本数量:1 * 设为默认保留策略 */ public void createDefaultRetentionPolicy() { String command = String.format("CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %s DEFAULT", "default", database, "30d", 1); this.query(command); } ? /** * 查询 * * @param command * 查询语句 * @return */ public QueryResult query(String command) { return influxDB.query(new Query(command, database)); } ? /** * 插入 * * @param measurement * 表 * @param tags * 标签 * @param fields * 字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time, TimeUnit timeUnit) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); if (0 != time) { builder.time(time, timeUnit); } influxDB.write(database, retentionPolicy, builder.build()); } ? /** * 批量写入测点 * * @param batchPoints */ public void batchInsert(BatchPoints batchPoints) { influxDB.write(batchPoints); // influxDB.enableGzip(); // influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS); // influxDB.disableGzip(); // influxDB.disableBatch(); } /** * 批量写入数据 * * @param database * 数据库 * @param retentionPolicy * 保存策略 * @param consistency * 一致性 * @param records * 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) */ public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List<String> records) { influxDB.write(database, retentionPolicy, consistency, records); } /** * 删除 * * @param command * 删除语句 * @return 返回错误信息 */ public String deleteMeasurementData(String command) { QueryResult result = influxDB.query(new Query(command, database)); return result.getError(); } ? /** * 关闭数据库 */ public void close() { influxDB.close(); } ? /** * 构建Point * * @param measurement * @param time * @param fields * @return */ public Point pointBuilder(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) { Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build(); return point; } ? } 3、查询数据 InfluxDB支持一次查询多个SQL,SQL之间用逗号隔开即可。 public static void main(String[] args) { InfluxDBConnection influxDBConnection = new InfluxDBConnection("root", "Password01", "localhost", "devops", "tk_test"); QueryResult results = influxDBConnection .query("SELECT * FROM mt order by time desc limit 1000"); //results.getResults()是同时查询多条SQL语句的返回值,此处我们只有一条SQL,所以只取第一个结果集即可。 Result oneResult = results.getResults().get(0); if (oneResult.getSeries() != null) { List<List<Object>> valueList = oneResult.getSeries().stream().map(Series::getValues) .collect(Collectors.toList()).get(0); if (valueList != null && valueList.size() > 0) { for (List<Object> value : valueList) { Map<String, String> map = new HashMap<String, String>(); // 数据库中字段1取值 String field1 = value.get(0) == null ? null : value.get(0).toString(); // 数据库中字段2取值 String field2 = value.get(1) == null ? null : value.get(1).toString(); // TODO 用取出的字段做你自己的业务逻辑…… } } } } 4、插入数据 InfluxDB的字段类型,由第一条插入的值得类型决定;tags的类型只能是String型,可以作为索引,提高检索速度。 public static void main(String[] args) { InfluxDBConnection influxDBConnection = new InfluxDBConnection("root", "Password01", "localhost", "devops", "tk_test"); Map<String, String> tags = new HashMap<String, String>(); tags.put("tag1", "标签值"); Map<String, Object> fields = new HashMap<String, Object>(); fields.put("field1", "String类型"); // 数值型,InfluxDB的字段类型,由第一天插入的值得类型决定 fields.put("field2", 3.141592657); // 时间使用毫秒为单位 influxDBConnection.insert("表名", tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
Linux下安装Influxdb1.7.8并整合Grafana与Java
原文:https://www.cnblogs.com/Ldengfeng/p/15250299.html