import json
import os
from enum import Enumclass LaneDirectionType(int, Enum):LaneDirectionType_Unknown = -1 # 类型未知OneWay = 1 # 单向TwoWay = 2 # 双向# 颜色类型
class ColorCombo(int, Enum):NOUSE = 0 # 默认值UNKNOWN = 1000 # 未定义WHITE = 1 # 白色(默认值)YELLOW = 2 # 黄色ORANGE = 3 # 橙色BLUE = 4 # 蓝色GREEN = 5 # 绿色LEFT_WHITE_RIGHT_YELLOW = 6 # 左白右黄LEFT_YELLOW_RIGHT_WHITE = 7 # 左黄右白RED = 8 # 红色class LaneTurnType(int, Enum):NOUSE = 0 # 默认值UNKNOWN = 1000 # 未定义AHEAD = 1 # 直行LEFT = 2 # 左转RIGHT = 3 # 右转U_TURN = 4 # 掉头class ObjectLaneType(int, Enum):NOUSE = 0 # 默认值UNKNOWN = 1000 # 未分类VIRTUAL_WIRE = 1 # 虚拟线THICK_DASHED_LINE_SEGMENT = 2 # 粗虚线段SINGLE_DASHED_LINE = 3 # 单虚线SINGLE_SOLID_LINE = 4 # 单实线DOUBLE_DASHED_LINE = 5 # 双虚线DOUBLE_SOLID_LINE = 6 # 双实线LEFT_SOLID_RIGHT_DASHED_LINE = 7 # 左实右虚线RIGHT_SOLID_LEFT_DASHED_LINE = 8 # 右实左虚线FOUR_SOLID_LINE = 9 # 四实线class LongitudinalType(int, Enum):COMMON = 0 # 常规标线DISTANCE_CONFIRM_LINE = 1 # 白色半圆状车距确认线LOW_SPEED_LINE = 2 # 车行道纵向减速标线GORE_AREA_LINE = 3 # 导流区边线NO_PARKING_LINE = 4 # 禁停区边线PARKING_LINE = 5 # 停车位边线VARIABLE_GUIDANCE_LINE = 6 # 可变导向车道线# 路边条带枚举定义
class ObjectFenceType(int, Enum):NOUSE = 0 # 默认值UNKNOWN = 1000 # 未分类CURB = 1 # 路缘石GUARDRAIL = 2 # 护栏WALL = 3 # 墙体GEOGRAPHICAL_BOUNDARTES = 4 # 地理边界GREENBELTS = 5 # 绿化带OTHER_HARD_ISOLATION = 6 # 其它硬隔离PARKING_POST = 7 # 停车场柱子def read_data(input_data):#./data3转换/output/semanticfeature = []for _file_name in os.scandir(input_data):#会遍历该目录下的所有文件和子目录with open(_file_name, encoding='utf-8') as fh:feature_collection = json.loads(fh.read())#fh.read()会读取文件的所有内容作为字符串feature.append(feature_collection)return featuredef transform_line_properties(id="", groupid="", color=ColorCombo.NOUSE, color_tf=100, type=ObjectLaneType.NOUSE,type_tf=100,longitudinal_type=str(LongitudinalType.COMMON.value), aggregation_count=1,taskid="0", update_time=0):properties = {"id": str(id),"groupid": groupid,"color": color,# 置信度默认赋值 100"color_tf": color_tf,"type": type,# 置信度默认赋值 100"type_tf": type_tf,"longitudinal_type": longitudinal_type,# 聚类次数默认赋值 1"aggregation_count": aggregation_count,"taskid": taskid,"update_time": update_time}return propertiesdef transform_boundary_properties(id="", type=6, type_tf=100, aggregation_count=1, taskid="0", update_time=0):properties = {"id": str(id),"type": type,# 置信度默认赋值 100"type_tf": type_tf,# 聚类次数默认赋值 1"aggregation_count": aggregation_count,"taskid": taskid,"update_time": update_time}return propertiesdef transform_trajectory_properties(id=0,lanenode_id_s=-1,lanenode_id_e=-1,speed=0,turn_type=LaneTurnType.NOUSE,collect_num=1,direction=LaneDirectionType.LaneDirectionType_Unknown,taskid="0",update_time=0):properties = {"id": id,"lanenode_id_s": lanenode_id_s,"lanenode_id_e": lanenode_id_e,"speed": speed,"turn_type": turn_type,"collect_num": collect_num,"direction": direction,"taskid": taskid,"update_time": update_time}return propertiesdef transform_line(data):# if isinstance(data["properties"]["longitudinal_type"], list):# longitudinal_type = ','.join([str(x) for x in data["properties"]["longitudinal_type"]])# elif isinstance(data["properties"]["longitudinal_type"], str):# longitudinal_type = data["properties"]["longitudinal_type"]# else:# longitudinal_type = ""new_properties = transform_line_properties(id=data["properties"]["id"],color=data["properties"]["color"],color_tf=data["properties"]["color_tf"],type=ObjectLaneType.SINGLE_DASHED_LINE)data["properties"] = new_propertiesdef transform_boundary(data):new_properties = transform_boundary_properties(id=data["properties"]["id"],type=ObjectFenceType.OTHER_HARD_ISOLATION)data["properties"] = new_properties#修改参数,可以改上层参数
def transform_trajectory(data):new_properties = transform_trajectory_properties(id=data["properties"]["id"])data["properties"] = new_propertiesdef save_data(data_list, out_data_path):directory = os.path.dirname(out_data_path)if not os.path.exists(directory):os.makedirs(directory)# out_data_path = out_data_path + save_type + ".geojson"out_data= {"type": "FeatureCollection","features":data_list}with open(out_data_path, 'w', encoding='utf-8') as fp:fp.write(json.dumps(out_data, ensure_ascii=False))def transform_format(in_data_path,out_path):# 读取semantic数据 转成字典保存在list中返回semantic_features = read_data(os.path.join(in_data_path, "semantic"))#./data3转换/output/semanticboundary_data = []#道路边线line_data = []#车道线for data_features in semantic_features:for data_feature in data_features["features"]:#遍历list 包括几何信息,属性信息# 云端建图后续可能会有字段枚举值if data_feature["properties"]["type"] == 6:transform_boundary(data_feature)#只更新对象里的属性值,其他的保留boundary_data.append(data_feature)if data_feature["properties"]["type"] == 1:transform_line(data_feature)line_data.append(data_feature)out_boundary_data_path = os.path.join(out_path, "semantic", "Boundary.geojson")out_line_data_path = os.path.join(out_path, "semantic", "Line.geojson")save_data(boundary_data, out_boundary_data_path)save_data(line_data, out_line_data_path)
if __name__ == '__main__':# 用于确保该代码块只在作为主程序运行时才执行,而在被导入为模块时不执行in_data_path = "/home/linux/下载/557040098/output"out_path = "/home/linux/下载/557040098/output"transform_format(in_data_path, out_path)# # 读取trajectory数据# trajectory_features = read_data(os.path.join(in_data_path,"trajectory"))## for data_features in trajectory_features.items():# for data_feature in data_features["features"]:# transform_trajectory(data_feature)# out_trajectory_data_path = os.path.join(out_path, "trajectory", "Lane")# save_data(trajectory_features, out_trajectory_data_path)
json.load()
和 json.loads()
都是用于读取 JSON 格式数据的函数,其中 json.load()
用于读取文件(File)对象,而 json.loads()
用于读取字符串(str)对象。
具体来说,json.load()
的参数应该是一个打开的文件对象,而 json.loads()
的参数应该是一个字符串对象。json.loads()
会将这个字符串解码为 Python 对象,而 json.load()
则会将文件中的 JSON 数据解码为 Python 对象。