1
2
3
4
5
6
7
8
9
10
11
12
13
14 import sys
15 import types
16 import threading
17 import Queue
18 import logging
19 log = logging.getLogger("zen.ApplyDataMap")
20
21 import transaction
22
23 from zope.event import notify
24 from zope.app.container.contained import ObjectRemovedEvent, ObjectMovedEvent
25 from zope.app.container.contained import ObjectAddedEvent
26 from Acquisition import aq_base
27
28 from Products.ZenUtils.Utils import importClass, getObjByPath
29 from Products.Zuul.catalog.events import IndexingEvent
30 from Exceptions import ObjectCreationError
31 from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked
32 from Products.ZenModel.Lockable import Lockable
33 import Products.ZenEvents.Event as Event
34 from zExceptions import NotFound
35
36 zenmarker = "__ZENMARKER__"
37
38 CLASSIFIER_CLASS = '/Classifier'
39
40 _notAscii = dict.fromkeys(range(128,256), u'?')
41
42
44 """
45 A more comprehensive check to see if existing model data is the same as
46 newly modeled data. The primary focus is comparing unsorted lists of
47 dictionaries.
48 """
49 if isinstance(x, (tuple, list)) and isinstance(y, (tuple, list)):
50 if len(x) > 0 and len(y) > 0 \
51 and isinstance(x[0], dict) and isinstance(y[0], dict):
52
53 x = set([ tuple(sorted(d.items())) for d in x ])
54 y = set([ tuple(sorted(d.items())) for d in y ])
55 else:
56 return sorted(x) == sorted(y)
57
58 return x == y
59
60
62
64 self.datacollector = datacollector
65
66
67 - def logChange(self, device, compname, eventClass, msg):
72
73
74 - def logEvent(self, device, component, eventClass, msg, severity):
75 ''' Used to report a change to a device model. Logs the given msg
76 to log.info and creates an event.
77 '''
78 device = device.device()
79 compname = ""
80 try:
81 compname = getattr(component, 'id', component)
82 if hasattr(component, 'name') and callable(component.name):
83 compname = component.name()
84 elif device.id == compname:
85 compname = ""
86 except: pass
87 log.debug(msg)
88 devname = device.device().id
89 if (self.datacollector
90
91
92 and getattr(self.datacollector, 'dmd', None)):
93 eventDict = {
94 'eventClass': eventClass,
95 'device': devname,
96 'component': compname,
97 'summary': msg,
98 'severity': severity,
99 'agent': 'ApplyDataMap',
100 'explanation': "Event sent as zCollectorLogChanges is True",
101 }
102 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
103
104
106 """
107 A modeler plugin specifies the protocol (eg SNMP, WMI) and
108 the specific data to retrieve from the device (eg an OID).
109 This data is then processed by the modeler plugin and then
110 passed to this method to apply the results to the ZODB.
111
112 @parameter device: DMD device object
113 @type device: DMD device object
114 @parameter collectorClient: results of modeling
115 @type collectorClient: DMD object
116 """
117 log.debug("Processing data for device %s", device.id)
118 devchanged = False
119 try:
120 for pname, results in collectorClient.getResults():
121 log.debug("Processing plugin %s on device %s", pname, device.id)
122 if not results:
123 log.warn("Plugin %s did not return any results", pname)
124 continue
125 plugin = self.datacollector.collectorPlugins.get(pname, None)
126 if not plugin:
127 log.warn("Unable to get plugin %s from %s", pname,
128 self.datacollector.collectorPlugins)
129 continue
130
131 results = plugin.preprocess(results, log)
132 datamaps = plugin.process(device, results, log)
133
134 if (type(datamaps) != types.ListType
135 and type(datamaps) != types.TupleType):
136 datamaps = [datamaps,]
137 for datamap in datamaps:
138 changed = self._applyDataMap(device, datamap)
139 if changed: devchanged=True
140 if devchanged:
141 device.setLastChange()
142 log.info("Changes applied")
143 else:
144 log.info("No change detected")
145 device.setSnmpLastCollection()
146 trans = transaction.get()
147 trans.setUser("datacoll")
148 trans.note("data applied from automated collection")
149 trans.commit()
150 except (SystemExit, KeyboardInterrupt):
151 raise
152 except:
153 transaction.abort()
154 log.exception("Plugin %s device %s", pname, device.getId())
155
156
157 - def applyDataMap(self, device, datamap, relname="", compname="", modname=""):
167
168
176
177
179 """Apply a datamap to a device.
180 """
181 persist = True
182 try:
183 device.dmd._p_jar.sync()
184 except AttributeError:
185
186 persist = False
187
188 if hasattr(datamap, "compname"):
189 if datamap.compname:
190 try:
191 tobj = device.getObjByPath(datamap.compname)
192 except NotFound:
193 log.warn("Unable to find compname '%s'" % datamap.compname)
194 return False
195 else:
196 tobj = device
197 if hasattr(datamap, "relname"):
198 changed = self._updateRelationship(tobj, datamap)
199 elif hasattr(datamap, 'modname'):
200 changed = self._updateObject(tobj, datamap)
201 else:
202 changed = False
203 log.warn("plugin returned unknown map skipping")
204 else:
205 changed = False
206 if changed and persist:
207 device.setLastChange()
208 trans = transaction.get()
209 trans.setUser("datacoll")
210 trans.note("data applied from automated collection")
211 trans.commit()
212 return changed
213
214
216 """Add/Update/Remote objects to the target relationship.
217 """
218 changed = False
219 rname = relmap.relname
220 rel = getattr(device, rname, None)
221 if not rel:
222 log.warn("no relationship:%s found on:%s (%s %s)",
223 relmap.relname, device.id, device.__class__, device.zPythonClass)
224 return changed
225 relids = rel.objectIdsAll()
226 seenids = {}
227 for objmap in relmap:
228 from Products.ZenModel.ZenModelRM import ZenModelRM
229 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'):
230 if seenids.has_key(objmap.id):
231 seenids[objmap.id] += 1
232 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id])
233 else:
234 seenids[objmap.id] = 1
235 if objmap.id in relids:
236 obj = rel._getOb(objmap.id)
237
238
239
240 existing_modname = ''
241 existing_classname = ''
242 try:
243 import inspect
244 existing_modname = inspect.getmodule(obj).__name__
245 existing_classname = obj.__class__.__name__
246 except:
247 pass
248
249 if objmap.modname == existing_modname and \
250 objmap.classname in ('', existing_classname):
251
252 objchange = self._updateObject(obj, objmap)
253 if not changed: changed = objchange
254 else:
255 rel._delObject(objmap.id)
256 objchange, obj = self._createRelObject(device, objmap, rname)
257 if not changed: changed = objchange
258
259 if objmap.id in relids: relids.remove(objmap.id)
260 else:
261 objchange, obj = self._createRelObject(device, objmap, rname)
262 if objchange: changed = True
263 if obj and obj.id in relids: relids.remove(obj.id)
264 elif isinstance(objmap, ZenModelRM):
265 self.logChange(device, objmap.id, Change_Add,
266 "linking object %s to device %s relation %s" % (
267 objmap.id, device.id, rname))
268 device.addRelation(rname, objmap)
269 changed = True
270 else:
271 objchange, obj = self._createRelObject(device, objmap, rname)
272 if objchange: changed = True
273 if obj and obj.id in relids: relids.remove(obj.id)
274
275 for id in relids:
276 obj = rel._getOb(id)
277 if isinstance(obj, Lockable) and obj.isLockedFromDeletion():
278 objname = obj.id
279 try: objname = obj.name()
280 except: pass
281 msg = "Deletion Blocked: %s '%s' on %s" % (
282 obj.meta_type, objname,obj.device().id)
283 log.warn(msg)
284 if obj.sendEventWhenBlocked():
285 self.logEvent(device, obj, Change_Remove_Blocked,
286 msg, Event.Warning)
287 continue
288 self.logChange(device, obj, Change_Remove,
289 "removing object %s from rel %s on device %s" % (
290 id, rname, device.id))
291 rel._delObject(id)
292 if relids: changed=True
293 return changed
294
295
297 """Update an object using a objmap.
298 """
299 changed = False
300 device = obj.device()
301
302 if isinstance(obj, Lockable) and obj.isLockedFromUpdates():
303 if device.id == obj.id:
304 msg = 'Update Blocked: %s' % device.id
305 else:
306 objname = obj.id
307 try: objname = obj.name()
308 except: pass
309 msg = "Update Blocked: %s '%s' on %s" % (
310 obj.meta_type, objname ,device.id)
311 log.warn(msg)
312 if obj.sendEventWhenBlocked():
313 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning)
314 return changed
315 for attname, value in objmap.items():
316 if attname.startswith('_'):
317 continue
318 if isinstance(value, basestring):
319 try:
320
321
322
323
324
325
326
327
328
329
330
331 codec = obj.zCollectorDecoding or sys.getdefaultencoding()
332 value = value.decode(codec)
333 value = value.encode(sys.getdefaultencoding())
334 except UnicodeDecodeError:
335
336
337 continue
338 att = getattr(aq_base(obj), attname, zenmarker)
339 if att == zenmarker:
340 log.warn('The attribute %s was not found on object %s from device %s',
341 attname, obj.id, device.id)
342 continue
343 if callable(att):
344 setter = getattr(obj, attname)
345 gettername = attname.replace("set","get")
346 getter = getattr(obj, gettername, None)
347
348 if not getter:
349
350 log.warn("getter '%s' not found on obj '%s', "
351 "skipping", gettername, obj.id)
352
353 else:
354
355 from plugins.DataMaps import MultiArgs
356 if isinstance(value, MultiArgs):
357
358 args = value.args
359 change = not isSameData(value.args, getter())
360
361 else:
362
363 args = (value,)
364 try:
365 change = not isSameData(value, getter())
366 except UnicodeDecodeError:
367 change = True
368
369 if change:
370 setter(*args)
371 self.logChange(device, obj, Change_Set,
372 "calling function '%s' with '%s' on "
373 "object %s" % (attname, value, obj.id))
374 changed = True
375
376 else:
377 try:
378 change = not isSameData(att, value)
379 except UnicodeDecodeError:
380 change = True
381 if change:
382 setattr(aq_base(obj), attname, value)
383 self.logChange(device, obj, Change_Set,
384 "set attribute '%s' "
385 "to '%s' on object '%s'" %
386 (attname, value, obj.id))
387 changed = True
388 if not changed:
389 try: changed = obj._p_changed
390 except: pass
391 if changed:
392 if getattr(aq_base(obj), "index_object", False):
393 log.debug("indexing object %s", obj.id)
394 obj.index_object()
395 notify(IndexingEvent(obj))
396 else:
397 obj._p_deactivate()
398 return changed
399
400
402 """Create an object on a relationship using its objmap.
403 """
404 constructor = importClass(objmap.modname, objmap.classname)
405 if hasattr(objmap, 'id'):
406 remoteObj = constructor(objmap.id)
407 else:
408 remoteObj = constructor(device, objmap)
409 if remoteObj is None:
410 log.debug("Constructor returned None")
411 return False, None
412 id = remoteObj.id
413 if not remoteObj:
414 raise ObjectCreationError(
415 "failed to create object %s in relation %s" % (id, relname))
416
417 realdevice = device.device()
418 if realdevice.isLockedFromUpdates():
419 objtype = ""
420 try: objtype = objmap.modname.split(".")[-1]
421 except: pass
422 msg = "Add Blocked: %s '%s' on %s" % (
423 objtype, id, realdevice.id)
424 log.warn(msg)
425 if realdevice.sendEventWhenBlocked():
426 self.logEvent(realdevice, id, Change_Add_Blocked,
427 msg, Event.Warning)
428 return False, None
429 rel = device._getOb(relname, None)
430 if not rel:
431 raise ObjectCreationError(
432 "No relation %s found on device %s (%s)" % (relname, device.id, device.__class__ ))
433
434 changed = False
435 try:
436 remoteObj = rel._getOb(remoteObj.id)
437 except AttributeError:
438 self.logChange(realdevice, remoteObj, Change_Add,
439 "adding object %s to relationship %s" %
440 (remoteObj.id, relname))
441 rel._setObject(remoteObj.id, remoteObj)
442 remoteObj = rel._getOb(remoteObj.id)
443 changed = True
444 notify(ObjectMovedEvent(remoteObj, rel, remoteObj.id, rel, remoteObj.id))
445 return self._updateObject(remoteObj, objmap) or changed, remoteObj
446
447
448 - def stop(self): pass
449
450
452 """
453 Thread that applies datamaps to a device. It reads from a queue that
454 should have tuples of (devid, datamaps) where devid is the primaryId to
455 the device and datamps is a list of datamaps to apply. Cache is synced at
456 the start of each transaction and there is one transaction per device.
457 """
458
459 - def __init__(self, datacollector, app):
460 threading.Thread.__init__(self)
461 ApplyDataMap.__init__(self, datacollector)
462 self.setName("ApplyDataMapThread")
463 self.setDaemon(1)
464 self.app = app
465 log.debug("Thread conn:%s", self.app._p_jar)
466 self.inputqueue = Queue.Queue()
467 self.done = False
468
469
471 """Apply datamps to device.
472 """
473 devpath = device.getPrimaryPath()
474 self.inputqueue.put((devpath, collectorClient))
475
476
478 """Process collectorClients as they are passed in from a data collector.
479 """
480 log.info("starting applyDataMap thread")
481 while not self.done or not self.inputqueue.empty():
482 devpath = ()
483 try:
484 devpath, collectorClient = self.inputqueue.get(True,1)
485 self.app._p_jar.sync()
486 device = getObjByPath(self.app, devpath)
487 ApplyDataMap.processClient(self, device, collectorClient)
488 except Queue.Empty: pass
489 except (SystemExit, KeyboardInterrupt): raise
490 except:
491 transaction.abort()
492 log.exception("processing device %s", "/".join(devpath))
493 log.info("stopping applyDataMap thread")
494
495
497 """Stop the thread once all devices are processed.
498 """
499 self.done = True
500 self.join()
501