1. influxDB连接
首先用InfluxDBStudio软件连接influxDB数据库来查看所有表:
2. 写sql语句来查询数据
然后和平时写sql查询语句一样,先创建连接client,然后调用其query函数来查询获取数据
self.client = influxdb.InfluxDBClient(host=influx_host, port=influx_port, username=influx_username, password=influx_password, database=influx_database, timeout=influx_timeout)
query_sql = f"""select * from {table_name} \
where point = '{temperature_point_id}'\
and nowTime >= {begin_time} and nowTime < {end_time}"""
tables = self.client.query(query_sql)
3. 向influxDB数据库插入数据
插入数据时无需提前新建表,只需要指定表名直接插入即可:
self.influx_conn()
if data:
for i in range(len(data)):
data[i].update({"measurement": table_name}) # 加上表名
data[i].update({"time": now_time}) # 加上时间
self.client.write_points(data)
else:
print("data is None!")
4. 完整代码(code)
import influxdb
import pandas as pd
import datetime
import timeinflux_host = "132.12.29.18"
influx_port = 8086
influx_username = "admin"
influx_password = "abc32100"
influx_database = "dqcloud"
influx_timeout = 10def get_time_str(time_stamp): # 时间戳转字符串time_array = time.localtime(int(time_stamp) / 1000)time_str = time.strftime("%Y-%m-%d %H:%M:%S", time_array)return time_strclass InfluxInfo(object):def __init__(self):passdef influx_conn(self):self.client = influxdb.InfluxDBClient(host=influx_host, port=influx_port, username=influx_username,password=influx_password, database=influx_database, timeout=influx_timeout)def query_data(self, table_name, temperature_point_id, begin_time, end_time):self.influx_conn()if type == "query":# point:测温点id,num:测温点编号,lastTime:上次测温时间,nowTime:巡检时间,v:温度值,tr:温升值query_sql = f"""select * from {table_name} \where point = '{temperature_point_id}'\and nowTime >= {begin_time} and nowTime < {end_time}"""tables = self.client.query(query_sql)if len(tables) > 0:return tables._get_series()else:return []def insert_data(self, table_name, now_time, data=None):self.influx_conn()if data:for i in range(len(data)):data[i].update({"measurement": table_name}) # 加上表名data[i].update({"time": now_time}) # 加上时间self.client.write_points(data)else:print("data is None!")def get_data_by_tum(query_rslt):""""time": 数据插入时间(时序库自带的,不是自己插入的)"log": "巡检记录id""point": "测温点id""num": "测温点编号""lastTime": "上次测温时间""nowTime": "巡检时间""v": "温度值""tr": "温升值""overTempAlert": "超温告警:0-否,1-是""overTempWarning": "超温预警:0-否,1-是""tempRiseAlert": "温升告警:0-否,1-是""""timelist = [] #nowTime_list = []lastTime_list = []tr_list = []valuelist = []if len(query_rslt) > 0:result = query_rslt[0]['values']for r in result:if len(r) > 4 and r[-1]:timestr = r[0].split(".")[0]timestr = timestr.replace("T", " ")timelist.append(timestr)lastTime_list.append(get_time_str(r[1]))nowTime_list.append(get_time_str(r[3]))tr_list.append(float(r[-2]))valuelist.append(float(r[-1]))if len(timelist) > 0:returndata = pd.DataFrame({'time': timelist, 'lastTime': lastTime_list, 'nowTime': nowTime_list, 'tr': tr_list, 'value': valuelist})else:returndata = pd.DataFrame()return returndataif __name__ == '__main__':info = InfluxInfo()# "point": "测温点id"# "nowTime": "预测时间"# "v": "温度值"# "tr": "温升值"# "overTempWarning": "超温预警:0-否,1-是"# "tempRiseWarning": "温升预警:0-否,1-是"data = [{"tags": { # 标签,元数据信息和标识数据的键值对"point": "85231",},"fields": { # 字段,实际的数值数据"predictTime": 1698289692244,"v": 44,"tr": 0.64,"overTempWarning": 0,"tempRiseWarning": 0}},{"tags": {"point": "85232",},"fields": {"predictTime": 1698289692292,"v": 45,"tr": 1.69,"overTempWarning": 0,"tempRiseWarning": 1}}]now_time = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%dT%H:%M:%SZ")info.insert_data("point_predict_data", now_time, data)