1
2
3
4
5
6
7
8
9
10
11 import sys
12 from collections import defaultdict
13 import threading
14 import Queue
15 import logging
16 log = logging.getLogger("zen.ApplyDataMap")
17
18 import transaction
19
20 from ZODB.transact import transact
21 from zope.event import notify
22 from zope.container.contained import ObjectRemovedEvent, ObjectMovedEvent
23 from zope.container.contained import ObjectAddedEvent
24 from Acquisition import aq_base
25
26 from Products.ZenUtils.Utils import importClass, getObjByPath
27 from Products.Zuul.catalog.events import IndexingEvent
28 from Exceptions import ObjectCreationError
29 from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked
30 from Products.ZenModel.Lockable import Lockable
31 from Products.ZenEvents import Event
32 from zExceptions import NotFound
33
34 zenmarker = "__ZENMARKER__"
35
36 CLASSIFIER_CLASS = '/Classifier'
37
38 _notAscii = dict.fromkeys(range(128,256), u'?')
42 """
43 A more comprehensive check to see if existing model data is the same as
44 newly modeled data. The primary focus is comparing unsorted lists of
45 dictionaries.
46 """
47 if isinstance(x, (tuple, list)) and isinstance(y, (tuple, list)):
48 if len(x) > 0 and len(y) > 0 \
49 and isinstance(x[0], dict) and isinstance(y[0], dict):
50
51 x = set( tuple(sorted(d.items())) for d in x )
52 y = set( tuple(sorted(d.items())) for d in y )
53 else:
54 return sorted(x) == sorted(y)
55
56 return x == y
57
60
62 self.datacollector = datacollector
63
64
65 - def logChange(self, device, compname, eventClass, msg):
70
71
72 - def logEvent(self, device, component, eventClass, msg, severity):
73 ''' Used to report a change to a device model. Logs the given msg
74 to log.info and creates an event.
75 '''
76 device = device.device()
77 compname = ""
78 try:
79 compname = getattr(component, 'id', component)
80 if device.id == compname:
81 compname = ""
82 except: pass
83 log.debug(msg)
84 devname = device.device().id
85 if (self.datacollector
86
87
88 and getattr(self.datacollector, 'dmd', None)):
89 eventDict = {
90 'eventClass': eventClass,
91 'device': devname,
92 'component': compname,
93 'summary': msg,
94 'severity': severity,
95 'agent': 'ApplyDataMap',
96 'explanation': "Event sent as zCollectorLogChanges is True",
97 }
98 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
99
100
102 """
103 A modeler plugin specifies the protocol (eg SNMP, WMI) and
104 the specific data to retrieve from the device (eg an OID).
105 This data is then processed by the modeler plugin and then
106 passed to this method to apply the results to the ZODB.
107
108 @parameter device: DMD device object
109 @type device: DMD device object
110 @parameter collectorClient: results of modeling
111 @type collectorClient: DMD object
112 """
113 log.debug("Processing data for device %s", device.id)
114 devchanged = False
115 try:
116 for pname, results in collectorClient.getResults():
117 log.debug("Processing plugin %s on device %s", pname, device.id)
118 if not results:
119 log.warn("Plugin %s did not return any results", pname)
120 continue
121 plugin = self.datacollector.collectorPlugins.get(pname, None)
122 if not plugin:
123 log.warn("Unable to get plugin %s from %s", pname,
124 self.datacollector.collectorPlugins)
125 continue
126
127 results = plugin.preprocess(results, log)
128 datamaps = plugin.process(device, results, log)
129
130 if not isinstance(datamaps, (list, tuple, set)):
131 datamaps = [datamaps,]
132 for datamap in datamaps:
133 changed = self._applyDataMap(device, datamap)
134 if changed: devchanged=True
135 if devchanged:
136 device.setLastChange()
137 log.info("Changes applied")
138 else:
139 log.info("No change detected")
140 device.setSnmpLastCollection()
141 trans = transaction.get()
142 trans.setUser("datacoll")
143 trans.note("data applied from automated collection")
144 trans.commit()
145 except (SystemExit, KeyboardInterrupt):
146 raise
147 except:
148 transaction.abort()
149 log.exception("Plugin %s device %s", pname, device.getId())
150
151
152 - def applyDataMap(self, device, datamap, relname="", compname="", modname=""):
162
163
171
172
173 @transact
175 """Apply a datamap to a device.
176 """
177
178 if not hasattr(device.dmd, 'zport'):
179 transaction.abort()
180
181 if hasattr(datamap, "compname"):
182 if datamap.compname:
183 try:
184 tobj = device.getObjByPath(datamap.compname)
185 except NotFound:
186 log.warn("Unable to find compname '%s'" % datamap.compname)
187 return False
188 else:
189 tobj = device
190 if hasattr(datamap, "relname"):
191 changed = self._updateRelationship(tobj, datamap)
192 elif hasattr(datamap, 'modname'):
193 changed = self._updateObject(tobj, datamap)
194 else:
195 changed = False
196 log.warn("plugin returned unknown map skipping")
197 else:
198 changed = False
199 if not changed:
200 transaction.abort()
201 else:
202 device.setLastChange()
203 trans = transaction.get()
204 trans.setUser("datacoll")
205 trans.note("data applied from automated collection")
206 return changed
207
208
210 """Add/Update/Remote objects to the target relationship.
211 """
212 changed = False
213 rname = relmap.relname
214 rel = getattr(device, rname, None)
215 if not rel:
216 log.warn("no relationship:%s found on:%s (%s %s)",
217 relmap.relname, device.id, device.__class__, device.zPythonClass)
218 return changed
219 relids = rel.objectIdsAll()
220 seenids = defaultdict(int)
221 for objmap in relmap:
222 from Products.ZenModel.ZenModelRM import ZenModelRM
223 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'):
224 objmap_id = objmap.id
225 seenids[objmap_id] += 1
226 if seenids[objmap_id] > 1:
227 objmap_id = objmap.id = "%s_%s" % (objmap_id, seenids[objmap_id])
228 if objmap_id in relids:
229 obj = rel._getOb(objmap_id)
230
231
232
233 existing_modname = ''
234 existing_classname = ''
235 try:
236 import inspect
237 existing_modname = inspect.getmodule(obj).__name__
238 existing_classname = obj.__class__.__name__
239 except:
240 pass
241
242 if objmap.modname == existing_modname and \
243 objmap.classname in ('', existing_classname):
244
245 objchange = self._updateObject(obj, objmap)
246 if not changed: changed = objchange
247 else:
248 rel._delObject(objmap_id)
249 objchange, obj = self._createRelObject(device, objmap, rname)
250 if not changed: changed = objchange
251
252 if objmap_id in relids: relids.remove(objmap_id)
253 else:
254 objchange, obj = self._createRelObject(device, objmap, rname)
255 if objchange: changed = True
256 if obj and obj.id in relids: relids.remove(obj.id)
257 elif isinstance(objmap, ZenModelRM):
258 self.logChange(device, objmap.id, Change_Add,
259 "linking object %s to device %s relation %s" % (
260 objmap.id, device.id, rname))
261 device.addRelation(rname, objmap)
262 changed = True
263 else:
264 objchange, obj = self._createRelObject(device, objmap, rname)
265 if objchange: changed = True
266 if obj and obj.id in relids: relids.remove(obj.id)
267
268 for id in relids:
269 obj = rel._getOb(id)
270 if isinstance(obj, Lockable) and obj.isLockedFromDeletion():
271 objname = obj.id
272 try: objname = obj.name()
273 except: pass
274 msg = "Deletion Blocked: %s '%s' on %s" % (
275 obj.meta_type, objname,obj.device().id)
276 log.warn(msg)
277 if obj.sendEventWhenBlocked():
278 self.logEvent(device, obj, Change_Remove_Blocked,
279 msg, Event.Warning)
280 continue
281 self.logChange(device, obj, Change_Remove,
282 "removing object %s from rel %s on device %s" % (
283 id, rname, device.id))
284 rel._delObject(id)
285 if relids: changed=True
286 return changed
287
288
290 """Update an object using a objmap.
291 """
292 changed = False
293 device = obj.device()
294
295 if isinstance(obj, Lockable) and obj.isLockedFromUpdates():
296 if device.id == obj.id:
297 msg = 'Update Blocked: %s' % device.id
298 else:
299 objname = obj.id
300 try: objname = obj.name()
301 except: pass
302 msg = "Update Blocked: %s '%s' on %s" % (
303 obj.meta_type, objname ,device.id)
304 log.warn(msg)
305 if obj.sendEventWhenBlocked():
306 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning)
307 return changed
308 for attname, value in objmap.items():
309 if attname.startswith('_'):
310 continue
311 if isinstance(value, basestring):
312 try:
313
314
315
316
317
318
319
320
321
322
323
324 codec = obj.zCollectorDecoding or sys.getdefaultencoding()
325 value = value.decode(codec)
326 value = value.encode(sys.getdefaultencoding())
327 except UnicodeDecodeError:
328
329
330 continue
331 att = getattr(aq_base(obj), attname, zenmarker)
332 if att == zenmarker:
333 log.warn('The attribute %s was not found on object %s from device %s',
334 attname, obj.id, device.id)
335 continue
336 if callable(att):
337 setter = getattr(obj, attname)
338 gettername = attname.replace("set","get")
339 getter = getattr(obj, gettername, None)
340
341 if not getter:
342
343 log.warn("getter '%s' not found on obj '%s', "
344 "skipping", gettername, obj.id)
345
346 else:
347
348 from plugins.DataMaps import MultiArgs
349 if isinstance(value, MultiArgs):
350
351 args = value.args
352 change = not isSameData(value.args, getter())
353
354 else:
355
356 args = (value,)
357 try:
358 change = not isSameData(value, getter())
359 except UnicodeDecodeError:
360 change = True
361
362 if change:
363 setter(*args)
364 self.logChange(device, obj, Change_Set,
365 "calling function '%s' with '%s' on "
366 "object %s" % (attname, value, obj.id))
367 changed = True
368
369 else:
370 try:
371 change = not isSameData(att, value)
372 except UnicodeDecodeError:
373 change = True
374 if change:
375 setattr(aq_base(obj), attname, value)
376 self.logChange(device, obj, Change_Set,
377 "set attribute '%s' "
378 "to '%s' on object '%s'" %
379 (attname, value, obj.id))
380 changed = True
381 if not changed:
382 try: changed = obj._p_changed
383 except: pass
384 if changed:
385 if getattr(aq_base(obj), "index_object", False):
386 log.debug("indexing object %s", obj.id)
387 obj.index_object()
388 notify(IndexingEvent(obj))
389 else:
390 obj._p_deactivate()
391 return changed
392
393
395 """Create an object on a relationship using its objmap.
396 """
397 constructor = importClass(objmap.modname, objmap.classname)
398 if hasattr(objmap, 'id'):
399 remoteObj = constructor(objmap.id)
400 else:
401 remoteObj = constructor(device, objmap)
402 if remoteObj is None:
403 log.debug("Constructor returned None")
404 return False, None
405 id = remoteObj.id
406 if not remoteObj:
407 raise ObjectCreationError(
408 "failed to create object %s in relation %s" % (id, relname))
409
410 realdevice = device.device()
411 if realdevice.isLockedFromUpdates():
412 objtype = ""
413 try: objtype = objmap.modname.split(".")[-1]
414 except: pass
415 msg = "Add Blocked: %s '%s' on %s" % (
416 objtype, id, realdevice.id)
417 log.warn(msg)
418 if realdevice.sendEventWhenBlocked():
419 self.logEvent(realdevice, id, Change_Add_Blocked,
420 msg, Event.Warning)
421 return False, None
422 rel = device._getOb(relname, None)
423 if not rel:
424 raise ObjectCreationError(
425 "No relation %s found on device %s (%s)" % (relname, device.id, device.__class__ ))
426
427 changed = False
428 try:
429 remoteObj = rel._getOb(remoteObj.id)
430 except AttributeError:
431 self.logChange(realdevice, remoteObj, Change_Add,
432 "adding object %s to relationship %s" %
433 (remoteObj.id, relname))
434 rel._setObject(remoteObj.id, remoteObj)
435 remoteObj = rel._getOb(remoteObj.id)
436 changed = True
437 notify(ObjectMovedEvent(remoteObj, rel, remoteObj.id, rel, remoteObj.id))
438 return self._updateObject(remoteObj, objmap) or changed, remoteObj
439
440
441 - def stop(self): pass
442
445 """
446 Thread that applies datamaps to a device. It reads from a queue that
447 should have tuples of (devid, datamaps) where devid is the primaryId to
448 the device and datamps is a list of datamaps to apply. Cache is synced at
449 the start of each transaction and there is one transaction per device.
450 """
451
452 - def __init__(self, datacollector, app):
453 threading.Thread.__init__(self)
454 ApplyDataMap.__init__(self, datacollector)
455 self.setName("ApplyDataMapThread")
456 self.setDaemon(1)
457 self.app = app
458 log.debug("Thread conn:%s", self.app._p_jar)
459 self.inputqueue = Queue.Queue()
460 self.done = False
461
462
464 """Apply datamps to device.
465 """
466 devpath = device.getPrimaryPath()
467 self.inputqueue.put((devpath, collectorClient))
468
469
471 """Process collectorClients as they are passed in from a data collector.
472 """
473 log.info("starting applyDataMap thread")
474 while not self.done or not self.inputqueue.empty():
475 devpath = ()
476 try:
477 devpath, collectorClient = self.inputqueue.get(True,1)
478 self.app._p_jar.sync()
479 device = getObjByPath(self.app, devpath)
480 ApplyDataMap.processClient(self, device, collectorClient)
481 except Queue.Empty: pass
482 except Exception:
483 transaction.abort()
484 log.exception("processing device %s", "/".join(devpath))
485 log.info("stopping applyDataMap thread")
486
487
489 """Stop the thread once all devices are processed.
490 """
491 self.done = True
492 self.join()
493