本篇文章是对我们从高德拿到的公交/地铁的json文件的精细化处理的一个深入解析,通过对这些原始数据进行详细的清洗、转换和分析,我们通过对数据的质量和可用性的提升,来为后续的数据精细化处理和研究做基础数据的支撑,从而为后续的交通规划、调度优化以及用户服务提升提供了坚实的数据支持。
上篇的处理逻辑指路:利用高德API获取整个城市的公交路线并可视化(六)_高德地图公交线路的geojson文件-CSDN博客
第一部分:公交/地铁双线路径处理
相对于之前的处理python脚本,增加字段,dir(上下行,环线无上下行),direc(线路编码),这里的dir字段,原始数据没有,是自定义的规则;
dir 值的确定规则
- 如果该线路只有一个方向,dir 为 '0'
- 如果有两个方向,direc 值小的为 '0',大的为 '1'
文件路径修改成自己的路径,然后运行下面的脚本;
# 指定目录路径directory = 'D://data//高德json'csv_file_path = 'D://data//高德json//bus_route.csv'shp_file_path = 'D://data//高德json//bus_route.shp'
完整代码#运行环境 Python 3.11
import os
import json
import csv
import math
import geopandas as gpd
from shapely.geometry import LineString, Point
from collections import defaultdict# 高德GCJ02(火星坐标系)转WGS84
x_pi = 3.14159265358979324 * 3000.0 / 180.0
pi = 3.1415926535897932384626 # π
a = 6378245.0 # 长半轴
ee = 0.00669342162296594323 # 扁率def gcj02towgs84(lng, lat):"""GCJ02(火星坐标系)转WGS84:param lng: 火星坐标系的经度:param lat: 火星坐标系纬度:return: WGS84 经度, 纬度"""if out_of_china(lng, lat):return lng, latdlat = transformlat(lng - 105.0, lat - 35.0)dlng = transformlng(lng - 105.0, lat - 35.0)radlat = lat / 180.0 * pimagic = math.sin(radlat)magic = 1 - ee * magic * magicsqrtmagic = math.sqrt(magic)dlat = (dlat * 180.0) / ((a * (1 - ee)) / (magic * sqrtmagic) * pi)dlng = (dlng * 180.0) / (a / sqrtmagic * math.cos(radlat) * pi)mglat = lat + dlatmglng = lng + dlngreturn [lng * 2 - mglng, lat * 2 - mglat]def transformlat(lng, lat):"""计算纬度偏移量:param lng: 经度:param lat: 纬度:return: 纬度偏移量"""ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat + 0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0ret += (20.0 * math.sin(lat * pi) + 40.0 * math.sin(lat / 3.0 * pi)) * 2.0 / 3.0ret += (160.0 * math.sin(lat / 12.0 * pi) + 320 * math.sin(lat * pi / 30.0)) * 2.0 / 3.0return retdef transformlng(lng, lat):"""计算经度偏移量:param lng: 经度:param lat: 纬度:return: 经度偏移量"""ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng + 0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0ret += (20.0 * math.sin(lng * pi) + 40.0 * math.sin(lng / 3.0 * pi)) * 2.0 / 3.0ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 * math.sin(lng / 30.0 * pi)) * 2.0 / 3.0return retdef out_of_china(lng, lat):"""判断是否在国内,不在国内不做偏移:param lng: 经度:param lat: 纬度:return: 是否在国内"""if lng < 72.004 or lng > 137.8347:return Trueif lat < 0.8293 or lat > 55.8271:return Truereturn Falsedef process_json_files(directory, csv_file_path, shp_file_path):"""处理JSON文件并生成CSV和SHP文件"""# 首先收集所有线路的 direc 值route_direcs = defaultdict(list)# 第一次遍历 - 收集所有线路的 direc 值print("第一次遍历 - 收集线路方向信息...")for json_file in os.listdir(directory):if not json_file.endswith('.json'):continuejson_file_path = os.path.join(directory, json_file)try:with open(json_file_path, 'r', encoding='utf-8') as file:data = json.load(file)if isinstance(data, list):for item in data:name = item.get('name', '').split('(')[0] if item.get('name') else ''direc = str(item.get('direc', ''))if name and direc:route_direcs[name].append(direc)else:name = data.get('name', '').split('(')[0] if data.get('name') else ''direc = str(data.get('direc', ''))if name and direc:route_direcs[name].append(direc)except Exception as e:print(f"Error in first pass for {json_file}: {str(e)}")# 对每个线路的 direc 值进行排序for name in route_direcs:route_direcs[name] = sorted(route_direcs[name])# 初始化数据列表all_lines = []all_attributes = []# 第二次遍历 - 处理数据并写入文件print("第二次遍历 - 处理数据并生成文件...")# 如果CSV文件已存在,则删除if os.path.exists(csv_file_path):os.remove(csv_file_path)for json_file in os.listdir(directory):if not json_file.endswith('.json'):continuejson_file_path = os.path.join(directory, json_file)print(f"处理文件: {json_file}")try:with open(json_file_path, 'r', encoding='utf-8') as file:data = json.load(file)if isinstance(data, list):for item in data:if "name" in item and "path" in item:# 确定 dir 值name = item['name'].split('(')[0]direc = str(item.get('direc', ''))dir_value = '0' if direc == route_direcs[name][0] else '1' if len(route_direcs[name]) > 1 else '0'# 写入CSVheader = ["type", "name", "lng", "lat", "dir", "direc"]with open(csv_file_path, 'a', newline='', encoding='utf-8-sig') as file:writer = csv.writer(file)if file.tell() == 0:writer.writerow(header)for point in item["path"]:wgs84_lng, wgs84_lat = gcj02towgs84(point["lng"], point["lat"])row = [item["type"], item["name"], wgs84_lng, wgs84_lat, dir_value, direc]writer.writerow(row)# 处理 Shapefile 数据wgs84_points = [Point(*gcj02towgs84(point["lng"], point["lat"])) for point in item["path"]]line = LineString(wgs84_points)all_lines.append(line)all_attributes.append({"type": item["type"],"name": item["name"],"dir": dir_value,"direc": direc})else:# 确定 dir 值name = data['name'].split('(')[0]direc = str(data.get('direc', ''))dir_value = '0' if direc == route_direcs[name][0] else '1' if len(route_direcs[name]) > 1 else '0'# 写入CSVheader = ["type", "name", "lng", "lat", "dir", "direc"]with open(csv_file_path, 'a', newline='', encoding='utf-8-sig') as file:writer = csv.writer(file)if file.tell() == 0:writer.writerow(header)for point in data["path"]:wgs84_lng, wgs84_lat = gcj02towgs84(point["lng"], point["lat"])row = [data["type"], data["name"], wgs84_lng, wgs84_lat, dir_value, direc]writer.writerow(row)# 处理 Shapefile 数据wgs84_points = [Point(*gcj02towgs84(point["lng"], point["lat"])) for point in data["path"]]line = LineString(wgs84_points)all_lines.append(line)all_attributes.append({"type": data["type"],"name": data["name"],"dir": dir_value,"direc": direc})except Exception as e:print(f"Error processing {json_file}: {str(e)}")# 创建GeoDataFrame并保存print("创建和保存Shapefile...")gdf = gpd.GeoDataFrame(all_attributes, geometry=all_lines, crs="EPSG:4326")gdf.to_file(shp_file_path, driver='ESRI Shapefile', encoding='utf-8')def main():"""主函数"""# 指定目录路径directory = 'D://data//高德json'csv_file_path = 'D://data//高德json//bus_route.csv'shp_file_path = 'D://data//高德json//bus_route.shp'# 确保输出目录存在os.makedirs(os.path.dirname(csv_file_path), exist_ok=True)# 处理文件process_json_files(directory, csv_file_path, shp_file_path)print("CSV文件和SHP文件已成功创建")if __name__ == "__main__":main()
脚本执行结束,我们会得到一个文件名为bus_route的csv文件,标签包含线路direc、类型、站点名称、xy坐标(wgs84)这些信息;
和一个文件名为bus_route的shp文件,包含了该线路两个方向的线路路径信息;
第二部分:公交/地铁双线站点处理
相对于之前的处理python脚本,增加字段,dir(上下行,环线无上下行),direc(线路编码)loop(是否为环线),这里的dir字段,原始数据没有,是自定义的规则(规则参考上面说明);
文件路径修改成自己的路径,然后运行下面的脚本;
folder_path = r'D://data//高德json' # 你的JSON文件夹路径output_csv = r'D://data//高德json//output.csv' # 输出的CSV文件路径
完整代码#运行环境 Python 3.11
import json
import csv
import os
import math# 坐标转换相关常量
x_pi = 3.14159265358979324 * 3000.0 / 180.0
pi = 3.1415926535897932384626 # π
a = 6378245.0 # 长半轴
ee = 0.00669342162296594323 # 扁率def gcj02towgs84(lng, lat):"""GCJ02(火星坐标系)转WGS84:param lng: 火星坐标系的经度:param lat: 火星坐标系纬度:return: WGS84 经度, 纬度"""if out_of_china(lng, lat):return lng, latdlat = transformlat(lng - 105.0, lat - 35.0)dlng = transformlng(lng - 105.0, lat - 35.0)radlat = lat / 180.0 * pimagic = math.sin(radlat)magic = 1 - ee * magic * magicsqrtmagic = math.sqrt(magic)dlat = (dlat * 180.0) / ((a * (1 - ee)) / (magic * sqrtmagic) * pi)dlng = (dlng * 180.0) / (a / sqrtmagic * math.cos(radlat) * pi)mglat = lat + dlatmglng = lng + dlngreturn [lng * 2 - mglng, lat * 2 - mglat]def transformlat(lng, lat):"""计算纬度偏移量:param lng: 经度:param lat: 纬度:return: 纬度偏移量"""ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat + 0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0ret += (20.0 * math.sin(lat * pi) + 40.0 * math.sin(lat / 3.0 * pi)) * 2.0 / 3.0ret += (160.0 * math.sin(lat / 12.0 * pi) + 320 * math.sin(lat * pi / 30.0)) * 2.0 / 3.0return retdef transformlng(lng, lat):"""计算经度偏移量:param lng: 经度:param lat: 纬度:return: 经度偏移量"""ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng + 0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0ret += (20.0 * math.sin(lng * pi) + 40.0 * math.sin(lng / 3.0 * pi)) * 2.0 / 3.0ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 * math.sin(lng / 30.0 * pi)) * 2.0 / 3.0return retdef out_of_china(lng, lat):"""判断是否在国内,不在国内不做偏移:param lng: 经度:param lat: 纬度:return: 是否在国内"""if lng < 72.004 or lng > 137.8347:return Trueif lat < 0.8293 or lat > 55.8271:return Truereturn Falsedef process_json_files(folder_path, output_csv):"""Process all JSON files in a folder, extract route names and stop information, and save to CSV:param folder_path: Path to folder containing JSON files:param output_csv: Path to output CSV file"""# 创建一个字典来存储每个线路名称对应的 direc 值route_direcs = {}# 第一次遍历 - 收集所有线路的 direc 值for filename in os.listdir(folder_path):if not filename.endswith('.json'):continuefile_path = os.path.join(folder_path, filename)try:with open(file_path, 'r', encoding='utf-8') as file:json_data = json.load(file)routes = json_data if isinstance(json_data, list) else [json_data]for data in routes:name = data.get('name', '').split('(')[0] if data.get('name') else '' # 获取括号前的名称direc = str(data.get('direc', ''))if name:if name not in route_direcs:route_direcs[name] = []route_direcs[name].append(direc)except Exception as e:print(f"Error in first pass for {filename}: {str(e)}")# 处理 route_direcs 确保每个线路的 direc 值被排序for name in route_direcs:route_direcs[name] = sorted(route_direcs[name])try:with open(output_csv, 'w', newline='', encoding='utf-8-sig') as csvfile:fieldnames = ['line', 'ID', 'name', 'lng', 'lat', 'seq', 'loop', 'direc', 'dir']writer = csv.DictWriter(csvfile, fieldnames=fieldnames)writer.writeheader()# 第二次遍历 - 写入数据for filename in os.listdir(folder_path):if not filename.endswith('.json'):continuefile_path = os.path.join(folder_path, filename)try:with open(file_path, 'r', encoding='utf-8') as file:json_data = json.load(file)routes = json_data if isinstance(json_data, list) else [json_data]for data in routes:full_name = data.get('name', '')name = full_name.split('(')[0] if full_name else '' # 获取括号前的名称via_stops = data.get('via_stops', [])loop = data.get('loop', '')direc = str(data.get('direc', ''))if not (name and via_stops):print(f"Warning: Missing required data in {filename}")continue# 确定 dir 值dir_value = '0' if direc == route_direcs[name][0] else '1' if len(route_direcs[name]) > 1 else '0'for stop in via_stops:location = stop.get('location', {})gcj_lng = location.get('lng')gcj_lat = location.get('lat')if not (isinstance(gcj_lng, (int, float)) and isinstance(gcj_lat, (int, float))):print(f"Warning: Invalid coordinates in {filename} for stop {stop.get('name', '')}")continuewgs84_lng, wgs84_lat = gcj02towgs84(gcj_lng, gcj_lat)row = {'line': full_name, # 使用完整名称'ID': f"'{str(stop.get('id', ''))}",'name': stop.get('name', ''),'lng': f"{wgs84_lng:.6f}",'lat': f"{wgs84_lat:.6f}",'seq': stop.get('sequence', ''),'loop': loop,'direc': f"'{direc}",'dir': dir_value}writer.writerow(row)print(f"Successfully processed: {filename}")except (json.JSONDecodeError, UnicodeDecodeError) as e:print(f"Error processing {filename}: {str(e)}")except Exception as e:print(f"Unexpected error processing {filename}: {str(e)}")except Exception as e:print(f"Error writing to CSV file: {str(e)}")def main():"""主函数,指定文件夹路径和输出 CSV 文件路径,并调用处理函数"""folder_path = r'D://data//高德json' # 你的JSON文件夹路径output_csv = r'D://data//高德json//output.csv' # 输出的CSV文件路径process_json_files(folder_path, output_csv)print(f"数据已成功保存到 CSV 文件: {output_csv}")if __name__ == "__main__":main()
脚本执行结束,我们会得到一个文件名为output的csv文件,标签包含line(线路名称)、id、name (站点)、xy坐标(wgs84)、seq(站点顺序编号),loop(是否为环线)这些信息,我们可以把这个数据在arcgis进行【显示xy数据】的操作;
第三部分:公交/地铁双线线段化处理(新增)
线路本身是一条完整的线路,我们根据站点位置,对线路进行截断,并提取这两点之间的线段,这样,站点之间就是一个个线段,从而可以满足后续对线段客流量进行赋值可视化等需求;
这里需要把前面xy坐标转点的图层另存为新的bus_station图层,文件路径修改成自己的路径,然后运行下面的脚本;
# 读取线路和站点 shapefile 文件
lines_path = r'D:\data\高德json\bus_route.shp'
stations_path = r'D:\data\高德json\bus_station.shp'
完整代码#运行环境 Python 3.11
import geopandas as gpd
from shapely.geometry import LineString, Point
from shapely.ops import nearest_points
import pandas as pd
import re# 读取线路和站点 shapefile 文件
lines_path = r'D:\data\高德json\bus_route.shp'
stations_path = r'D:\data\高德json\bus_station.shp'# 读取线路和站点数据
gdf_lines = gpd.read_file(lines_path)
gdf_stations = gpd.read_file(stations_path)# 创建一个空的 GeoDataFrame 用于存储截断后的线路
truncated_lines = gpd.GeoDataFrame(columns=gdf_lines.columns)# 遍历每条线路,逐条处理
for line_index, line in gdf_lines.iterrows():# 获取当前线路的坐标line_coords = list(line.geometry.coords)# 获取原始线路名称(用于line字段)original_line = line['name']# 创建一个字典来存储每个站点对应的最近线路点station_to_line_points = {}# 遍历每个站点,找到最近的线路点,确保线路名匹配for idx, station in gdf_stations.iterrows():if station['line'] == line['name']:nearest_point = nearest_points(line.geometry, station.geometry)[1]station_to_line_points[station['name']] = nearest_point# 找到每个站点在当前线路上的最近点matched_points = []for station_name, station_point in station_to_line_points.items():nearest_point = nearest_points(line.geometry, station_point)[1]matched_points.append((station_name, nearest_point))# 根据匹配的点进行截断for i in range(len(matched_points) - 1):start_station_name, start_point = matched_points[i]end_station_name, end_point = matched_points[i + 1]# 找到起始和结束点在原线路中的位置start_pos = min(range(len(line_coords)), key=lambda j: start_point.distance(Point(line_coords[j])))end_pos = min(range(len(line_coords)), key=lambda j: end_point.distance(Point(line_coords[j])))# 确保 start_pos 和 end_pos 的有效性if start_pos < end_pos and start_pos < len(line_coords) and end_pos < len(line_coords):# 创建截断后的线段,保持原线路的路径truncated_line = LineString(line_coords[start_pos:end_pos + 1])# 计算截断线段的长度truncated_length = truncated_line.length# 创建新的 GeoDataFrame 行new_line = line.copy()new_line.geometry = truncated_line# 使用站点名称创建名称new_line['name'] = f"{start_station_name}—{end_station_name}"new_line['length_m'] = truncated_lengthnew_line['line'] = original_line # 添加原始线路信息# 将新行添加到截断后的线路 GeoDataFrametruncated_lines = pd.concat([truncated_lines, gpd.GeoDataFrame([new_line])], ignore_index=True)# 保存截断后的线路到新的 shapefile
truncated_lines.to_file(r'D:\data\高德json\lines.shp', encoding='utf-8')print("处理完成!")
脚本执行结束,我们会得到一个文件名为lines的shp图层,标签包含line(线路名称)、dir、 name(站点—站点名称)、direc(线路编码)、length_m(线段长度)这些信息,生成线段结果如下,每条线路都基于线路与站点的唯一对应关系进行截断,并生成新的线段;
文章仅用于分享个人学习成果与个人存档之用,分享知识,如有侵权,请联系作者进行删除。所有信息均基于作者的个人理解和经验,不代表任何官方立场或权威解读。