Package Products :: Package ZenUtils :: Module PBUtil
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.PBUtil

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2007, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  # taken from r1.10 of buildbot.sf.net/buildbot/pbutil.py 
 12   
 13  # flumotion has nearly the same code 
 14   
 15  __doc__ = """PBUtil 
 16  Base classes handy for use with PB clients. 
 17  """ 
 18   
 19  import logging 
 20  zenlog = logging.getLogger("zen.pbclientfactory") 
 21   
 22  from twisted.spread import pb 
 23   
 24  from twisted.spread.pb import PBClientFactory 
 25  from twisted.internet import protocol, reactor, defer, task 
 26  from twisted.internet.error import ConnectionClosed 
27 28 -class ReconnectingPBClientFactory(PBClientFactory, 29 protocol.ReconnectingClientFactory):
30 """Reconnecting client factory for PB brokers. 31 32 Like PBClientFactory, but if the connection fails or is lost, the factory 33 will attempt to reconnect. 34 35 Instead of using f.getRootObject (which gives a Deferred that can only 36 be fired once), override the gotRootObject method. 37 38 Instead of using the newcred f.login (which is also one-shot), call 39 f.startLogin() with the credentials and client, and override the 40 gotPerspective method. 41 42 Instead of using the oldcred f.getPerspective (also one-shot), call 43 f.startGettingPerspective() with the same arguments, and override 44 gotPerspective. 45 46 gotRootObject and gotPerspective will be called each time the object is 47 received (once per successful connection attempt). You will probably want 48 to use obj.notifyOnDisconnect to find out when the connection is lost. 49 50 If an authorization error occurs, failedToGetPerspective() will be 51 invoked. 52 53 To use me, subclass, then hand an instance to a connector (like 54 TCPClient). 55 """ 56 __pychecker__='no-override' 57 58 # maxDelay(secs) set to 5 minute maximum delay before attempting to 59 # reconnect 60 maxDelay = 300 61
62 - def __init__(self, connectTimeout=30, pingPerspective=False, pingInterval=30, pingtimeout=120):
63 PBClientFactory.__init__(self) 64 self._doingLogin = False 65 self._doingGetPerspective = False 66 self._scheduledConnectTimeout = None 67 self._connectTimeout = connectTimeout 68 # should the perspective be pinged. Perspective must have a ping method 69 self._shouldPingPerspective = pingPerspective 70 # how often to ping 71 self._pingInterval = pingInterval 72 # how long to wait for a ping before closing connection 73 self._pingTimeoutTime = pingtimeout 74 # ref to the scheduled ping timeout call 75 self._pingTimeout = None 76 # looping call doing the ping 77 self._pingCheck = None 78 79 self._perspective = None
80
81 - def connectTCP(self, host, port):
82 factory = self 83 self.connector = reactor.connectTCP(host, port, factory) 84 return self.connector
85
86 - def clientConnectionFailed(self, connector, reason):
87 zenlog.debug("Failed to create connection to %s:%s - %s", 88 connector.host, connector.port, reason) 89 self._perspective = None 90 self._cancelConnectTimeout() 91 PBClientFactory.clientConnectionFailed(self, connector, reason) 92 # Twisted-1.3 erroneously abandons the connection on non-UserErrors. 93 # To avoid this bug, don't upcall, and implement the correct version 94 # of the method here. 95 if self.continueTrying: 96 self.connector = connector 97 self.retry()
98
99 - def clientConnectionLost(self, connector, reason, reconnecting=1):
100 zenlog.debug("Lost connection to %s:%s - %s", connector.host, 101 connector.port, reason) 102 self._perspective = None 103 self._cancelConnectTimeout() 104 PBClientFactory.clientConnectionLost(self, connector, reason, 105 reconnecting=reconnecting) 106 RCF = protocol.ReconnectingClientFactory 107 RCF.clientConnectionLost(self, connector, reason)
108
109 - def clientConnectionMade(self, broker):
110 zenlog.debug("Connected") 111 self._cancelConnectTimeout() 112 self.resetDelay() 113 PBClientFactory.clientConnectionMade(self, broker) 114 if self._doingLogin: 115 self._startConnectTimeout("Login") 116 self.doLogin(self._root) 117 if self._doingGetPerspective: 118 self.doGetPerspective(self._root) 119 self.gotRootObject(self._root)
120
121 - def startedConnecting(self, connector):
122 zenlog.debug("Starting connection...") 123 self._startConnectTimeout("Initial connect") 124 self.connecting()
125
126 - def __getstate__(self):
127 # this should get folded into ReconnectingClientFactory 128 d = self.__dict__.copy() 129 d['connector'] = None 130 d['_callID'] = None 131 return d
132 133 # oldcred methods 134
135 - def getPerspective(self, *args):
136 raise RuntimeError( "getPerspective is one-shot: use startGettingPerspective instead" )
137
138 - def startGettingPerspective(self, username, password, serviceName, 139 perspectiveName=None, client=None):
140 self._doingGetPerspective = True 141 if perspectiveName == None: 142 perspectiveName = username 143 self._oldcredArgs = (username, password, serviceName, 144 perspectiveName, client)
145
146 - def doGetPerspective(self, root):
147 # oldcred getPerspective() 148 (username, password, 149 serviceName, perspectiveName, client) = self._oldcredArgs 150 d = self._cbAuthIdentity(root, username, password) 151 d.addCallback(self._cbGetPerspective, 152 serviceName, perspectiveName, client) 153 d.addCallbacks(self._gotPerspective, self.failedToGetPerspective)
154 155 156 # newcred methods 157
158 - def login(self, credentials, client=None):
159 from Products.ZenUtils.Utils import unused 160 unused(credentials, client) 161 raise RuntimeError( "Login is one-shot: use startLogin instead" )
162
163 - def startLogin(self, credentials, client=None):
164 self._credentials = credentials 165 self._client = client 166 self._doingLogin = True
167
168 - def doLogin(self, root):
169 # newcred login() 170 zenlog.debug("Sending credentials") 171 d = self._cbSendUsername(root, self._credentials.username, 172 self._credentials.password, self._client) 173 d.addCallbacks(self._gotPerspective, self.failedToGetPerspective) 174 return d
175
176 - def _gotPerspective(self, perspective):
177 self._cancelConnectTimeout() 178 self._cancelPingTimeout() 179 self._perspective = perspective 180 if self._shouldPingPerspective: 181 reactor.callLater(0, self._startPingCycle) 182 self.gotPerspective(perspective)
183 184
185 - def _disconnect(self):
186 if self._broker: 187 self.disconnect() 188 elif self.connector: 189 try: 190 self.connector.disconnect() 191 except Exception: 192 zenlog.exception('Could not disconnect') 193 else: 194 zenlog.debug('No connector or broker to disconnect')
195 196 # methods for connecting and login timeout
197 - def _startConnectTimeout(self, msg):
198 self._cancelConnectTimeout() 199 self._scheduledConnectTimeout = reactor.callLater(self._connectTimeout, self._timeoutConnect, msg)
200
201 - def _timeoutConnect(self, msg):
202 zenlog.info("%s timed out after %s seconds", msg, self._connectTimeout) 203 self._disconnect()
204
205 - def _cancelConnectTimeout(self):
206 self._scheduledConnectTimeout, timeout = None, self._scheduledConnectTimeout 207 if timeout and timeout.active(): 208 zenlog.debug("Cancelling connect timeout") 209 timeout.cancel()
210 211 # methods to check connection is active
212 - def _startPingTimeout(self):
213 if not self._pingTimeout: 214 self._pingTimeout = reactor.callLater(self._pingTimeoutTime, 215 self._doPingTimeout)
216
217 - def _cancelPingTimeout(self):
218 self._pingTimeout, timeout = None, self._pingTimeout 219 if timeout and timeout.active(): 220 zenlog.debug("Cancelling ping timeout") 221 timeout.cancel()
222
223 - def _doPingTimeout(self):
224 if self._perspective: 225 zenlog.warn("Perspective ping timed out after %s seconds", self._pingTimeoutTime) 226 self._disconnect()
227 228 @defer.inlineCallbacks
229 - def _startPingCycle(self):
230 if not self._pingCheck: 231 pingCheck = task.LoopingCall(self._pingPerspective) 232 self._pingCheck = pingCheck 233 try: 234 yield pingCheck.start(self._pingInterval) 235 except Exception: 236 zenlog.exception("perspective ping loop died") 237 finally: 238 # should only happen at shutdown 239 zenlog.info("perspective ping loop ended")
240 241 @defer.inlineCallbacks
242 - def _pingPerspective(self):
243 try: 244 if self._perspective: 245 zenlog.debug('pinging perspective') 246 self._startPingTimeout() 247 response = yield self._perspective.callRemote('ping') 248 zenlog.debug("perspective %sed", response) 249 else: 250 zenlog.debug('skipping perspective ping') 251 self._cancelPingTimeout() 252 except ConnectionClosed: 253 zenlog.info("Connection was closed") 254 self._cancelPingTimeout() 255 except Exception: 256 zenlog.exception("ping perspective exception")
257 258 # methods to override 259
260 - def connecting(self):
261 """ 262 Called when a connection is about to be attempted. Can be the initial 263 connect or a retry/reconnect 264 """ 265 pass
266
267 - def gotPerspective(self, perspective):
268 """The remote avatar or perspective (obtained each time this factory 269 connects) is now available.""" 270 pass
271
272 - def gotRootObject(self, root):
273 """The remote root object (obtained each time this factory connects) 274 is now available. This method will be called each time the connection 275 is established and the object reference is retrieved.""" 276 pass
277
278 - def failedToGetPerspective(self, why):
279 """The login process failed, most likely because of an authorization 280 failure (bad password), but it is also possible that we lost the new 281 connection before we managed to send our credentials. 282 """ 283 self._cancelConnectTimeout() 284 zenlog.debug("ReconnectingPBClientFactory.failedToGetPerspective") 285 if why.check(pb.PBConnectionLost): 286 zenlog.debug("we lost the brand-new connection") 287 # retrying might help here, let clientConnectionLost decide 288 return 289 290 zenlog.warning("Cancelling attempts to connect") 291 self.stopTrying() # logging in harder won't help 292 if why.type == 'twisted.cred.error.UnauthorizedLogin': 293 zenlog.critical("zenhub username/password combination is incorrect!") 294 # Don't exit as Enterprise caches info and can survive 295 else: 296 zenlog.critical("Unknown connection problem to zenhub %s", why.type)
297