Class | OpenWFE::Extras::SqsListener |
In: |
lib/openwfe/extras/listeners/sqslisteners.rb
|
Parent: | Service |
Polls an Amazon SQS queue for workitems
Workitems can be instances of InFlowWorkItem or LaunchItem.
require 'openwfe/extras/listeners/sqslisteners' ql = OpenWFE::SqsListener("workqueue1", engine.application_context) engine.add_workitem_listener(ql, "2m30s") # # thus, the engine will poll our "workqueue1" SQS queue # every 2 minutes and 30 seconds
queue_name | [R] | The name of the Amazon SQS whom this listener cares for |
# File lib/openwfe/extras/listeners/sqslisteners.rb, line 79 79: def initialize (queue_name, application_context) 80: 81: @queue_name = queue_name.to_s 82: 83: service_name = "#{self.class}::#{@queue_name}" 84: 85: super service_name, application_context 86: 87: linfo { "new() queue is '#{@queue_name}'" } 88: end
Extracts a workitem from the message‘s body.
By default, this listeners assumes the workitem is stored in its "hash form" (not directly as a Ruby InFlowWorkItem instance).
LaunchItem instances (as hash as well) are also accepted.
# File lib/openwfe/extras/listeners/sqslisteners.rb, line 135 135: def decode_object (message) 136: 137: o = Base64.decode64 message.message_body 138: o = YAML.load o 139: o = OpenWFE::workitem_from_h o 140: o 141: end
polls the SQS for incoming messages
# File lib/openwfe/extras/listeners/sqslisteners.rb, line 93 93: def trigger (params) 94: synchronize do 95: 96: ldebug { "trigger()" } 97: 98: qs = Rufus::SQS::QueueService.new 99: 100: qs.create_queue @queue_name 101: # just to be sure it is there 102: 103: while true 104: 105: l = qs.get_messages( 106: @queue_name, :timeout => 0, :count => 255) 107: 108: break if l.length < 1 109: 110: l.each do |msg| 111: 112: o = decode_object msg 113: 114: handle_item o 115: 116: msg.delete 117: 118: ldebug do 119: "trigger() " + 120: "handled successfully msg #{msg.message_id}" 121: end 122: end 123: end 124: end 125: end