聊聊PowerJob的OmsLogHandler

本文主要研究一下PowerJob的OmsLogHandler

OmsLogHandler

tech/powerjob/worker/background/OmsLogHandler.java

@Slf4j
public class OmsLogHandler {private final String workerAddress;private final Transporter transporter;private final ServerDiscoveryService serverDiscoveryService;// 处理线程,需要通过线程池启动public final Runnable logSubmitter = new LogSubmitter();// 上报锁,只需要一个线程上报即可private final Lock reportLock = new ReentrantLock();// 生产者消费者模式,异步上传日志private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);// 每次上报携带的数据条数private static final int BATCH_SIZE = 20;// 本地囤积阈值private static final int REPORT_SIZE = 1024;public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {this.workerAddress = workerAddress;this.transporter = transporter;this.serverDiscoveryService = serverDiscoveryService;}/*** 提交日志* @param instanceId 任务实例ID* @param logContent 日志内容*/public void submitLog(long instanceId, LogLevel logLevel, String logContent) {if (logQueue.size() > REPORT_SIZE) {// 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁new Thread(logSubmitter).start();}InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);boolean offerRet = logQueue.offer(tuple);if (!offerRet) {log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);}}//......
}    

OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(1024),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue

LogSubmitter

    private class LogSubmitter implements Runnable {@Overridepublic void run() {boolean lockResult = reportLock.tryLock();if (!lockResult) {return;}try {final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();// 当前无可用 Serverif (StringUtils.isEmpty(currentServerAddress)) {if (!logQueue.isEmpty()) {logQueue.clear();log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");}return;}List<InstanceLogContent> logs = Lists.newLinkedList();while (!logQueue.isEmpty()) {try {InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);logs.add(logContent);if (logs.size() >= BATCH_SIZE) {WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));// 不可靠请求,WEB日志不追求极致TransportUtils.reportLogs(req, currentServerAddress, transporter);logs.clear();}}catch (Exception ignore) {break;}}if (!logs.isEmpty()) {WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);TransportUtils.reportLogs(req, currentServerAddress, transporter);}}finally {reportLock.unlock();}}}

LogSubmitter实现了Runnable接口,其run方法先通过reportLock加锁,成功才继续,它通过serverDiscoveryService.getCurrentServerAddress()获取当前server的地址,若获取不到则清空logQueue;否则while循环,每次从logQueue拉取InstanceLogContent,放到linkedList,超过BATCH_SIZE(20)则创建WorkerLogReportReq,通过TransportUtils.reportLogs(req, currentServerAddress, transporter)上报,然后清空linkedList,跳出循环之后再上报剩下的日志,最后释放锁

reportLogs

tech/powerjob/worker/common/utils/TransportUtils.java

    public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);transporter.tell(url, req);}public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {HandlerLocation handlerLocation = new HandlerLocation().setRootPath(rootPath).setMethodPath(handlerPath);return new URL().setServerType(serverType).setAddress(Address.fromIpv4(address)).setLocation(handlerLocation);}    

reportLogs先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog

tell

AkkaTransporter

tech/powerjob/remote/akka/AkkaTransporter.java

    public void tell(URL url, PowerSerializable request) {ActorSelection actorSelection = fetchActorSelection(url);actorSelection.tell(request, null);}

AkkaTransporter直接使用actorSelection发送请求

VertxTransporter

tech/powerjob/remote/http/vertx/VertxTransporter.java

    public void tell(URL url, PowerSerializable request) {post(url, request, null);}private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {final String host = url.getAddress().getHost();final int port = url.getAddress().getPort();final String path = url.getLocation().toPath();RequestOptions requestOptions = new RequestOptions().setMethod(HttpMethod.POST).setHost(host).setPort(port).setURI(path);// 获取远程服务器的HTTP连接Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);// 转换 -> 发送请求获取响应Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->httpClientRequest.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON).send(JsonObject.mapFrom(request).toBuffer()));return responseFuture.compose(httpClientResponse -> {// throw exceptionfinal int statusCode = httpClientResponse.statusCode();if (statusCode != HttpResponseStatus.OK.code()) {// CompletableFuture.get() 时会传递抛出该异常throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",host, port, path, statusCode, httpClientResponse.statusMessage()));}return httpClientResponse.body().compose(x -> {if (clz == null) {return Future.succeededFuture(null);}if (clz.equals(String.class)) {return Future.succeededFuture((T) x.toString());}return Future.succeededFuture(x.toJsonObject().mapTo(clz));});}).toCompletionStage();}    

VertxTransporter则使用post方法通过vertx的httpClient发送请求

processWorkerLogReport

tech/powerjob/server/core/handler/AbWorkerRequestHandler.java

    @Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)public void processWorkerLogReport(WorkerLogReportReq req) {WorkerLogReportEvent event = new WorkerLogReportEvent().setWorkerAddress(req.getWorkerAddress()).setLogNum(req.getInstanceLogContents().size());try {processWorkerLogReport0(req, event);event.setStatus(WorkerLogReportEvent.Status.SUCCESS);} catch (RejectedExecutionException re) {event.setStatus(WorkerLogReportEvent.Status.REJECTED);} catch (Throwable t) {event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);log.warn("[WorkerRequestHandler] process worker report failed!", t);} finally {monitorService.monitor(event);}}

processWorkerLogReport通过processWorkerLogReport0进行处理,最后通过monitorService.monitor(event)上报监控

processWorkerLogReport0

tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java

    @Overrideprotected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());}

processWorkerLogReport0通过instanceLogService.submitLogs进行上报

submitLogs

tech/powerjob/server/core/instance/InstanceLogService.java

    /*** 提交日志记录,持久化到本地数据库中* @param workerAddress 上报机器地址* @param logs 任务实例运行时日志*/@Async(value = PJThreadPool.LOCAL_DB_POOL)public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {List<LocalInstanceLogDO> logList = logs.stream().map(x -> {instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());LocalInstanceLogDO y = new LocalInstanceLogDO();BeanUtils.copyProperties(x, y);y.setWorkerAddress(workerAddress);return y;}).collect(Collectors.toList());try {CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));}catch (Exception e) {log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);}}

InstanceLogService通过PJThreadPool.LOCAL_DB_POOL线程池进行异步,它通过localInstanceLogRepository.saveAll(logList)保存到本地数据库

monitor

tech/powerjob/server/monitor/PowerJobMonitorService.java

    public void monitor(Event event) {monitors.forEach(m -> m.record(event));}

monitor方法遍历monitors,挨个执行record

LogMonitor

tech/powerjob/server/monitor/monitors/LogMonitor.java

    public void record(Event event) {MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId()));LoggerFactory.getLogger(event.type()).info(event.message());}

LogMonitor的record方法通过日志打印event信息

小结

PowerJob的OmsLogHandler提供了submitLog方法,它先判断logQueue大小是否超过REPORT_SIZE(1024),超过则通过异步线程执行logSubmitter;接着将内容包装为InstanceLogContent,然后放入到logQueue;logSubmitter主要是执行reportLogs,它先通过easyBuildUrl构建URL,再通过transporter.tell(url, req)发送请求,rootPath为server,handlerPath为reportLog;服务端的processWorkerLogReport通过processWorkerLogReport0进行处理(通过localInstanceLogRepository.saveAll(logList)保存到本地数据库),最后通过monitorService.monitor(event)上报监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/409864.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[html] 为什么移动端页面的设计稿一般是750px/640px呢?

[html] 为什么移动端页面的设计稿一般是750px/640px呢&#xff1f; 750px 代表iphone6或inphone6s 设备的像素(宽) 640px 代表inpone3Gs&#xff0c;inpone4/4s iphone5系列 设备的像素(宽) 其他手机大多数时这两种规格 750px/640px 代表的逻辑像素是 375px/320px&#xff0c;…

m文件中函数的执行顺序

当进行多个GUI协同工作时&#xff0c;要用到uiwait和uiresume函数。此时&#xff0c;理解函数的执行顺序此时是很关键的。 首先理解uiwait和uiresume函数的作用。 uiwait函数&#xff1a;阻塞m文件的执行&#xff0c;直到uiresume解除这种阻塞&#xff1b; uiresume函数&#x…

Java从string数组创建临时文件

//从string数组创建临时文件 private static File createSampleFile(String[] strs) throws IOException {File file File.createTempFile("aws-java-sdk-", ".txt");file.deleteOnExit();Writer writer new OutputStreamWriter(new FileOutputStream(fi…

关刀机器人_小学机器人活动总结

页眉内容开展青少年机器人创新实践活动情况个人总结为进一步推动我校机器人活动的普及与发展&#xff0c;培养学生对科技制作的兴趣&#xff0c;促进学生整体素质的提高&#xff0c;同时培养学生动手实践的能力&#xff0c;我校于2016年3月开展了机器人创新实践活动。在学校领导…

[html] iframe可以使用父页面中的资源吗(如:css、js等)?

[html] iframe可以使用父页面中的资源吗&#xff08;如&#xff1a;css、js等&#xff09;&#xff1f; iframe 属于一个单独的文档不能直接使用父页面的资源&#xff0c;css的层叠不会影响iframeiframe如果和父页面同域则可以在iframe中使用parent对象来使用父页的js对象个人简…

error LNK2001:unresolved external symbol __imp__@ 解决方法

我在程序Lan中使用了winsock函数&#xff0c;出现如下错误&#xff1a;Lan.obj&#xff1a;error LNK2001: unresolved external symbol __imp__listen8 Lan.obj : error LNK2001: unresolved external symbol __imp__bind12 Lan.obj : error LN…

[html] 怎么去除img之间存在的间隔缝隙?

[html] 怎么去除img之间存在的间隔缝隙&#xff1f; 1.修改display:block/flex&#xff0c; 2.父级设置font-size:0个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题…

zabbix基础之环境搭建

zabbix入门 环境部署 安装mysql #安装MySQL&#xff0c;官方的MySQL的repo源地址&#xff1a;http://repo.mysql.com/ #选择指定的MySQL版本&#xff0c;我这里选mysql5.7的版本,复制对应版本的链接地址。 wget http://repo.mysql.com/mysql57-community-release-el7-10.noarch…

wordpress函数手册_WordPress主题开发手册

functions.php文件是您为 WordPress 主题添加功能的唯一位置。您可以在其中把自定义功能挂载到 WordPress 的核心功能上&#xff0c;使您的主题更加模块化、更具扩展性、功能更加丰富。什么是functions.php&#xff1f;functions.php文件的行为类似于 WordPress 插件&#xff0…

python中_python中的一些用法总结

用python写了一个测试Demo&#xff0c;其中涉及到一些常用的用法&#xff0c;现在记录在这里&#xff0c;方便后续查阅&#xff1a; 1 python中全局变量的使用&#xff1a; 全局变量在所有程序的外部进行定义&#xff0c;再函数内部使用的时候分为两种情况&#xff0c;一种是只…

Android 通讯录学习笔记之——目标:调用系统通讯录的编辑功能

资料来源&#xff1a;http://www.eoeandroid.com/thread-37271-1-1.html 代码功能&#xff1a;调用系统自带的编辑联系人功能 代码片段 // 如下用法在ANADOIR 2.3.6上面会报 “Caused by: android.database.sqlite.SQLiteException: no such column: raw_contact_id:” 异常  …

[html] table中给td设置宽度无效怎么解决?

[html] table中给td设置宽度无效怎么解决&#xff1f; 默认是列宽度由单元格内容设定 table 添加css tableLayout :fixed;个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端…

使用 vue-qrcode 生成二维码

直接安装xkeshi/vue-qrcode包 "dependencies": {"xkeshi/vue-qrcode": "^1.0.0"}<template id"demo"><div class"container"><qrcode :value"qrcodeUrl" :options"{ size: 100 }">&l…

台达变频器485通讯接线图_台达变频器RS485通讯设置

变频器配置一、00(第79页)功能码 名称 设定范围00-02 参数重置(基底频率为50HZ) 900-03 开机显示画面选择 100-20 频率指令来源设定 100-21 运转指令来源设定 2二、09(第101页…

新版个人所得税计算python_最新个税计算 / 个税计算器 小程序 wepy 开发

根据最新税改后计算个人所得税的计算器。 如有其它疑惑&#xff0c;也欢迎提出任何修改意见。可以在主题下留言或者在小程序中点击联系在线客服或者加入qq群&#xff1a;869113926 1、扫一扫2、效果图这次开发遇到几个问题因此记录下来。 (1)、在开发微信小程序组件框架时&…

[html] 如何在页面上显示Emoji表情?

[html] 如何在页面上显示Emoji表情&#xff1f; 如果客户端发送了一个带条件的GET 请求且该请求已被允许&#xff0c;而文档的内容&#xff08;自上次访问以来或者根据请求的条件&#xff09;并没有改变&#xff0c;则服务器应当返回这个304状态码。简单的表达就是&#xff1a…

Google Analytics 搜索引擎来源

转载于:https://www.cnblogs.com/dabaopku/archive/2012/05/08/2490164.html

实现点击按钮复制文本(Clipboard包)

操作如下&#xff1a; npm install clipboard --save import Clipboard from clipboard; Vue.use(VueClipboard) <template id"demo"><div class"container"><!-- 文本内容 --><input type"text" v-model"message&quo…

明机器人孔尧是哪里人_明我创始人孔尧:未来办公,始于聚点

2018 全球人工智能与机器人峰会(CCF-GAIR)于日前在深圳召开&#xff0c;这是国内人工智能和机器人学术界、工业界及投资界三大领域的顶级交流盛会&#xff0c;旨在打造国内人工智能领域最具实力的跨界交流合作平台。本次盛会共吸引超过2500名业界人士参与&#xff0c;包括了行业…

python 图片 变清晰_python实现图片变亮或者变暗的方法

python实现图片变亮或者变暗的方法 这篇文章主要介绍了python实现图片变亮或者变暗的方法,涉及Python中Image模块操作图片的相关技巧,分享给大家供大家参考。具体实现方法如下&#xff1a; import Image # open an image file (.jpg or.png) you have in the working folder im…