logstash-input-redis源码解析

首先是程序的自定义,这里设置了redis插件需要的参数,默认值,以及校验等。

然后注册Redis实例需要的信息,比如key的名字或者url等,可以看到默认的data_type是list模式。

程序运行的主要入口,根据不同的data_type,传递不同的实现方法,然后调用listener_loop执行循环监听

Listner_loop方法传递了两个参数,一个是监听器实现的方法,一个是处理的数据队列。循环是每秒钟执行一次,如果循环标识被设置,则退出。

上面的循环方法可以看到,是通过一个参数shutdown_requested来判断是否继续循环。该参数通过tear_down方法设置为true,然后根据不同的模式,指定不同的退出方式。
如果是list模式,则直接退出;如果是channel模式,则发送redis的unsubsribe命令退出;如果是pattern_channel,则发送punsubscribe退出。

在循环内部,判断是否已经创建了redis实例,如果没有创建,则调用connect方法创建;否则直接执行。

这里前一段是调用Redis的new方法,初始化一个redis实例。紧接着判断batch_count是否大于1,如果等于1,就什么也不做,然后返回redis。
如果batch_count大于1,那么就调用load_batch_script方法,加载Lua脚本,存储到redis中的lua脚本字典中,供后面使用。代码如下:

上面的代码应该是这个插件最难理解的部分了。为了弄清楚这段代码的工作,需要了解下面几个知识点:

  • lua脚本基本概念
  • Redis中的EVAL命令如何使用
  • 理解上面脚本的工作

首先,要想运行上面的脚本,必须是Redis2.6+的版本,才支持EVAL,否则会报错!EVAL命令与js中的差不多,就是可以把某一个字符串当做命令解析,其中字符串就包括lua脚本。这样有什么好处呢?

说白了,就是能一次性进行多个操作。比如我们可以在脚本中写入一连串的操作,这些操作会以原子模式,一次性在服务器执行完,在返回回来。

Lua脚本

关于lua脚本,其实没有详细研究的必要,但是一定要知道一个local和table的概念。local是创建本地的变量,这样就不会污染redis的数据。table是lua的一种数据结构,有点类似于json,可以存储数据。

EVAL命令

另外还要知道EVAL命令的使用方法,看下面这个命令,就好理解了!
EVAL "return KEYS[1] KEYS[2] ARGV[1] ARGV[2];" 2 name:xing age:13
就会返回:

name
age
xing
13

这段代码没有经过真正的操作,但是有助于理解就好!也就是说,EVAL后面跟着一段脚本,脚本后面跟着的就是参数,可以通过KEYS和ARGV数组获得,但是下标从1开始。

再来说说EVAL命令,它的执行过程如下:

  • 解析字符串脚本,根据校验和生成lua的方法
  • 把校验和和函数放入一个lua_script字典里面,之后就可以通过EVALSHA命令直接使用校验和执行函数。

有了这些理论基础以后,就可以看看上面的代码都做了什么了!
首先是获取参数,这个参数赋值给i;然后创建了一个对象res;紧接着调用llen命令,获得指定list的长度;如果list的长度大于i,则什么也不做;如果小于i,那么i就等于lenth;然后执行命令lpop,取出list中的元素,一共取i次,放入res中,最后返回。

说得通俗点,就是比较一下list元素个数与设置batch_count的值。如果batch_count为5,列表list中有5条以上的数据,那么直接取5条,一次性返回;否则取length条返回。

可以看到这段脚本的作用,就是让logstash一次请求,最多获得batch_count条事件,减小了服务器处理请求的压力。

讲完这段代码,可以看看不同的工作模式的实现代码了:

首先是list的代码,其实就是执行BLPOP命令,获取数据。如果在list模式中,还会去判断batch_count的值,如果是1直接退出;如果大于1,则使用evalsha命令调用之前保存的脚本方法。

至于channel和pattern_channel,就没啥解释的了,就是分别调用subscribe和psubsribe命令而已。

其实最难理解的,就是中间那段lua脚本~明白它的用处,redis插件也就不难理解了。

代码

# encoding: utf-8
require "logstash/inputs/base"
require "logstash/inputs/threadable"
require "logstash/namespace"# This input will read events from a Redis instance; it supports both Redis channels and lists.
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
# the channel commands used by Logstash are found in Redis v1.3.8+.
# While you may be able to make these Redis versions work, the best performance
# and stability will be found in more recent stable versions.  Versions 2.6.0+
# are recommended.
#
# For more information about Redis, see <http://redis.io/>
#
# `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or
# newer. Anything older does not support the operations used by batching.
#
class LogStash::Inputs::Redis < LogStash::Inputs::Threadableconfig_name "redis"default :codec, "json"# The `name` configuration is used for logging in case there are multiple instances.# This feature has no real function and will be removed in future versions.config :name, :validate => :string, :default => "default", :deprecated => true# The hostname of your Redis server.config :host, :validate => :string, :default => "127.0.0.1"# The port to connect on.config :port, :validate => :number, :default => 6379# The Redis database number.config :db, :validate => :number, :default => 0# Initial connection timeout in seconds.config :timeout, :validate => :number, :default => 5# Password to authenticate with. There is no authentication by default.config :password, :validate => :password# The name of the Redis queue (we'll use BLPOP against this).# TODO: remove soon.config :queue, :validate => :string, :deprecated => true# The name of a Redis list or channel.# TODO: change required to trueconfig :key, :validate => :string, :required => false# Specify either list or channel.  If `redis\_type` is `list`, then we will BLPOP the# key.  If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.# If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.# TODO: change required to trueconfig :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false# The number of events to return from Redis using EVAL.config :batch_count, :validate => :number, :default => 1publicdef registerrequire 'redis'@redis = nil@redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"# TODO remove after setting key and data_type to trueif @queueif @key or @data_typeraise RuntimeError.new("Cannot specify queue parameter and key or data_type")end@key = @queue@data_type = 'list'endif not @key or not @data_typeraise RuntimeError.new("Must define queue, or key and data_type parameters")end# end TODO@logger.info("Registering Redis", :identity => identity)end # def register# A string used to identify a Redis instance in log messages# TODO(sissel): Use instance variables for this once the @name config# option is removed.privatedef identity@name || "#{@redis_url} #{@data_type}:#{@key}"endprivatedef connectredis = Redis.new(:host => @host,:port => @port,:timeout => @timeout,:db => @db,:password => @password.nil? ? nil : @password.value)load_batch_script(redis) if @data_type == 'list' && (@batch_count > 1)return redisend # def connectprivatedef load_batch_script(redis)#A Redis Lua EVAL script to fetch a count of keys#in case count is bigger than current items in queue whole queue will be returned without extra nil valuesredis_script = <<EOFlocal i = tonumber(ARGV[1])local res = {}local length = redis.call('llen',KEYS[1])if length < i then i = length endwhile (i > 0) dolocal item = redis.call("lpop", KEYS[1])if (not item) thenbreakendtable.insert(res, item)i = i-1endreturn res
EOF@redis_script_sha = redis.script(:load, redis_script)endprivatedef queue_event(msg, output_queue)begin@codec.decode(msg) do |event|decorate(event)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               output_queue << eventendrescue LogStash::ShutdownSignal => e# propagate upraise(e)rescue => e # parse or event creation error@logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace);endendprivatedef list_listener(redis, output_queue)item = redis.blpop(@key, 0, :timeout => 1)return unless item # from timeout or other conditions# blpop returns the 'key' read from as well as the item result# we only care about the result (2nd item in the list).queue_event(item[1], output_queue)# If @batch_count is 1, there's no need to continue.return if @batch_count == 1beginredis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item|queue_event(item, output_queue)end# Below is a commented-out implementation of 'batch fetch'# using pipelined LPOP calls. This in practice has been observed to# perform exactly the same in terms of event throughput as# the evalsha method. Given that the EVALSHA implementation uses# one call to Redis instead of N (where N == @batch_count) calls,# I decided to go with the 'evalsha' method of fetching N items# from Redis in bulk.#redis.pipelined do#error, item = redis.lpop(@key)#(@batch_count-1).times { redis.lpop(@key) }#end.each do |item|#queue_event(item, output_queue) if item#end# --- End commented out implementation of 'batch fetch'rescue Redis::CommandError => eif e.to_s =~ /NOSCRIPT/ then@logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);load_batch_script(redis)retryelseraise eendendendprivatedef channel_listener(redis, output_queue)redis.subscribe @key do |on|on.subscribe do |channel, count|@logger.info("Subscribed", :channel => channel, :count => count)endon.message do |channel, message|queue_event message, output_queueendon.unsubscribe do |channel, count|@logger.info("Unsubscribed", :channel => channel, :count => count)endendendprivatedef pattern_channel_listener(redis, output_queue)redis.psubscribe @key do |on|on.psubscribe do |channel, count|@logger.info("Subscribed", :channel => channel, :count => count)endon.pmessage do |ch, event, message|queue_event message, output_queueendon.punsubscribe do |channel, count|@logger.info("Unsubscribed", :channel => channel, :count => count)endendend# Since both listeners have the same basic loop, we've abstracted the outer# loop.privatedef listener_loop(listener, output_queue)while !@shutdown_requestedbegin@redis ||= connectself.send listener, @redis, output_queuerescue Redis::BaseError => e@logger.warn("Redis connection problem", :exception => e)# Reset the redis variable to trigger reconnect@redis = nilsleep 1endendend # listener_looppublicdef run(output_queue)if @data_type == 'list'listener_loop :list_listener, output_queueelsif @data_type == 'channel'listener_loop :channel_listener, output_queueelselistener_loop :pattern_channel_listener, output_queueendrescue LogStash::ShutdownSignal# ignore and quitend # def runpublicdef teardown@shutdown_requested = trueif @redisif @data_type == 'list'@redis.quit rescue nilelsif @data_type == 'channel'@redis.unsubscribe rescue nil@redis.connection.disconnectelsif @data_type == 'pattern_channel'@redis.punsubscribe rescue nil@redis.connection.disconnectend@redis = nilendend
end # class LogStash::Inputs::Redis

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/535376.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SSL双向认证和SSL单向认证的区别

双向认证 SSL 协议要求服务器和用户双方都有证书。单向认证 SSL 协议不需要客户拥有CA证书&#xff0c;具体的过程相对于上面的步骤&#xff0c;只需将服务器端验证客户证书的过程去掉&#xff0c;以及在协商对称密码方案&#xff0c;对称通话密钥时&#xff0c;服务器发送给客…

双向认证SSL原理

文中首先解释了加密解密的一些基础知识和概念&#xff0c;然后通过一个加密通信过程的例子说明了加密算法的作用&#xff0c;以及数字证书的出现所起的作用。接着对数字证书做一个详细的解释&#xff0c;并讨论一下windows中数字证书的管理&#xff0c;最后演示使用makecert生成…

Xtrabackup备份与恢复

一、Xtrabackup介绍 Percona-xtrabackup是 Percona公司开发的一个用于MySQL数据库物理热备的备份工具&#xff0c;支持MySQL、Percona server和MariaDB&#xff0c;开源免费&#xff0c;是目前较为受欢迎的主流备份工具。xtrabackup只能备份innoDB和xtraDB两种数据引擎的表&…

实时备份工具之inotify+rsync

1.inotify简介 inotify 是一个从 2.6.13 内核开始&#xff0c;对 Linux 文件系统进行高效率、细粒度、异步地监控机制&#xff0c; 用于通知用户空间程序的文件系统变化。可利用它对用户空间进行安全、性能、以及其他方面的监控。Inotify 反应灵敏&#xff0c;用法非常简单&…

mysql主从延迟

在实际的生产环境中&#xff0c;由单台MySQL作为独立的数据库是完全不能满足实际需求的&#xff0c;无论是在安全性&#xff0c;高可用性以及高并发等各个方面 因此&#xff0c;一般来说都是通过集群主从复制&#xff08;Master-Slave&#xff09;的方式来同步数据&#xff0c…

16张图带你吃透高性能 Redis 集群

现如今 Redis 变得越来越流行&#xff0c;几乎在很多项目中都要被用到&#xff0c;不知道你在使用 Redis 时&#xff0c;有没有思考过&#xff0c;Redis 到底是如何稳定、高性能地提供服务的&#xff1f; 你也可以尝试回答一下以下这些问题&#xff1a; 我使用 Redis 的场景很…

Redis与MySQL双写一致性如何保证

谈谈一致性 一致性就是数据保持一致&#xff0c;在分布式系统中&#xff0c;可以理解为多个节点中数据的值是一致的。 强一致性&#xff1a;这种一致性级别是最符合用户直觉的&#xff0c;它要求系统写入什么&#xff0c;读出来的也会是什么&#xff0c;用户体验好&#xff0c;…

weblogic忘记console密码

进入 cd /sotware/oracle_ldap/Middleware/user_projects/domains/base_domain/security/ 目录 执行 java -classpath /sotware/oracle_ldap/Middleware/wlserver_10.3/server/lib/weblogic.jar weblogic.security.utils.AdminAccount weblogic(账号) weblogic123(密码) . …

Redis的AOF日志

如果 Redis 每执行一条写操作命令&#xff0c;就把该命令以追加的方式写入到一个文件里&#xff0c;然后重启 Redis 的时候&#xff0c;先去读取这个文件里的命令&#xff0c;并且执行它&#xff0c;这不就相当于恢复了缓存数据了吗&#xff1f; 这种保存写操作命令到日志的持久…

Redis 核心技术与实战

目录 开篇词 | 这样学 Redis&#xff0c;才能技高一筹 01 | 基本架构&#xff1a;一个键值数据库包含什么&#xff1f; 02 | 数据结构&#xff1a;快速的Redis有哪些慢操作&#xff1f; 键和值用什么结构组织&#xff1f; 为什么哈希表操作变慢了&#xff1f; 有哪些底层数…

plsql定时器

Oralce中的任务有2种&#xff1a;Job和Dbms_job&#xff0c;两者的区别有&#xff1a; 1&#xff0e; jobs是oracle数据库的对象&#xff0c; dbms_jobs只是jobs对象的一个实例&#xff0c; 就像对于tables&#xff0c; emp和dept都是表的实例。 2&#xff0e; 创建方式也有…

PL/SQL批处理语句:BULK COLLECT 和 FORALL

PL/SQL程序中运行SQL语句是存在开销的&#xff0c;因为SQL语句是要提交给SQL引擎处理&#xff0c;这种在PL/SQL引擎和SQL引擎之间的控制转移叫做上下文却换&#xff0c;每次却换时&#xff0c;都有额外的开销 请看下图&#xff1a; 但是&#xff0c;FORALL和BULK COLLEC…

redis-full-check

https://github.com/alibaba/RedisFullCheck/releases redis-full-check是阿里云Redis&MongoDB团队开源的用于校验2个redis数据是否一致的工具。   redis-full-check通过全量对比源端和目的端的redis中的数据的方式来进行数据校验&#xff0c;其比较方式通过多轮次比较&a…

Docker目录挂载

Docker容器启动的时候&#xff0c;如果要挂载宿主机的一个目录&#xff0c;可以用-v参数指定。 譬如我要启动一个centos容器&#xff0c;宿主机的/test目录挂载到容器的/soft目录&#xff0c;可通过以下方式指定&#xff1a; # docker run -it -v /test:/soft centos /bin/ba…

Redis主从复制原理学习

Redis主从复制原理学习总结 - 运维笔记 和Mysql主从复制的原因一样&#xff0c;Redis虽然读取写入的速度都特别快&#xff0c;但是也会产生读压力特别大的情况。为了分担读压力&#xff0c;Redis支持主从复制&#xff0c;Redis的主从结构可以采用一主多从或者级联结构&#xff…

Redis哨兵模式(sentinel)学习总结及部署记录(主从复制、读写分离、主从切换)

Redis的集群方案大致有三种&#xff1a;1&#xff09;redis cluster集群方案&#xff1b;2&#xff09;master/slave主从方案&#xff1b;3&#xff09;哨兵模式来进行主从替换以及故障恢复。 一、sentinel哨兵模式介绍 Sentinel(哨兵)是用于监控redis集群中Master状态的工具&…

Redis之Redis内存模型

Redis是目前最火爆的内存数据库之一&#xff0c;通过在内存中读写数据&#xff0c;大大提高了读写速度&#xff0c;可以说Redis是实现网站高并发不可或缺的一部分。 我们使用Redis时&#xff0c;会接触Redis的5种对象类型&#xff08;字符串、哈希、列表、集合、有序集合&…

Redis监控指标

监控指标 •性能指标&#xff1a;Performance•内存指标: Memory•基本活动指标&#xff1a;Basic activity•持久性指标: Persistence•错误指标&#xff1a;Error 性能指标&#xff1a;Performance NameDescriptionlatencyRedis响应一个请求的时间instantaneous_ops_per_s…

MHA高可用

manager 组件 masterha_manger # 启动MHA masterha_check_ssh # 检查MHA的SSH配置状况 masterha_check_repl # 检查MySQL复制状况&#xff0c;配置信息 masterha_master_monitor # 检测master是否宕机 masterha_check_status # 检测当…

MySQL Replication需要注意的问题

主库意外宕机 如果没有设置主库的sync_binlog选项&#xff0c;就可能在奔溃前没有将最后的几个二进制日志事件刷新到磁盘中。备库I/O线程因此也可一直处于读不到尚未写入磁盘的事件的状态中。当主库从新启动时&#xff0c;备库将重连到主库并再次尝试去读该事件&#xff0c;但…