- lua脚本基本概念
- Redis中的EVAL命令如何使用
- 理解上面脚本的工作
另外还要知道EVAL命令的使用方法,看下面这个命令,就好理解了!EVAL "return KEYS[1] KEYS[2] ARGV[1] ARGV[2];" 2 name:xing age:13
- 解析字符串脚本,根据校验和生成lua的方法
- 把校验和和函数放入一个lua_script字典里面,之后就可以通过EVALSHA命令直接使用校验和执行函数。
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/inputs/threadable"
require "logstash/namespace"# This input will read events from a Redis instance; it supports both Redis channels and lists.
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
# the channel commands used by Logstash are found in Redis v1.3.8+.
# While you may be able to make these Redis versions work, the best performance
# and stability will be found in more recent stable versions. Versions 2.6.0+
# are recommended.
# For more information about Redis, see <http://redis.io/>
# `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or
# newer. Anything older does not support the operations used by batching.
class LogStash::Inputs::Redis < LogStash::Inputs::Threadableconfig_name "redis"default :codec, "json"# The `name` configuration is used for logging in case there are multiple instances.# This feature has no real function and will be removed in future versions.config :name, :validate => :string, :default => "default", :deprecated => true# The hostname of your Redis server.config :host, :validate => :string, :default => ""# The port to connect on.config :port, :validate => :number, :default => 6379# The Redis database number.config :db, :validate => :number, :default => 0# Initial connection timeout in seconds.config :timeout, :validate => :number, :default => 5# Password to authenticate with. There is no authentication by default.config :password, :validate => :password# The name of the Redis queue (we'll use BLPOP against this).# TODO: remove soon.config :queue, :validate => :string, :deprecated => true# The name of a Redis list or channel.# TODO: change required to trueconfig :key, :validate => :string, :required => false# Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP the# key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.# If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.# TODO: change required to trueconfig :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false# The number of events to return from Redis using EVAL.config :batch_count, :validate => :number, :default => 1publicdef registerrequire 'redis'@redis = nil@redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"# TODO remove after setting key and data_type to trueif @queueif @key or @data_typeraise RuntimeError.new("Cannot specify queue parameter and key or data_type")end@key = @queue@data_type = 'list'endif not @key or not @data_typeraise RuntimeError.new("Must define queue, or key and data_type parameters")end# end TODO@logger.info("Registering Redis", :identity => identity)end # def register# A string used to identify a Redis instance in log messages# TODO(sissel): Use instance variables for this once the @name config# option is removed.privatedef identity@name || "#{@redis_url} #{@data_type}:#{@key}"endprivatedef connectredis = Redis.new(:host => @host,:port => @port,:timeout => @timeout,:db => @db,:password => @password.nil? ? nil : @password.value)load_batch_script(redis) if @data_type == 'list' && (@batch_count > 1)return redisend # def connectprivatedef load_batch_script(redis)#A Redis Lua EVAL script to fetch a count of keys#in case count is bigger than current items in queue whole queue will be returned without extra nil valuesredis_script = <<EOFlocal i = tonumber(ARGV[1])local res = {}local length = redis.call('llen',KEYS[1])if length < i then i = length endwhile (i > 0) dolocal item = redis.call("lpop", KEYS[1])if (not item) thenbreakendtable.insert(res, item)i = i-1endreturn res
EOF@redis_script_sha = redis.script(:load, redis_script)endprivatedef queue_event(msg, output_queue)begin@codec.decode(msg) do |event|decorate(event) output_queue << eventendrescue LogStash::ShutdownSignal => e# propagate upraise(e)rescue => e # parse or event creation error@logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace);endendprivatedef list_listener(redis, output_queue)item = redis.blpop(@key, 0, :timeout => 1)return unless item # from timeout or other conditions# blpop returns the 'key' read from as well as the item result# we only care about the result (2nd item in the list).queue_event(item[1], output_queue)# If @batch_count is 1, there's no need to continue.return if @batch_count == 1beginredis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item|queue_event(item, output_queue)end# Below is a commented-out implementation of 'batch fetch'# using pipelined LPOP calls. This in practice has been observed to# perform exactly the same in terms of event throughput as# the evalsha method. Given that the EVALSHA implementation uses# one call to Redis instead of N (where N == @batch_count) calls,# I decided to go with the 'evalsha' method of fetching N items# from Redis in bulk.#redis.pipelined do#error, item = redis.lpop(@key)#(@batch_count-1).times { redis.lpop(@key) }#end.each do |item|#queue_event(item, output_queue) if item#end# --- End commented out implementation of 'batch fetch'rescue Redis::CommandError => eif e.to_s =~ /NOSCRIPT/ then@logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);load_batch_script(redis)retryelseraise eendendendprivatedef channel_listener(redis, output_queue)redis.subscribe @key do |on|on.subscribe do |channel, count|@logger.info("Subscribed", :channel => channel, :count => count)endon.message do |channel, message|queue_event message, output_queueendon.unsubscribe do |channel, count|@logger.info("Unsubscribed", :channel => channel, :count => count)endendendprivatedef pattern_channel_listener(redis, output_queue)redis.psubscribe @key do |on|on.psubscribe do |channel, count|@logger.info("Subscribed", :channel => channel, :count => count)endon.pmessage do |ch, event, message|queue_event message, output_queueendon.punsubscribe do |channel, count|@logger.info("Unsubscribed", :channel => channel, :count => count)endendend# Since both listeners have the same basic loop, we've abstracted the outer# loop.privatedef listener_loop(listener, output_queue)while !@shutdown_requestedbegin@redis ||= connectself.send listener, @redis, output_queuerescue Redis::BaseError => e@logger.warn("Redis connection problem", :exception => e)# Reset the redis variable to trigger reconnect@redis = nilsleep 1endendend # listener_looppublicdef run(output_queue)if @data_type == 'list'listener_loop :list_listener, output_queueelsif @data_type == 'channel'listener_loop :channel_listener, output_queueelselistener_loop :pattern_channel_listener, output_queueendrescue LogStash::ShutdownSignal# ignore and quitend # def runpublicdef teardown@shutdown_requested = trueif @redisif @data_type == 'list'@redis.quit rescue nilelsif @data_type == 'channel'@redis.unsubscribe rescue nil@redis.connection.disconnectelsif @data_type == 'pattern_channel'@redis.punsubscribe rescue nil@redis.connection.disconnectend@redis = nilendend
end # class LogStash::Inputs::Redis