Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class ReportProcessor
- attr_accessor :name, :report
- def initialize(name, report)
- # TODO: validate report
- @name = name
- @report = report
- @redis = Redis.new(:host => $config.redis.host, :port => $config.redis.port, :db => 5)
- end
- def prefix
- "reports_#{@name}"
- end
- def last_collected_at=(time)
- @redis.set "#{prefix}_last_collected_at", time.xmlschema
- end
- def last_collected_at
- value = @redis.get "#{prefix}_last_collected_at"
- Time.parse value if value
- end
- def last_record_index=(int)
- @redis.set "#{prefix}_last_record_index", int
- end
- def last_record_index
- (@redis.get("#{prefix}_last_record_index") || '-1').to_i
- end
- # sets an in use flag on the rows till the passed block is completed
- def lock
- if @redis.getset("#{prefix}_lock", 'locked') != 'locked'
- begin
- yield
- rescue StandardError => e
- raise e
- ensure
- @redis.del "#{prefix}_lock"
- end
- true
- else
- false
- end
- end
- def lock!
- throw StandardError, 'Lock could not be aquired' unless lock{ yield }
- end
- def reset!
- @redis.keys("#{prefix}*").each { |key| @redis.del key }
- end
- def keys(with_value=false)
- @redis.smembers("#{prefix}_keys").each do |key|
- @redis.smembers("#{prefix}_dimensions").each do |dimension|
- @report[:metrics].keys.each do |metric|
- if with_value
- value = @redis.get("#{prefix}:#{key}:#{dimension}:#{metric}") || '0'
- yield key, dimension, metric, value.to_i
- else
- yield key, dimension, metric
- end
- end
- end
- end
- end
- def collect(data)
- throw ArgumentError, "data must be enumerable" unless data.respond_to? :each
- last = nil
- data.each do |row|
- last = row
- key = @report[:key].call(row)
- dimensions = @report[:dimensions].call(row)
- @redis.sadd "#{prefix}_keys", key
- dimensions.each do |dimension|
- @redis.sadd "#{prefix}_dimensions", dimension
- @report[:metrics].each do |metric, proc|
- @redis.incrby "#{prefix}:#{key}:#{dimension}:#{metric}", proc.call(row)
- end
- end
- end
- if last
- self.last_record_index = @report[:index].call last
- self.last_collected_at = Time.now
- end
- end
- end
Add Comment
Please, Sign In to add comment