文章目录
- 1 摘要
- 2 背景
- 2.1 问题一:针对Influx V2.0工具
- 2.2 问题二:针对Influx查询语言
- 3 需求分析
- 4 快速入门
- 4.1 客户端驱动版本选择
- 4.2 连接influx
- 4.2.1 influx配置信息
- 4.2.2 influx连接配置
- 4.2.3 测试连通情况
- 5 Influx工具类
- 5.1 InfluxQL工具类
- 5.1.1 出现背景
- 5.1.2 InfluxQL工具类
- 5.2 插入数据工具类
1 摘要
摘要:InfluxQL;InfluxQL工具类;influxdb.java客户端
2 背景
2.1 问题一:针对Influx V2.0工具
针对新版Influx V2.0 版本数据库:
- 其一,influx支持两种查询语言,flux和InfluxQL,然后InfluxQL在高版本中已没有得到较好维护,因而,在后续开发中,笔者采用Influx V1.x 版本来进行开发。
2.2 问题二:针对Influx查询语言
针对使用新版Influx数据库,目前在其上做操做有两种方法,
- 其一,使用官方的UI工具,缺点:由于是可视化拼购操作,对操作有所限制。
- 其二,使用flux语言,flux是一种查询语言,其语法格式类似于R语言,具有管道符这些的形式,其也是官方所推荐的,然而由于没太使用过,此处不做展开,如有兴趣自己查询。
- 最后,也就是笔者所推荐的,InfluxQL其语法格式,高度切合于SQL语言,因而作为influx快速使用所推荐。
3 需求分析
正如,上述所描述问题,此处选择环境:
- 版本:influx v1.x
- 查询实现语言:InfluxQL
4 快速入门
4.1 客户端驱动版本选择
<!--influxDB--><dependency><groupId>org.influxdb</groupId><artifactId>influxdb-java</artifactId><version>2.19</version></dependency>
4.2 连接influx
4.2.1 influx配置信息
示例如下,可以优化:
spring:influxdb:url: yourURLdatabaseApply: databaseName1databaseTemp: databaseName2
4.2.2 influx连接配置
/*** influx配置读取*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.influxdb")
public class InfluxProperties {private String url; //influx访问URL
// private String user; //用户名
// private String password; //密码private String databaseApply; //应用数据库private String databaseTemp; //备份用数据库public InfluxDB getConnectionDatabaseApply() {return InfluxDBFactory.connect(url).setDatabase(databaseApply);}public InfluxDB getConnectionDatabaseTemp() {return InfluxDBFactory.connect(url).setDatabase(databaseTemp);}
}
4.2.3 测试连通情况
@Testpublic void testInfluxQLWithMoreTerm() {InfluxDB influxDB = influxProperties.getConnectionDatabaseTemp();System.out.println("influxDB.ping() = " + influxDB.ping());}
}
5 Influx工具类
5.1 InfluxQL工具类
5.1.1 出现背景
目前针对InfluxQL暂无诸如mybatis样的,持久层框架,因而此处提供一种工具类的解决方案。
核心思想:InfluxQL高度类似于SQL,因而我们把诸如select、group等着关键字,封装成工具类方法,以实现链式InfluxQL,便于后续开发于维护。
5.1.2 InfluxQL工具类
/*** InfluxQL构造器** @author jx* @date 2023/7/23 21:24*/public class InfluxQLQuery {private final StringBuilder queryBuilder;private String database;private InfluxQLQuery(String query) {this.queryBuilder = new StringBuilder(query);}public static InfluxQLQuery select(String... fields) {StringJoiner joiner = new StringJoiner(", ");for (String field : fields) {if (!field.isEmpty()) {joiner.add(field);}}return new InfluxQLQuery("SELECT " + joiner);}// SELECT max("A_Cond_Temp"), min("A_Feed_Pump_Power_Cons_Rate") FROM "JJjTEStljx" GROUP BY "A_Cond_Temp", "A_Feed_Pump_Power_Cons_Rate"public InfluxQLQuery select(List<AggregationType> aggregationTypes, String... fields) {//得到类型List<String> list = new ArrayList<>();for (AggregationType aggregationType : aggregationTypes) {list.add(aggregationType +"(\""+ fields +"\"");}return new InfluxQLQuery("SELECT " + list);}public InfluxQLQuery from(String measurement) {queryBuilder.append(" FROM ").append(measurement);return this;}public InfluxQLQuery where(String condition) {queryBuilder.append(" WHERE ").append(condition);return this;}public InfluxQLQuery groupBy(String grouping) {queryBuilder.append(" GROUP BY ").append(grouping);return this;}/*** 设置查询时,时间戳为上海时间** @return 上海时区*/public InfluxQLQuery setShanghaiTimeZone() {queryBuilder.append(" tz('Asia/Shanghai') ");return this;}public QueryResult execute(InfluxDB influxDB, String database) {Query queryObject = new Query(queryBuilder.toString(), database);return influxDB.query(queryObject);}public InfluxQLQuery selectMean(String field) {queryBuilder.append("MEAN(").append(field).append(")");return this;}public InfluxQLQuery selectMin(String field) {queryBuilder.append("min(").append(field).append(")");return this;}public InfluxQLQuery selectMax(String field) {queryBuilder.append("max(").append(field).append(")");return this;}public InfluxQLQuery selectSum(String field) {queryBuilder.append(" sum(").append(field).append(")");return this;}public InfluxQLQuery selectCount(String field) {queryBuilder.append(" count(").append(field).append(")");return this;}/*** 查询结果的最大数量** @param limit* @return*/public InfluxQLQuery limit(int limit) {queryBuilder.append(" LIMIT ").append(limit);return this;}/*** 设置当前操作的值。** @param interval* @return*/public InfluxQLQuery interval(String interval) {queryBuilder.append(" INTERVAL ").append(interval);return this;}/*** 设置查询条件中的标签。** @param tags* @return*/public InfluxQLQuery whereTags(Map<String, String> tags) {StringBuilder tagsBuilder = new StringBuilder();for (Map.Entry<String, String> entry : tags.entrySet()) {String tagKey = entry.getKey();String tagValue = entry.getValue();tagsBuilder.append(" \"").append(tagKey).append("\"='").append(tagValue).append("' AND");}// 删除最后的 ANDif (tagsBuilder.length() > 0) {tagsBuilder.setLength(tagsBuilder.length() - 4);}queryBuilder.append(" WHERE").append(tagsBuilder);return this;}/*** 设置填充策略。** @param value* @return*/public InfluxQLQuery fill(String value) {queryBuilder.append(" fill(").append(value).append(")");return this;}/*** 查询结果的排序方式。** @param field* @param direction* @return*/public InfluxQLQuery orderBy(String field, String direction) {queryBuilder.append(" ORDER BY ").append(field).append(" ").append(direction);return this;}/*** 设置结果集的返回数量上限。** @param limit* @return*/public InfluxQLQuery sLimit(int limit) {queryBuilder.append(" SLIMIT ").append(limit);return this;}/*** 设置结果集的偏移量。** @param offset* @return*/public InfluxQLQuery sOffset(int offset) {queryBuilder.append(" SOFFSET ").append(offset);return this;}/*** 指定写入的目标measurement** @param targetMeasurement* @return*/public InfluxQLQuery into(String targetMeasurement) {queryBuilder.append(" INTO ").append(targetMeasurement);return this;}/*** 指定写入数据时的标签** @param tagKey* @param tagValue* @return*/public InfluxQLQuery withTag(String tagKey, String tagValue) {queryBuilder.append(" WITH ").append(tagKey).append("=").append(tagValue);return this;}/*** 创建数据保留策略** @param policyName* @param duration* @param replication* @param isDefault* @return*/public InfluxQLQuery createRetentionPolicy(String policyName, String duration, String replication,boolean isDefault) {queryBuilder.append(" CREATE RETENTION POLICY ").append("\"").append(policyName).append("\"").append(" ON ").append(database).append(" DURATION ").append(duration).append(" REPLICATION ").append(replication);if (isDefault) {queryBuilder.append(" DEFAULT");}return this;}/*** 展示当前数据库的所有保留策略。** @return*/public InfluxQLQuery showRetentionPolicies() {queryBuilder.append(" SHOW RETENTION POLICIES").append(" ON ").append(database);return this;}/*** 删除指定的数据保留策略。** @param policyName* @return*/public InfluxQLQuery dropRetentionPolicy(String policyName) {queryBuilder.append(" DROP RETENTION POLICY ").append("\"").append(policyName).append("\"").append(" ON ").append(database);return this;}/*** 创建用户** @param username* @param password* @return*/public InfluxQLQuery createUser(String username, String password) {queryBuilder.append(" CREATE USER ").append("\"").append(username).append("\"").append(" WITH PASSWORD ").append("'").append(password).append("'");return this;}/*** 设置用户密码** @param username* @param password* @return*/public InfluxQLQuery setUserPassword(String username, String password) {queryBuilder.append(" SET PASSWORD FOR ").append("\"").append(username).append("\"").append(" = ").append("'").append(password).append("'");return this;}/*** 设置当前操作的数据库。** @param database* @return*/public InfluxQLQuery setDatabase(String database) {this.database = database;return this;}/*** 设置当前操作的measurement。** @param measurement* @return*/public InfluxQLQuery setMeasurement(String measurement) {queryBuilder.append(" ").append(measurement);return this;}/*** 设置当前操作的字段。** @param field* @return*/public InfluxQLQuery setField(String field) {queryBuilder.append(" ").append(field);return this;}/*** 设置当前操作的值。** @param value* @return*/public InfluxQLQuery setValue(String value) {queryBuilder.append(" = ").append(value);return this;}public InfluxQLQuery aggFunction(InfluxQLQuery influxQLQuery, TimeType timeType, String field) {if (influxQLQuery == null || timeType == null || field == null) {return null;}switch (timeType) {case MAX:influxQLQuery.selectMax(field);break;case AVG:influxQLQuery.selectMean(field);break;case MIN:influxQLQuery.selectMin(field);break;case SUM:influxQLQuery.selectSum(field);break;default:return null;}return influxQLQuery;}public static InfluxQLQuery groupBy(String... fields) {StringJoiner joiner = new StringJoiner(", ");for (String field : fields) {if (!field.isEmpty()) {joiner.add(field);}}return new InfluxQLQuery("SELECT " + joiner);}
}
5.2 插入数据工具类
注意:此处相关逻辑,需要依据入库实际需求而适应!
@Resourceprivate InfluxProperties influxProperties;/*** 插入influx应用数据库** @param assetId 表名* @param pointId 测点名* @param pointValue 测点值* @param time 消息时间戳*/public void intoInfluxApply(String assetId, String pointId, Double pointValue, Long time) {InfluxDB influxDBApply = influxProperties.getConnectionDatabaseApply();//创建要写入的数据点Map<String, Object> fields = new HashMap<>();fields.put(pointId, pointValue); //测单标识符,值// 写入数据Point point = Point.measurement(assetId)//表名.time(time, TimeUnit.MILLISECONDS)//时间戳.fields(fields)//添加一个字段的多个属性值.build();influxDBApply.write(influxProperties.getDatabaseApply(), "autogen", point);}/*** 插入influx备份数据库** @param assetId 表名* @param pointId 测点名* @param pointValue 测点值* @param time 消息时间戳*/public void intoInfluxTemp(String assetId, String pointId, Double pointValue, Long time) {InfluxDB influxDBApply = influxProperties.getConnectionDatabaseTemp();//创建要写入的数据点Map<String, Object> fields = new HashMap<>();fields.put(pointId, pointValue); //测单标识符,值// 写入数据Point point = Point.measurement(assetId)//表名.time(time, TimeUnit.MILLISECONDS)//时间戳.fields(fields)//添加一个字段的多个属性值.build();try {influxDBApply.write(influxProperties.getDatabaseTemp(), "autogen", point);} catch (Exception e) {log.info("{}", "写入influx临时数据库出现错误" + e.getMessage());}}