[BitSail] Connector开发详解系列三:SourceReader

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

Source Connector

本文将主要介绍负责数据读取的组件SourceReader:

SourceReader

每个SourceReader都在独立的线程中执行,只要我们保证SourceSplitCoordinator分配给不同SourceReader的切片没有交集,在SourceReader的执行周期中,我们就可以不考虑任何有关并发的细节。

SourceReader接口

public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {void start();void pollNext(SourcePipeline<T> pipeline) throws Exception;void addSplits(List<SplitT> splits);/*** Check source reader has more elements or not.*/boolean hasMoreElements();/*** There will no more split will send to this source reader.* Source reader could be exited after process all assigned split.*/default void notifyNoMoreSplits() {}/*** Process all events which from {@link SourceSplitCoordinator}.*/default void handleSourceEvent(SourceEvent sourceEvent) {}/*** Store the split to the external system to recover when task failed.*/List<SplitT> snapshotState(long checkpointId);/*** When all tasks finished snapshot, notify checkpoint complete will be invoked.*/default void notifyCheckpointComplete(long checkpointId) throws Exception {}interface Context {TypeInfo<?>[] getTypeInfos();String[] getFieldNames();int getIndexOfSubtask();void sendSplitRequest();}
}

构造方法

这里需要完成和数据源访问各种配置的提取,比如数据库库名表名、消息队列cluster和topic、身份认证的配置等等。

示例

public RocketMQSourceReader(BitSailConfiguration readerConfiguration,Context context,Boundedness boundedness) {this.readerConfiguration = readerConfiguration;this.boundedness = boundedness;this.context = context;this.assignedRocketMQSplits = Sets.newHashSet();this.finishedRocketMQSplits = Sets.newHashSet();this.deserializationSchema = new RocketMQDeserializationSchema(readerConfiguration,context.getTypeInfos(),context.getFieldNames());this.noMoreSplits = false;cluster = readerConfiguration.get(RocketMQSourceOptions.CLUSTER);topic = readerConfiguration.get(RocketMQSourceOptions.TOPIC);consumerGroup = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP);consumerTag = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_TAG);pollBatchSize = readerConfiguration.get(RocketMQSourceOptions.POLL_BATCH_SIZE);pollTimeout = readerConfiguration.get(RocketMQSourceOptions.POLL_TIMEOUT);commitInCheckpoint = readerConfiguration.get(RocketMQSourceOptions.COMMIT_IN_CHECKPOINT);accessKey = readerConfiguration.get(RocketMQSourceOptions.ACCESS_KEY);secretKey = readerConfiguration.get(RocketMQSourceOptions.SECRET_KEY);
}

start方法

初始化数据源的访问对象,例如数据库的执行对象、消息队列的consumer对象或者文件系统的连接。

示例

消息队列

public void start() {try {if (StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey)) {AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));consumer = new DefaultMQPullConsumer(aclClientRPCHook);} else {consumer = new DefaultMQPullConsumer();}consumer.setConsumerGroup(consumerGroup);consumer.setNamesrvAddr(cluster);consumer.setInstanceName(String.format(SOURCE_READER_INSTANCE_NAME_TEMPLATE,cluster, topic, consumerGroup, UUID.randomUUID()));consumer.setConsumerPullTimeoutMillis(pollTimeout);consumer.start();} catch (Exception e) {throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);}
}

数据库

public void start() {this.connection = connectionHolder.connect();// Construct statement.String baseSql = ClickhouseJdbcUtils.getQuerySql(dbName, tableName, columnInfos);String querySql = ClickhouseJdbcUtils.decorateSql(baseSql, splitField, filterSql, maxFetchCount, true);try {this.statement = connection.prepareStatement(querySql);} catch (SQLException e) {throw new RuntimeException("Failed to prepare statement.", e);}LOG.info("Task {} started.", subTaskId);
}

FTP

public void start() {this.ftpHandler.loginFtpServer();if (this.ftpHandler.getFtpConfig().getSkipFirstLine()) {this.skipFirstLine = true;}
}

addSplits方法

将SourceSplitCoordinator给当前Reader分配的Splits列表添加到自己的处理队列(Queue)或者集合(Set)中。

示例

public void addSplits(List<RocketMQSplit> splits) {LOG.info("Subtask {} received {}(s) new splits, splits = {}.",context.getIndexOfSubtask(),CollectionUtils.size(splits),splits);assignedRocketMQSplits.addAll(splits);
}

hasMoreElements方法

在无界的流计算场景中,会一直返回true保证Reader线程不被销毁。

在批式场景中,分配给该Reader的切片处理完之后会返回false,表示该Reader生命周期的结束。

public boolean hasMoreElements() {if (boundedness == Boundedness.UNBOUNDEDNESS) {return true;}if (noMoreSplits) {return CollectionUtils.size(assignedRocketMQSplits) != 0;}return true;
}

pollNext方法

在addSplits方法添加完成切片处理队列且hasMoreElements返回true时,该方法调用,开发者实现此方法真正和数据交互。

开发者在实现pollNext方法时候需要关注下列问题:

  • 切片数据的读取

    • 从构造好的切片中去读取数据。

  • 数据类型的转换

    • 将外部数据转换成BitSail的Row类型

示例

以RocketMQSourceReader为例:

从split队列中选取split进行处理,读取其信息,之后需要将读取到的信息转换成BitSail的Row类型,发送给下游处理。

public void pollNext(SourcePipeline<Row> pipeline) throws Exception {for (RocketMQSplit rocketmqSplit : assignedRocketMQSplits) {MessageQueue messageQueue = rocketmqSplit.getMessageQueue();PullResult pullResult = consumer.pull(rocketmqSplit.getMessageQueue(),consumerTag,rocketmqSplit.getStartOffset(),pollBatchSize,pollTimeout);if (Objects.isNull(pullResult) || CollectionUtils.isEmpty(pullResult.getMsgFoundList())) {continue;}for (MessageExt message : pullResult.getMsgFoundList()) {Row deserialize = deserializationSchema.deserialize(message.getBody());pipeline.output(deserialize);if (rocketmqSplit.getStartOffset() >= rocketmqSplit.getEndOffset()) {LOG.info("Subtask {} rocketmq split {} in end of stream.",context.getIndexOfSubtask(),rocketmqSplit);finishedRocketMQSplits.add(rocketmqSplit);break;}}rocketmqSplit.setStartOffset(pullResult.getNextBeginOffset());if (!commitInCheckpoint) {consumer.updateConsumeOffset(messageQueue, pullResult.getMaxOffset());}}assignedRocketMQSplits.removeAll(finishedRocketMQSplits);
}

转换为BitSail Row类型的常用方式

自定义RowDeserializer类

对于不同格式的列应用不同converter,设置到相应Row的Field。

public class ClickhouseRowDeserializer {interface FiledConverter {Object apply(ResultSet resultSet) throws SQLException;}private final List<FiledConverter> converters;private final int fieldSize;public ClickhouseRowDeserializer(TypeInfo<?>[] typeInfos) {this.fieldSize = typeInfos.length;this.converters = new ArrayList<>();for (int i = 0; i < fieldSize; ++i) {converters.add(initFieldConverter(i + 1, typeInfos[i]));}}public Row convert(ResultSet resultSet) {Row row = new Row(fieldSize);try {for (int i = 0; i < fieldSize; ++i) {row.setField(i, converters.get(i).apply(resultSet));}} catch (SQLException e) {throw BitSailException.asBitSailException(ClickhouseErrorCode.CONVERT_ERROR, e.getCause());}return row;}private FiledConverter initFieldConverter(int index, TypeInfo<?> typeInfo) {if (!(typeInfo instanceof BasicTypeInfo)) {throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + " is not supported yet.");}Class<?> curClass = typeInfo.getTypeClass();if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getByte(index);}if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getShort(index);}if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getInt(index);}if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getLong(index);}if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> {BigDecimal dec = resultSet.getBigDecimal(index);return dec == null ? null : dec.toBigInteger();};}if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getFloat(index);}if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getDouble(index);}if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getBigDecimal(index);}if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getString(index);}if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getDate(index);}if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getTimestamp(index);}if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getTime(index);}if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getBoolean(index);}if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> null;}throw new UnsupportedOperationException("Unsupported data type: " + typeInfo);}
}
实现DeserializationSchema接口

相对于实现RowDeserializer,我们更希望大家去实现一个继承DeserializationSchema接口的实现类,将一定类型格式的数据对数据比如JSON、CSV转换为BitSail Row类型。 

在具体的应用时,我们可以使用统一的接口创建相应的实现类

public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {private BitSailConfiguration deserializationConfiguration;private TypeInfo<?>[] typeInfos;private String[] fieldNames;private transient DeserializationSchema<byte[], Row> deserializationSchema;public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,TypeInfo<?>[] typeInfos,String[] fieldNames) {this.deserializationConfiguration = deserializationConfiguration;this.typeInfos = typeInfos;this.fieldNames = fieldNames;ContentType contentType = ContentType.valueOf(deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase());switch (contentType) {case CSV:this.deserializationSchema =new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);break;case JSON:this.deserializationSchema =new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);break;default:throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType);}}@Overridepublic Row deserialize(Writable message) {return deserializationSchema.deserialize((message.toString()).getBytes());}@Overridepublic boolean isEndOfStream(Row nextElement) {return false;}
}

也可以自定义当前需要解析类专用的DeserializationSchema:

public class MapredParquetInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {private final BitSailConfiguration deserializationConfiguration;private final transient DateTimeFormatter localDateTimeFormatter;private final transient DateTimeFormatter localDateFormatter;private final transient DateTimeFormatter localTimeFormatter;private final int fieldSize;private final TypeInfo<?>[] typeInfos;private final String[] fieldNames;private final List<DeserializationConverter> converters;public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,TypeInfo<?>[] typeInfos,String[] fieldNames) {this.deserializationConfiguration = deserializationConfiguration;this.typeInfos = typeInfos;this.fieldNames = fieldNames;this.localDateTimeFormatter = DateTimeFormatter.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_TIME_PATTERN));this.localDateFormatter = DateTimeFormatter.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_PATTERN));this.localTimeFormatter = DateTimeFormatter.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.TIME_PATTERN));this.fieldSize = typeInfos.length;this.converters = Arrays.stream(typeInfos).map(this::createTypeInfoConverter).collect(Collectors.toList());}@Overridepublic Row deserialize(Writable message) {int arity = fieldNames.length;Row row = new Row(arity);Writable[] writables = ((ArrayWritable) message).get();for (int i = 0; i < fieldSize; ++i) {row.setField(i, converters.get(i).convert(writables[i].toString()));}return row;}@Overridepublic boolean isEndOfStream(Row nextElement) {return false;}private interface DeserializationConverter extends Serializable {Object convert(String input);}private DeserializationConverter createTypeInfoConverter(TypeInfo<?> typeInfo) {Class<?> typeClass = typeInfo.getTypeClass();if (typeClass == TypeInfos.VOID_TYPE_INFO.getTypeClass()) {return field -> null;}if (typeClass == TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass()) {return this::convertToBoolean;}if (typeClass == TypeInfos.INT_TYPE_INFO.getTypeClass()) {return this::convertToInt;}throw BitSailException.asBitSailException(CsvFormatErrorCode.CSV_FORMAT_COVERT_FAILED,String.format("Csv format converter not support type info: %s.", typeInfo));}private boolean convertToBoolean(String field) {return Boolean.parseBoolean(field.trim());}private int convertToInt(String field) {return Integer.parseInt(field.trim());}
}

snapshotState方法

生成并保存State的快照信息,用于ckeckpoint。

示例

public List<RocketMQSplit> snapshotState(long checkpointId) {LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId);if (commitInCheckpoint) {for (RocketMQSplit rocketMQSplit : assignedRocketMQSplits) {try {consumer.updateConsumeOffset(rocketMQSplit.getMessageQueue(), rocketMQSplit.getStartOffset());LOG.debug("Subtask {} committed message queue = {} in checkpoint id = {}.", context.getIndexOfSubtask(),rocketMQSplit.getMessageQueue(),checkpointId);} catch (MQClientException e) {throw new RuntimeException(e);}}}return Lists.newArrayList(assignedRocketMQSplits);
}

hasMoreElements方法

每次调用pollNext方法之前会做sourceReader.hasMoreElements()的判断,当且仅当判断通过,pollNext方法才会被调用。

示例

public boolean hasMoreElements() {if (noMoreSplits) {return CollectionUtils.size(assignedHadoopSplits) != 0;}return true;
}

notifyNoMoreSplits方法

当Reader处理完所有切片之后,会调用此方法。

示例

public void notifyNoMoreSplits() {LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask());noMoreSplits = true;
}

【关于BitSail】:

⭐️ Star 不迷路 https://github.com/bytedance/bitsail

提交问题和建议:https://github.com/bytedance/bitsail/issues

贡献代码:https://github.com/bytedance/bitsail/pulls

BitSail官网:https://bytedance.github.io/bitsail/zh/

订阅邮件列表:bitsail+subscribe@googlegroups.com

加入BitSail技术社群

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/40216.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jmeter进阶使用:BeanShell实现接口前置和后置操作

一、背景 我们使用Jmeter做压力测试或者接口测试时&#xff0c;除了最简单的直接对接口发起请求&#xff0c;很多时候需要对接口进行一些前置操作&#xff1a;比如提前生成测试数据&#xff0c;以及一些后置操作&#xff1a;比如提取接口响应内容中的某个字段的值。举个最常用…

c语言——拷贝数组

这段代码是一个简单的数组拷贝示例。它的功能是将一个原始数组 original 的内容拷贝到另一个数组 copied 中&#xff0c;并输出两个数组的元素。 代码执行过程如下&#xff1a; 首先&#xff0c;在 main() 函数中定义了一个整型数组 original&#xff0c;并初始化了它的元素。…

物联网在制造业中的应用

制造业目前正在经历第四次工业革命&#xff0c;物联网、人工智能和机器人等技术进步正在推动行业的发展。研究表明&#xff0c;到2024年&#xff0c;全球制造商将在物联网解决方案上投资700亿美元&#xff0c;许多制造商正在实施物联网设备&#xff0c;以利用预测性维护和复杂的…

接口测试工具——Postman测试工具 Swagger接口测试+SpringBoot整合 JMeter高并发测试工具

目录 Postman测试工具接口测试工具swaggerKnife4j1.引入依赖2.配置3.常用注解4.接口测试 JMeter什么是JMeter?JMeter安装配置1.官网下载2.下载后解压3.汉语设置 JMeter的使用方法1.新建线程组2.设置参数3.添加取样器4.设置参数&#xff1a;协议&#xff0c;ip&#xff0c;端口…

SDK是什么,SDK和API有什么区别

SDK&#xff08;Software Development Kit&#xff09;是一种开发工具包&#xff0c;通常由软件开发公司或平台提供&#xff0c;用于帮助开发人员构建、测试和集成特定平台或软件的应用程序。SDK 包含一系列的库、工具、示例代码和文档&#xff0c;旨在简化开发过程并提供所需的…

基于Mysql+Vue+Django的协同过滤和内容推荐算法的智能音乐推荐系统——深度学习算法应用(含全部工程源码)+数据集

目录 前言总体设计系统整体结构图系统流程图 运行环境Python 环境MySQL环境VUE环境 模块实现1. 数据请求和储存2. 数据处理计算歌曲、歌手、用户相似度计算用户推荐集 3. 数据存储与后台4. 数据展示 系统测试工程源代码下载其它资料下载 前言 本项目以丰富的网易云音乐数据为基…

SQLSERVER 查询语句加with (NOLOCK) 报ORDER BY 报错 除非另外还指定了 TOP、OFFSET 或 FOR XML

最近有一个项目在客户使用时发现死锁问题&#xff0c;用的数据库是SQLSERVER &#xff0c;死锁的原因是有的客户经常去点报表&#xff0c;报表查询时间又慢&#xff0c;然后又有人在做单导致了死锁&#xff0c;然后主管要我们用SQLSERVER查询时要加with (NOLOCK),但是我在加完 …

2023骨传导耳机推荐,适合运动骨传导耳机推荐

相信很多人跟我一样&#xff0c;随着现在五花八门的耳机品种增多&#xff0c;选耳机的时候真是眼花缭乱&#xff0c;尤其还是网购&#xff0c;只能看&#xff0c;不能试&#xff0c;所以选择起来比较困难&#xff0c; 作为一个运动达人&#xff0c;为了让大家在购买耳机时少走弯…

〔012〕Stable Diffusion 之 中文提示词自动翻译插件 篇

✨ 目录 &#x1f388; 翻译插件&#x1f388; 下载谷歌翻译&#x1f388; 谷歌翻译使用方法&#x1f388; 谷歌翻译使用效果 &#x1f388; 翻译插件 在插件列表中搜索 Prompt Translator可以看到有2个插件选项&#xff1a;一个是基于谷歌翻译 〔推荐〕、一个基于百度和deepl…

奥威BI财务数据分析方案:借BI之利,成就智能财务分析

随着智能技术的发展&#xff0c;各行各业都走上借助智能技术高效运作道路&#xff0c;财务数据分析也不例外。借助BI商业智能技术能够让财务数据分析更高效、便捷、直观立体&#xff0c;也更有助于发挥财务数据分析作为企业经营管理健康晴雨表的作用。随着BI财务数据分析经验的…

【RP2040】香瓜树莓派RP2040之新建工程

本文最后修改时间&#xff1a;2022年09月05日 11:02 一、本节简介 本节介绍如何新建一个自己的工程。 二、实验平台 1、硬件平台 1&#xff09;树莓派pico开发板 ①树莓派pico开发板*2 ②micro usb数据线*2 2&#xff09;电脑 2、软件平台 1&#xff09;VS CODE 三、版…

【C++】一文带你初识C++继承

食用指南&#xff1a;本文在有C基础的情况下食用更佳 &#x1f340;本文前置知识&#xff1a; C类 ♈️今日夜电波&#xff1a;napori—Vaundy 1:21 ━━━━━━️&#x1f49f;──────── 3:23 …

CSS中的calc()函数有什么作用?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ CSS中的calc()函数及其作用⭐ 作用⭐ 示例1. 动态计算宽度&#xff1a;2. 响应式布局&#xff1a;3. 自适应字体大小&#xff1a;4. 计算间距&#xff1a; ⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点…

KCC@广州开源读书会广州开源建设讨论会

亲爱的开源读书会朋友们&#xff0c; 在下个周末我们将举办一场令人激动的线下读书会&#xff0c;探讨两本引人入胜的新书《只是为了好玩》和《开源之迷》。作为一个致力于推广开源精神和技术创新的社区&#xff0c;这次我们还邀请了圈内大咖前来参与&#xff0c;会给大家提供一…

[UE4][C++]使用qrencode动态生成二维码

一、使用CMake编译x64版本qrencode 下载地址 GitHub - fukuchi/libqrencode: A fast and compact QR Code encoding libraryA fast and compact QR Code encoding library. Contribute to fukuchi/libqrencode development by creating an account on GitHub.https://github.…

2023/08/13_______JVM(CG)垃圾回收 算法(复制算法,标记清除,标记清除压缩)

JVM GC算法 复制算法 1&#xff0c;每一次GC都会将伊甸&#xff08;Eden&#xff09;活的对象移到幸存区中&#xff1a;一旦Eden区被GC后 就会是空 只要有内容就是from区 谁空谁是to区 内存会从 伊甸->幸存区to->幸存from&#xff08;这个时候to和from交换区域&#xf…

EXPLAIN使用分析

系列文章目录 文章目录 系列文章目录一、type说明二、MySQL中使用Show Profile1.查看当前profiling配置2.在会话级别修改profiling配置3.查看profile记录4.要深入查看某条查询执行时间的分布 一、type说明 我们只需要注意一个最重要的type 的信息很明显的提现是否用到索引&…

kafka线上问题优化

如何防止消息丢失 生产者&#xff1a; 使用同步发送把ack设成1或者all&#xff08;非0&#xff0c;0可能会出现消息丢失的情况&#xff09;&#xff0c;并且设置同步的分区数>2 消费者&#xff1a;把自动提交改成手动提交 如何防止重复消费 在防止消息丢失的方案中&#…

leetcode 力扣刷题 数组交集(数组、set、map都可实现哈希表)

数组交集 349. 两个数组的交集排序&#xff0b;双指针数组实现哈希表unordered_setunordered_map 350. 两个数组的交集Ⅱ排序 双指针数组实现哈希表unordered_map 349. 两个数组的交集 题目链接&#xff1a;349. 两个数组的交集 题目内容如下&#xff0c;理解题意&#xff1a…

聊聊火车的发展

目录 1.火车的概念 2.火车的发展历史 3.火车对战争的影响 4.火车对人们出行造成的影响 1.火车的概念 火车是一种由机械动力驱动的陆上交通工具&#xff0c;通常用来运输人员和货物。它由一列或多列的连接在一起的车厢组成&#xff0c;有轨道作为其行驶的基础&#xff0c;并通…