Package Products :: Package ZenUtils :: Module zencatalog
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.zencatalog

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2010, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  import sys 
 12  import logging 
 13  from itertools import chain 
 14   
 15  from Globals import * 
 16   
 17  import transaction 
 18  from twisted.internet import defer, reactor, task 
 19  from OFS.ObjectManager import ObjectManager 
 20  from ZODB.POSException import ConflictError 
 21  from ZEO.Exceptions import ClientDisconnected 
 22  from ZEO.zrpc.error import DisconnectedError 
 23  from zope.component import getUtility 
 24  from Products.ZenRelations.ToManyContRelationship import ToManyContRelationship 
 25  from Products.ZenModel.ZenModelRM import ZenModelRM 
 26  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 27  from Products.Zuul.catalog.global_catalog import globalCatalogId, catalog_caching 
 28  from Products.Zuul.catalog.interfaces import IGlobalCatalogFactory 
 29   
 30  log = logging.getLogger("zen.Catalog") 
 31   
 32  # Hide connection errors. We handle them all ourselves. 
 33  HIGHER_THAN_CRITICAL = 100 
 34  logging.getLogger('ZODB.Connection').setLevel(HIGHER_THAN_CRITICAL) 
 35  logging.getLogger('ZEO.zrpc').setLevel(HIGHER_THAN_CRITICAL) 
 36   
 37  CHUNK_SIZE = 100 
38 39 -class DisconnectedDuringGenerator(Exception):
40 """ 41 A special exception that can be yielded during a generator and watched for. 42 This lets us react to connection exceptions in the generator without killing it. 43 """
44 - def __init__(self, value):
45 self.value = value
46
47 48 -def chunk(iterable, callback, reconnect_cb=lambda:None, size=1, delay=1):
49 """ 50 Iterate through a generator, splitting it into chunks of size C{size}, 51 calling C{callback(chunk)} on each. In case of a 52 L{DisconnectedDuringGenerator}, pause for C{delay} seconds, then call 53 C{reconnect_cb} and continue with the iteration. 54 55 This is used to walk through the database object by object without dying if 56 the database goes away or there's a C{ConflictError}. 57 """ 58 gen = iter(iterable) 59 60 # defer.inlineCallbacks means that Deferreds yielded from the function will 61 # execute their callbacks /in order/, blocking each other. 62 @defer.inlineCallbacks 63 def inner(gen=gen): 64 d = defer.Deferred() 65 d.addCallback(callback) 66 l = [] 67 while True: 68 try: 69 # Advance the iterator 70 n = gen.next() 71 except StopIteration: 72 # Iterator's exhausted. Call back with the possibly incomplete 73 # chunk, then stop. 74 if l: 75 d.callback(l) 76 yield d 77 break 78 else: 79 # We got a value from the iterator 80 if isinstance(n, DisconnectedDuringGenerator): 81 # The special exception was yielded, meaning the generator 82 # encountered an exception we want to handle by pausing. 83 # Push the value that broke back onto the front of the 84 # iterator. 85 gen = chain((n.value,), gen) 86 # Yield a C{Deferred} that will call back to 87 # C{reconnect_cb} in C{delay} seconds. Because we're using 88 # C{inlineCallbacks}, this will block the advance of the 89 # iterator. 90 yield task.deferLater(reactor, delay, reconnect_cb) 91 else: 92 # Normal value, add it to the chunk 93 l.append(n) 94 # If the chunk is complete, call back the Deferred, yield it, and 95 # start a new chunk 96 if len(l)==size: 97 d.callback(l) 98 l = [] 99 yield d 100 d = defer.Deferred() 101 d.addCallback(callback)
102 103 # return the C{Deferred} that comes from an C{inlineCallbacks} function. 104 return inner() 105
106 107 -class ZenCatalog(ZCmdBase):
108 name = 'zencatalog' 109
110 - def buildOptions(self):
111 """basic options setup sub classes can add more options here""" 112 ZCmdBase.buildOptions(self) 113 self.parser.add_option("--createcatalog", 114 action="store_true", 115 default=False, 116 help="Create global catalog and populate it") 117 self.parser.add_option("--forceindex", 118 action="store_true", 119 default=False, 120 help="works with --createcatalog to re-create index, if catalog exists it will be "\ 121 "dropped first") 122 self.parser.add_option("--reindex", 123 action="store_true", 124 default=False, 125 help="reindex existing catalog") 126 self.parser.add_option("--permissionsOnly", 127 action="store_true", 128 default=False, 129 help="Only works with --reindex, only update the permissions catalog")
130 131
132 - def run(self):
133 134 def stop(ignored): 135 reactor.stop()
136 137 def main(): 138 zport = self.dmd.getPhysicalRoot().zport 139 if self.options.createcatalog: 140 d = self._createCatalog(zport) 141 elif self.options.reindex: 142 d = self._reindex(zport) 143 d.addBoth(stop)
144 145 if not self.options.createcatalog and not self.options.reindex: 146 self.parser.error("Must use one of --createcatalog, --reindex") 147 reactor.callWhenRunning(main) 148 with catalog_caching(): 149 reactor.run() 150
151 - def _reindex(self, zport):
152 globalCat = self._getCatalog(zport) 153 154 if globalCat: 155 reindex_catalog(globalCat, self.options.permissionsOnly, not self.options.daemon) 156 else: 157 log.warning('Global Catalog does not exist, try --createcatalog option') 158 return defer.succeed(None)
159
160 - def _createCatalog(self, zport):
161 162 # Whether we reconnected after a recursion failure. Because the nested 163 # func has no access to this scope, make it a mutable. 164 _reconnect = [False] 165 166 167 catalog = self._getCatalog(zport) 168 factory = getUtility(IGlobalCatalogFactory) 169 if self.options.forceindex and catalog: 170 factory.remove(zport) 171 catalog = self._getCatalog(zport) 172 173 if catalog is None: 174 # Create the catalog 175 log.debug("Creating global catalog") 176 zport = self.dmd.getPhysicalRoot().zport 177 factory.create(zport) 178 catalog = self._getCatalog(zport) 179 transaction.commit() 180 else: 181 log.info('Global catalog already exists. Run with --forceindex to drop and recreate catalog') 182 return defer.succeed(None) 183 184 def recurse(obj): 185 if _reconnect[0]: 186 log.info('Reconnected.') 187 _reconnect.pop() 188 _reconnect.append(False) 189 try: 190 if isinstance(obj, ObjectManager): 191 # Bottom up, for multiple-path efficiency 192 for ob in obj.objectValues(): 193 for kid in recurse(ob): 194 yield kid 195 if isinstance(obj, ZenModelRM): 196 for rel in obj.getRelationships(): 197 if not isinstance(rel, ToManyContRelationship): 198 continue 199 for kid in rel.objectValuesGen(): 200 for gkid in recurse(kid): 201 yield gkid 202 yield obj 203 except (AttributeError, ClientDisconnected, DisconnectedError): 204 # Yield the special exception C{chunk} is watching for, so 205 # it'll pause and wait for a connection. Feed it the current 206 # object so it knows where to start from. 207 # We'll also catch AttributeErrors, which are thrown when 208 # ZenPacks get updated during the run. 209 log.info("Connection problem during object retrieval. " 210 "Trying again in 5 seconds...") 211 _reconnect.pop() 212 _reconnect.append(True) 213 yield DisconnectedDuringGenerator(obj)
214 215 def catalog_object(ob): 216 if hasattr(ob, 'index_object'): 217 ob.index_object() 218 catalog.catalog_object(ob) 219 log.debug('Catalogued object %s' % ob.absolute_url_path()) 220 221 # Count of catalogued objects. Because the nested func has no access to 222 # this scope, have to make it a mutable 223 i = [0] 224 225 def handle_chunk(c, d=None, _is_reconnect=False): 226 """ 227 Return a Deferred that will call back once the chunk has been 228 catalogued. In case of a conflict or disconnect, wait 5 seconds, then 229 try again. Because this is a callback chained to a C{chunk} Deferred 230 yielded from an C{inlineCallbacks} function, the next chunk will not be 231 created until this completes successfully. 232 """ 233 if d is None: 234 d = defer.Deferred() 235 self.syncdb() 236 try: 237 for ob in filter(None, c): 238 catalog_object(ob) 239 transaction.commit() 240 except ConflictError, e: 241 log.info('Conflict error during commit. Retrying...') 242 log.debug('Object in conflict: %r' % (self.app._p_jar[e.oid],)) 243 reactor.callLater(0, handle_chunk, c, d) 244 except (ClientDisconnected, DisconnectedError): 245 log.info('Connection problem during commit. ' 246 'Trying again in 5 seconds...') 247 reactor.callLater(5, handle_chunk, c, d, True) 248 else: 249 if _is_reconnect: 250 log.info('Reconnected.') 251 d.callback(None) 252 # Increment the count 253 i.append(i.pop()+len(c)) 254 if self.options.daemon: 255 log.info("Catalogued %s objects" % i[0]) 256 else: 257 sys.stdout.write('.') 258 sys.stdout.flush() 259 return d 260 261 def reconnect(): 262 """ 263 If we had a connection error, the db is probably in a weird state. 264 Reset it and try again. 265 """ 266 log.info("Reconnected.") 267 self.syncdb() 268 269 def set_flag(r): 270 """ 271 Set a flag in the database saying we've finished indexing. 272 """ 273 if not self.options.daemon: 274 sys.stdout.write('\n') 275 log.debug("Marking the indexing as completed in the database") 276 self.syncdb() 277 zport._zencatalog_completed = True 278 transaction.commit() 279 log.info("Reindexing completed.") 280 281 log.info("Reindexing your system. This may take some time.") 282 d = chunk(recurse(zport), handle_chunk, reconnect, CHUNK_SIZE, 5) 283 284 return d.addCallbacks(set_flag, log.exception) 285 286
287 - def _getCatalog(self, zport):
288 return getattr(zport, globalCatalogId, None)
289
290 -def reindex_catalog(globalCat, permissionsOnly=False, printProgress=True, commit=True):
291 with catalog_caching(): 292 msg = 'objects' 293 if permissionsOnly: 294 msg = 'permissions' 295 log.info('reindexing %s in catalog' % msg) 296 i = 0 297 catObj = globalCat.catalog_object 298 for brain in globalCat(): 299 log.debug('indexing %s' % brain.getPath()) 300 try: 301 obj = brain.getObject() 302 except Exception: 303 log.debug("Could not load object: %s" % brain.getPath()) 304 globalCat.uncatalog_object(brain.getPath()) 305 continue 306 if obj is not None: 307 #None defaults to all inedexs 308 kwargs = {} 309 if permissionsOnly: 310 kwargs = {'update_metadata': False, 311 'idxs': ("allowedRolesAndUsers",)} 312 elif hasattr(obj, 'index_object'): 313 obj.index_object() 314 315 catObj(obj, **kwargs) 316 log.debug('Catalogued object %s' % obj.absolute_url_path()) 317 else: 318 log.debug('%s does not exists' % brain.getPath()) 319 globalCat.uncatalog_object(brain.getPath()) 320 i += 1 321 if not i % CHUNK_SIZE: 322 if printProgress: 323 sys.stdout.write(".") 324 sys.stdout.flush() 325 else: 326 log.info('Catalogued %s objects' % i) 327 if commit: 328 transaction.commit() 329 if printProgress: 330 sys.stdout.write('\n') 331 sys.stdout.flush() 332 if commit: 333 transaction.commit()
334 335 if __name__ == "__main__": 336 zc = ZenCatalog() 337 try: 338 zc.run() 339 except Exception, e: 340 log.exception(e) 341