Package Products :: Package ZenUtils :: Package celeryintegration :: Module backend
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.celeryintegration.backend

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2012, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  import Globals 
 12  import time 
 13  import threading 
 14  import Queue 
 15  import transaction 
 16  import logging 
 17  import traceback 
 18  from zope.component import getUtility 
 19  from ZODB.POSException import ConflictError 
 20  from datetime import datetime 
 21  from celery.backends.base import BaseDictBackend 
 22  from celery.exceptions import TimeoutError 
 23  import AccessControl.User 
 24  from AccessControl.SecurityManagement import newSecurityManager 
 25  from AccessControl.SecurityManagement import noSecurityManager 
 26  from Products.ZenUtils.celeryintegration import states 
 27  from Products.ZenUtils.ZodbFactory import IZodbFactoryLookup 
 28  from Products.Jobber.exceptions import NoSuchJobException 
 29  from Products.ZenRelations.ZenPropertyManager import setDescriptors 
 30  log = logging.getLogger("zen.celeryintegration") 
 31   
 32  CONNECTION_ENVIRONMENT = threading.local() 
33 34 35 -class ConnectionCloser(object):
36 - def __init__(self, connection):
37 self.connection = connection
38
39 - def __del__(self):
40 try: 41 transaction.abort() 42 except Exception: 43 pass 44 try: 45 noSecurityManager() 46 self.connection.close() 47 except Exception: 48 pass
49
50 51 -class ZODBBackend(BaseDictBackend):
52 """ 53 ZODB result backend for Celery. 54 """ 55 CONN_MARKER = 'ZODBBackendConnection' 56 _db = None 57
58 - def __init__(self, *args, **kwargs):
59 BaseDictBackend.__init__(self, *args, **kwargs) 60 self._db_lock = threading.Lock()
61
62 - def get_db_options(self):
63 options = getattr(self.app, 'db_options', None) 64 if options is not None: 65 return options.__dict__ 66 else: 67 # This path should never be hit except in testing, because 68 # Globals.DB will have been set before this method is even called. 69 # Having this lets us have zendmd open a new db so we can test in 70 # process, if we comment out getting the database from Globals in 71 # db() below. 72 from Products.ZenUtils.GlobalConfig import getGlobalConfiguration 73 return getGlobalConfiguration()
74 75 @property
76 - def db(self):
77 """ 78 Get a handle to the database by whatever means necessary 79 """ 80 with self._db_lock: 81 # Get the current database 82 db = self._db 83 if db is None: 84 # Try to get it off Globals (Zope, zendmd) 85 db = getattr(Globals, 'DB', None) 86 if db is None: 87 # Open a connection (CmdBase) 88 connectionFactory = getUtility(IZodbFactoryLookup).get() 89 db, storage = connectionFactory.getConnection(**self.get_db_options()) 90 self._db = db 91 return db
92 93 @property
94 - def dmd(self):
95 """ 96 Use a well-known connection to get a reliable dmd object. 97 """ 98 closer = getattr(CONNECTION_ENVIRONMENT, self.CONN_MARKER, None) 99 100 if closer is None: 101 conn = self.db.open() 102 setattr(CONNECTION_ENVIRONMENT, self.CONN_MARKER, 103 ConnectionCloser(conn)) 104 newSecurityManager(None, AccessControl.User.system) 105 app = conn.root()['Application'] 106 # Configure zProperty descriptors 107 setDescriptors(app.zport.dmd) 108 else: 109 app = closer.connection.root()['Application'] 110 111 return app.zport.dmd
112 113 @property
114 - def jobmgr(self):
115 return self.dmd.JobManager
116
117 - def update(self, task_id, **properties):
118 """ 119 Store properties on a JobRecord. 120 """ 121 def _update(): 122 """ 123 Give the database time to sync incase a job record update 124 was received before the job was created 125 """ 126 try: 127 for i in range(5): 128 try: 129 log.debug("Updating job %s - Pass %d", task_id, i+1) 130 self.jobmgr.update(task_id, **properties) 131 transaction.commit() 132 return 133 except (NoSuchJobException, ConflictError): 134 log.debug("Unable to find Job %s, retrying \n%s", task_id, 135 traceback.format_exc()) 136 # Race condition. Wait. 137 time.sleep(0.25) 138 self.dmd._p_jar.sync() 139 140 log.warn("Unable to save properties %s to job %s", properties, task_id) 141 finally: 142 self.reset()
143 144 log.debug("Updating job %s", task_id) 145 t = threading.Thread(target=_update) 146 t.start() 147 t.join()
148
149 - def _store_result(self, task_id, result, status, traceback=None):
150 """ 151 Store return value and status of an executed task. 152 153 This runs in a separate thread with a short-lived connection, thereby 154 guaranteeing isolation from the current transaction. 155 """ 156 self.update(task_id, result=result, status=status, 157 date_done=datetime.utcnow(), traceback=traceback) 158 return result
159
160 - def _get_task_meta_for(self, task_id):
161 """ 162 Get task metadata for a task by id. 163 """ 164 return self.jobmgr.getJob(task_id)
165
166 - def wait_for(self, task_id, timeout=None, propagate=True, interval=0.5):
167 """ 168 Check status of a task and return its result when complete. 169 170 This runs in a separate thread with a short-lived connection, thereby 171 guaranteeing isolation from the current transaction. 172 """ 173 status = self.get_status(task_id) 174 if status in states.READY_STATES: 175 # Already done, no need to spin up a thread to poll 176 result = self.get_result(task_id) 177 else: 178 result_queue = Queue.Queue() 179 180 def do_wait(): 181 try: 182 time_elapsed = 0.0 183 while True: 184 self.jobmgr._p_jar.sync() 185 status = self.get_status(task_id) 186 if status in states.READY_STATES: 187 result_queue.put((status, self.get_result(task_id))) 188 return 189 # avoid hammering the CPU checking status. 190 time.sleep(interval) 191 time_elapsed += interval 192 if timeout and time_elapsed >= timeout: 193 raise TimeoutError("The operation timed out.") 194 finally: 195 self.reset()
196 197 t = threading.Thread(target=do_wait) 198 t.start() 199 t.join() 200 201 try: 202 status, result = result_queue.get_nowait() 203 except Queue.Empty: 204 return 205 206 if status in states.PROPAGATE_STATES and propagate: 207 raise result 208 else: 209 return result 210
211 - def _forget(self, task_id):
212 """ 213 Forget about a result. 214 """ 215 # TODO: implement 216 raise NotImplementedError("ZODBBackend does not support forget")
217
218 - def cleanup(self):
219 """ 220 Delete expired metadata. 221 """ 222 # TODO: implement 223 raise NotImplementedError("ZODBBackend does not support cleanup")
224
225 - def reset(self):
226 self._db = None 227 try: 228 delattr(CONNECTION_ENVIRONMENT, self.CONN_MARKER) 229 except AttributeError: 230 pass
231
232 - def process_cleanup(self):
233 self.reset()
234