1
2
3
4
5
6
7
8
9
10
11
12
13 import sys
14 import logging
15 from itertools import chain
16
17 from Globals import *
18
19 import transaction
20 from twisted.internet import defer, reactor, task
21 from OFS.ObjectManager import ObjectManager
22 from ZODB.POSException import ConflictError
23 from ZEO.Exceptions import ClientDisconnected
24 from ZEO.zrpc.error import DisconnectedError
25 from Products.ZenRelations.ToManyContRelationship import ToManyContRelationship
26 from Products.ZenModel.ZenModelRM import ZenModelRM
27 from Products.ZenUtils.ZCmdBase import ZCmdBase
28 from Products.Zuul.catalog.global_catalog import createGlobalCatalog
29
30 log = logging.getLogger("zen.Catalog")
31
32
33 HIGHER_THAN_CRITICAL = 100
34 logging.getLogger('ZODB.Connection').setLevel(HIGHER_THAN_CRITICAL)
35 logging.getLogger('ZEO.zrpc').setLevel(HIGHER_THAN_CRITICAL)
36
37 CHUNK_SIZE = 100
40 """
41 A special exception that can be yielded during a generator and watched for.
42 This lets us react to connection exceptions in the generator without killing it.
43 """
46
47
48 -def chunk(iterable, callback, reconnect_cb=lambda:None, size=1, delay=1):
49 """
50 Iterate through a generator, splitting it into chunks of size C{size},
51 calling C{callback(chunk)} on each. In case of a
52 L{DisconnectedDuringGenerator}, pause for C{delay} seconds, then call
53 C{reconnect_cb} and continue with the iteration.
54
55 This is used to walk through the database object by object without dying if
56 the database goes away or there's a C{ConflictError}.
57 """
58 gen = iter(iterable)
59
60
61
62 @defer.inlineCallbacks
63 def inner(gen=gen):
64 d = defer.Deferred()
65 d.addCallback(callback)
66 l = []
67 while True:
68 try:
69
70 n = gen.next()
71 except StopIteration:
72
73
74 if l:
75 d.callback(l)
76 yield d
77 break
78 else:
79
80 if isinstance(n, DisconnectedDuringGenerator):
81
82
83
84
85 gen = chain((n.value,), gen)
86
87
88
89
90 yield task.deferLater(reactor, delay, reconnect_cb)
91 else:
92
93 l.append(n)
94
95
96 if len(l)==size:
97 d.callback(l)
98 l = []
99 yield d
100 d = defer.Deferred()
101 d.addCallback(callback)
102
103
104 return inner()
105
108 name = 'zencatalog'
109
111 """basic options setup sub classes can add more options here"""
112 ZCmdBase.buildOptions(self)
113 self.parser.add_option("--createcatalog",
114 action="store_true",
115 default=False,
116 help="Create global catalog and populate it")
117 self.parser.add_option("--forceindex",
118 action="store_true",
119 default=False,
120 help="works with --createcatalog to create index"\
121 " even if catalog exists")
122 self.parser.add_option("--reindex",
123 action="store_true",
124 default=False,
125 help="reindex existing catalog")
126
128
129 def stop(ignored):
130 reactor.stop()
131
132 def main():
133 zport = self.dmd.getPhysicalRoot().zport
134 if self.options.createcatalog:
135 d = self._createCatalog(zport)
136 elif self.options.reindex:
137 d = self._reindex(zport)
138 d.addBoth(stop)
139
140 reactor.callWhenRunning(main)
141 reactor.run()
142
144 globalCat = self._getCatalog(zport)
145
146 if globalCat:
147 log.info('reindexing objects in catalog')
148 i = 0
149 catObj = globalCat.catalog_object
150 for brain in globalCat():
151 log.debug('indexing %s' % brain.getPath())
152 obj = brain.getObject()
153 if obj is not None:
154 if hasattr(obj, 'index_object'):
155 obj.index_object()
156 catObj(obj)
157 log.debug('Catalogued object %s' % obj.absolute_url_path())
158 else:
159 log.debug('%s does not exists' % brain.getPath())
160
161 i += 1
162 if not i % CHUNK_SIZE:
163 if not self.options.daemon:
164 sys.stdout.write(".")
165 sys.stdout.flush()
166 else:
167 log.info('Catalogued %s objects' % i)
168 transaction.commit()
169 transaction.commit()
170 else:
171 log.warning('Global Catalog does not exist, try --createcatalog option')
172
174
175
176
177 _reconnect = [False]
178
179 catalog = self._getCatalog(zport)
180 if catalog is None:
181 log.info('Global catalog already exists.')
182
183 createGlobalCatalog(zport)
184 catalog = self._getCatalog(zport)
185
186 def recurse(obj):
187 if _reconnect[0]:
188 log.info('Reconnected.')
189 _reconnect.pop()
190 _reconnect.append(False)
191 try:
192 if isinstance(obj, ObjectManager):
193
194 for ob in obj.objectValues():
195 for kid in recurse(ob):
196 yield kid
197 if isinstance(obj, ZenModelRM):
198 for rel in obj.getRelationships():
199 if not isinstance(rel, ToManyContRelationship):
200 continue
201 for kid in rel.objectValuesGen():
202 for gkid in recurse(kid):
203 yield gkid
204 yield obj
205 except (AttributeError, ClientDisconnected, DisconnectedError):
206
207
208
209
210
211 log.info("Connection problem during object retrieval. "
212 "Trying again in 5 seconds...")
213 _reconnect.pop()
214 _reconnect.append(True)
215 yield DisconnectedDuringGenerator(obj)
216
217 def catalog_object(ob):
218 if hasattr(ob, 'index_object'):
219 ob.index_object()
220 catalog.catalog_object(ob)
221 log.debug('Catalogued object %s' % ob.absolute_url_path())
222
223
224
225 i = [0]
226
227 def handle_chunk(c, d=None, _is_reconnect=False):
228 """
229 Return a Deferred that will call back once the chunk has been
230 catalogued. In case of a conflict or disconnect, wait 5 seconds, then
231 try again. Because this is a callback chained to a C{chunk} Deferred
232 yielded from an C{inlineCallbacks} function, the next chunk will not be
233 created until this completes successfully.
234 """
235 if d is None:
236 d = defer.Deferred()
237 self.syncdb()
238 try:
239 for ob in filter(None, c):
240 catalog_object(ob)
241 transaction.commit()
242 except ConflictError, e:
243 log.info('Conflict error during commit. Retrying...')
244 log.debug('Object in conflict: %r' % (self.app._p_jar[e.oid],))
245 reactor.callLater(0, handle_chunk, c, d)
246 except (ClientDisconnected, DisconnectedError):
247 log.info('Connection problem during commit. '
248 'Trying again in 5 seconds...')
249 reactor.callLater(5, handle_chunk, c, d, True)
250 else:
251 if _is_reconnect:
252 log.info('Reconnected.')
253 d.callback(None)
254
255 i.append(i.pop()+len(c))
256 if self.options.daemon:
257 log.info("Catalogued %s objects" % i[0])
258 else:
259 sys.stdout.write('.')
260 sys.stdout.flush()
261 return d
262
263 def reconnect():
264 """
265 If we had a connection error, the db is probably in a weird state.
266 Reset it and try again.
267 """
268 log.info("Reconnected.")
269 self.syncdb()
270
271 def set_flag(r):
272 """
273 Set a flag in the database saying we've finished indexing.
274 """
275 if self.options.daemon:
276 sys.stdout.write('\n')
277 log.debug("Marking the indexing as completed in the database")
278 self.syncdb()
279 zport._zencatalog_completed = True
280 transaction.commit()
281
282 log.info("Reindexing your system. This may take some time.")
283 d = chunk(recurse(zport), handle_chunk, reconnect, CHUNK_SIZE, 5)
284
285 return d.addCallbacks(set_flag, log.exception)
286
287
289 return getattr(zport, 'global_catalog', None)
290
291
292 if __name__ == "__main__":
293 zc = ZenCatalog()
294 try:
295 zc.run()
296 except Exception, e:
297 log.exception(e)
298