Package Products :: Package ZenUtils :: Module zencatalog
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.zencatalog

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2010, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  import sys 
 14  import logging 
 15  from itertools import chain 
 16   
 17  from Globals import * 
 18   
 19  import transaction 
 20  from twisted.internet import defer, reactor, task 
 21  from OFS.ObjectManager import ObjectManager 
 22  from ZODB.POSException import ConflictError 
 23  from ZEO.Exceptions import ClientDisconnected 
 24  from ZEO.zrpc.error import DisconnectedError 
 25  from Products.ZenRelations.ToManyContRelationship import ToManyContRelationship 
 26  from Products.ZenModel.ZenModelRM import ZenModelRM 
 27  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 28  from Products.Zuul.catalog.global_catalog import createGlobalCatalog 
 29   
 30  log = logging.getLogger("zen.Catalog") 
 31   
 32  # Hide connection errors. We handle them all ourselves. 
 33  HIGHER_THAN_CRITICAL = 100 
 34  logging.getLogger('ZODB.Connection').setLevel(HIGHER_THAN_CRITICAL) 
 35  logging.getLogger('ZEO.zrpc').setLevel(HIGHER_THAN_CRITICAL) 
 36   
 37  CHUNK_SIZE = 100 
38 39 -class DisconnectedDuringGenerator(Exception):
40 """ 41 A special exception that can be yielded during a generator and watched for. 42 This lets us react to connection exceptions in the generator without killing it. 43 """
44 - def __init__(self, value):
45 self.value = value
46
47 48 -def chunk(iterable, callback, reconnect_cb=lambda:None, size=1, delay=1):
49 """ 50 Iterate through a generator, splitting it into chunks of size C{size}, 51 calling C{callback(chunk)} on each. In case of a 52 L{DisconnectedDuringGenerator}, pause for C{delay} seconds, then call 53 C{reconnect_cb} and continue with the iteration. 54 55 This is used to walk through the database object by object without dying if 56 the database goes away or there's a C{ConflictError}. 57 """ 58 gen = iter(iterable) 59 60 # defer.inlineCallbacks means that Deferreds yielded from the function will 61 # execute their callbacks /in order/, blocking each other. 62 @defer.inlineCallbacks 63 def inner(gen=gen): 64 d = defer.Deferred() 65 d.addCallback(callback) 66 l = [] 67 while True: 68 try: 69 # Advance the iterator 70 n = gen.next() 71 except StopIteration: 72 # Iterator's exhausted. Call back with the possibly incomplete 73 # chunk, then stop. 74 if l: 75 d.callback(l) 76 yield d 77 break 78 else: 79 # We got a value from the iterator 80 if isinstance(n, DisconnectedDuringGenerator): 81 # The special exception was yielded, meaning the generator 82 # encountered an exception we want to handle by pausing. 83 # Push the value that broke back onto the front of the 84 # iterator. 85 gen = chain((n.value,), gen) 86 # Yield a C{Deferred} that will call back to 87 # C{reconnect_cb} in C{delay} seconds. Because we're using 88 # C{inlineCallbacks}, this will block the advance of the 89 # iterator. 90 yield task.deferLater(reactor, delay, reconnect_cb) 91 else: 92 # Normal value, add it to the chunk 93 l.append(n) 94 # If the chunk is complete, call back the Deferred, yield it, and 95 # start a new chunk 96 if len(l)==size: 97 d.callback(l) 98 l = [] 99 yield d 100 d = defer.Deferred() 101 d.addCallback(callback)
102 103 # return the C{Deferred} that comes from an C{inlineCallbacks} function. 104 return inner() 105
106 107 -class ZenCatalog(ZCmdBase):
108 name = 'zencatalog' 109
110 - def buildOptions(self):
111 """basic options setup sub classes can add more options here""" 112 ZCmdBase.buildOptions(self) 113 self.parser.add_option("--createcatalog", 114 action="store_true", 115 default=False, 116 help="Create global catalog and populate it") 117 self.parser.add_option("--forceindex", 118 action="store_true", 119 default=False, 120 help="works with --createcatalog to create index"\ 121 " even if catalog exists") 122 self.parser.add_option("--reindex", 123 action="store_true", 124 default=False, 125 help="reindex existing catalog")
126
127 - def run(self):
128 129 def stop(ignored): 130 reactor.stop()
131 132 def main(): 133 zport = self.dmd.getPhysicalRoot().zport 134 if self.options.createcatalog: 135 d = self._createCatalog(zport) 136 elif self.options.reindex: 137 d = self._reindex(zport) 138 d.addBoth(stop)
139 140 reactor.callWhenRunning(main) 141 reactor.run() 142
143 - def _reindex(self, zport):
144 globalCat = self._getCatalog(zport) 145 146 if globalCat: 147 log.info('reindexing objects in catalog') 148 i = 0 149 catObj = globalCat.catalog_object 150 for brain in globalCat(): 151 log.debug('indexing %s' % brain.getPath()) 152 obj = brain.getObject() 153 if obj is not None: 154 if hasattr(obj, 'index_object'): 155 obj.index_object() 156 catObj(obj) 157 log.debug('Catalogued object %s' % obj.absolute_url_path()) 158 else: 159 log.debug('%s does not exists' % brain.getPath()) 160 #TODO uncatalog object 161 i += 1 162 if not i % CHUNK_SIZE: 163 if not self.options.daemon: 164 sys.stdout.write(".") 165 sys.stdout.flush() 166 else: 167 log.info('Catalogued %s objects' % i) 168 transaction.commit() 169 transaction.commit() 170 else: 171 log.warning('Global Catalog does not exist, try --createcatalog option')
172
173 - def _createCatalog(self, zport):
174 175 # Whether we reconnected after a recursion failure. Because the nested 176 # func has no access to this scope, make it a mutable. 177 _reconnect = [False] 178 179 catalog = self._getCatalog(zport) 180 if catalog is None: 181 log.info('Global catalog already exists.') 182 # Create the catalog 183 createGlobalCatalog(zport) 184 catalog = self._getCatalog(zport) 185 186 def recurse(obj): 187 if _reconnect[0]: 188 log.info('Reconnected.') 189 _reconnect.pop() 190 _reconnect.append(False) 191 try: 192 if isinstance(obj, ObjectManager): 193 # Bottom up, for multiple-path efficiency 194 for ob in obj.objectValues(): 195 for kid in recurse(ob): 196 yield kid 197 if isinstance(obj, ZenModelRM): 198 for rel in obj.getRelationships(): 199 if not isinstance(rel, ToManyContRelationship): 200 continue 201 for kid in rel.objectValuesGen(): 202 for gkid in recurse(kid): 203 yield gkid 204 yield obj 205 except (AttributeError, ClientDisconnected, DisconnectedError): 206 # Yield the special exception C{chunk} is watching for, so 207 # it'll pause and wait for a connection. Feed it the current 208 # object so it knows where to start from. 209 # We'll also catch AttributeErrors, which are thrown when 210 # ZenPacks get updated during the run. 211 log.info("Connection problem during object retrieval. " 212 "Trying again in 5 seconds...") 213 _reconnect.pop() 214 _reconnect.append(True) 215 yield DisconnectedDuringGenerator(obj)
216 217 def catalog_object(ob): 218 if hasattr(ob, 'index_object'): 219 ob.index_object() 220 catalog.catalog_object(ob) 221 log.debug('Catalogued object %s' % ob.absolute_url_path()) 222 223 # Count of catalogued objects. Because the nested func has no access to 224 # this scope, have to make it a mutable 225 i = [0] 226 227 def handle_chunk(c, d=None, _is_reconnect=False): 228 """ 229 Return a Deferred that will call back once the chunk has been 230 catalogued. In case of a conflict or disconnect, wait 5 seconds, then 231 try again. Because this is a callback chained to a C{chunk} Deferred 232 yielded from an C{inlineCallbacks} function, the next chunk will not be 233 created until this completes successfully. 234 """ 235 if d is None: 236 d = defer.Deferred() 237 self.syncdb() 238 try: 239 for ob in filter(None, c): 240 catalog_object(ob) 241 transaction.commit() 242 except ConflictError, e: 243 log.info('Conflict error during commit. Retrying...') 244 log.debug('Object in conflict: %r' % (self.app._p_jar[e.oid],)) 245 reactor.callLater(0, handle_chunk, c, d) 246 except (ClientDisconnected, DisconnectedError): 247 log.info('Connection problem during commit. ' 248 'Trying again in 5 seconds...') 249 reactor.callLater(5, handle_chunk, c, d, True) 250 else: 251 if _is_reconnect: 252 log.info('Reconnected.') 253 d.callback(None) 254 # Increment the count 255 i.append(i.pop()+len(c)) 256 if self.options.daemon: 257 log.info("Catalogued %s objects" % i[0]) 258 else: 259 sys.stdout.write('.') 260 sys.stdout.flush() 261 return d 262 263 def reconnect(): 264 """ 265 If we had a connection error, the db is probably in a weird state. 266 Reset it and try again. 267 """ 268 log.info("Reconnected.") 269 self.syncdb() 270 271 def set_flag(r): 272 """ 273 Set a flag in the database saying we've finished indexing. 274 """ 275 if self.options.daemon: 276 sys.stdout.write('\n') 277 log.debug("Marking the indexing as completed in the database") 278 self.syncdb() 279 zport._zencatalog_completed = True 280 transaction.commit() 281 282 log.info("Reindexing your system. This may take some time.") 283 d = chunk(recurse(zport), handle_chunk, reconnect, CHUNK_SIZE, 5) 284 285 return d.addCallbacks(set_flag, log.exception) 286 287
288 - def _getCatalog(self, zport):
289 return getattr(zport, 'global_catalog', None)
290 291 292 if __name__ == "__main__": 293 zc = ZenCatalog() 294 try: 295 zc.run() 296 except Exception, e: 297 log.exception(e) 298