开发一个springboot项目
- 代码迭代整合工具 gitee
- 建模意义
- 程序处理方式
- 开发功能的步骤
- web服务
- 网络状态码
- web应用的开发分层
- springboot的作用
- springboot框架搭建
- 框架中各组件作用
- 框架的演变
- 如何提取hive中的表结构
- 创建springboot 工程的引导模版 要选择aliyun ,否则不支持java8
- springboot 工程
- spring 相关组件
- 代码生成器
- 动态数据源
- 整合druid连接池
- 具体代码:
- 代码说明:
- Controller层
- 注解 说明
- 小练习
- Redis(代码)
- 5大数据类型 场景
- 自测题
- HBase代码
- Region Server
- HBase写操作
- memstore 刷新机制
- StoreFile Compaction(文件合并)
- HBase读操作
- 布隆过滤器
- hbase 海量数据 的读取 高效是怎么做到的(面试重点)
- 数据倾斜
- 预分区(自定义分区)
- 小练习
- Flink
- Flink初级
- Yarn应用模式作业提交模式![啊啊](https://i-blog.csdnimg.cn/direct/9fe4a4dfb6cf48e187286c536d89fc56.png)
- union
- connect
- Flink高级
- 迟到数据的处理
- 非barrier对齐的精准一次 过程:
- 两次提交过程描述
- 设置事务的超时时间
- Kafka案例演示
代码迭代整合工具 gitee
建模意义
我们用最后一种方式
程序处理方式
开发功能的步骤
web服务
因为之前没有提过
所以现在简单提一下
网络状态码
1xx 正在处理中
2xx 200 成功
3xx 重定向 永久重定向 临时重定向
4xx 400 参数不匹配 404 资源找不到了 405 method not allowed 请求类型(get和post)没匹配上
5xx 500 后台程序抛异常 503 后端服务器等抛异常了
xxx 600 找第三方支付等问,这都是自定义的
web应用的开发分层
springboot的作用
springboot框架搭建
框架中各组件作用
- Springboot 作为一个web应用快速开发框架,整合了SpringMVC、Spring、Mybatis等框架。其中:
- SpringMVC负责接收返回web请求。
- Spring负责把业务处理类的组件化,管理生命周期。
- Mybatis负责连接数据库、封装参数、封装结果。
- Mybatis-plus是Mybatis的第三方扩展依赖,可以 方便的对数据单表进行插删改查操作。
- Mybatis-plus 提供了代码生成器generator ,辅助我们快速生成基础代码。
- DynamicDatasource是用来对接多个jdbc数据源的工具。
- Druid 是一个非常好用的数据连接池,可以使得数据连接循环利用减少开销。
- JSON 工具的使用
- SQL日志的控制打印
框架的演变
- php perl .net asp jsp
- ssh structs (1\2) + spring + hibernate
- ssm springmvc +spring + mybatis
- springboot:
- 简化开发流程 约定大于配置
- 内置了web服务容器 ,打好的jar包 直接运行即可 不用提前安装web容器
- 整了各种第三方的组件 starter application统一配置
如何提取hive中的表结构
hiveserver2 jdbc数据
metastore 元数据服务
HiveMetaStoreClient
package com.liang.dga.meta.service.impl;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SimplePropertyPreFilter;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.liang.common.utils.SqlUtil;
import com.liang.dga.meta.bean.TableMetaInfo;
import com.liang.dga.meta.bean.TableMetaInfoQuery;
import com.liang.dga.meta.bean.TableMetaInfoVO;
import com.liang.dga.meta.mapper.TableMetaInfoMapper;
import com.liang.dga.meta.service.TableMetaInfoExtraService;
import com.liang.dga.meta.service.TableMetaInfoService;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** <p>* 元数据表 服务实现类* </p>** @author liang* @since 2024-07-24*/
@Service
@DS("dga")
public class TableMetaInfoServiceImpl extends ServiceImpl<TableMetaInfoMapper, TableMetaInfo> implements TableMetaInfoService {@Value("${hive.metastore.uris}")String hiveMetaUri;@Value("${default.database}")String schemaName;HiveMetaStoreClient hiveMetaStoreClient;@AutowiredTableMetaInfoExtraService tableMetaInfoExtraService;@PostConstructpublic void initHiveClient() {HiveConf hiveConf = new HiveConf();hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetaUri);try {hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);} catch (MetaException e) {throw new RuntimeException(e);}}public void initTableMetaInfo(String assessDate) throws TException {List<String> allTableNames = hiveMetaStoreClient.getAllTables(schemaName);List<TableMetaInfo> tableMetaInfoList = new ArrayList<>(allTableNames.size());Table table = hiveMetaStoreClient.getTable(schemaName, allTableNames.get(0));System.out.println("111111111111111111111tableMetaInfoList in initTableMetaInfo: " + JSON.toJSONString(table));for (String tableName : allTableNames) {// 调用方法提取元数据信息TableMetaInfo tableMetaInfo = this.extractTableMetaInfoFromHive(tableName, schemaName);hiveMetaStoreClient.getTable(schemaName, tableName);// 调用方法提取HDFS环境信息this.extractHdfsInfo(tableMetaInfo);// 设置其他信息tableMetaInfo.setCreateTime(new Date());tableMetaInfo.setAssessDate(assessDate);tableMetaInfoList.add(tableMetaInfo);}// 去重,把今天的都删了this.remove(new QueryWrapper<TableMetaInfo>().eq("assess_date",assessDate));// 保存元数据列表saveBatch(tableMetaInfoList);QueryWrapper<TableMetaInfo> wrapper = new QueryWrapper<TableMetaInfo>().eq("assess_date", assessDate).notInSql("concat(schema_name,table_name)","select concat(schema_name,table_name) from table_meta_info_extra");List<TableMetaInfo> tableMetaInfoExtraWithoutList = this.list(wrapper);System.out.println("222222222222222222tableMetaInfoListWithout in initTableMetaInfo" + JSON.toJSONString(tableMetaInfoExtraWithoutList));// 补充辅助信息,把没填写的表格格式化一下tableMetaInfoExtraService.initTableMetaInfoExtra(tableMetaInfoExtraWithoutList);}private TableMetaInfo extractTableMetaInfoFromHive(String tableName,String SchemaName){try {// 获取表信息Table table = hiveMetaStoreClient.getTable(SchemaName, tableName);TableMetaInfo tableMetaInfo = new TableMetaInfo();tableMetaInfo.setTableName(tableName);tableMetaInfo.setSchemaName(SchemaName);// 获取分区字段信息SimplePropertyPreFilter filter = new SimplePropertyPreFilter("comment", "name", "type");tableMetaInfo.setPartitionColNameJson(JSON.toJSONString(table.getPartitionKeys(), filter));// 获取表字段信息StorageDescriptor sd = table.getSd();tableMetaInfo.setColNameJson(JSON.toJSONString(sd.getCols(),filter));// 获取表参数信息tableMetaInfo.setTableParametersJson(JSON.toJSONString(table.getParameters()));// 获取表格式信息tableMetaInfo.setTableInputFormat(sd.getInputFormat());tableMetaInfo.setTableOutputFormat(sd.getOutputFormat());// 分桶字段tableMetaInfo.setTableBucketColsJson(JSON.toJSONString(sd.getBucketCols()));// 获取表分桶排序信息tableMetaInfo.setTableSortColsJson(JSON.toJSONString(sd.getSortCols()));// 获取表桶数tableMetaInfo.setTableBucketNum(sd.getNumBuckets()+0L);// 获取表创建时间 ts->Date->string "yyyy-MM-dd HH:mm:ss"Date createTime = new Date(table.getCreateTime() * 1000L);tableMetaInfo.setTableCreateTime(DateFormatUtils.format(createTime,"yyyy-MM-dd HH:mm:ss"));// 获取表类型tableMetaInfo.setTableType(table.getTableType());// 获取表注释tableMetaInfo.setTableComment(table.getParameters().get("comment"));// 获取表拥有者tableMetaInfo.setTableFsOwner(table.getOwner());// 获取序列化类tableMetaInfo.setTableRowFormatSerde(sd.getSerdeInfo().getSerializationLib());//获取表路径tableMetaInfo.setTableFsPath(sd.getLocation());System.out.println("333333333333333333333333 tableMetaInfo in = extractTableMetaInfoFromHive: " + JSON.toJSONString(tableMetaInfo));return tableMetaInfo;} catch (Exception e){e.printStackTrace();throw new RuntimeException(e);}}private void extractHdfsInfo(TableMetaInfo tableMetaInfo){try {// 获取表存储信息URI uri = new URI(tableMetaInfo.getTableFsPath());FileSystem fs = FileSystem.get(uri, new Configuration(), "atguigu");// 当前目录下的一级子节点 的节点状态(目录和文件)FileStatus[] fileStatuses = fs.listStatus(new Path(tableMetaInfo.getTableFsPath()));// 递归获得该文件的信息getHdfsInfoRec(fs, fileStatuses, tableMetaInfo);System.out.println("tableMetaInfo = " +tableMetaInfo.getTableName()+":"+ tableMetaInfo.getTableSize()+":accessTime:"+tableMetaInfo.getTableLastAccessTime()+"modifyTIme:"+tableMetaInfo.getTableLastModifyTime());tableMetaInfo.setFsCapcitySize(fs.getStatus().getCapacity());tableMetaInfo.setFsRemainSize(fs.getStatus().getRemaining());tableMetaInfo.setFsUsedSize(fs.getStatus().getUsed());} catch (Exception e){e.printStackTrace();throw new RuntimeException(e);}}private void getHdfsInfoRec(FileSystem fs, FileStatus[] fileStatuses, TableMetaInfo tableMetaInfo) {for (FileStatus fileStatus : fileStatuses) {if(fileStatus.isDirectory()){try {// 递归FileStatus[] listStatus = fs.listStatus(fileStatus.getPath());getHdfsInfoRec(fs, listStatus, tableMetaInfo);} catch (IOException e) {e.printStackTrace();throw new RuntimeException(e);}}else {// 获取表大小信息// 获取表最后修改时间// 获取表最后访问时间try {// 获得文件大小tableMetaInfo.setTableSize(tableMetaInfo.getTableSize()+fileStatus.getLen());tableMetaInfo.setTableTotalSize(tableMetaInfo.getTableTotalSize()+fileStatus.getLen());// 获取表访问信息if (tableMetaInfo.getTableLastAccessTime() == null){tableMetaInfo.setTableLastAccessTime(new Date(fileStatus.getAccessTime()));} else {if (fileStatus.getAccessTime() > tableMetaInfo.getTableLastAccessTime().getTime()){tableMetaInfo.setTableLastAccessTime(new Date(fileStatus.getAccessTime()));}}// 获取表最后修改时间if (tableMetaInfo.getTableLastModifyTime() == null){tableMetaInfo.setTableLastModifyTime(new Date(fileStatus.getModificationTime()));} else {if (fileStatus.getModificationTime() > tableMetaInfo.getTableLastModifyTime().getTime()){tableMetaInfo.setTableLastModifyTime(new Date(fileStatus.getModificationTime()));}}} catch (Exception e){e.printStackTrace();throw new RuntimeException(e);}}}}// 获取前端的需连接两个表和分页的信息@Overridepublic List<TableMetaInfoVO> getListForQuery(TableMetaInfoQuery tableMetaInfoQuery) {StringBuilder sb = new StringBuilder(2048);sb.append("select tm.id ,tm.table_name,tm.schema_name,table_comment,table_size,table_total_size,tec_owner_user_name,\n" +" busi_owner_user_name, table_last_access_time,table_last_modify_time from table_meta_info tm\n" +" left join table_meta_info_extra te\n" +" on tm.table_name=te.table_name and tm.schema_name=te.schema_name\n" +"where assess_date = (select max( assess_date ) from table_meta_info tmi\n" +" where tm.table_name = tmi.table_name\n" +" and tm.schema_name=tmi.schema_name )");// 接收动态条件if(tableMetaInfoQuery.getSchemaName()!=null&&!tableMetaInfoQuery.getSchemaName().trim().equals("")){sb.append(" and tm.schema_name like '%"+ SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getSchemaName()) +"%'");}if (tableMetaInfoQuery.getTableName()!=null&&!tableMetaInfoQuery.getTableName().trim().equals("")){sb.append(" and tm.table_name like '%"+SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getTableName())+"%'");}if (tableMetaInfoQuery.getDwLevel()!=null&&!tableMetaInfoQuery.getDwLevel().trim().equals("")) {sb.append(" and te.dw_level like '%" + SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getDwLevel()) + "%'");}// 处理分页sb.append("limit " + (tableMetaInfoQuery.getPageNo()-1) * tableMetaInfoQuery.getPageSize() + "," + tableMetaInfoQuery.getPageSize());System.out.println("sb = " + sb);List<TableMetaInfoVO> tableMetaInfoVOList = baseMapper.selectTableMetaListForQuery(sb.toString());return tableMetaInfoVOList;}// 有个数据总条数得单独算@Overridepublic Integer getTotalForQuery(TableMetaInfoQuery tableMetaInfoQuery) {StringBuilder sb = new StringBuilder(2048);sb.append("select count(*) from table_meta_info tm\n" +" left join table_meta_info_extra te\n" +" on tm.table_name=te.table_name and tm.schema_name=te.schema_name\n" +"where assess_date = (select max( assess_date ) from table_meta_info tmi\n" +" where tm.table_name = tmi.table_name\n" +" and tm.schema_name=tmi.schema_name )");// 接收动态条件if(tableMetaInfoQuery.getSchemaName()!=null&&!tableMetaInfoQuery.getSchemaName().trim().equals("")){sb.append(" and tm.schema_name like '%"+ SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getSchemaName()) +"%'");}if (tableMetaInfoQuery.getTableName()!=null&&!tableMetaInfoQuery.getTableName().trim().equals("")){sb.append(" and tm.table_name like '%"+SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getTableName())+"%'");}if (tableMetaInfoQuery.getDwLevel()!=null&&!tableMetaInfoQuery.getDwLevel().trim().equals("")) {sb.append(" and te.dw_level like '%" + SqlUtil.filterUnsafeSql(tableMetaInfoQuery.getDwLevel()) + "%'");}Integer total = baseMapper.selectTableMetaTotalForQuery(sb.toString());return total;}@Overridepublic List<TableMetaInfo> getListWithExtra(String assessDate) {List<TableMetaInfo> tableMetaInfoList = baseMapper.selectListWithExtra(assessDate);return tableMetaInfoList;}}
创建springboot 工程的引导模版 要选择aliyun ,否则不支持java8
springboot 选2.7.x
springboot 工程
xxxxxxApplication
启动类 用于启动整个应用程序 ,里面不添加任何业务代码
第一次打开新工程可能会有乱码,是编码集的问题,都改成UTF-8就可以了
spring 相关组件
- spring内部有一个组件容器池
包含了所有组件的单态实例
组件 :类上 有 @RestController @Servcie @Mapper @Component
这些组件在应用启动时都会以单态的形式创建并保存在容器中 - 如果想引用这些对象 利用@Autowired把对象 装配在引用上
这些组件都是固定的模式,所以有了代码生成器
代码生成器
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.TemplateConfig;
import com.baomidou.mybatisplus.generator.config.rules.DateType;
import com.baomidou.mybatisplus.generator.engine.FreemarkerTemplateEngine;
import org.apache.ibatis.annotations.Mapper;import java.util.function.Consumer;public class CodeGen3531 {public static void main(String[] args) {String[] tables = {"table_meta_info"};FastAutoGenerator.create("jdbc:mysql://hadoop102:3306/dga", "root", "000000").globalConfig(builder -> {builder.author("liang") //作者.outputDir("D:\\sgg\\19-数据治理Git\\dga02\\src\\main\\java") //输出路径(写到java目录).commentDate("yyyy-MM-dd").dateType(DateType.ONLY_DATE); //选择实体类中的日期类型 ,Date or LocalDatetime}).packageConfig(builder -> { //各个package 名称builder.parent("com.atguigu.dga").moduleName("meta").entity("bean") //目录名.service("service") //目录名.serviceImpl("service.impl") //目录名.controller("controller") //目录名.mapper("mapper"); //目录名}).strategyConfig(builder -> {builder.addInclude(tables).serviceBuilder().formatServiceFileName("%sService") //类后缀.formatServiceImplFileName("%sServiceImpl") //类后缀.entityBuilder().enableLombok() //允许使用lombok.controllerBuilder().formatFileName("%sController") //类后缀.enableRestStyle() //生成@RestController 否则是@Controller.mapperBuilder()//生成通用的resultMap 的xml映射.enableBaseResultMap() //生成xml映射.superClass(BaseMapper.class) //标配.formatMapperFileName("%sMapper") //类后缀//.enableFileOverride() //生成代码覆盖已有文件 谨慎开启.mapperAnnotation(Mapper.class); //生成代码Mapper上自带@Mapper}).templateConfig(new Consumer<TemplateConfig.Builder>() {@Overridepublic void accept(TemplateConfig.Builder builder) {// 实体类使用我们自定义模板builder.entity("templates/myentity.java");}}).templateEngine(new FreemarkerTemplateEngine()) // 使用Freemarker引擎模板,默认的是Velocity引擎模板.execute();}
}
配合其他配置即可使用
首先是在pom.xml中增加依赖
增加完后:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.0</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.1</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.3.2</version><scope>compile</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId><version>3.5.3.1</version></dependency><dependency><groupId>org.apache.velocity</groupId><artifactId>velocity-engine-core</artifactId><version>2.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-metastore</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-common</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.1.3</version></dependency><dependency><groupId>commons-beanutils</groupId><artifactId>commons-beanutils</artifactId><version>1.9.4</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version><exclusions><exclusion><artifactId>hadoop-annotations</artifactId><groupId>org.apache.hadoop</groupId></exclusion><exclusion><artifactId>hadoop-yarn-server-common</artifactId><groupId>org.apache.hadoop</groupId></exclusion><exclusion><artifactId>hadoop-yarn-server-resourcemanager</artifactId><groupId>org.apache.hadoop</groupId></exclusion><exclusion><artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId><groupId>org.apache.hadoop</groupId></exclusion><exclusion><artifactId>hadoop-yarn-server-web-proxy</artifactId><groupId>org.apache.hadoop</groupId></exclusion><exclusion><artifactId>hadoop-yarn-common</artifactId><groupId>org.apache.hadoop</groupId></exclusion><exclusion><artifactId>jackson-core-asl</artifactId><groupId>org.codehaus.jackson</groupId></exclusion><exclusion><artifactId>jackson-mapper-asl</artifactId><groupId>org.codehaus.jackson</groupId></exclusion><exclusion><artifactId>jersey-core</artifactId><groupId>com.sun.jersey</groupId></exclusion><exclusion><artifactId>jersey-server</artifactId><groupId>com.sun.jersey</groupId></exclusion><exclusion><artifactId>zookeeper</artifactId><groupId>org.apache.zookeeper</groupId></exclusion></exclusions></dependency></dependencies>
把这个配置文件复制一份
放到这个位置
修改名称和文件中的两个地方即可
package ${package.Entity}; <#list table.importPackages as pkg>
import ${pkg};
</#list>
<#if springdoc>
import io.swagger.v3.oas.annotations.media.Schema;
<#elseif swagger>
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
</#if>
<#if entityLombokModel>
import lombok.Data; <#if chainModel>
import lombok.experimental.Accessors; </#if>
</#if> /** * <p> * ${table.comment!} * </p> * * @author ${author} * @since ${date} */
<#if entityLombokModel>
@Data <#if chainModel>
@Accessors(chain = true) </#if>
</#if>
<#if table.convert>
@TableName("${schemaName}${table.name}")
</#if>
<#if springdoc>
@Schema(name = "${entity}", description = "$!{table.comment}")
<#elseif swagger>
@ApiModel(value = "${entity}对象", description = "${table.comment!}")
</#if>
<#if superEntityClass??>
public class ${entity} extends ${superEntityClass}<#if activeRecord><${entity}></#if> {
<#elseif activeRecord>
public class ${entity} extends Model<${entity}> {
<#elseif entitySerialVersionUID>
public class ${entity} implements Serializable {
<#else>
public class ${entity} {
</#if>
<#if entitySerialVersionUID> private static final long serialVersionUID = 1L;
</#if>
<#-- ---------- BEGIN 字段循环遍历 ---------->
<#list table.fields as field> <#if field.keyFlag> <#assign keyPropertyName="${field.propertyName}"/> </#if> <#if field.comment!?length gt 0> <#if springdoc> @Schema(description = "${field.comment}") <#elseif swagger> @ApiModelProperty("${field.comment}") <#else> /** * ${field.comment} */ </#if> </#if> <#if field.keyFlag> <#-- 主键 --> <#if field.keyIdentityFlag> @TableId(value = "${field.annotationColumnName}", type = IdType.AUTO) <#elseif idType??> @TableId(value = "${field.annotationColumnName}", type = IdType.${idType}) <#elseif field.convert> @TableId("${field.annotationColumnName}") </#if> <#-- 普通字段 --> <#elseif field.fill??> <#-- ----- 存在字段填充设置 -----> <#if field.convert> @TableField(value = "${field.annotationColumnName}", fill = FieldFill.${field.fill}) <#else> @TableField(fill = FieldFill.${field.fill}) </#if> <#elseif field.convert> @TableField("${field.annotationColumnName}") </#if> <#-- 乐观锁注解 --> <#if field.versionField> @Version </#if> <#-- 逻辑删除注解 --> <#if field.logicDeleteField> @TableLogic </#if> private ${field.propertyType} ${field.propertyName};
</#list>
<#------------ END 字段循环遍历 ---------->
<#if !entityLombokModel> <#list table.fields as field> <#if field.propertyType == "boolean"> <#assign getprefix="is"/> <#else> <#assign getprefix="get"/> </#if> public ${field.propertyType} ${getprefix}${field.capitalName}() { return ${field.propertyName}; } <#if chainModel> public ${entity} set${field.capitalName}(${field.propertyType} ${field.propertyName}) { <#else> public void set${field.capitalName}(${field.propertyType} ${field.propertyName}) { </#if> this.${field.propertyName} = ${field.propertyName}; <#if chainModel> return this; </#if> } </#list>
</#if>
<#if entityColumnConstant> <#list table.fields as field> public static final String ${field.name?upper_case} = "${field.name}"; </#list>
</#if>
<#if activeRecord> @Override public Serializable pkVal() { <#if keyPropertyName??> return this.${keyPropertyName}; <#else> return null; </#if> }
</#if>
<#if !entityLombokModel> @Override public String toString() { return "${entity}{" + <#list table.fields as field> <#if field_index==0> "${field.propertyName} = " + ${field.propertyName} + <#else> ", ${field.propertyName} = " + ${field.propertyName} + </#if> </#list> "}"; }
</#if>
}
最后运行 CodeGen3531.java 的主函数就可以生成代码了。
动态数据源
步骤:
- 加依赖
- application.properties 加配置
- 在Service实现类 和Mapper上 加@DS(“xx”)
- 如果是不同数据库类型 ,修改不同的jdbc协议 和 不同的驱动包
- 方法和类上都有@DS 以方法为准
在pom.xml中增加依赖
<dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.3.2</version>
</dependency>
在service层和 mapper的类上增加默认数据源
@Service
@DS("mysql001")
public class CustomerServiceImpl extends ServiceImpl<CustomerMapper, Customer> implements CustomerService {@Mapper //实现类由mybatis 提供
@DS("mysql001")
public interface CustomerMapper extends BaseMapper<Customer> {
在特定的方法上增加特定数据源
@Insert("insert into customer(name,age) values (#{customer.name}, #{customer.age} )")
@DS("mysql002")
public void insertCustomer002(@Param("customer") Customer customer);spring.datasource.dynamic.datasource.demo.url=jdbc:mysql://bigdata01:3306/demo?characterEncoding=utf-8&useSSL=false
spring.datasource.dynamic.datasource.demo.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.demo.username=root
spring.datasource.dynamic.datasource.demo.password=000000spring.datasource.dynamic.datasource.demo2.url=jdbc:mysql://bigdata01:3306/demo2?characterEncoding=utf-8&useSSL=false
spring.datasource.dynamic.datasource.demo2.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.demo2.username=root
spring.datasource.dynamic.datasource.demo2.password=000000
整合druid连接池
数据连接池的主要目的是当应用反复访问数据库时,不会每次都重复创建连接。而是复用已有的连接。
假设应用程序访问一次数据库 100ms
查询流程:
- 格式转换 <1ms (视情况而定:内存、CPU、外存、网页前端、后端代码……)
- 网络传输 双向 60ms (很费时所以要用sql语句提前过滤)
- 建立连接 20ms(连接池在此处节约时间)
- sql的执行 20ms(根据主键进行查询 10万级)
首先要引入依赖
<dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.15</version>
</dependency>
配文件中加入
spring.autoconfigure.exclude=com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigurespring.datasource.dynamic.datasource.demo.url=jdbc:mysql://bigdata01:3306/demo?characterEncoding=utf-8&useSSL=false
spring.datasource.dynamic.datasource.demo.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.demo.username=root
spring.datasource.dynamic.datasource.demo.password=000000
spring.datasource.dynamic.datasource.demo.druid.initial-size=5
spring.datasource.dynamic.datasource.demo.druid.min-idle=5
spring.datasource.dynamic.datasource.demo.druid.max-active=20
spring.datasource.dynamic.datasource.demo.druid.max-wait=60000
spring.datasource.dynamic.datasource.demo.druid.test-on-borrow=true
spring.datasource.dynamic.datasource.demo.druid.test-while-idle=true
spring.datasource.dynamic.datasource.demo.druid.test-on-return=falsespring.datasource.dynamic.datasource.demo2.url=jdbc:mysql://bigdata01:3306/demo2?characterEncoding=utf-8&useSSL=false
spring.datasource.dynamic.datasource.demo2.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.demo2.username=root
spring.datasource.dynamic.datasource.demo2.password=000000
spring.datasource.dynamic.datasource.demo2.druid.initial-size=5
spring.datasource.dynamic.datasource.demo2.druid.min-idle=5
spring.datasource.dynamic.datasource.demo2.druid.max-active=20
spring.datasource.dynamic.datasource.demo2.druid.max-wait=60000
spring.datasource.dynamic.datasource.demo2.druid.test-on-borrow=true
spring.datasource.dynamic.datasource.demo2.druid.test-while-idle=true
spring.datasource.dynamic.datasource.demo2.druid.test-on-return=false
参数 | 含义 |
---|---|
initial-size | 初始连接数 |
min-idle | 最少保持空闲连接数 |
max-active | 最大可活动的连接数 |
max-wait | 最长等待时长 |
test-on-borrow | 借走时测试 |
test-while-idle | 空闲时测试 |
test-on-return | 归还时测试 |
具体代码:
gitee仓库地址
代码说明:
用hiveMetaStoreClient提取hive中所有表名,然后根据表名即可获取该表所有信息
然后放入我们事先准备好的TableMetaInfo bean中
分两部分放,普通字段信息和HDFS环境信息
体力活儿,挨个取
Controller层
注解 说明
注****解 | 位****置 | |
---|---|---|
@RestController | 类 | 标识入口类 |
@RequestMapping | 方法 | 标识入口方法 |
@GetMapping | 方法 | 标识入口方法GET请求专用 |
@PostMapping | 方法 | 标识入口方法POST请求专用 |
@RequestParam | 参数 | 接收路径上的键值对参数 http://xxxxx/xx?name=xxx&age=xxx |
@PathVariable | 参数 | 接收路径上的值http://xxxxx/customer/123 |
@RequestBody | 参数 | 接收请求体(payload) 中的参数 |
小练习
1 关于web开发描述正确的是:bce
a post请求的路径中不能携带键值对参数
b 请求路径中的键值对参数使用@RequestParam接收到方法参数中
c /customer/123 中的123可以通过@PathVariable 接收到方法参数中
d @ResponseBody 可以负责接收请求体中的数据。
e get请求中一般不携带请求体数据2 关于springboot描述正确的是ada @Service标识对象默认会在web容器启动时以单态的方式加载到内存中b 被@Autowired标识的接口,默认只有在其方法被调用时springboot才会为其装配实现类。(@Lazy)c 控制层直接访问数据层并不违反开发规范d 服务层的各个服务之间可以互相调用3 关于状态码描述正确的是abcda 见到页面500的报错,直接查web服务器端控制台日志,b 405的状态,一般是浏览器请求的get或post 与服务器的get或post不匹配c 400的状态,往往是因为参数传递缺失或者类型不匹配造成的。d 404的状态,有可能是请求路径填错了4 关于sprinboot分层描述正确的是abcda 一个控制层的方法代表一个请求b 一个服务层的方法代表一个业务操作c 一个数据层的类代表一张表d 一个数据层的方法代表一条sql语句5 关于开发调试正确的是abcdfa idea 进入当前断点的方法的快捷键是F7b idea 从当前断点位置执行一行代码的快捷键是F8c idea 从当前断点位置继续执行后面的代码的快捷键是F9d evaluate expression 可以帮助查看断点方法的执行结果e 通过run执行java代码,比debug方式执行,运行性能有明显提升f 在各种主流的浏览器上执行F12,都可以打开开发者工具窗口。6 关于web开发描述正确的是abcde
A.只要是支持jdbc的数据库,都可以使用mybatis
B.如果使用#{} 在SQL中作为参数替代符,即使是字符串参数也不用加单引
C.如果存在多张表join的操作,则无法使用mybatis-plus ,只能使用mybatis
D.mybatis-plus不仅封装了很多现成的数据层方法,还封装了很多服务层的方法
E.mybatis-plus中提供的数据层的方法,一定是使用select、insert、update、delete开头的,没有例外。7 以下对数据库连接池描述正确的是acd
A 可以节省程序反复创建连接的开销
B 连接池的初始连接数 主要看平均在线人数(平均访问并发数)
C 连接池的最大活跃数 主要看后台数据库能承担的压力
D 如果后台数据库发生了重启,所有的连接都会立刻失效。
E test-while-idle 可以绝对避免用户借走坏掉的连接 on-borrow可以1 以下为程序开发时的注意事项正确的是:
A 避免即席查询中在循环中操作数据库 对 场景 避免1+N关联查询 解决 1 join 2 从数据库查出来在java中拼接
B 避免循环使用String频繁拼接字符串 对 解决 stringbuilder stringbuffer
C mybatis中执行sql中涉及#{}时, 要手工对用户提交的参数进行过滤特殊字符. 错 只有${}时才涉及sql注入问题 才需要过滤特殊字符
D 大量数据存入一个集合对象时,要对集合对象进行容量初始化。 对 避免扩容
E 对数据库的插入操作较多时可以通过建立数据库索引提高速度。错 索引的目的是优化查询性能 对于插入操作来说 索引往往都是负担 2 向以下数据集合对象频繁写入操作会造成扩容的是
A hashMap put 会扩容 初始容量 16 扩容比例 翻倍 扩容因子 0.75
B Stringbuilder append 会扩容 初始容量16 新值2倍+2 不够就加
C ArrayList add 会扩容 初始容量10 扩容比例+50% 不够就加
D LinkedList add 不会扩容 3 以下时间复杂度描述正确的是
A 判断HashSet中是否存在某个元素 O(N) 错
B 判断HashSet中是否存在某个元素 O(1) 对
C 有序数组的折半查找指定值 O(logN) 对
D 无序List进行遍历查找指定值 O(N) 对
E 数组按下标查询指定值 O(N) 错 O(1)
F 链表按下标进行查询 O(logN) 错 O(N)
G 冒泡排序 O(N^2) 对
H 快速排序 O(N) 错 O(NlogN)
I Mysql 如果用主键查询 时间复杂度 O(1) 错 O(LogN) b+tree 非二叉树
J Hashmap中 的 红黑树 时间复杂度 O(1) 错 O(LogN) 二叉树为什么mysql选择了非二叉树 而hashmap选择二叉树 原因 二叉树非常的高 如果放在磁盘操作会增加很多的io 4 以下关于mapper层映射的描述正确的是
A 如果java对象的属性刚好与数据库字段对应(驼峰对蛇形),不需要任何手动映射处理,否则需要手动干预字段与属性的映射。
B 如果单表查询,实体类中存在某些属性,不存在于数据库中,需要增加@TableField(exists=false)
C @DS 可以放在方法上,当方法上的@DS和类上的@DS指向不同数据源时,以类上的为准。
D Mybatis-plus 执行getById方法时,一定是根据数据库表的主键进行查询。
E 当对象中还有子对象时,select语句查询的结果需要借助<association>封装到子对象中。5 以下信息哪些是java程序可以和Hdfs中获得
A 数据表的单副本大小
B 数据表含副本的大小
C 数据表下的文件数
D 数据表下的分区数
E 数据表的行数
F 数据表的最后访问和写入时间
G 数据库表的字段数1 以下关于设计模式描述正确的是
A 单例模式的初衷是减少对象重复创建,节省内存开销。 对
B 在多线程场景下,单例模式要考虑内部属性的线程安全问题。 对 i++(不安全) (使用原子对象) hashmap.put (不安全) cocurrentHashMap (线程安全) 或者使用锁
C 模板模式践行了开闭原则,封装不变的部分,扩展可变的部分。 对
D 模板模式践行了单一职责原则,每个子类实现一个职责。 对
E 模板模式由抽象父类进行控制,子类实现核心逻辑。 对
F 模板模式必要时子类可以重写父类的主控制方法。 错 为了避免子类重写父类的控制方法 可以在父类控制方法中加final2 以下对多线程场景描述正确的是
A 需要事务之间没有先后依赖
B 适合每个事务有大量io的操作
C 线程越多并行效果越明显
D 适合需要频繁访问共享资源的场景3 以下可以从dolphinscheduler获得的信息
A 任务的执行耗时
B 任务的sql定义
C 任务的yarn_id
D 任务的stage个数4 以下关于正则表达式的描述,哪些是正确的?
A. ^[a] 表示字符串中不能含有a字符
B. \d+ 表示至少有一个数字字符
C. .* 表示匹配任意长度的任意字符
D. \w 表示匹配任意字母和数字
E a{2,4} 可以匹配ababaa
F. (abc|def) 表示匹配字符串 "abc" 或 "def"5 关于数据治理考评平台
A 致力于对数据事前、事中、事后三个阶段进行治理。
B 比起庞大数据中台,搭建数据治理考评平台更快更直观且开发成本更低的暴露数据治理的问题。
C 如果有数据中台,则不需要进行数据考评治理。
D 如果有数据中台,则可以更好的对数据考评进行治理。
E 数据治理考评平台属于技术型项目,不需要管理制度的支持。6 关于语法树解析的描述正确的是
A 凡是使用java开发的框架使用的语法树解析工具都是通用的
B hive的语法树解析器中的walker遍历器使用后序遍历。
C 不同的需求要开发不同的自定义Dispatcher,放入遍历器,在遍历中收集信息。
D 在语法树中只用TOK_TABREF可以定位来源表
E 在语法树中只用TOK_TABLE_OR_COL可以定位where条件的字段7 关于数据考评指标的描述正确的是
A 所有指标分成【规范】、【存储】、【计算】、【质量】、【安全】五个板块。
B “是否数据倾斜”属于计算板块。
C “是否长期无产出” 属于存储板块。
D “生命周期合理” 属于存储版块
E 如果增加一个“产出数据行数监控”指标应该放在质量板块。
Redis(代码)
5大数据类型 场景
1 string 1对1 单值的kv 存不太容易变化的对象 key –json
2 list 1对多v 轻量级的队列 栈
3 set 1对多v 判存 去重 集合运算
4 zset 1对多v 排序
5 hash 1对多kv 可以存储 对象 需要字段独立管理 变化的场景
很简单,就是创建连接,创建对象,提交
看不懂可以用插件 通义灵码 解释一下,如图
gitee仓库地址
自测题
1 以下对 redis 描述正确的是
A redis 的默认端口号是6379 ,并且可以在配置文件中修改。 对
B 因为redis可以备份数据到磁盘中,所以不会丢失数据。 错 rdb有可能 aof也会丢概率极小 always不会丢 单代价极大
C 启动redis-server时如果不声明配置文件,则使用安装目录中的redis.conf 错 如果不指定配置文件 则使用程序中硬编码的配置
D redis默认是单线程的,可以通过配置改为多线程。错 redis就是单线程 配置6.x配置多线程只是辅助工作 核心工作线程还是单线程 2 以下关于redis 命令描述正确的是
A keys * 命令 可以查看所有库下的key 错 当前库 当前默认0号库
B expire 设置过期时间默认单位是毫秒 错 默认秒
C lrange mylst 0 3 含义是从左边取前4个元素 对 左闭右闭
D zrevrange 是从大到小 ,按下标范围取值 对 3 以下redis命令 为幂等操作的是
A sadd sadd s1 v1 幂等
B append 非幂等
C set 幂等
D incr 非幂等
E hmset 幂等
F lpush lpush l1 v1 非幂等
G zadd 幂等
H zincrby 非幂等4 需求:在实时计算中利用redis记录当日访问过网站的用户id,并且统计人数 ,则最好选用哪个数据类型A string 需要一对多 B list 不能去重 C zset 不需要排序 D hash 没有kv结构 不需要 E set 推荐 可以去重 也可以去个数 5 以下对 redis 描述正确的是
A redis默认不写错误日志,必须手动开启 对 配置logfile
B redis的内存淘汰策略 maxmemory-policy ,只有在内存使用达到了阈值才会触发 对
D 在命令行中执行config set 操作修改的配置,重启redis后就会失效 对
E 如果给redis设置了timeout 时间,会有可能导致应用系统中的连接池的对象连接失效 对 为了避免连接池失效 testOnBorrow(true) 6 redis 在正常使用一段时间后,抛出如下异常:redis.clients.jedis.exceptions.JedisConnectionException : Could not get a resource from the pool 应该考虑以下哪个解决方案 :
A 检查配置文件 bind 是否注释,保护模式是否关闭 错 如果有该问题 不会能够正常使用一段时间
B 检查redis.conf配置中,对于超时时间过短可能造成连接池中的连接损坏 timeout
C 检查程序中使用过的连接是否close掉 一开始可以从连接池中获得连接 但是因为没有close也就是没有归还连接 造成连接池中的连接 枯竭 所有报 题干上的错误
D 检查redis服务内存使用过大,超过阈值 OOM out of memory 7 以下关于rdb描述正确的是
A redis的rdb备份位置可以通过配置文件配置。 可以 dir "/xxx/xx"
B 当redis进行rdb备份时,会瞬间造成内存总量翻倍。 不会翻倍 copy on write 可以共享内存
C redis自动备份数据时,会阻塞主线程,一定程度影响用户使用。 错 自动备份使用独立进程 不影响用户使用
D flushall和shutdown 命令会触发rdb备份。 对 8 以下关于aof描述正确的是
A 相对于rdb,aof更占用存储空间。 对
B aof文件可以通过文本编辑器查看,但是不能修改。 错 可以改
C aof本质上是由rdb加命令文本日志组成。 对 通过rewrite 周期性的压缩为rdb
D aof和rdb同时启用,redis会优先加载rdb。 错 优先aof
E aof重写可以压缩aof的数据量。 对 9 以下关于主从复制描述正确的是
A 主从复制的主要作用是缓解服务器压力,扩展容量。 错 作用是高可用
B master如果宕机, slave不会自己自动升级为master。 对 原地等
C slave无法继承master建立关系前的历史数据。 错 可以获得
D slave一旦宕机,再次启动并成为slave后,无法获得失联这段时间的master数据。 错 可获得
E 如果已有自己数据的节点,被设定成为另个节点的从机,那么原先自己的数据依然还会保留。 错 不会保留 10 关于哨兵描述正确的是
A 一个主从集群中通常会有一个或多个哨兵,监控master及slave的健康状态。 对
B 当master出现健康问题时,哨兵们会发起投票,以默认过半数原则确定是否进行主从切换。 错 决定切换的票数 是 配置的 没有过半数的默认值
C 哨兵会选择优先级数值更小的slave 对
D 当旧master再次上线后,哨兵会把旧master转为slave。 对
E 哨兵可以为客户端提供路由功能,会把写操作分给master ,读操作分给slave,从而实现压力的分摊。错 哨兵 会把所有的请求都给master 不会做读写分离 读写分离会存在数据不一致的情况 而且还有线程不安全的情况
HBase代码
很简单,就是创建连接,创建对象,提交
gitee仓库地址
Region Server
1)MemStore
写缓存,由于HFile中的数据要求是有序的,所以数据是先存储在MemStore中,排好序后,等到达刷写时机才会刷写到HFile,每次刷写都会形成一个新的HFile,写入到对应的文件夹store中。
2)WAL
由于数据要经MemStore排序后才能刷写到HFile,但把数据保存在内存中会有很高的概率导致数据丢失,为了解决这个问题,数据会先写在一个叫做Write-Ahead logfile的文件中,然后再写入MemStore中。所以在系统出现故障的时候,数据可以通过这个日志文件重建。(write ahead log 预写日志 防治出现崩溃造成数据丢失
一般情况下是不会读取该日志 顺序写入(性能好) (随机读写非常难) 只有原主不在了 才会读 , WAL一定保存在Hdfs
MasterProcWAL 是记录了master日常的ddl操作
防止master崩溃 backupmaster可以通过读取wal 快速接手master 未完成事情 )
3)BlockCache
读缓存,每次查询出的数据会缓存在BlockCache中,方便下次查询。
HBase写操作
1 节点定位阶段 : 定位regionServer1首次访问需要查询zookeeper获得meta的regionserver 2在访问regionServer (meta) 获得meta表信息3 根据数据的rowkey 查询meta表 定位到regionServer (data)4 把数据提交给regionServer 优化: 在客户端会缓存meta信息 meta cache ,以后同样的region 不会再问zookeeper 跳过 1-2 环节 提高访问效率 减少通信 io2 写操作提交阶段 : 向regionserver提交数据 1 为什么写wal 避免regionserver 挂掉 刚刚写入缓存的数据丢失 2 为什么不直接写磁盘 因为wal是日志 是顺序写 写入速度非常快 3 先写wal还是缓存memstore 先写wal 如果先写缓存 会有数据丢失风险 如果完成wal和缓存的写入 本次put就完成了 会等数据真实落盘到hdfs 3 数据落盘阶段 : 写入hdfs 把memstore中的数据 排序 写入到对应store下 形成一个文件, store会有多个文件 ,是因为memstore 不同时间点的flush 产生 ,可能大也可能效 ,不同的文件可能 保存的是同样rowkey 数据 只是version不同
4 数据整理阶段 compaction 把文件进行合并整理
memstore 刷新机制
1 到达阈值 128 日常刷写
2 高频访问压力大 堆内的40%
3 空闲 每小时刷写一次
4 wal个数 超过32 刷写
StoreFile Compaction(文件合并)
memstore每次刷写都会生成一个新的HFile,文件过多读取不方便,所以会进行文件的合并,清理掉过期和删除的数据,会进行StoreFile Compaction。
major 会最终把一个store文件合并为一个 而且还会清理失效数据 非常的重 不好估计执行时间 ,建议设置为手动 ,根据企业情况 执行
minor 把文件 分为小 中大 大的一定不合并 小的一定合并(128mb) 中 :看和其他文件的相对大小 相对小的合并
合并个数 3到10 ,避免一次合并消耗太大
HBase读操作
布隆过滤器
hbase 海量数据 的读取 高效是怎么做到的(面试重点)
- 规划:hbase 把一个大表的数据分成多个region 分布在不同的regionsServer节点
- 定位:hbase的客户端可以缓存metacache 快速定位 数据节点
- 一张表的数据根据store(列族)进行划分,避免读取没有必要的列族
- 该文件有索引 根据rowkey 判断是否要扫描该文件
- 还可以根据布隆过滤器 判断是否要扫描该文件
- 如果确实要扫描该文件 该文件有block cache 可以直接从内存中读取
- hbase 还对已经存储的文件进行compaction ,进一步优化读取效率
数据倾斜
-
放在分布式批处理系统 产生的问题是 长尾效应
-
放在即席查询的场景 产生热点问题(hot pot)
-
预分区 + rowkey的设计 解决热点问题 让数据散列分布
预分区(自定义分区)
系统自动分区一般是伴随着数据量的增长,但是根据不同数据特征我们可以提前规划,直接把数据热点问题扼杀在前期。
每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。
1)手动设定预分区
create ‘staff1’,‘info’, SPLITS => [‘1000’,‘2000’,‘3000’,‘4000’]
生成效果如下:
注意:范围是左闭右开的。
2)按照文件中设置的规则预分区
(1)创建splits.txt文件内容如下:
aaaa
bbbb
dddd
cccc
(2)然后执行:
create ‘staff3’, ‘info’,SPLITS_FILE => ‘/xx/xx/splits.txt’
小练习
1 以下是Hbase的主要特征的是
A 大规模数据的随机读写 对
B 在线分析处理 (online-analysis-process) 错
C NoSQL 对
D 经过配置可以实现原子性事务 错 mysql oracle postgresql tidb
E 适合存储稀疏的数据 对
F 对于增加列族有非常高的弹性 错 对于增加列有非常高的弹性 2 以下哪个组件会负责存储数据
A HMaster 错
B RegionServer 对 memstore
C Zookeeper 错
D HDFS 对 wal+region3 以关于建立hbase表下描述正确的是
A 每个表都至少有一个列族 对
B 列族中的数据都至少会保留3个版本 错 1个版本
C 列族的名称可以随意更改 错 不能改 只能删掉在加新的
D 建议每张表有5个以上列族为宜 错 1-3个4 对于hbase写流程描述正确的是
A 数据提交到RegionServer中先保存MemStore,再保存WAL。 错 先写wal 保证数据不丢失
B MemStore每次写入Hdfs都会经过排序。 对
C flush时每个store对应一个memstore,会产生一个文件。 对
D 当某个memstore达到阈值触发flush时,该region下所有的memstore都会一起保存。 对
E flush时,都会阻塞往regionserver提交数据。 错 达到更高的阈值的时候才会阻塞住写操作5 以下哪些因素会影响flush的触发
A 某个memstore的大小是否超过阈值 对 128
B 所有memstore占regionserver总内存的比例超过阈值 对 40%
C 超过某个阈值时长未进行flush 。 对 1小时
D WAL 文件数达到某个阈值。 对 32 个6 影响触发小压缩 minor compaction的因素
A 有足够小的文件 对
B 有足够大的文件 错
C 需要压缩的文件个数 对
D 足够多的未合并天数 错 7 以下哪些是hbase对读取性能的优化
A 海量数据行被切分为多个region 对 分布式
B meta cache 对 客户端快速定位节点
C 列族 对 减少查询的列
D HFile上有rowkey范围索引 对 不在范围不扫描
E HFile上有布隆过滤器 对 过滤不属于该文件的rowkey
F Block Cache 对 内存
G Compaction压缩 对 减少小文件8 关于region 拆分描述正确的是
A 拆分的目的是为了避免过多的数据集中。 对 分布式
B 目前版本拆分规则采用阶梯制。 错 上个版本阶梯制
C 目前首次拆分以memstore flush为准,之后以region最大文件大小为准。 错 根据当前rs下是否是一个region memstore flush *2
D 刚刚拆分后只是逻辑拆分, 直到compaction才物理拆分。 对
E 预分区为了解决数据热点问题 对
F 预分区的region不会再发生拆分 错 大了还是会拆 9 rowkey设计要考虑的
A 唯一性 对
B 满足任意查询场景 错 只能满足个别查询场景
C 散列避免热点 对
D 长度尽可能优化 对
Flink
Flink学习仓库地址
Flink初级
Flink是一个框架是一个分布式处理引擎对有界或者无界流数据进行实时计算Flink和Sparkstreaming对比Flink Sparkstreaming时间语义 事件时间、处理时间 处理时间窗口 灵活 不够灵活状态 有状态 无状态(updateStateByKey)容错 检查点 弱动态SQL 支持 不支持Flink的分层APISQLTableAPIDataStreamAPI|DataSetAPI(Flink1.12后,基本不用)处理函数 process越顶层,使用越简单越底层,使用越灵活(可以处理较复杂的业务)通过WordCount对比DataStreamAPI和DataSetAPIDataStreamAPI DataSetAPI准备环境 StreamExecutionEnvironment ExecutionEnvironment分组 keyBy groupBy作业提交 env.execute() 自动提交处理方式 流中每来一条数据都会对其进行处理 收集齐所有数据后,处理一次Flink运行模式(指定Flink程序在什么地方执行)Standalone、k8s、mesos、Yarn...Flink集群中的角色客户端JobManagerTaskManagerFlink部署模式会话模式-session需要先启动集群多个job共享集群资源,作业在共享资源的时候,相互之间会有影响当作业取消的时候,对集群是没有影响的单作业模式-per-job不需要先启动集群当作业提交的时候,会给每一个作业单独启动一个集群当作业取消的时候,集群也会停掉应用模式-application不需要先启动集群当应用提交的时候,会给每一个应用(一个应用下(app),可能会有多个作业(job))单独启动一个集群当作业取消的时候,集群也会停掉如果是会话模式以及单作业模式,在客户端会执行一些转换操作,客户端压力比较大如果是应用模式,转换操作会到服务器(JobManager)上执行,客户端压力变小在开发的时候,首选应用模式Flink On YarnYarn会话启动集群 bin/yarn-session.shWEBUI提交作业命令行提交作业 bin/flink run -d -c 类的全限定名 jar包路径Yarn-Per-Job不需要启动集群只能通过命令行提交作业 bin/flink run -d -t yarn-per-job -c 类的全限定名 jar包路径YarnApplication不需要启动集群只能通过命令行提交作业 bin/flink run-application -d -t yarn-application -c 类的全限定名 jar包路径
Yarn应用模式作业提交模式
流程描述:
客户端将作业(做一些参数的解析和封装)以yarn-application形式打包并上传至Flink后去Yarn的ResourceManager
Yarn的ResourceManager接收到请求后根据作业封装一个AM,也是Flink的JobMaster,共包含分发器(针对每一个job封装一个jobMaster对象)、JobMaster(根据提交的作业生成逻辑、作业、执行流图)、资源管理器(根据执行流图向Yarn的资源管理器申请资源)
Yarn的ResourceManager启动TaskManager,看看哪一个TaskManager有足够的空间就创建slot,并将slot返回给对应的Task去运行job
该任务
并行度当要处理的数据量非常大时,我们可以把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks)一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)一个Flink应用程序的并行度取决于并行度最大的算子的并行度个数默认情况下,如果在本地(Idea)中运行程序,如果没有指定并行度,并行度的个数为当前CPU的线程数如何设置并行度在代码中全局设置env.setParallelism(3)针对某一个算子单独设置并行度算子.setParallelism(3)在flink的配置文件 flink-conf.yamlparallelism.default: 1在提交作业的时候通过-p参数指定并行度优先级算子单独设置并行度 > 全局设置 > 在提交作业的时候通过-p参数指定 > 在flink的配置文件算子链在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task)这样原来的算子就成为了真正任务里的一部分,这样的技术被称为“算子链”(Operator Chain)算子链是Flink提供一种非常有效的优化手段,不需要我们做什么处理,默认就会进行合并前提:算子间必须是one-to-one的关系并行度相同不能存在重分区操作
禁用算子链全局禁用算子链 env.disableOperatorChaining();算子禁用 算子.disableChaining()开始新链 算子.startNewChain()任务槽Flink程序在执行的时候,会被划分为多个算子子任务,每个子任务在执行的时候,需要TaskManager提供资源TaskManager是一个JVM进程,开始开启多个线程执行任务,每一个线程叫做任务槽任务槽可以均分TaskManager的内存资源在flink-conf.yaml的配置文件中可以设置一个tm上slot的数量taskmanager.numberOfTaskSlots: 1在设置slot数量的时候,除了要考虑内存资源外,也需要考虑CPU情况,一般slot的数量和cpu核数之间的关系是1:1或者2:1
slot共享只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行一个Flink应用执行需要的Slot的数量 = 各个共享组中并行度最大的算子并行度个数之和四张图逻辑流程---程序执行拓扑作业图 ---合并算子链执行图 ---作业图的并行化版本物理"图" --- 程序的执行过程以Yarn的应用模式为例描述Flink作业提交流程
DataStreamAPI环境准备创建本地环境StreamExecutionEnvironment.createLocalEnvironment();创建本地环境带WEBUIStreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())需要在pom.xml文件中提前添加flink-runtime-web依赖创建远程环境StreamExecutionEnvironment.createRemoteEnvironment(远程服务器ip,端口号,jar)根据实际执行场景自动创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()->如果需要自动创建本地带webUI的执行环境,需要传递Configuration,并显式指定端口号关于运行模式设置STREAMINGBATCHAUTOMATIC源算子-Source从集合中读取数据env.fromCollection(集合)从指定的元素中读取数据env.fromElements(元素...)从指定的网络端口读取数据从文件中读取数据env.readTextFile() 已过时文件连接器FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("D:\\dev\\workspace\\bigdata-0318\\input\\words.txt")).build();env.fromSource(数据源对象,WaterMark,数据源名)Flink1.12前env.addSource(SourceFunction)Flink1.12后env.fromSource(Source)从kafka主题中读取数据KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//设置kafka集群的地址.setBootstrapServers("hadoop102:9092")//设置消费的主题.setTopics("first")//设置消费者组.setGroupId("test")//设置消费位点//作为消费者,如何保证消费的精准一次:手动维护偏移量KafkaSource->KafkaSourceReader->SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>>// 从最早位点开始消费//.setStartingOffsets(OffsetsInitializer.earliest())// 从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest())// 从时间戳大于等于指定时间戳(毫秒)的数据开始消费//.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))// 从消费组提交的位点开始消费,不指定位点重置策略//.setStartingOffsets(OffsetsInitializer.committedOffsets())// 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点//.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//设置反序列化器.setValueOnlyDeserializer(new SimpleStringSchema()).build();从DataGen中读取数据自定义Source数据源转换算子-Transform基本的转化算子map、filter、flatMap聚合算子必须先进行keyBy,经过keyBy之后,得到的流的类型是KeyedStream,分组流或者键控流,只有键控流才能直接调用聚合算子聚合算子会将原来的计算结果保留到状态中,用于计算简单聚合算子sum、min、minBy、max、maxBy规约聚合reduce如果流中只有一条数据,reduce方法不会被执行reduce(value1,value2)value1:累加的结果value2:新来的数据输出算子-Sink提交作业同步提交作业 env.execute()异步提交作业 env.executeAsync()以异步提交作业为例,如果一个应用App下面有多个作业Job,部署到Yarn的不同模式下效果会话-session先启动集群,多个job都部署到同一个集群中,共享集群资源作业停止,不会对集群产生影响单作业-perjob不需要先启动集群,给每一个作业启动一个集群如果停止某一个作业,当前作业对应的集群也会停止,对其它的作业对应的集群没有影响应用-application不需要先启动集群,给每一个应用启动一个集群同一个应用下的多个作业,共享这一个集群如果停止某一个作业,整个集群停止,对其它的作业也会有影响
富函数在流中数据进行处理的时候,相关算子要求传递一个处理函数实体作为参数例如:map(MapFunction)、filter(FilterFunction)、flatMap(FlatMapFunction)默认参数声明的形式是接口类型,其实除了接口之外,每一个声明的函数类,都有一个对应的富函数(抽象类)实现富函数的形式 Rich +接口名 例如: MapFunction -> RichMapFunction富函数和普通的接口比起来,多出了如下功能提供了上下文对象,可以通过上下文对象获取更丰富的信息提供了带生命周期的方法open方法初始化的时候执行的代码在每一个算子子任务(并行度)上,只执行一次用于进行连接的初始化close方法执行结束的时候执行的代码在每一个算子子任务(并行度)上,只执行一次用于资源的释放如果处理的是无界数据,close方法不会被执行
分区操作(算子)shufflerebalancerescalebroadcastglobalone-to-one --- forwardkeyBy---hashcustom---custom
分流将一条流拆分为2条流或者多条流filter涉及对同一条流的数据处理多次,效率较低侧输出流首先定义侧输出流标签OutputTag在定义侧输出流标签的时候,存在泛型擦除可以通过匿名内部类的方式创建对象可以在创建对象的传递侧输出流中的数据类型注意:如果要向使用侧输出流,只能通过process算子对流中数据进行处理,因为在底层的processElement方法中,可以获取上下文对象合流union可以合并2条或者多条流要求:参与合并的流中数据类型必须一致connect只能对2条流进行合并参与连接的两条流数据类型可以不一致Flink读写外部系统的方式Flink1.12前env.addSource(SourceFunction)流.addSink(SinkFunction)从Flink1.12开始env.fromSource(Source)流.sinkTo(Sink)Flink从kafka中读取数据创建KafkaSourceenv.fromSource(kafkaSource)KafkaSource可以保证读取的精准一次,KafkaSource->KafkaSourceReader->成员变量维护偏移量Flink向kafka中写入数据,一致性如何保证(先了解)检查点必须要开启.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("xxx")//检查点超时时间 < 事务超时时间 <= 事务最大超时时间(15min).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,15*60*1000 + "")在消费端,设置消费的隔离级别read_committed.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed")
union
connect
Flink高级
Jdbc连接器SinkFunction jdbcSinkFunction = JdbcSink.sink(sql,给问号占位符赋值,攒批配置(可选),连接选项);流.addSink(jdbcSinkFunction)
构造者设计模式对象创建和赋值一步搞定链式调用
自定义Sinkclass 类 implements SinkFunction|extends RichSinkFunction{invoke:向不同外部系统写入数据的逻辑}时间语义事件时间:数据真正产生的时间处理时间:Flink中算子对数据进行处理的操作摄入时间(了解):数据进入到Flink的Source时间从Flink1.12开始,默认时间语义事件时间窗口将无限的流数据划分一个个有限的数据库进行处理,就是所谓的窗口在理解窗口的时候,窗口是"桶"不是"框"窗口分类按照驱动方式分时间窗口:以时间作为窗口起始或者结束的标记计数窗口:以元素的个数作为窗口的结束标记按照数据划分的方式分滚动窗口特点:窗口和窗口之间首尾相接,不会重叠,同一个元素不会同时属于多个窗口需要传递的参数:窗口大小滑动窗口特点:窗口和窗口会重叠,同一个元素同时属于多个窗口需要传递的参数:窗口大小、滑动步长会话窗口特点:窗口和窗口不会重叠,同一个元素不会同时属于多个窗口,窗口和窗口之间应该有一个间隙(Gap)需要传递的参数:时间间隔全局窗口特点:将数据放到同一个窗口中,默认是没有结束的,所以在使用全局窗口的时候,需要自定义触发器计数窗口的底层就是通过全局窗口实现的窗口API开窗前是否进行了keyBy操作没有keyBy,针对整条流进行开窗,相当于并行设置为1wsDS.windowAll()wsDS.countWindowAll()keyBy,针对keyBy之后的每一组进行单独开窗,窗口之间相互不影响wsDS.keyBy().window()wsDS.keyBy().countWindow()窗口分配器---开什么样的窗口滚动处理时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))滑动处理时间窗口.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(2)))处理时间会话窗口.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(10)))滑动事件时间窗口.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))事件时间会话窗口.window(EventTimeSessionWindows.withGap(Time.seconds(10)))滚动计数窗口.countWindow(10)滑动计数窗口.countWindow(10,2)全局窗口.window(GlobalWindows.create())窗口处理函数---如何处理窗口中的元素增量处理函数---窗口中来一条数据处理一次,不会对数据进行缓存优点:不会缓存数据,省空间缺点:获取不到窗口更详细的信息.reduce()窗口中元素的类型以及向下游传递的类型是一致的如果窗口中只有一个元素,reduce方法不会被执行reduce(value1,value2)value1:中间累加结果value2:新来的数据.aggregate()窗口中元素的类型、累加器的类型以及向下游传递的数据的类型可以不一致createAccumulator:属于当前窗口的第一个元素进来的时候执行add:窗口中每来一条数据都会被执行一次getResult: 窗口触发计算的时候执行一次全量处理函数---会将当前窗口的数据全部缓存下来,等窗口触发计算的时候再进行处理优点:可以获取窗口更详细的信息缺点:会将窗口的数据进行缓存,比较占用空间.apply()apply(String key, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out).process()process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out)process更底层,通过context对象除了可以获取窗口对象之外,还可以获取更丰富的信息在实际使用的过程中,可以增量 + 全量reduce + apply (WindowFunction)reduce + process (ProcessWindowFunction)aggregate + apply (WindowFunction)aggregate + process (ProcessWindowFunction)窗口触发器---何时触发窗口的计算.trigger()窗口移除器---在窗口触发计算后,在处理函数执行前或者后执行一些移除操作.evictor()以滚动处理时间窗口为例什么时候创建窗口对象当有属于这个窗口的第一个元素到来的时候,创建窗口对象窗口对象是左闭右开的[ )窗口对象的起始和结束时间起始时间:向下取整final long remainder = (timestamp - offset) % windowSize;// handle both positive and negative casesif (remainder < 0) {return timestamp - (remainder + windowSize);} else {return timestamp - remainder;}结束时间: 起始时间 + 窗口大小最大时间: 结束时间 - 1ms窗口如何触发计算当系统时间到了窗口的最大时间的时候触发计算
水位线前提:事件时间语义在Flink内部维护一个逻辑时钟,用于衡量事件时间的进展,我们称这个逻辑时钟为水位线(Watermark)主要用于事件时间窗口的触发、事件时间定时器的执行水位线也会作为流中的元素向下游传递flink会任务水位线前的数据都已经处理完了水位线是递增的,不会变小maxTimestamp - outOfOrdernessMillis - 1Flink提供水位线生成的两种方式单调递增--流中的数据不会出现乱序.<WaterSensor>forMonotonousTimestamps()new AscendingTimestampsWatermarks ---> extends ---> BoundedOutOfOrdernessWatermarks有界乱序--流中的数据可能出现乱序.<WaterSensor>forBoundedOutOfOrderness(Duration.ofMillis(3))new BoundedOutOfOrdernessWatermarks单调递增是有界乱序的子类,单调递增是有界乱序的一种特殊情况,乱序程度是0水位线的传递上游是一个并行度,下游是多个并行度广播上游是多个并行度,下游是一个并行度将上游所有的并行度上水位值拿过来取最小上游是多个并行度,下游也是多个并行度先广播,再取最小空闲数据源
以滚动事件时间窗口为例窗口对象什么时候创建当属于这个窗口的第一个元素到来的时候创建窗口对象窗口起始时间向下取整的算法窗口的结束时间起始时间 + 窗口大小窗口最大时间maxTimestamp = 结束时间 - 1ms窗口什么时候触发计算水位线到了窗口的最大时间window.maxTimestamp() <= ctx.getCurrentWatermark()窗口什么时候关闭水位线到了 window.maxTimestamp() + allowedLateness迟到数据的处理指定水位线的生成策略为有界乱序,指定乱序程度在开窗的时候,指定窗口的允许迟到时间侧输出流基于时间双流join基于窗口滚动滑动会话基于状态IntervalJoin基本语法:keyedA.intervalJoin(keyedB).between(下界,上界).process()底层实现:connect + 状态具体处理步骤判断是否迟到将当前数据放到状态中缓存起来用当前数据和另外一条流中缓存的数据进行关联清状态局限性:只支持内连接
迟到数据的处理
- 指定水位线的生成策略为有界乱序,指定乱序程度
- 在开窗的时候,指定窗口的允许迟到时间
- 侧输出流
处理函数.process(处理函数)处理函数可以获取上下文对象。通过上下文对象可以操作侧输出流、可以获取TimeService
处理函数分类ProcessFunction最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。KeyedProcessFunction对流按键分组后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,必须基于KeyedStream。ProcessWindowFunction开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。ProcessAllWindowFunction同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。CoProcessFunction合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。ProcessJoinFunction间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。BroadcastProcessFunction广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。KeyedBroadcastProcessFunction按键分组的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。
定时器当处理时间或者事件时间到了定时器指定的时间点的时候定时器会触发执行---onTimer注意:只有KeyedProcessFunction中才能使用定时器
状态作用:用于保存程序运行的中间结果状态的分类原始状态托管状态(我们主要学习)算子状态作用范围:和普通的成员变量作用范围相同,算子子任务(每个并行度/每个slot) 普通的成员变量只能在内存中保存,状态可以被持久化ListState状态的使用流程实现CheckpointedFunction接口在成员变量的位置声明状态重写initializeState和snapshotState方法在initializeState方法中给状态进行初始化context.getOperatorStateStore().getListState(listStateDescriptor)在snapshotState方法中将成员变量放到状态中存起来在处理函数中使用的还是成员变量BroadcastState状态的使用流程准备两条流(非广播流-主流、广播流-规则|配置流)广播流进行广播---broadcast(广播状态描述器)将主流和广播流进行关联---connect对关联后的数据进行处理---process【Keyed】BroadcastProcessFunctionprocessElement:处理主流数据 --- 从广播状态取规则|配置对主流数据进行处理processBroadCastElement:处理广播流数据 --- 将数据放到广播状态中将处理结果向下游传递键控状态作用范围:经过keyBy之后的每一个组单独维护一个状态,组和组之间状态不共享,相互隔离值状态ValueState列表状态ListStatemap状态MapState规约状态ReducingState聚合状态AggregatingState键控状态的使用流程在成员变量的位置声明状态在open方法中给状态进行初始化getRuntimeContext().getState(状态描述器对象)在处理函数中使用状态状态后端主要决定状态以及检查点存储位置Flink1.13前状态 检查点Memory TaskManager堆内存 JobManager堆内存Fs TaskManager堆内存 文件系统 ,例如HDFSRocksDB rocksDB库 文件系统 ,例如HDFSFlink1.13后HashMap TaskManager堆内存 JobManager堆内存||文件系统 ,例如HDFSRocksDB rocksDB库 文件系统 ,例如HDFS检查点检查点是对状态进行的备份是状态的副本底层使用异步分界线快照算法核心:barrier工作原理当到了检查点触发时机后,JobManager中的检查点协调器会向Source发送barrier当source接收到barrier后,会将source的状态进行备份barrier也会随着流中的流动向下游传递下游算子接收到barrier后,也会对状态进行备份直到barrier走到sink,对sink上的状态做完备份后,说明barrier之前的数据肯定是已经处理完了相关的状态也已经都备份成功,这次检查点结束
barrier的传递(指的是barrier对齐的情况)上游是一个并行度,下游是多个并行度,广播上游是多个并行度,下游是一个并行度,将上游所有的barrier收齐后再进行备份上游是多个并行度,下游是多个并行度,先广播,再对齐检查点算法Barrier对齐的精准一次在等待barrier对齐的过程中,如果已经到达的barrier上,又有新的数据进来不会对其进行处理,会将新来的数据缓存起来的,等到barrier对齐后再进行处理优点:可以保证精准一次缺点:时效性差Barrier对齐的至少一次在等待barrier对齐的过程中,如果已经到达的barrier上,又有新的数据进来直接对其进行处理,如果出现故障,可能会出现重复处理的情况优点:时效性好缺点:保证不了精准一次非Barrier对齐的精准一次(Flink1.11后)只要上游并行度的barrier过来,就开始进行备份,但是在备份前,需要做如下几件事将已经到达的barrier移到算子的输出缓存区末端标记当前barrier跳过的数据标记其它未到的barrier之前的数据将标记的数据以及状态都保存到检查点优点:时效性好,能够保证精准一次缺点:保存数据量大,如果数据量大、算子链长、性能底
非barrier对齐的精准一次 过程:
当barrier来时,将其从输入缓冲区直接放到输出缓冲区末端,并将其(barrier)和跳过的数据(已处理的、待处理的)、和其他流的同一批未到来barrier之前的数据都保存到当前检查点(即备份)
检查点相关的配置启用检查点检查点存储检查点模式(CheckpointingMode)超时时间(checkpointTimeout)最小间隔时间(minPauseBetweenCheckpoints)最大并发检查点数量(maxConcurrentCheckpoints)开启外部持久化存储(enableExternalizedCheckpoints)--job取消后,检查点是否保留检查点连续失败次数(tolerableCheckpointFailureNumber)非对齐检查点(enableUnalignedCheckpoints)对齐检查点超时时间(alignedCheckpointTimeout)通用增量 checkpoint (changelog)最终检查点
检查点配置位置在flink-conf.yaml文件中在代码中通过API指定
保存点底层原理实现和检查点一样检查点是程序自动进行快照保存点是程序员手动进行快照--必须指定uid
一致性级别最多一次(At-Most-Once)至少一次(At-Least-Once)精确一次(Exactly-Once)Flink的端到端一致性Source(读取数据的外部系统)可重置偏移量Flink流处理框架本身检查点Sink(写入数据的外部系统)幂等外部系统必须支持幂等事务WAL:外部系统不支持事务*2PC:外部系统支持事务
两次提交过程描述
当处理完的数据到达Flink程序的SInk端时,要在接收端(如Kafka)开启一个事务,然后再将数据写过去(预提交)
一直写,直到barrier到达Sink,将当前检查点的状态(事务执行状态:未提交)保存起来,并开启新的状态,当所有并行度的检查点做完这两件事之后,JobManager向所有节点发送“本轮成功”的回调消息,预提交结束,进行真正提交,将写入Kafka的数据标记为已提交(若该步骤未完成,就从上次快照(检查点,就是本轮的检查点)的位置重新提交),提交成功后,Kafka会给SInk返回一个值,此时,Kafka将检查点(事务执行)状态修改为:已完成
设置事务的超时时间
检查点超时时间 < 事务的超时时间 <= 事务的最大超时时间
Kafka案例演示
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;/*** 该案例演示了两阶段提交_Sink* 如果将Flink流中的数据写到Kafka主题中,要想保证写入的精准一次,需要做如下操作* 开启检查点* 设置一致性级别为精准一次(默认barrier对齐)* .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)* 设置事务id的前缀* .setTransactionalIdPrefix("2pc_sink")* 设置事务的超时时间* 检查点超时时间 < 事务的超时时间 <= 事务的最大超时时间* 在消费端设置消费隔离级别* .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed") **/
public class Flink02_2pc_Sink {public static void main(String[] args) throws Exception {//TODO 1.环境准备//1.1 指定流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//1.2 设置并行度env.setParallelism(1);//开启检查点env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(60000L);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));//TODO 2.从指定的网络端口读取数据DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 8888);//TODO 3.将流的数据写到kafka的主题//3.1 创建KafkaSinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("first").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("2pc_sink").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,15*60*1000 + "").build();//3.2 写入socketDS.sinkTo(kafkaSink);//TODO 4.提交作业env.execute();}
}
启动yarn的历史服务器bin/historyserv
flink客户端:bin/sql-client.sh -i conf/sql-client-init.sql