Package Products :: Package DataCollector :: Module ApplyDataMap
[hide private]
[frames] | no frames]

Source Code for Module Products.DataCollector.ApplyDataMap

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import sys 
 15  import types 
 16  import threading 
 17  import Queue 
 18  import logging 
 19  log = logging.getLogger("zen.ApplyDataMap") 
 20   
 21  import transaction 
 22   
 23  from zope.event import notify 
 24  from zope.app.container.contained import ObjectRemovedEvent, ObjectMovedEvent 
 25  from zope.app.container.contained import ObjectAddedEvent 
 26  from Acquisition import aq_base 
 27   
 28  from Products.ZenUtils.Utils import importClass, getObjByPath 
 29  from Products.Zuul.catalog.events import IndexingEvent 
 30  from Exceptions import ObjectCreationError 
 31  from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked 
 32  from Products.ZenModel.Lockable import Lockable 
 33  import Products.ZenEvents.Event as Event 
 34  from zExceptions import NotFound 
 35   
 36  zenmarker = "__ZENMARKER__" 
 37   
 38  CLASSIFIER_CLASS = '/Classifier' 
 39   
 40  _notAscii = dict.fromkeys(range(128,256), u'?') 
 41   
 42   
43 -def isSameData(x, y):
44 """ 45 A more comprehensive check to see if existing model data is the same as 46 newly modeled data. The primary focus is comparing unsorted lists of 47 dictionaries. 48 """ 49 if isinstance(x, (tuple, list)) and isinstance(y, (tuple, list)): 50 if len(x) > 0 and len(y) > 0 \ 51 and isinstance(x[0], dict) and isinstance(y[0], dict): 52 53 x = set([ tuple(sorted(d.items())) for d in x ]) 54 y = set([ tuple(sorted(d.items())) for d in y ]) 55 else: 56 return sorted(x) == sorted(y) 57 58 return x == y
59 60
61 -class ApplyDataMap(object):
62
63 - def __init__(self, datacollector=None):
64 self.datacollector = datacollector
65 66
67 - def logChange(self, device, compname, eventClass, msg):
68 if not getattr(device, 'zCollectorLogChanges', True): return 69 if type(msg) == type(u''): 70 msg = msg.translate(_notAscii) 71 self.logEvent(device, compname, eventClass, msg, Event.Info)
72 73
74 - def logEvent(self, device, component, eventClass, msg, severity):
75 ''' Used to report a change to a device model. Logs the given msg 76 to log.info and creates an event. 77 ''' 78 device = device.device() 79 compname = "" 80 try: 81 compname = getattr(component, 'id', component) 82 if hasattr(component, 'name') and callable(component.name): 83 compname = component.name() 84 elif device.id == compname: 85 compname = "" 86 except: pass 87 log.debug(msg) 88 devname = device.device().id 89 if (self.datacollector 90 # why is this line here? Blocks evnets from model in zope 91 #and getattr(self.datacollector, 'generateEvents', False) 92 and getattr(self.datacollector, 'dmd', None)): 93 eventDict = { 94 'eventClass': eventClass, 95 'device': devname, 96 'component': compname, 97 'summary': msg, 98 'severity': severity, 99 'agent': 'ApplyDataMap', 100 'explanation': "Event sent as zCollectorLogChanges is True", 101 } 102 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
103 104
105 - def processClient(self, device, collectorClient):
106 """ 107 A modeler plugin specifies the protocol (eg SNMP, WMI) and 108 the specific data to retrieve from the device (eg an OID). 109 This data is then processed by the modeler plugin and then 110 passed to this method to apply the results to the ZODB. 111 112 @parameter device: DMD device object 113 @type device: DMD device object 114 @parameter collectorClient: results of modeling 115 @type collectorClient: DMD object 116 """ 117 log.debug("Processing data for device %s", device.id) 118 devchanged = False 119 try: 120 for pname, results in collectorClient.getResults(): 121 log.debug("Processing plugin %s on device %s", pname, device.id) 122 if not results: 123 log.warn("Plugin %s did not return any results", pname) 124 continue 125 plugin = self.datacollector.collectorPlugins.get(pname, None) 126 if not plugin: 127 log.warn("Unable to get plugin %s from %s", pname, 128 self.datacollector.collectorPlugins) 129 continue 130 131 results = plugin.preprocess(results, log) 132 datamaps = plugin.process(device, results, log) 133 #allow multiple maps to be returned from one plugin 134 if (type(datamaps) != types.ListType 135 and type(datamaps) != types.TupleType): 136 datamaps = [datamaps,] 137 for datamap in datamaps: 138 changed = self._applyDataMap(device, datamap) 139 if changed: devchanged=True 140 if devchanged: 141 device.setLastChange() 142 log.info("Changes applied") 143 else: 144 log.info("No change detected") 145 device.setSnmpLastCollection() 146 trans = transaction.get() 147 trans.setUser("datacoll") 148 trans.note("data applied from automated collection") 149 trans.commit() 150 except (SystemExit, KeyboardInterrupt): 151 raise 152 except: 153 transaction.abort() 154 log.exception("Plugin %s device %s", pname, device.getId())
155 156
157 - def applyDataMap(self, device, datamap, relname="", compname="", modname=""):
158 """Apply a datamap passed as a list of dicts through XML-RPC. 159 """ 160 from plugins.DataMaps import RelationshipMap, ObjectMap 161 if relname: 162 datamap = RelationshipMap(relname=relname, compname=compname, 163 modname=modname, objmaps=datamap) 164 else: 165 datamap = ObjectMap(datamap, compname=compname, modname=modname) 166 self._applyDataMap(device, datamap)
167 168
169 - def setDeviceClass(self, device, deviceClass=None):
170 """ 171 If a device class has been passed and the current class is not /Classifier 172 then move the device to the newly clssified device class. 173 """ 174 if deviceClass and device.getDeviceClassPath().startswith(CLASSIFIER_CLASS): 175 device.changeDeviceClass(deviceClass)
176 177
178 - def _applyDataMap(self, device, datamap):
179 """Apply a datamap to a device. 180 """ 181 persist = True 182 try: 183 device.dmd._p_jar.sync() 184 except AttributeError: 185 # This can occur in unit testing when the device is not persisted. 186 persist = False 187 188 if hasattr(datamap, "compname"): 189 if datamap.compname: 190 try: 191 tobj = device.getObjByPath(datamap.compname) 192 except NotFound: 193 log.warn("Unable to find compname '%s'" % datamap.compname) 194 return False 195 else: 196 tobj = device 197 if hasattr(datamap, "relname"): 198 changed = self._updateRelationship(tobj, datamap) 199 elif hasattr(datamap, 'modname'): 200 changed = self._updateObject(tobj, datamap) 201 else: 202 changed = False 203 log.warn("plugin returned unknown map skipping") 204 else: 205 changed = False 206 if changed and persist: 207 device.setLastChange() 208 trans = transaction.get() 209 trans.setUser("datacoll") 210 trans.note("data applied from automated collection") 211 trans.commit() 212 return changed
213 214
215 - def _updateRelationship(self, device, relmap):
216 """Add/Update/Remote objects to the target relationship. 217 """ 218 changed = False 219 rname = relmap.relname 220 rel = getattr(device, rname, None) 221 if not rel: 222 log.warn("no relationship:%s found on:%s (%s %s)", 223 relmap.relname, device.id, device.__class__, device.zPythonClass) 224 return changed 225 relids = rel.objectIdsAll() 226 seenids = {} 227 for objmap in relmap: 228 from Products.ZenModel.ZenModelRM import ZenModelRM 229 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'): 230 if seenids.has_key(objmap.id): 231 seenids[objmap.id] += 1 232 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id]) 233 else: 234 seenids[objmap.id] = 1 235 if objmap.id in relids: 236 obj = rel._getOb(objmap.id) 237 238 # Handle the possibility of objects changing class by 239 # recreating them. Ticket #5598. 240 existing_modname = '' 241 existing_classname = '' 242 try: 243 import inspect 244 existing_modname = inspect.getmodule(obj).__name__ 245 existing_classname = obj.__class__.__name__ 246 except: 247 pass 248 249 if objmap.modname == existing_modname and \ 250 objmap.classname in ('', existing_classname): 251 252 objchange = self._updateObject(obj, objmap) 253 if not changed: changed = objchange 254 else: 255 rel._delObject(objmap.id) 256 objchange, obj = self._createRelObject(device, objmap, rname) 257 if not changed: changed = objchange 258 259 if objmap.id in relids: relids.remove(objmap.id) 260 else: 261 objchange, obj = self._createRelObject(device, objmap, rname) 262 if objchange: changed = True 263 if obj and obj.id in relids: relids.remove(obj.id) 264 elif isinstance(objmap, ZenModelRM): 265 self.logChange(device, objmap.id, Change_Add, 266 "linking object %s to device %s relation %s" % ( 267 objmap.id, device.id, rname)) 268 device.addRelation(rname, objmap) 269 changed = True 270 else: 271 objchange, obj = self._createRelObject(device, objmap, rname) 272 if objchange: changed = True 273 if obj and obj.id in relids: relids.remove(obj.id) 274 275 for id in relids: 276 obj = rel._getOb(id) 277 if isinstance(obj, Lockable) and obj.isLockedFromDeletion(): 278 objname = obj.id 279 try: objname = obj.name() 280 except: pass 281 msg = "Deletion Blocked: %s '%s' on %s" % ( 282 obj.meta_type, objname,obj.device().id) 283 log.warn(msg) 284 if obj.sendEventWhenBlocked(): 285 self.logEvent(device, obj, Change_Remove_Blocked, 286 msg, Event.Warning) 287 continue 288 self.logChange(device, obj, Change_Remove, 289 "removing object %s from rel %s on device %s" % ( 290 id, rname, device.id)) 291 rel._delObject(id) 292 if relids: changed=True 293 return changed
294 295
296 - def _updateObject(self, obj, objmap):
297 """Update an object using a objmap. 298 """ 299 changed = False 300 device = obj.device() 301 302 if isinstance(obj, Lockable) and obj.isLockedFromUpdates(): 303 if device.id == obj.id: 304 msg = 'Update Blocked: %s' % device.id 305 else: 306 objname = obj.id 307 try: objname = obj.name() 308 except: pass 309 msg = "Update Blocked: %s '%s' on %s" % ( 310 obj.meta_type, objname ,device.id) 311 log.warn(msg) 312 if obj.sendEventWhenBlocked(): 313 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning) 314 return changed 315 for attname, value in objmap.items(): 316 if attname.startswith('_'): 317 continue 318 if isinstance(value, basestring): 319 try: 320 # This looks confusing, and it is. The scenario is: 321 # A collector gathers some data as a raw byte stream, 322 # but really it has a specific encoding specified by 323 # by the zCollectorDecoding zProperty. Say, latin-1 or 324 # utf-16, etc. We need to decode that byte stream to get 325 # back a UnicodeString object. But, this version of Zope 326 # doesn't like UnicodeString objects for a variety of 327 # fields, such as object ids, so we then need to convert 328 # that UnicodeString back into a regular string of bytes, 329 # and for that we use the system default encoding, which 330 # is now utf-8. 331 codec = obj.zCollectorDecoding or sys.getdefaultencoding() 332 value = value.decode(codec) 333 value = value.encode(sys.getdefaultencoding()) 334 except UnicodeDecodeError: 335 # We don't know what to do with this, so don't set the 336 # value 337 continue 338 att = getattr(aq_base(obj), attname, zenmarker) 339 if att == zenmarker: 340 log.warn('The attribute %s was not found on object %s from device %s', 341 attname, obj.id, device.id) 342 continue 343 if callable(att): 344 setter = getattr(obj, attname) 345 gettername = attname.replace("set","get") 346 getter = getattr(obj, gettername, None) 347 348 if not getter: 349 350 log.warn("getter '%s' not found on obj '%s', " 351 "skipping", gettername, obj.id) 352 353 else: 354 355 from plugins.DataMaps import MultiArgs 356 if isinstance(value, MultiArgs): 357 358 args = value.args 359 change = not isSameData(value.args, getter()) 360 361 else: 362 363 args = (value,) 364 try: 365 change = not isSameData(value, getter()) 366 except UnicodeDecodeError: 367 change = True 368 369 if change: 370 setter(*args) 371 self.logChange(device, obj, Change_Set, 372 "calling function '%s' with '%s' on " 373 "object %s" % (attname, value, obj.id)) 374 changed = True 375 376 else: 377 try: 378 change = not isSameData(att, value) 379 except UnicodeDecodeError: 380 change = True 381 if change: 382 setattr(aq_base(obj), attname, value) 383 self.logChange(device, obj, Change_Set, 384 "set attribute '%s' " 385 "to '%s' on object '%s'" % 386 (attname, value, obj.id)) 387 changed = True 388 if not changed: 389 try: changed = obj._p_changed 390 except: pass 391 if changed: 392 if getattr(aq_base(obj), "index_object", False): 393 log.debug("indexing object %s", obj.id) 394 obj.index_object() 395 notify(IndexingEvent(obj)) 396 else: 397 obj._p_deactivate() 398 return changed
399 400
401 - def _createRelObject(self, device, objmap, relname):
402 """Create an object on a relationship using its objmap. 403 """ 404 constructor = importClass(objmap.modname, objmap.classname) 405 if hasattr(objmap, 'id'): 406 remoteObj = constructor(objmap.id) 407 else: 408 remoteObj = constructor(device, objmap) 409 if remoteObj is None: 410 log.debug("Constructor returned None") 411 return False, None 412 id = remoteObj.id 413 if not remoteObj: 414 raise ObjectCreationError( 415 "failed to create object %s in relation %s" % (id, relname)) 416 417 realdevice = device.device() 418 if realdevice.isLockedFromUpdates(): 419 objtype = "" 420 try: objtype = objmap.modname.split(".")[-1] 421 except: pass 422 msg = "Add Blocked: %s '%s' on %s" % ( 423 objtype, id, realdevice.id) 424 log.warn(msg) 425 if realdevice.sendEventWhenBlocked(): 426 self.logEvent(realdevice, id, Change_Add_Blocked, 427 msg, Event.Warning) 428 return False, None 429 rel = device._getOb(relname, None) 430 if not rel: 431 raise ObjectCreationError( 432 "No relation %s found on device %s (%s)" % (relname, device.id, device.__class__ )) 433 #"No relation %s found on device %s" % (relname, device.id)) 434 changed = False 435 try: 436 remoteObj = rel._getOb(remoteObj.id) 437 except AttributeError: 438 self.logChange(realdevice, remoteObj, Change_Add, 439 "adding object %s to relationship %s" % 440 (remoteObj.id, relname)) 441 rel._setObject(remoteObj.id, remoteObj) 442 remoteObj = rel._getOb(remoteObj.id) 443 changed = True 444 notify(ObjectMovedEvent(remoteObj, rel, remoteObj.id, rel, remoteObj.id)) 445 return self._updateObject(remoteObj, objmap) or changed, remoteObj
446 447
448 - def stop(self): pass
449 450
451 -class ApplyDataMapThread(threading.Thread, ApplyDataMap):
452 """ 453 Thread that applies datamaps to a device. It reads from a queue that 454 should have tuples of (devid, datamaps) where devid is the primaryId to 455 the device and datamps is a list of datamaps to apply. Cache is synced at 456 the start of each transaction and there is one transaction per device. 457 """ 458
459 - def __init__(self, datacollector, app):
460 threading.Thread.__init__(self) 461 ApplyDataMap.__init__(self, datacollector) 462 self.setName("ApplyDataMapThread") 463 self.setDaemon(1) 464 self.app = app 465 log.debug("Thread conn:%s", self.app._p_jar) 466 self.inputqueue = Queue.Queue() 467 self.done = False
468 469
470 - def processClient(self, device, collectorClient):
471 """Apply datamps to device. 472 """ 473 devpath = device.getPrimaryPath() 474 self.inputqueue.put((devpath, collectorClient))
475 476
477 - def run(self):
478 """Process collectorClients as they are passed in from a data collector. 479 """ 480 log.info("starting applyDataMap thread") 481 while not self.done or not self.inputqueue.empty(): 482 devpath = () 483 try: 484 devpath, collectorClient = self.inputqueue.get(True,1) 485 self.app._p_jar.sync() 486 device = getObjByPath(self.app, devpath) 487 ApplyDataMap.processClient(self, device, collectorClient) 488 except Queue.Empty: pass 489 except (SystemExit, KeyboardInterrupt): raise 490 except: 491 transaction.abort() 492 log.exception("processing device %s", "/".join(devpath)) 493 log.info("stopping applyDataMap thread")
494 495
496 - def stop(self):
497 """Stop the thread once all devices are processed. 498 """ 499 self.done = True 500 self.join()
501