1
2
3
4
5
6
7
8
9
10
11 import sys
12 import logging
13 from itertools import chain
14
15 from Globals import *
16
17 import transaction
18 from twisted.internet import defer, reactor, task
19 from OFS.ObjectManager import ObjectManager
20 from ZODB.POSException import ConflictError
21 from ZEO.Exceptions import ClientDisconnected
22 from ZEO.zrpc.error import DisconnectedError
23 from zope.component import getUtility
24 from Products.ZenRelations.ToManyContRelationship import ToManyContRelationship
25 from Products.ZenModel.ZenModelRM import ZenModelRM
26 from Products.ZenUtils.ZCmdBase import ZCmdBase
27 from Products.Zuul.catalog.global_catalog import globalCatalogId, catalog_caching
28 from Products.Zuul.catalog.interfaces import IGlobalCatalogFactory
29
30 log = logging.getLogger("zen.Catalog")
31
32
33 HIGHER_THAN_CRITICAL = 100
34 logging.getLogger('ZODB.Connection').setLevel(HIGHER_THAN_CRITICAL)
35 logging.getLogger('ZEO.zrpc').setLevel(HIGHER_THAN_CRITICAL)
36
37 CHUNK_SIZE = 100
40 """
41 A special exception that can be yielded during a generator and watched for.
42 This lets us react to connection exceptions in the generator without killing it.
43 """
46
47
48 -def chunk(iterable, callback, reconnect_cb=lambda:None, size=1, delay=1):
49 """
50 Iterate through a generator, splitting it into chunks of size C{size},
51 calling C{callback(chunk)} on each. In case of a
52 L{DisconnectedDuringGenerator}, pause for C{delay} seconds, then call
53 C{reconnect_cb} and continue with the iteration.
54
55 This is used to walk through the database object by object without dying if
56 the database goes away or there's a C{ConflictError}.
57 """
58 gen = iter(iterable)
59
60
61
62 @defer.inlineCallbacks
63 def inner(gen=gen):
64 d = defer.Deferred()
65 d.addCallback(callback)
66 l = []
67 while True:
68 try:
69
70 n = gen.next()
71 except StopIteration:
72
73
74 if l:
75 d.callback(l)
76 yield d
77 break
78 else:
79
80 if isinstance(n, DisconnectedDuringGenerator):
81
82
83
84
85 gen = chain((n.value,), gen)
86
87
88
89
90 yield task.deferLater(reactor, delay, reconnect_cb)
91 else:
92
93 l.append(n)
94
95
96 if len(l)==size:
97 d.callback(l)
98 l = []
99 yield d
100 d = defer.Deferred()
101 d.addCallback(callback)
102
103
104 return inner()
105
108 name = 'zencatalog'
109
111 """basic options setup sub classes can add more options here"""
112 ZCmdBase.buildOptions(self)
113 self.parser.add_option("--createcatalog",
114 action="store_true",
115 default=False,
116 help="Create global catalog and populate it")
117 self.parser.add_option("--forceindex",
118 action="store_true",
119 default=False,
120 help="works with --createcatalog to re-create index, if catalog exists it will be "\
121 "dropped first")
122 self.parser.add_option("--reindex",
123 action="store_true",
124 default=False,
125 help="reindex existing catalog")
126 self.parser.add_option("--permissionsOnly",
127 action="store_true",
128 default=False,
129 help="Only works with --reindex, only update the permissions catalog")
130
131
133
134 def stop(ignored):
135 reactor.stop()
136
137 def main():
138 zport = self.dmd.getPhysicalRoot().zport
139 if self.options.createcatalog:
140 d = self._createCatalog(zport)
141 elif self.options.reindex:
142 d = self._reindex(zport)
143 d.addBoth(stop)
144
145 if not self.options.createcatalog and not self.options.reindex:
146 self.parser.error("Must use one of --createcatalog, --reindex")
147 reactor.callWhenRunning(main)
148 with catalog_caching():
149 reactor.run()
150
152 globalCat = self._getCatalog(zport)
153
154 if globalCat:
155 reindex_catalog(globalCat, self.options.permissionsOnly, not self.options.daemon)
156 else:
157 log.warning('Global Catalog does not exist, try --createcatalog option')
158 return defer.succeed(None)
159
161
162
163
164 _reconnect = [False]
165
166
167 catalog = self._getCatalog(zport)
168 factory = getUtility(IGlobalCatalogFactory)
169 if self.options.forceindex and catalog:
170 factory.remove(zport)
171 catalog = self._getCatalog(zport)
172
173 if catalog is None:
174
175 log.debug("Creating global catalog")
176 zport = self.dmd.getPhysicalRoot().zport
177 factory.create(zport)
178 catalog = self._getCatalog(zport)
179 transaction.commit()
180 else:
181 log.info('Global catalog already exists. Run with --forceindex to drop and recreate catalog')
182 return defer.succeed(None)
183
184 def recurse(obj):
185 if _reconnect[0]:
186 log.info('Reconnected.')
187 _reconnect.pop()
188 _reconnect.append(False)
189 try:
190 if isinstance(obj, ObjectManager):
191
192 for ob in obj.objectValues():
193 for kid in recurse(ob):
194 yield kid
195 if isinstance(obj, ZenModelRM):
196 for rel in obj.getRelationships():
197 if not isinstance(rel, ToManyContRelationship):
198 continue
199 for kid in rel.objectValuesGen():
200 for gkid in recurse(kid):
201 yield gkid
202 yield obj
203 except (AttributeError, ClientDisconnected, DisconnectedError):
204
205
206
207
208
209 log.info("Connection problem during object retrieval. "
210 "Trying again in 5 seconds...")
211 _reconnect.pop()
212 _reconnect.append(True)
213 yield DisconnectedDuringGenerator(obj)
214
215 def catalog_object(ob):
216 if hasattr(ob, 'index_object'):
217 ob.index_object()
218 catalog.catalog_object(ob)
219 log.debug('Catalogued object %s' % ob.absolute_url_path())
220
221
222
223 i = [0]
224
225 def handle_chunk(c, d=None, _is_reconnect=False):
226 """
227 Return a Deferred that will call back once the chunk has been
228 catalogued. In case of a conflict or disconnect, wait 5 seconds, then
229 try again. Because this is a callback chained to a C{chunk} Deferred
230 yielded from an C{inlineCallbacks} function, the next chunk will not be
231 created until this completes successfully.
232 """
233 if d is None:
234 d = defer.Deferred()
235 self.syncdb()
236 try:
237 for ob in filter(None, c):
238 catalog_object(ob)
239 transaction.commit()
240 except ConflictError, e:
241 log.info('Conflict error during commit. Retrying...')
242 log.debug('Object in conflict: %r' % (self.app._p_jar[e.oid],))
243 reactor.callLater(0, handle_chunk, c, d)
244 except (ClientDisconnected, DisconnectedError):
245 log.info('Connection problem during commit. '
246 'Trying again in 5 seconds...')
247 reactor.callLater(5, handle_chunk, c, d, True)
248 else:
249 if _is_reconnect:
250 log.info('Reconnected.')
251 d.callback(None)
252
253 i.append(i.pop()+len(c))
254 if self.options.daemon:
255 log.info("Catalogued %s objects" % i[0])
256 else:
257 sys.stdout.write('.')
258 sys.stdout.flush()
259 return d
260
261 def reconnect():
262 """
263 If we had a connection error, the db is probably in a weird state.
264 Reset it and try again.
265 """
266 log.info("Reconnected.")
267 self.syncdb()
268
269 def set_flag(r):
270 """
271 Set a flag in the database saying we've finished indexing.
272 """
273 if not self.options.daemon:
274 sys.stdout.write('\n')
275 log.debug("Marking the indexing as completed in the database")
276 self.syncdb()
277 zport._zencatalog_completed = True
278 transaction.commit()
279 log.info("Reindexing completed.")
280
281 log.info("Reindexing your system. This may take some time.")
282 d = chunk(recurse(zport), handle_chunk, reconnect, CHUNK_SIZE, 5)
283
284 return d.addCallbacks(set_flag, log.exception)
285
286
288 return getattr(zport, globalCatalogId, None)
289
290 -def reindex_catalog(globalCat, permissionsOnly=False, printProgress=True, commit=True):
291 with catalog_caching():
292 msg = 'objects'
293 if permissionsOnly:
294 msg = 'permissions'
295 log.info('reindexing %s in catalog' % msg)
296 i = 0
297 catObj = globalCat.catalog_object
298 for brain in globalCat():
299 log.debug('indexing %s' % brain.getPath())
300 try:
301 obj = brain.getObject()
302 except Exception:
303 log.debug("Could not load object: %s" % brain.getPath())
304 globalCat.uncatalog_object(brain.getPath())
305 continue
306 if obj is not None:
307
308 kwargs = {}
309 if permissionsOnly:
310 kwargs = {'update_metadata': False,
311 'idxs': ("allowedRolesAndUsers",)}
312 elif hasattr(obj, 'index_object'):
313 obj.index_object()
314
315 catObj(obj, **kwargs)
316 log.debug('Catalogued object %s' % obj.absolute_url_path())
317 else:
318 log.debug('%s does not exists' % brain.getPath())
319 globalCat.uncatalog_object(brain.getPath())
320 i += 1
321 if not i % CHUNK_SIZE:
322 if printProgress:
323 sys.stdout.write(".")
324 sys.stdout.flush()
325 else:
326 log.info('Catalogued %s objects' % i)
327 if commit:
328 transaction.commit()
329 if printProgress:
330 sys.stdout.write('\n')
331 sys.stdout.flush()
332 if commit:
333 transaction.commit()
334
335 if __name__ == "__main__":
336 zc = ZenCatalog()
337 try:
338 zc.run()
339 except Exception, e:
340 log.exception(e)
341