本片文章不讲怎么安装,只讲安装后如何用JAVA代码操作库表
- 1.创建数据库
- 2.为bucket添加TELEGRAF配置
- 3.TELEGRAF配置参数说明
- 4.配置数据库的访问权限API TOKENS
- 5.JAVA代码操作库表
- 5.1 yaml
- 5.2 pom依赖
- 5.3 config
- 5.4 controller
- 5.5 查询方法、结果集提取方法
1.创建数据库
Influxdb2.x是有管理界面平台的,以本地为例,游览器访问 :http://127.0.0.1:8086,登录后,即可看到该界面,根据图片顺序操作即可
这里的bucket(桶)就是数据库
选择(配置)数据库数据保存策略
2.为bucket添加TELEGRAF配置
这里选择第1步创建的数据库
来源这里数据sys后会自动筛选,点击即可
点击后,右下角的创建按钮会亮起,点击按钮进行配置
数据库的配置文件名称后,点击保存和测试,配置内容不需要自己填写会自动生成
保存后出现该界面代表创建完成,会返回给两个配置信息:export INFLUX_TOKEN 和 telegraf --config
点击后关闭界面
点击配置文件名称会打开配置文件的内容
配置内容如下:
# Configuration for telegraf agent
[agent]## Default data collection interval for all inputsinterval = "10s"## Rounds collection interval to 'interval'## ie, if interval="10s" then always collect on :00, :10, :20, etc.round_interval = true## Telegraf will send metrics to outputs in batches of at most## metric_batch_size metrics.## This controls the size of writes that Telegraf sends to output plugins.metric_batch_size = 1000## Maximum number of unwritten metrics per output. Increasing this value## allows for longer periods of output downtime without dropping metrics at the## cost of higher maximum memory usage.metric_buffer_limit = 10000## Collection jitter is used to jitter the collection by a random amount.## Each plugin will sleep for a random time within jitter before collecting.## This can be used to avoid many plugins querying things like sysfs at the## same time, which can have a measurable effect on the system.collection_jitter = "0s"## Default flushing interval for all outputs. Maximum flush_interval will be## flush_interval + flush_jitterflush_interval = "10s"## Jitter the flush interval by a random amount. This is primarily to avoid## large write spikes for users running a large number of telegraf instances.## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15sflush_jitter = "0s"## By default or when set to "0s", precision will be set to the same## timestamp order as the collection interval, with the maximum being 1s.## ie, when interval = "10s", precision will be "1s"## when interval = "250ms", precision will be "1ms"## Precision will NOT be used for service inputs. It is up to each individual## service input to set the timestamp at the appropriate precision.## Valid time units are "ns", "us" (or "µs"), "ms", "s".precision = ""## Log at debug level.# debug = false## Log only error level messages.# quiet = false## Log target controls the destination for logs and can be one of "file",## "stderr" or, on Windows, "eventlog". When set to "file", the output file## is determined by the "logfile" setting.# logtarget = "file"## Name of the file to be logged to when using the "file" logtarget. If set to## the empty string then logs are written to stderr.# logfile = ""## The logfile will be rotated after the time interval specified. When set## to 0 no time based rotation is performed. Logs are rotated only when## written to, if there is no log activity rotation may be delayed.# logfile_rotation_interval = "0d"## The logfile will be rotated when it becomes larger than the specified## size. When set to 0 no size based rotation is performed.# logfile_rotation_max_size = "0MB"## Maximum number of rotated archives to keep, any older logs are deleted.## If set to -1, no archives are removed.# logfile_rotation_max_archives = 5## Pick a timezone to use when logging or type 'local' for local time.## Example: America/Chicago# log_with_timezone = ""## Override default hostname, if empty use os.Hostname()hostname = ""## If set to true, do no set the "host" tag in the telegraf agent.omit_hostname = false
[[outputs.influxdb_v2]]## The URLs of the InfluxDB cluster nodes.#### Multiple URLs can be specified for a single cluster, only ONE of the## urls will be written to each interval.## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]urls = ["http://127.0.0.1:8086"]## Token for authentication.token = "$INFLUX_TOKEN"## Organization is the name of the organization you wish to write to; must exist.organization = "org"## Destination bucket to write into.bucket = "db2"## The value of this tag will be used to determine the bucket. If this## tag is not set the 'bucket' option is used as the default.# bucket_tag = ""## If true, the bucket tag will not be added to the metric.# exclude_bucket_tag = false## Timeout for HTTP messages.# timeout = "5s"## Additional HTTP headers# http_headers = {"X-Special-Header" = "Special-Value"}## HTTP Proxy override, if unset values the standard proxy environment## variables are consulted to determine which proxy, if any, should be used.# http_proxy = "http://corporate.proxy:3128"## HTTP User-Agent# user_agent = "telegraf"## Content-Encoding for write request body, can be set to "gzip" to## compress body or "identity" to apply no encoding.# content_encoding = "gzip"## Enable or disable uint support for writing uints influxdb 2.0.# influx_uint_support = false## Optional TLS Config for use on HTTP connections.# tls_ca = "/etc/telegraf/ca.pem"# tls_cert = "/etc/telegraf/cert.pem"# tls_key = "/etc/telegraf/key.pem"## Use TLS but skip chain & host verification# insecure_skip_verify = false
# Read metrics about system load & uptime
[[inputs.system]]# no configuration
3.TELEGRAF配置参数说明
这四个key的值就对应的是JAVA应用程序中yaml中的配置的四个属性值,分别是url、token、org、bucket
注意:2.x版本是通过这四个属性来访问的,不再是账号和密码了
其中token需要提一嘴,token的值就是第二步创建完配置文件后返回的两个配置文件中的 export INFLUX_TOKEN
得到这四个配置属性后就可以操作数据库了吗 ???
NONONO,网上的资料比较杂乱,很多文章并没有讲到这一步,我是在这一步踩坑了,继续往看
经过测试发现了问题,注意这个TOKEN是数据库配置的TOKEN虽然可以连接到数据库并成功插入数据,但是并不具备访问的权限的,也就是说只能保存不能进行其他操作。查询报错:HTTP status code: 404; Message: failed to initialize execute state: could not find bucket “XX”
应用程序通过依赖中的API来访问的库,报错的原因其实就是缺少了最重要的API访问权限配置,网上的资料里没讲这块,贼坑
4.配置数据库的访问权限API TOKENS
勾选需要通过API访问的库和库的配置文件,其他权限根据自己情况来
点击创建后,会弹出生成的API访问的TOKENS,该TOKENS直接替换掉yaml配置文件中的token即可
5.JAVA代码操作库表
5.1 yaml
#influx配置
influx2:url: http://127.0.0.1:8086token: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX==写自己的org: orgbucket: db2
5.2 pom依赖
这里我没选择高版本依赖是因为和项目中的依赖存在冲突,高版本依赖提供了对2.x以上版本的兼容API
高版本和低版本的依赖都可以操作2.x版本,这里根据自己的实际情况来决定即可
<!--InfluxDB--><dependency><groupId>com.influxdb</groupId><artifactId>influxdb-client-java</artifactId><!--<version>6.9.0</version>--><version>3.0.1</version></dependency>
5.3 config
package net.influx.com.config;import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author luo zhuo tao* @create 2023/8/29*/
@Configuration
@ConfigurationProperties(prefix = "influx2")
public class InfluxdbConfig {private static final Logger logger = LoggerFactory.getLogger(InfluxdbConfig.class);private String url;private String token;private String org;private String bucket;@Beanpublic InfluxDBClient influxDBClient(){InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url,token.toCharArray(),org,bucket);//日志级别可用可不用influxDBClient.setLogLevel(LogLevel.BASIC);if (influxDBClient.ping()){logger.info("InfluxDB时序数据库2.x---------------------------------------------连接成功!");}else {logger.info("InfluxDB时序数据库2.x---------------------------------------------连接失败!");}return influxDBClient;}public void setUrl(String url) {this.url = url;}public void setToken(String token) {this.token = token;}public void setOrg(String org) {this.org = org;}public void setBucket(String bucket) {this.bucket = bucket;}
}
5.4 controller
package net.influx.com.controller;import com.alibaba.fastjson.JSON;
import com.influxdb.client.*;
import com.influxdb.client.domain.InfluxQLQuery;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxTable;
import com.influxdb.query.InfluxQLQueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;/*** @author luo zhuo tao* @create 2023/8/29*/@RestController
@RequestMapping("influxdb")
public class InfluxdbController {private static final Logger logger = LoggerFactory.getLogger(InfluxdbController.class);@Resourceprivate InfluxDBClient influxDBClient;@Value("${influx2.org:''}")private String org;@Value("${influx2.bucket:''}")private String bucket;private String table = "test1";@GetMapping("test")public String test() {/*** 写入:WriteApiBlocking 同步写入API WriteApi 异步写入API*/if (false) {WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking();Point point = Point.measurement(table).addField(String.valueOf(System.currentTimeMillis()), UUID.randomUUID().toString()).time(Instant.now(), WritePrecision.NS);writeApiBlocking.writePoint(point);}/*** 查询:QueryApi 同步查询API InfluxQLQueryApi SQL查询API*/if (false){InfluxQLQueryApi influxQLQueryApi = influxDBClient.getInfluxQLQueryApi();InfluxQLQuery influxQLQuery = new InfluxQLQuery("SELECT * FROM test1", bucket);InfluxQLQueryResult query = influxQLQueryApi.query(influxQLQuery);logger.info("query:{}", JSON.toJSONString(query));findAll();}/*** 删除*/DeleteApi deleteApi = influxDBClient.getDeleteApi();deleteApi.delete(OffsetDateTime.now(), OffsetDateTime.now(),"",bucket,org);return "查询成功";}/*** @param measurement 表名*/public void save(String measurement) {WriteOptions writeOptions = WriteOptions.builder().batchSize(5000).flushInterval(1000).bufferLimit(10000).jitterInterval(1000).retryInterval(5000).build();try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {Point point = Point.measurement(measurement).addField("MMSI".concat(UUID.randomUUID().toString()), UUID.randomUUID().toString()).time(Instant.now(), WritePrecision.NS);writeApi.writePoint(bucket, org, point);}}public List<FluxTable> findAll() {String flux = "from(bucket: \"db3\")\n" +" |> range(start:0)\n" +" |> filter(fn: (r) => r[\"_measurement\"] == \"test1\")\n" +" |> yield(name: \"mean\")";QueryApi queryApi = influxDBClient.getQueryApi();List<FluxTable> tables = queryApi.query(flux, org);logger.info("tables:{}", JSON.toJSONString(tables));return tables;}
}
5.5 查询方法、结果集提取方法
这里用了两种方式查询,一个是直接通过key查、一个是根据时间维度查询,具体的自己去研究flux语法这里不详细讲
package net.superlucy.departure.monitor.app.service.impl;import cn.hutool.core.collection.CollectionUtil;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import net.superlucy.departure.monitor.app.service.InfluxdbService;
import net.superlucy.departure.monitor.app.util.CommonUtil;
import net.superlucy.departure.monitor.dto.enums.InfluxdbEnum;
import net.superlucy.departure.monitor.dto.model.DepartureShipPosition;
import org.apache.commons.compress.utils.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @author luo zhuo tao* @create 2023/9/4*/
@Service
public class InfluxdbServiceImpl implements InfluxdbService {private static final Logger logger = LoggerFactory.getLogger(InfluxdbServiceImpl.class);/*** 通过MMSI号查询SQL:响应单条数据*/private String queryValueFluxOne = "from(bucket: \"%s\") " +"|> range(start: %s) " +"|> filter(fn: (r) => r._measurement == \"%s\" and r._field == \"%s\")" +"" +"";/*** 通过时间范围查询SQL:响应多条数据*/private String queryValueFluxTwo = "from(bucket: \"%s\") " +"|> range(start: %s) " +"|> filter(fn: (r) => r._measurement == \"%s\")" +"" +"";@Resourceprivate InfluxDBClient influxDBClient;@Value("${influx2.org:''}")private String org;@Value("${influx2.bucket:''}")private String bucket;@Overridepublic Map<String, Object> findOne(InfluxdbEnum influxdbEnum, String mmsi) {String flux = String.format(queryValueFluxOne, bucket, 0, influxdbEnum.getValue(), mmsi);QueryApi queryApi = influxDBClient.getQueryApi();List<FluxTable> tables = queryApi.query(flux, org);return qryVal(tables);}public Map<String, Object> qryVal(List<FluxTable> tables) {Map<String, Object> map = new HashMap<>();if (CollectionUtil.isNotEmpty(tables)) {for (FluxTable table : tables) {List<FluxRecord> records = table.getRecords();for (FluxRecord fluxRecord : records) {map.put("value", fluxRecord.getValue().toString());map.put("field", fluxRecord.getField());map.put("valueTime", Date.from(fluxRecord.getTime()));}}}return map;}@Overridepublic List<Map<String, Object>> findList(InfluxdbEnum influxdbEnum, String date) {String flux = String.format(queryValueFluxTwo, bucket, date, influxdbEnum.getValue());QueryApi queryApi = influxDBClient.getQueryApi();List<FluxTable> tables = queryApi.query(flux, org);return qryValList(tables);}@Overridepublic Map<String, DepartureShipPosition> getDynamicList(InfluxdbEnum influxdbEnum, String date) {String flux = String.format(queryValueFluxTwo, bucket, date, influxdbEnum.getValue());QueryApi queryApi = influxDBClient.getQueryApi();List<FluxTable> tables = queryApi.query(flux, org);return dynamicList(tables);}/*** 查询所有船舶最新位置信息* @param tables* @return*/private Map<String, DepartureShipPosition> dynamicList(List<FluxTable> tables) {Map<String, DepartureShipPosition> map = new HashMap<>();if (CollectionUtil.isNotEmpty(tables)) {for (FluxTable table : tables) {List<FluxRecord> records = table.getRecords();//直接用时间维度查询,会出// 现同一个Field多条数据的情况,这里只需要最新的数据,时间的排序是从远到近的,所以直接拿最后一条即可FluxRecord fluxRecord = records.get(records.size() - 1);DepartureShipPosition position = new DepartureShipPosition();String mmsi = fluxRecord.getField();String value = fluxRecord.getValue().toString();/*** 动态格式转换方法是我自己业务里面的方法,不用管* String mmsi = fluxRecord.getField();* String value = fluxRecord.getValue().toString();* 这两个get方法是已经获取到存储的数据结果了,后续处理根据自己业务需求来即可*/// 动态格式转换DepartureShipPosition dynamic = CommonUtil.dynamic(position, value);map.put(mmsi,dynamic);}}return map;}/**** @param tables* @return*/public List<Map<String, Object>> qryValList(List<FluxTable> tables) {List<Map<String, Object>> mapList = Lists.newArrayList();if (CollectionUtil.isNotEmpty(tables)) {for (FluxTable table : tables) {List<FluxRecord> records = table.getRecords();//直接用时间维度查询,会出现同一个Field多条数据的情况,这里只需要最新的数据,时间的排序是从远到近的,所以直接拿最后一条即可FluxRecord fluxRecord = records.get(records.size() - 1);Map<String, Object> map = new HashMap<>(1);map.put("value", fluxRecord.getValue().toString());map.put("field", fluxRecord.getField());map.put("valueTime", Date.from(fluxRecord.getTime()));mapList.add(map);}}return mapList;}/*** @param measurement 表名* @param k MMSI号* @param v ASI数据*/@Overridepublic void save(String measurement, String k, String v) {WriteOptions writeOptions = WriteOptions.builder().batchSize(5000).flushInterval(1000).bufferLimit(10000).jitterInterval(1000).retryInterval(5000).build();try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {Point point = Point.measurement(measurement).addField(k, v).time(Instant.now(), WritePrecision.NS);writeApi.writePoint(bucket, org, point);}}
}