1
2
3
4
5
6
7
8
9
10
11 import Globals
12 import time
13 import threading
14 import Queue
15 import transaction
16 import logging
17 import traceback
18 from zope.component import getUtility
19 from ZODB.POSException import ConflictError
20 from datetime import datetime
21 from celery.backends.base import BaseDictBackend
22 from celery.exceptions import TimeoutError
23 import AccessControl.User
24 from AccessControl.SecurityManagement import newSecurityManager
25 from AccessControl.SecurityManagement import noSecurityManager
26 from Products.ZenUtils.celeryintegration import states
27 from Products.ZenUtils.ZodbFactory import IZodbFactoryLookup
28 from Products.Jobber.exceptions import NoSuchJobException
29 from Products.ZenRelations.ZenPropertyManager import setDescriptors
30 log = logging.getLogger("zen.celeryintegration")
31
32 CONNECTION_ENVIRONMENT = threading.local()
37 self.connection = connection
38
40 try:
41 transaction.abort()
42 except Exception:
43 pass
44 try:
45 noSecurityManager()
46 self.connection.close()
47 except Exception:
48 pass
49
52 """
53 ZODB result backend for Celery.
54 """
55 CONN_MARKER = 'ZODBBackendConnection'
56 _db = None
57
59 BaseDictBackend.__init__(self, *args, **kwargs)
60 self._db_lock = threading.Lock()
61
74
75 @property
77 """
78 Get a handle to the database by whatever means necessary
79 """
80 with self._db_lock:
81
82 db = self._db
83 if db is None:
84
85 db = getattr(Globals, 'DB', None)
86 if db is None:
87
88 connectionFactory = getUtility(IZodbFactoryLookup).get()
89 db, storage = connectionFactory.getConnection(**self.get_db_options())
90 self._db = db
91 return db
92
93 @property
112
113 @property
115 return self.dmd.JobManager
116
117 - def update(self, task_id, **properties):
118 """
119 Store properties on a JobRecord.
120 """
121 def _update():
122 """
123 Give the database time to sync incase a job record update
124 was received before the job was created
125 """
126 try:
127 for i in range(5):
128 try:
129 log.debug("Updating job %s - Pass %d", task_id, i+1)
130 self.jobmgr.update(task_id, **properties)
131 transaction.commit()
132 return
133 except (NoSuchJobException, ConflictError):
134 log.debug("Unable to find Job %s, retrying \n%s", task_id,
135 traceback.format_exc())
136
137 time.sleep(0.25)
138 self.dmd._p_jar.sync()
139
140 log.warn("Unable to save properties %s to job %s", properties, task_id)
141 finally:
142 self.reset()
143
144 log.debug("Updating job %s", task_id)
145 t = threading.Thread(target=_update)
146 t.start()
147 t.join()
148
149 - def _store_result(self, task_id, result, status, traceback=None):
150 """
151 Store return value and status of an executed task.
152
153 This runs in a separate thread with a short-lived connection, thereby
154 guaranteeing isolation from the current transaction.
155 """
156 self.update(task_id, result=result, status=status,
157 date_done=datetime.utcnow(), traceback=traceback)
158 return result
159
165
166 - def wait_for(self, task_id, timeout=None, propagate=True, interval=0.5):
167 """
168 Check status of a task and return its result when complete.
169
170 This runs in a separate thread with a short-lived connection, thereby
171 guaranteeing isolation from the current transaction.
172 """
173 status = self.get_status(task_id)
174 if status in states.READY_STATES:
175
176 result = self.get_result(task_id)
177 else:
178 result_queue = Queue.Queue()
179
180 def do_wait():
181 try:
182 time_elapsed = 0.0
183 while True:
184 self.jobmgr._p_jar.sync()
185 status = self.get_status(task_id)
186 if status in states.READY_STATES:
187 result_queue.put((status, self.get_result(task_id)))
188 return
189
190 time.sleep(interval)
191 time_elapsed += interval
192 if timeout and time_elapsed >= timeout:
193 raise TimeoutError("The operation timed out.")
194 finally:
195 self.reset()
196
197 t = threading.Thread(target=do_wait)
198 t.start()
199 t.join()
200
201 try:
202 status, result = result_queue.get_nowait()
203 except Queue.Empty:
204 return
205
206 if status in states.PROPAGATE_STATES and propagate:
207 raise result
208 else:
209 return result
210
212 """
213 Forget about a result.
214 """
215
216 raise NotImplementedError("ZODBBackend does not support forget")
217
219 """
220 Delete expired metadata.
221 """
222
223 raise NotImplementedError("ZODBBackend does not support cleanup")
224
231
234