Package Products :: Package DataCollector :: Module ApplyDataMap
[hide private]
[frames] | no frames]

Source Code for Module Products.DataCollector.ApplyDataMap

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2007, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  import sys 
 12  from collections import defaultdict 
 13  import threading 
 14  import Queue 
 15  import logging 
 16  log = logging.getLogger("zen.ApplyDataMap") 
 17   
 18  import transaction 
 19   
 20  from ZODB.transact import transact 
 21  from zope.event import notify 
 22  from zope.container.contained import ObjectRemovedEvent, ObjectMovedEvent 
 23  from zope.container.contained import ObjectAddedEvent 
 24  from Acquisition import aq_base 
 25   
 26  from Products.ZenUtils.Utils import importClass, getObjByPath 
 27  from Products.Zuul.catalog.events import IndexingEvent 
 28  from Exceptions import ObjectCreationError 
 29  from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked 
 30  from Products.ZenModel.Lockable import Lockable 
 31  from Products.ZenEvents import Event 
 32  from zExceptions import NotFound 
 33   
 34  zenmarker = "__ZENMARKER__" 
 35   
 36  CLASSIFIER_CLASS = '/Classifier' 
 37   
 38  _notAscii = dict.fromkeys(range(128,256), u'?') 
39 40 41 -def isSameData(x, y):
42 """ 43 A more comprehensive check to see if existing model data is the same as 44 newly modeled data. The primary focus is comparing unsorted lists of 45 dictionaries. 46 """ 47 if isinstance(x, (tuple, list)) and isinstance(y, (tuple, list)): 48 if len(x) > 0 and len(y) > 0 \ 49 and isinstance(x[0], dict) and isinstance(y[0], dict): 50 51 x = set( tuple(sorted(d.items())) for d in x ) 52 y = set( tuple(sorted(d.items())) for d in y ) 53 else: 54 return sorted(x) == sorted(y) 55 56 return x == y
57
58 59 -class ApplyDataMap(object):
60
61 - def __init__(self, datacollector=None):
62 self.datacollector = datacollector
63 64
65 - def logChange(self, device, compname, eventClass, msg):
66 if not getattr(device, 'zCollectorLogChanges', True): return 67 if isinstance(msg, unicode): 68 msg = msg.translate(_notAscii) 69 self.logEvent(device, compname, eventClass, msg, Event.Info)
70 71
72 - def logEvent(self, device, component, eventClass, msg, severity):
73 ''' Used to report a change to a device model. Logs the given msg 74 to log.info and creates an event. 75 ''' 76 device = device.device() 77 compname = "" 78 try: 79 compname = getattr(component, 'id', component) 80 if device.id == compname: 81 compname = "" 82 except: pass 83 log.debug(msg) 84 devname = device.device().id 85 if (self.datacollector 86 # why is this line here? Blocks evnets from model in zope 87 #and getattr(self.datacollector, 'generateEvents', False) 88 and getattr(self.datacollector, 'dmd', None)): 89 eventDict = { 90 'eventClass': eventClass, 91 'device': devname, 92 'component': compname, 93 'summary': msg, 94 'severity': severity, 95 'agent': 'ApplyDataMap', 96 'explanation': "Event sent as zCollectorLogChanges is True", 97 } 98 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
99 100
101 - def processClient(self, device, collectorClient):
102 """ 103 A modeler plugin specifies the protocol (eg SNMP, WMI) and 104 the specific data to retrieve from the device (eg an OID). 105 This data is then processed by the modeler plugin and then 106 passed to this method to apply the results to the ZODB. 107 108 @parameter device: DMD device object 109 @type device: DMD device object 110 @parameter collectorClient: results of modeling 111 @type collectorClient: DMD object 112 """ 113 log.debug("Processing data for device %s", device.id) 114 devchanged = False 115 try: 116 for pname, results in collectorClient.getResults(): 117 log.debug("Processing plugin %s on device %s", pname, device.id) 118 if not results: 119 log.warn("Plugin %s did not return any results", pname) 120 continue 121 plugin = self.datacollector.collectorPlugins.get(pname, None) 122 if not plugin: 123 log.warn("Unable to get plugin %s from %s", pname, 124 self.datacollector.collectorPlugins) 125 continue 126 127 results = plugin.preprocess(results, log) 128 datamaps = plugin.process(device, results, log) 129 #allow multiple maps to be returned from one plugin 130 if not isinstance(datamaps, (list, tuple, set)): 131 datamaps = [datamaps,] 132 for datamap in datamaps: 133 changed = self._applyDataMap(device, datamap) 134 if changed: devchanged=True 135 if devchanged: 136 device.setLastChange() 137 log.info("Changes applied") 138 else: 139 log.info("No change detected") 140 device.setSnmpLastCollection() 141 trans = transaction.get() 142 trans.setUser("datacoll") 143 trans.note("data applied from automated collection") 144 trans.commit() 145 except (SystemExit, KeyboardInterrupt): 146 raise 147 except: 148 transaction.abort() 149 log.exception("Plugin %s device %s", pname, device.getId())
150 151
152 - def applyDataMap(self, device, datamap, relname="", compname="", modname=""):
153 """Apply a datamap passed as a list of dicts through XML-RPC. 154 """ 155 from plugins.DataMaps import RelationshipMap, ObjectMap 156 if relname: 157 datamap = RelationshipMap(relname=relname, compname=compname, 158 modname=modname, objmaps=datamap) 159 else: 160 datamap = ObjectMap(datamap, compname=compname, modname=modname) 161 self._applyDataMap(device, datamap)
162 163
164 - def setDeviceClass(self, device, deviceClass=None):
165 """ 166 If a device class has been passed and the current class is not /Classifier 167 then move the device to the newly clssified device class. 168 """ 169 if deviceClass and device.getDeviceClassPath().startswith(CLASSIFIER_CLASS): 170 device.changeDeviceClass(deviceClass)
171 172 173 @transact
174 - def _applyDataMap(self, device, datamap):
175 """Apply a datamap to a device. 176 """ 177 # This can cause breakage in unit testing when the device is persisted. 178 if not hasattr(device.dmd, 'zport'): 179 transaction.abort() 180 181 if hasattr(datamap, "compname"): 182 if datamap.compname: 183 try: 184 tobj = device.getObjByPath(datamap.compname) 185 except NotFound: 186 log.warn("Unable to find compname '%s'" % datamap.compname) 187 return False 188 else: 189 tobj = device 190 if hasattr(datamap, "relname"): 191 changed = self._updateRelationship(tobj, datamap) 192 elif hasattr(datamap, 'modname'): 193 changed = self._updateObject(tobj, datamap) 194 else: 195 changed = False 196 log.warn("plugin returned unknown map skipping") 197 else: 198 changed = False 199 if not changed: 200 transaction.abort() 201 else: 202 device.setLastChange() 203 trans = transaction.get() 204 trans.setUser("datacoll") 205 trans.note("data applied from automated collection") 206 return changed
207 208
209 - def _updateRelationship(self, device, relmap):
210 """Add/Update/Remote objects to the target relationship. 211 """ 212 changed = False 213 rname = relmap.relname 214 rel = getattr(device, rname, None) 215 if not rel: 216 log.warn("no relationship:%s found on:%s (%s %s)", 217 relmap.relname, device.id, device.__class__, device.zPythonClass) 218 return changed 219 relids = rel.objectIdsAll() 220 seenids = defaultdict(int) 221 for objmap in relmap: 222 from Products.ZenModel.ZenModelRM import ZenModelRM 223 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'): 224 objmap_id = objmap.id 225 seenids[objmap_id] += 1 226 if seenids[objmap_id] > 1: 227 objmap_id = objmap.id = "%s_%s" % (objmap_id, seenids[objmap_id]) 228 if objmap_id in relids: 229 obj = rel._getOb(objmap_id) 230 231 # Handle the possibility of objects changing class by 232 # recreating them. Ticket #5598. 233 existing_modname = '' 234 existing_classname = '' 235 try: 236 import inspect 237 existing_modname = inspect.getmodule(obj).__name__ 238 existing_classname = obj.__class__.__name__ 239 except: 240 pass 241 242 if objmap.modname == existing_modname and \ 243 objmap.classname in ('', existing_classname): 244 245 objchange = self._updateObject(obj, objmap) 246 if not changed: changed = objchange 247 else: 248 rel._delObject(objmap_id) 249 objchange, obj = self._createRelObject(device, objmap, rname) 250 if not changed: changed = objchange 251 252 if objmap_id in relids: relids.remove(objmap_id) 253 else: 254 objchange, obj = self._createRelObject(device, objmap, rname) 255 if objchange: changed = True 256 if obj and obj.id in relids: relids.remove(obj.id) 257 elif isinstance(objmap, ZenModelRM): 258 self.logChange(device, objmap.id, Change_Add, 259 "linking object %s to device %s relation %s" % ( 260 objmap.id, device.id, rname)) 261 device.addRelation(rname, objmap) 262 changed = True 263 else: 264 objchange, obj = self._createRelObject(device, objmap, rname) 265 if objchange: changed = True 266 if obj and obj.id in relids: relids.remove(obj.id) 267 268 for id in relids: 269 obj = rel._getOb(id) 270 if isinstance(obj, Lockable) and obj.isLockedFromDeletion(): 271 objname = obj.id 272 try: objname = obj.name() 273 except: pass 274 msg = "Deletion Blocked: %s '%s' on %s" % ( 275 obj.meta_type, objname,obj.device().id) 276 log.warn(msg) 277 if obj.sendEventWhenBlocked(): 278 self.logEvent(device, obj, Change_Remove_Blocked, 279 msg, Event.Warning) 280 continue 281 self.logChange(device, obj, Change_Remove, 282 "removing object %s from rel %s on device %s" % ( 283 id, rname, device.id)) 284 rel._delObject(id) 285 if relids: changed=True 286 return changed
287 288
289 - def _updateObject(self, obj, objmap):
290 """Update an object using a objmap. 291 """ 292 changed = False 293 device = obj.device() 294 295 if isinstance(obj, Lockable) and obj.isLockedFromUpdates(): 296 if device.id == obj.id: 297 msg = 'Update Blocked: %s' % device.id 298 else: 299 objname = obj.id 300 try: objname = obj.name() 301 except: pass 302 msg = "Update Blocked: %s '%s' on %s" % ( 303 obj.meta_type, objname ,device.id) 304 log.warn(msg) 305 if obj.sendEventWhenBlocked(): 306 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning) 307 return changed 308 for attname, value in objmap.items(): 309 if attname.startswith('_'): 310 continue 311 if isinstance(value, basestring): 312 try: 313 # This looks confusing, and it is. The scenario is: 314 # A collector gathers some data as a raw byte stream, 315 # but really it has a specific encoding specified by 316 # by the zCollectorDecoding zProperty. Say, latin-1 or 317 # utf-16, etc. We need to decode that byte stream to get 318 # back a UnicodeString object. But, this version of Zope 319 # doesn't like UnicodeString objects for a variety of 320 # fields, such as object ids, so we then need to convert 321 # that UnicodeString back into a regular string of bytes, 322 # and for that we use the system default encoding, which 323 # is now utf-8. 324 codec = obj.zCollectorDecoding or sys.getdefaultencoding() 325 value = value.decode(codec) 326 value = value.encode(sys.getdefaultencoding()) 327 except UnicodeDecodeError: 328 # We don't know what to do with this, so don't set the 329 # value 330 continue 331 att = getattr(aq_base(obj), attname, zenmarker) 332 if att == zenmarker: 333 log.warn('The attribute %s was not found on object %s from device %s', 334 attname, obj.id, device.id) 335 continue 336 if callable(att): 337 setter = getattr(obj, attname) 338 gettername = attname.replace("set","get") 339 getter = getattr(obj, gettername, None) 340 341 if not getter: 342 343 log.warn("getter '%s' not found on obj '%s', " 344 "skipping", gettername, obj.id) 345 346 else: 347 348 from plugins.DataMaps import MultiArgs 349 if isinstance(value, MultiArgs): 350 351 args = value.args 352 change = not isSameData(value.args, getter()) 353 354 else: 355 356 args = (value,) 357 try: 358 change = not isSameData(value, getter()) 359 except UnicodeDecodeError: 360 change = True 361 362 if change: 363 setter(*args) 364 self.logChange(device, obj, Change_Set, 365 "calling function '%s' with '%s' on " 366 "object %s" % (attname, value, obj.id)) 367 changed = True 368 369 else: 370 try: 371 change = not isSameData(att, value) 372 except UnicodeDecodeError: 373 change = True 374 if change: 375 setattr(aq_base(obj), attname, value) 376 self.logChange(device, obj, Change_Set, 377 "set attribute '%s' " 378 "to '%s' on object '%s'" % 379 (attname, value, obj.id)) 380 changed = True 381 if not changed: 382 try: changed = obj._p_changed 383 except: pass 384 if changed: 385 if getattr(aq_base(obj), "index_object", False): 386 log.debug("indexing object %s", obj.id) 387 obj.index_object() 388 notify(IndexingEvent(obj)) 389 else: 390 obj._p_deactivate() 391 return changed
392 393
394 - def _createRelObject(self, device, objmap, relname):
395 """Create an object on a relationship using its objmap. 396 """ 397 constructor = importClass(objmap.modname, objmap.classname) 398 if hasattr(objmap, 'id'): 399 remoteObj = constructor(objmap.id) 400 else: 401 remoteObj = constructor(device, objmap) 402 if remoteObj is None: 403 log.debug("Constructor returned None") 404 return False, None 405 id = remoteObj.id 406 if not remoteObj: 407 raise ObjectCreationError( 408 "failed to create object %s in relation %s" % (id, relname)) 409 410 realdevice = device.device() 411 if realdevice.isLockedFromUpdates(): 412 objtype = "" 413 try: objtype = objmap.modname.split(".")[-1] 414 except: pass 415 msg = "Add Blocked: %s '%s' on %s" % ( 416 objtype, id, realdevice.id) 417 log.warn(msg) 418 if realdevice.sendEventWhenBlocked(): 419 self.logEvent(realdevice, id, Change_Add_Blocked, 420 msg, Event.Warning) 421 return False, None 422 rel = device._getOb(relname, None) 423 if not rel: 424 raise ObjectCreationError( 425 "No relation %s found on device %s (%s)" % (relname, device.id, device.__class__ )) 426 #"No relation %s found on device %s" % (relname, device.id)) 427 changed = False 428 try: 429 remoteObj = rel._getOb(remoteObj.id) 430 except AttributeError: 431 self.logChange(realdevice, remoteObj, Change_Add, 432 "adding object %s to relationship %s" % 433 (remoteObj.id, relname)) 434 rel._setObject(remoteObj.id, remoteObj) 435 remoteObj = rel._getOb(remoteObj.id) 436 changed = True 437 notify(ObjectMovedEvent(remoteObj, rel, remoteObj.id, rel, remoteObj.id)) 438 return self._updateObject(remoteObj, objmap) or changed, remoteObj
439 440
441 - def stop(self): pass
442
443 444 -class ApplyDataMapThread(threading.Thread, ApplyDataMap):
445 """ 446 Thread that applies datamaps to a device. It reads from a queue that 447 should have tuples of (devid, datamaps) where devid is the primaryId to 448 the device and datamps is a list of datamaps to apply. Cache is synced at 449 the start of each transaction and there is one transaction per device. 450 """ 451
452 - def __init__(self, datacollector, app):
453 threading.Thread.__init__(self) 454 ApplyDataMap.__init__(self, datacollector) 455 self.setName("ApplyDataMapThread") 456 self.setDaemon(1) 457 self.app = app 458 log.debug("Thread conn:%s", self.app._p_jar) 459 self.inputqueue = Queue.Queue() 460 self.done = False
461 462
463 - def processClient(self, device, collectorClient):
464 """Apply datamps to device. 465 """ 466 devpath = device.getPrimaryPath() 467 self.inputqueue.put((devpath, collectorClient))
468 469
470 - def run(self):
471 """Process collectorClients as they are passed in from a data collector. 472 """ 473 log.info("starting applyDataMap thread") 474 while not self.done or not self.inputqueue.empty(): 475 devpath = () 476 try: 477 devpath, collectorClient = self.inputqueue.get(True,1) 478 self.app._p_jar.sync() 479 device = getObjByPath(self.app, devpath) 480 ApplyDataMap.processClient(self, device, collectorClient) 481 except Queue.Empty: pass 482 except Exception: 483 transaction.abort() 484 log.exception("processing device %s", "/".join(devpath)) 485 log.info("stopping applyDataMap thread")
486 487
488 - def stop(self):
489 """Stop the thread once all devices are processed. 490 """ 491 self.done = True 492 self.join()
493