Class OpenWFE::Extras::SqsListener
In: lib/openwfe/extras/listeners/sqslisteners.rb
Parent: Service

Polls an Amazon SQS queue for workitems

Workitems can be instances of InFlowWorkItem or LaunchItem.

    require 'openwfe/extras/listeners/sqslisteners'

    ql = OpenWFE::SqsListener("workqueue1", engine.application_context)

    engine.add_workitem_listener(ql, "2m30s")
        #
        # thus, the engine will poll our "workqueue1" SQS queue
        # every 2 minutes and 30 seconds

Methods

decode_object   new   trigger  

Included Modules

MonitorMixin WorkItemListener Rufus::Schedulable

Attributes

queue_name  [R]  The name of the Amazon SQS whom this listener cares for

Public Class methods

[Source]

    # File lib/openwfe/extras/listeners/sqslisteners.rb, line 79
79:         def initialize (queue_name, application_context)
80: 
81:             @queue_name = queue_name.to_s
82: 
83:             service_name = "#{self.class}::#{@queue_name}"
84: 
85:             super service_name, application_context
86: 
87:             linfo { "new() queue is '#{@queue_name}'" }
88:         end

Public Instance methods

Extracts a workitem from the message‘s body.

By default, this listeners assumes the workitem is stored in its "hash form" (not directly as a Ruby InFlowWorkItem instance).

LaunchItem instances (as hash as well) are also accepted.

[Source]

     # File lib/openwfe/extras/listeners/sqslisteners.rb, line 135
135:         def decode_object (message)
136: 
137:             o = Base64.decode64 message.message_body
138:             o = YAML.load o
139:             o = OpenWFE::workitem_from_h o
140:             o
141:         end

polls the SQS for incoming messages

[Source]

     # File lib/openwfe/extras/listeners/sqslisteners.rb, line 93
 93:         def trigger (params)
 94:             synchronize do
 95: 
 96:                 ldebug { "trigger()" }
 97: 
 98:                 qs = Rufus::SQS::QueueService.new
 99: 
100:                 qs.create_queue @queue_name
101:                     # just to be sure it is there
102: 
103:                 while true
104: 
105:                     l = qs.get_messages(
106:                         @queue_name, :timeout => 0, :count => 255)
107: 
108:                     break if l.length < 1
109: 
110:                     l.each do |msg|
111: 
112:                         o = decode_object msg
113: 
114:                         handle_item o
115: 
116:                         msg.delete
117: 
118:                         ldebug do 
119:                             "trigger() " +
120:                             "handled successfully msg #{msg.message_id}" 
121:                         end
122:                     end
123:                 end
124:             end
125:         end

[Validate]