撤回流的处理
撤回流是指流式处理过程中,两表join过程中的数据是一条一条跑过来的,即原本可以join到一起的数据在刚开始可能并没有join上。
- 撤回流的格式:
- 解决方案
- 定时器:使用定时器定时10s(数据最大的时间差值),定时器触发时将状态中的数据发送过来
- 如果重复计算这些数据,如何保持结果正确即可;通过每次度量值修改为
当次度量值 - 上次度量值
即可
异步IO
- 减少等待的时间,充分利用已有的资源
- 使用异步IO时,必须保证从头到尾都是异步的操作;即使用异步的连接器
/*** 获取到 redis 的异步连接** @return 异步链接对象*/public static StatefulRedisConnection<String, String> getRedisAsyncConnection() {RedisClient redisClient = RedisClient.create("redis://hadoop102:6379/2");return redisClient.connect();}/*** 关闭 redis 的异步连接** @param redisAsyncConn*/public static void closeRedisAsyncConnection(StatefulRedisConnection<String, String> redisAsyncConn) {if (redisAsyncConn != null) {redisAsyncConn.close();}}/*** 获取到 Hbase 的异步连接** @return 得到异步连接对象*/public static AsyncConnection getHBaseAsyncConnection() {Configuration conf = new Configuration();conf.set("hbase.zookeeper.quorum", "hadoop102");conf.set("hbase.zookeeper.property.clientPort", "2181");try {return ConnectionFactory.createAsyncConnection(conf).get();} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭 hbase 异步连接** @param asyncConn 异步连接*/public static void closeAsyncHbaseConnection(AsyncConnection asyncConn) {if (asyncConn != null) {try {asyncConn.close();} catch (IOException e) {throw new RuntimeException(e);}}}
异步IO关联
AsyncDataStream.unorderedWait(异步核心逻辑, 60, TimeUnit.SECONDS)
异步关联维度表CompletableFuture.supplyAsync(new Supplier<>(){ 异步访问读取Redis中的数据 })
,返回的数据类型是Future类型- 先拼写访问的redisKey
- 获取到dimSkuInfoFuture期货
- 使用dimSkuInfoFuture.get()获取异步结果
thenApplyAsync(new Function<>())
, 旁路缓存判断,判断是否在redis中读取到相关数据,如果没有读取到,需要访问HBase.- 需要重写HBase的getCells方法,改为getAsyncCells方法
- 连接更换为异步连接
- Future类型数据需要再get()方法获取具体的值
- 无需关闭连接
- 将从HBase读取的数据保存到redis,
redisAsyncConnection.async().setex(redisKey,24*60*60,dimJsonObj.toJSONString());
异步维度关联封装
- 继承
RichAsyncFunction<TradeSkuOrderBean, TradeSkuOrderBean>
接口 - 将表名和rowkey拼接的方法抽象化,让方法调用者自己传进来
- 封装join方法,
join(TradeSkuOrderBean input, JSONOjbect dim)
; join方法里面填写度量值的聚合逻辑 - 将抽象方法和具体方法分离,把抽象方法放到接口中,在实现该接口
- 将
TradeSkuOrderBean
类改为泛型方法T
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T>
implements DimJoinFunction<T>{StatefulRedisConnection<String, String> redisAsyncConnection;AsyncConnection hBaseAsyncConnection;String tableName;@Overridepublic void open(Configuration parameters) throws Exception {redisAsyncConnection = RedisUtil.getRedisAsyncConnection();hBaseAsyncConnection = HBaseUtil.getHBaseAsyncConnection();}@Overridepublic void close() throws Exception {RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);HBaseUtil.closeAsyncHbaseConnection(hBaseAsyncConnection);}@Overridepublic void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {//java的异步编程方式String tableName = getTableName();String rowKey = getId(input);String redisKey = RedisUtil.getRedisKey(tableName, rowKey);CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {//第一步异步访问得到的数据RedisFuture<String> dimSkuInfoFuture = redisAsyncConnection.async().get(redisKey);String dimInfo = null;try {dimInfo = dimSkuInfoFuture.get();} catch (Exception e) {e.printStackTrace();}return dimInfo;}}).thenApplyAsync(new Function<String, JSONObject>() {@Overridepublic JSONObject apply(String dimInfo) {JSONObject dimJsonObj = null;//旁路缓存判断if (dimInfo == null || dimInfo.isEmpty()) {try {//需要访问HBasedimJsonObj = HBaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, tableName, rowKey);//将读取的数据保存到redisredisAsyncConnection.async().setex(redisKey, 24 * 60 * 60, dimJsonObj.toJSONString());} catch (Exception e) {e.printStackTrace();}} else {//redis中存在缓存数据dimJsonObj = JSONObject.parseObject(dimInfo);}return dimJsonObj;}}).thenAccept(new Consumer<JSONObject>() {public void accept(JSONObject dim) {//合并维度信息if (dim == null) {//无法关联到维度信息System.out.println("无法关联到当前的维度信息:" + tableName + ":" + rowKey);} else {join(input,dim);}//返回结果resultFuture.complete(Collections.singletonList(input));}});}}