Class | OpenWFE::SocketListener |
In: |
lib/openwfe/listeners/socketlisteners.rb
|
Parent: | Service |
Listens for workitems on a socket.
Workitems can be instances of InFlowWorkItem or LaunchItem.
By default, listens on port 7007.
require 'openwfe/listeners/socketlisteners' engine.add_workitem_listener(OpenWFE::SocketListener)
But you can be more specific :
engine.add_workitem_listener( OpenWFE::SocketListener.new( "sl_whatever_name", engine.application_context, "target.host.xx", 7707)
server | [R] | |
thread | [R] |
# File lib/openwfe/listeners/socketlisteners.rb, line 81 81: def initialize (service_name, application_context, port=nil, iface=nil) 82: 83: super(service_name, application_context) 84: 85: #iface ||= "127.0.0.1" 86: # not necessary 87: 88: port ||= 7007 89: 90: @server = TCPServer.new(iface, port) 91: 92: @thread = OpenWFE.call_in_thread(@service_name, self) do 93: listen 94: end 95: end
This base implementation is capable of decoding XML workitems and YAML workitems.
# File lib/openwfe/listeners/socketlisteners.rb, line 122 122: def decode_workitem (data) 123: 124: return nil if not data or data.length < 4 125: 126: if data[0, 1] == "<" 127: # 128: # seems like XML 129: 130: OpenWFE::XmlCodec::decode(data) 131: 132: elsif data[0, 3] == "---" 133: # 134: # must be YAML 135: 136: YAML.load(data) 137: 138: else 139: # 140: # perhaps OpenWFEja style header + workitem 141: 142: data = pop_line(data) 143: data = pop_line(data) 144: 145: decode_workitem(data) 146: end 147: end
The base implementation allows returns true.
An override of this method might check the origin of the socket and maybe only allow a certain range of hosts…
# File lib/openwfe/listeners/socketlisteners.rb, line 168 168: def is_allowed? (socket) 169: 170: true 171: end
Simply pipes back the result of get_engine.reply(wi) on the socket.
# File lib/openwfe/listeners/socketlisteners.rb, line 153 153: def reply_to_socket (socket, result) 154: 155: socket.puts result.to_s 156: socket.puts 157: socket.close_write 158: 159: #ldebug { "reply_to_socket() result is >#{result}<" } 160: end
Stops this socket listener (shuts down its socket)
# File lib/openwfe/listeners/socketlisteners.rb, line 100 100: def stop 101: 102: @thread.raise "shutdown" 103: 104: begin 105: @server.close 106: rescue Exception => e 107: ldebug { "stop() exc : #{e.to_s}" } 108: end 109: #begin 110: # @server.shutdown 111: #rescue Exception => e 112: # ldebug { "stop() exc : #{e.to_s}" } 113: #end 114: 115: linfo { "stop() shut socket down" } 116: end
The bulk work of handling a connection is done here. The incoming workitem is piped to the engine, then the result it written back a string on the socket which then gets closed.
# File lib/openwfe/listeners/socketlisteners.rb, line 205 205: def handle_socket (socket) 206: 207: ldebug do 208: "handle_socket() "+ 209: "connection from #{socket.peeraddr.join(' ')}" 210: end 211: 212: data = "" 213: loop do 214: s = socket.gets 215: break unless s 216: data += s 217: end 218: 219: wi = decode_workitem(data) 220: 221: if not wi 222: 223: ldebug do 224: "handle_socket() "+ 225: ">>>#{data}<<< doesn't contain a workitem" 226: end 227: socket.close 228: return 229: 230: else 231: 232: ldebug do 233: "handle_socket() "+ 234: "received something of class #{wi.class}" 235: end 236: end 237: 238: result = nil 239: 240: begin 241: 242: #result = get_engine.reply(wi) 243: #result = handle_item(wi) 244: handle_item wi 245: 246: result = "<ok-reply/>" 247: 248: ldebug { "handle_socket() result is >>#{result}<<" } 249: 250: rescue Exception => e 251: 252: result = "ERROR\n\n" 253: result << OpenWFE::exception_to_s(e) 254: 255: ldebug { "handle_socket() error reply :\n" + result } 256: end 257: 258: reply_to_socket(socket, result) 259: 260: socket.close 261: end
Where the socket waiting loop is…
# File lib/openwfe/listeners/socketlisteners.rb, line 178 178: def listen 179: 180: linfo { "listen() listening on #{@server.addr.join(' ')}" } 181: 182: loop do 183: 184: socket = nil 185: 186: begin 187: socket = @server.accept 188: rescue Exception => e 189: linfo { "listen() shut down '#{e}'" } 190: end 191: 192: return unless socket 193: 194: OpenWFE.call_in_thread(@service_name, self) do 195: handle_socket(socket) if socket and is_allowed?(socket) 196: end 197: end 198: end