1、引入依赖
<dependency><groupId>org.apache.iotdb</groupId><artifactId>iotdb-session</artifactId><version>0.14.0-preview1</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.6.3</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency>
2、iotdb 配置工具类
@Log4j2
@Component
@Configuration
public class IotDBSessionConfig {private static Session session;private static final String LOCAL_HOST = "127.0.0.1"; //改为自己服务器ip@Beanpublic Session getSession() throws IoTDBConnectionException, StatementExecutionException {if (session == null) {log.info("正在连接iotdb.......");session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();session.open(false);session.setFetchSize(100);log.info("iotdb连接成功~");// 设置时区session.setTimeZone("+08:00");}return session;}
}
3、入参
@Data
public class IotDbParam {/**** 产品PK*/private String pk;/**** 设备号*/private String sn;/**** 时间*/private Long time;/**** 实时呼吸*/private String breath;/**** 实时心率*/private String heart;/**** 查询开始时间*/private String startTime;/**** 查询结束时间*/private String endTime;}
4、响应
@Data
public class IotDbResult {/**** 时间*/private String time;/**** 产品PK*/private String pk;/**** 设备号*/private String sn;/**** 实时呼吸*/private String breath;/**** 实时心率*/private String heart;}
5、控制层
@Log4j2
@RestController
public class IotDbController {@Resourceprivate IotDbServer iotDbServer;@Resourceprivate IotDBSessionConfig iotDBSessionConfig;/*** 插入数据* @param iotDbParam*/@PostMapping("/api/device/insert")public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {iotDbServer.insertData(iotDbParam);return ResponseData.success();}/*** 查询数据* @param iotDbParam*/@PostMapping("/api/device/queryData")public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam));}/*** 删除分组* @return*/@PostMapping("/api/device/deleteGroup")public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException {iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty");iotDBSessionConfig.deleteStorageGroup("root.smartretirement");return ResponseData.success();}}
6、接口
public interface IotDbServer {/*** 添加数据*/void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException;/*** 查询数据*/Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception;
}
7、实现层
@Log4j2
@Service
public class IotDbServerImpl implements IotDbServer {@Resourceprivate IotDBSessionConfig iotDBSessionConfig;@Overridepublic void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {// iotDbParam: 模拟设备上报消息// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码String deviceId = "root.bizkey."+ iotDbParam.getPk() + "." + iotDbParam.getSn();// 将设备上报的数据存入数据库(时序数据库)List<String> measurementsList = new ArrayList<>();measurementsList.add("heart");measurementsList.add("breath");List<String> valuesList = new ArrayList<>();valuesList.add(String.valueOf(iotDbParam.getHeart()));valuesList.add(String.valueOf(iotDbParam.getBreath()));iotDBSessionConfig.insertRecord(deviceId, iotDbParam.getTime(), measurementsList, valuesList);}@Overridepublic List<IotDbResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception {List<IotDbResult> iotDbResultList = new ArrayList<>();if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) {String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= "+ iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime();SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);List<String> columnNames = sessionDataSet.getColumnNames();List<String> titleList = new ArrayList<>();// 排除Time字段 -- 方便后面后面拼装数据for (int i = 1; i < columnNames.size(); i++) {String[] temp = columnNames.get(i).split("\\.");titleList.add(temp[temp.length - 1]);}// 封装处理数据packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);} else {log.info("PK或者SN不能为空!!");}return iotDbResultList;}/*** 封装处理数据* @param iotDbParam* @param iotDbResultList* @param sessionDataSet* @param titleList* @throws StatementExecutionException* @throws IoTDBConnectionException*/private void packagingData(IotDbParam iotDbParam, List<IotDbResult> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)throws StatementExecutionException, IoTDBConnectionException {int fetchSize = sessionDataSet.getFetchSize();if (fetchSize > 0) {while (sessionDataSet.hasNext()) {IotDbResult iotDbResult = new IotDbResult();RowRecord next = sessionDataSet.next();List<Field> fields = next.getFields();String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());iotDbResult.setTime(timeString);Map<String, String> map = new HashMap<>();for (int i = 0; i < fields.size(); i++) {Field field = fields.get(i);// 这里的需要按照类型获取map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());}iotDbResult.setTime(timeString);iotDbResult.setPk(iotDbParam.getPk());iotDbResult.setSn(iotDbParam.getSn());iotDbResult.setHeart(map.get("heart"));iotDbResult.setBreath(map.get("breath"));iotDbResultList.add(iotDbResult);}}}
}
8、测试api
1.添加一条记录
接口:localhost:8080/api/device/insert
入参:
{"time":1660573444672,"pk":"a1TTQK9TbKT","sn":"SN202208120945QGJLD","breath":"17","heart":"68"
}
2.根据SQL查询时间区间记录
接口:localhost:8080/api/device/queryData
入参:
{"pk":"a1TTQK9TbKT","sn":"SN202208120945QGJLD","startTime":"2022-08-14 00:00:00","endTime":"2022-08-16 00:00:00"
}
结果