| Class | OpenWFE::Extras::SqsListener |
| In: |
lib/openwfe/extras/listeners/sqslisteners.rb
|
| Parent: | Service |
Polls an Amazon SQS queue for workitems
Workitems can be instances of InFlowWorkItem or LaunchItem.
require 'openwfe/extras/listeners/sqslisteners'
ql = OpenWFE::SqsListener("workqueue1", engine.application_context)
engine.add_workitem_listener(ql, "2m30s")
#
# thus, the engine will poll our "workqueue1" SQS queue
# every 2 minutes and 30 seconds
| queue_name | [R] | The name of the Amazon SQS whom this listener cares for |
# File lib/openwfe/extras/listeners/sqslisteners.rb, line 79
79: def initialize (queue_name, application_context)
80:
81: @queue_name = queue_name.to_s
82:
83: service_name = "#{self.class}::#{@queue_name}"
84:
85: super service_name, application_context
86:
87: linfo { "new() queue is '#{@queue_name}'" }
88: end
Extracts a workitem from the message‘s body.
By default, this listeners assumes the workitem is stored in its "hash form" (not directly as a Ruby InFlowWorkItem instance).
LaunchItem instances (as hash as well) are also accepted.
# File lib/openwfe/extras/listeners/sqslisteners.rb, line 135
135: def decode_object (message)
136:
137: o = Base64.decode64 message.message_body
138: o = YAML.load o
139: o = OpenWFE::workitem_from_h o
140: o
141: end
polls the SQS for incoming messages
# File lib/openwfe/extras/listeners/sqslisteners.rb, line 93
93: def trigger (params)
94: synchronize do
95:
96: ldebug { "trigger()" }
97:
98: qs = Rufus::SQS::QueueService.new
99:
100: qs.create_queue @queue_name
101: # just to be sure it is there
102:
103: while true
104:
105: l = qs.get_messages(
106: @queue_name, :timeout => 0, :count => 255)
107:
108: break if l.length < 1
109:
110: l.each do |msg|
111:
112: o = decode_object msg
113:
114: handle_item o
115:
116: msg.delete
117:
118: ldebug do
119: "trigger() " +
120: "handled successfully msg #{msg.message_id}"
121: end
122: end
123: end
124: end
125: end