| Class | OpenWFE::SocketListener |
| In: |
lib/openwfe/listeners/socketlisteners.rb
|
| Parent: | Service |
Listens for workitems on a socket.
Workitems can be instances of InFlowWorkItem or LaunchItem.
By default, listens on port 7007.
require 'openwfe/listeners/socketlisteners'
engine.add_workitem_listener(OpenWFE::SocketListener)
But you can be more specific :
engine.add_workitem_listener(
OpenWFE::SocketListener.new(
"sl_whatever_name",
engine.application_context,
"target.host.xx",
7707)
| server | [R] | |
| thread | [R] |
# File lib/openwfe/listeners/socketlisteners.rb, line 81
81: def initialize (service_name, application_context, port=nil, iface=nil)
82:
83: super(service_name, application_context)
84:
85: #iface ||= "127.0.0.1"
86: # not necessary
87:
88: port ||= 7007
89:
90: @server = TCPServer.new(iface, port)
91:
92: @thread = OpenWFE.call_in_thread(@service_name, self) do
93: listen
94: end
95: end
This base implementation is capable of decoding XML workitems and YAML workitems.
# File lib/openwfe/listeners/socketlisteners.rb, line 122
122: def decode_workitem (data)
123:
124: return nil if not data or data.length < 4
125:
126: if data[0, 1] == "<"
127: #
128: # seems like XML
129:
130: OpenWFE::XmlCodec::decode(data)
131:
132: elsif data[0, 3] == "---"
133: #
134: # must be YAML
135:
136: YAML.load(data)
137:
138: else
139: #
140: # perhaps OpenWFEja style header + workitem
141:
142: data = pop_line(data)
143: data = pop_line(data)
144:
145: decode_workitem(data)
146: end
147: end
The base implementation allows returns true.
An override of this method might check the origin of the socket and maybe only allow a certain range of hosts…
# File lib/openwfe/listeners/socketlisteners.rb, line 168
168: def is_allowed? (socket)
169:
170: true
171: end
Simply pipes back the result of get_engine.reply(wi) on the socket.
# File lib/openwfe/listeners/socketlisteners.rb, line 153
153: def reply_to_socket (socket, result)
154:
155: socket.puts result.to_s
156: socket.puts
157: socket.close_write
158:
159: #ldebug { "reply_to_socket() result is >#{result}<" }
160: end
Stops this socket listener (shuts down its socket)
# File lib/openwfe/listeners/socketlisteners.rb, line 100
100: def stop
101:
102: @thread.raise "shutdown"
103:
104: begin
105: @server.close
106: rescue Exception => e
107: ldebug { "stop() exc : #{e.to_s}" }
108: end
109: #begin
110: # @server.shutdown
111: #rescue Exception => e
112: # ldebug { "stop() exc : #{e.to_s}" }
113: #end
114:
115: linfo { "stop() shut socket down" }
116: end
The bulk work of handling a connection is done here. The incoming workitem is piped to the engine, then the result it written back a string on the socket which then gets closed.
# File lib/openwfe/listeners/socketlisteners.rb, line 205
205: def handle_socket (socket)
206:
207: ldebug do
208: "handle_socket() "+
209: "connection from #{socket.peeraddr.join(' ')}"
210: end
211:
212: data = ""
213: loop do
214: s = socket.gets
215: break unless s
216: data += s
217: end
218:
219: wi = decode_workitem(data)
220:
221: if not wi
222:
223: ldebug do
224: "handle_socket() "+
225: ">>>#{data}<<< doesn't contain a workitem"
226: end
227: socket.close
228: return
229:
230: else
231:
232: ldebug do
233: "handle_socket() "+
234: "received something of class #{wi.class}"
235: end
236: end
237:
238: result = nil
239:
240: begin
241:
242: #result = get_engine.reply(wi)
243: #result = handle_item(wi)
244: handle_item wi
245:
246: result = "<ok-reply/>"
247:
248: ldebug { "handle_socket() result is >>#{result}<<" }
249:
250: rescue Exception => e
251:
252: result = "ERROR\n\n"
253: result << OpenWFE::exception_to_s(e)
254:
255: ldebug { "handle_socket() error reply :\n" + result }
256: end
257:
258: reply_to_socket(socket, result)
259:
260: socket.close
261: end
Where the socket waiting loop is…
# File lib/openwfe/listeners/socketlisteners.rb, line 178
178: def listen
179:
180: linfo { "listen() listening on #{@server.addr.join(' ')}" }
181:
182: loop do
183:
184: socket = nil
185:
186: begin
187: socket = @server.accept
188: rescue Exception => e
189: linfo { "listen() shut down '#{e}'" }
190: end
191:
192: return unless socket
193:
194: OpenWFE.call_in_thread(@service_name, self) do
195: handle_socket(socket) if socket and is_allowed?(socket)
196: end
197: end
198: end