Package Products :: Package DataCollector :: Module ApplyDataMap
[hide private]
[frames] | no frames]

Source Code for Module Products.DataCollector.ApplyDataMap

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import types 
 15  import threading 
 16  import Queue 
 17   
 18  import transaction 
 19   
 20  from Acquisition import aq_base 
 21   
 22  from Products.ZenUtils.Utils import importClass, getObjByPath 
 23  from Exceptions import * 
 24  from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked 
 25  from Products.ZenModel.Lockable import Lockable 
 26  import Products.ZenEvents.Event as Event 
 27  import logging 
 28  log = logging.getLogger("zen.ApplyDataMap") 
 29   
 30  zenmarker = "__ZENMARKER__" 
 31   
 32  _notAscii = dict.fromkeys(range(128,256), u'?') 
 33   
 34   
35 -def isSameData(x, y):
36 """ 37 A more comprehensive check to see if existing model data is the same as 38 newly modeled data. The primary focus is comparing unsorted lists of 39 dictionaries. 40 """ 41 if isinstance(x, (tuple, list)) and isinstance(y, (tuple, list)): 42 if len(x) > 0 and len(y) > 0 \ 43 and isinstance(x[0], dict) and isinstance(y[0], dict): 44 45 x = set([ tuple(sorted(d.items())) for d in x ]) 46 y = set([ tuple(sorted(d.items())) for d in y ]) 47 48 return x == y
49 50
51 -class ApplyDataMap(object):
52
53 - def __init__(self, datacollector=None):
54 self.datacollector = datacollector
55 56
57 - def logChange(self, device, compname, eventClass, msg):
58 if not getattr(device, 'zCollectorLogChanges', True): return 59 if type(msg) == type(u''): 60 msg = msg.translate(_notAscii) 61 self.logEvent(device, compname, eventClass, msg, Event.Info)
62 63
64 - def logEvent(self, device, component, eventClass, msg, severity):
65 ''' Used to report a change to a device model. Logs the given msg 66 to log.info and creates an event. 67 ''' 68 device = device.device() 69 compname = "" 70 try: 71 compname = getattr(component, 'id', component) 72 if hasattr(component, 'name') and callable(component.name): 73 compname = component.name() 74 elif device.id == compname: 75 compname = "" 76 except: pass 77 log.debug(msg) 78 devname = device.device().id 79 if (self.datacollector 80 # why is this line here? Blocks evnets from model in zope 81 #and getattr(self.datacollector, 'generateEvents', False) 82 and getattr(self.datacollector, 'dmd', None)): 83 eventDict = { 84 'eventClass': eventClass, 85 'device': devname, 86 'component': compname, 87 'summary': msg, 88 'severity': severity, 89 'agent': 'ApplyDataMap', 90 'explanation': "Event sent as zCollectorLogChanges is True", 91 } 92 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
93 94
95 - def processClient(self, device, collectorClient):
96 """Apply datamps to device. 97 """ 98 log.debug("processing data for device %s", device.id) 99 devchanged = False 100 try: 101 #clientresults = collectorClient.getResults() 102 #clientresults.sort() 103 #for pname, results in clientresults: 104 for pname, results in collectorClient.getResults(): 105 log.debug("processing plugin %s on device %s", pname, device.id) 106 if not results: 107 log.warn("plugin %s no results returned", pname) 108 continue 109 plugin = self.datacollector.collectorPlugins.get(pname, None) 110 if not plugin: continue 111 results = plugin.preprocess(results, log) 112 datamaps = plugin.process(device, results, log) 113 #allow multiple maps to be returned from one plugin 114 if (type(datamaps) != types.ListType 115 and type(datamaps) != types.TupleType): 116 datamaps = [datamaps,] 117 for datamap in datamaps: 118 changed = self._applyDataMap(device, datamap) 119 if changed: devchanged=True 120 if devchanged: 121 device.setLastChange() 122 log.info("changes applied") 123 else: 124 log.info("no change detected") 125 device.setSnmpLastCollection() 126 trans = transaction.get() 127 trans.setUser("datacoll") 128 trans.note("data applied from automated collection") 129 trans.commit() 130 except (SystemExit, KeyboardInterrupt): 131 raise 132 except: 133 transaction.abort() 134 log.exception("plugin %s device %s", pname, device.getId())
135 136
137 - def applyDataMap(self, device, datamap, relname="", compname="",modname=""):
138 """Apply a datamap passed as a list of dicts through XML-RPC. 139 """ 140 from plugins.DataMaps import RelationshipMap, ObjectMap 141 if relname: 142 datamap = RelationshipMap(relname=relname, compname=compname, 143 modname=modname, objmaps=datamap) 144 else: 145 datamap = ObjectMap(datamap, compname=compname, modname=modname) 146 self._applyDataMap(device, datamap)
147 148 149
150 - def _applyDataMap(self, device, datamap):
151 """Apply a datamap to a device. 152 """ 153 persist = True 154 try: 155 device.dmd._p_jar.sync() 156 except AttributeError: 157 # This can occur in unit testing when the device is not persisted. 158 persist = False 159 160 if hasattr(datamap, "compname"): 161 if datamap.compname: 162 tobj = getattr(device, datamap.compname) 163 else: 164 tobj = device 165 if hasattr(datamap, "relname"): 166 changed = self._updateRelationship(tobj, datamap) 167 elif hasattr(datamap, 'modname'): 168 changed = self._updateObject(tobj, datamap) 169 else: 170 changed = False 171 log.warn("plugin returned unknown map skipping") 172 else: 173 changed = False 174 if changed and persist: 175 device.setLastChange() 176 trans = transaction.get() 177 trans.setUser("datacoll") 178 trans.note("data applied from automated collection") 179 trans.commit() 180 return changed
181 182
183 - def _updateRelationship(self, device, relmap):
184 """Add/Update/Remote objects to the target relationship. 185 """ 186 changed = False 187 rname = relmap.relname 188 rel = getattr(device, rname, None) 189 if not rel: 190 log.warn("no relationship:%s found on:%s", 191 relmap.relname, device.id) 192 return changed 193 relids = rel.objectIdsAll() 194 seenids = {} 195 for objmap in relmap: 196 from Products.ZenModel.ZenModelRM import ZenModelRM 197 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'): 198 if seenids.has_key(objmap.id): 199 seenids[objmap.id] += 1 200 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id]) 201 else: 202 seenids[objmap.id] = 1 203 if objmap.id in relids: 204 obj = rel._getOb(objmap.id) 205 206 # Handle the possibility of objects changing class by 207 # recreating them. Ticket #5598. 208 existing_modname = '' 209 existing_classname = '' 210 try: 211 import inspect 212 existing_modname = inspect.getmodule(obj).__name__ 213 existing_classname = obj.__class__.__name__ 214 except: 215 pass 216 217 if objmap.modname == existing_modname and \ 218 objmap.classname in ('', existing_classname): 219 220 objchange = self._updateObject(obj, objmap) 221 if not changed: changed = objchange 222 else: 223 rel._delObject(objmap.id) 224 objchange, obj = self._createRelObject(device, objmap, rname) 225 if not changed: changed = objchange 226 227 if objmap.id in relids: relids.remove(objmap.id) 228 else: 229 objchange, obj = self._createRelObject(device, objmap, rname) 230 if objchange: changed = True 231 if obj and obj.id in relids: relids.remove(obj.id) 232 elif isinstance(objmap, ZenModelRM): 233 self.logChange(device, objmap.id, Change_Add, 234 "linking object %s to device %s relation %s" % ( 235 objmap.id, device.id, rname)) 236 device.addRelation(rname, objmap) 237 changed = True 238 else: 239 objchange, obj = self._createRelObject(device, objmap, rname) 240 if objchange: changed = True 241 if obj and obj.id in relids: relids.remove(obj.id) 242 243 for id in relids: 244 obj = rel._getOb(id) 245 if isinstance(obj, Lockable) and obj.isLockedFromDeletion(): 246 objname = obj.id 247 try: objname = obj.name() 248 except: pass 249 msg = "Deletion Blocked: %s '%s' on %s" % ( 250 obj.meta_type, objname,obj.device().id) 251 log.warn(msg) 252 if obj.sendEventWhenBlocked(): 253 self.logEvent(device, obj, Change_Remove_Blocked, 254 msg, Event.Warning) 255 continue 256 self.logChange(device, obj, Change_Remove, 257 "removing object %s from rel %s on device %s" % ( 258 id, rname, device.id)) 259 rel._delObject(id) 260 if relids: changed=True 261 return changed
262 263
264 - def _updateObject(self, obj, objmap):
265 """Update an object using a objmap. 266 """ 267 changed = False 268 device = obj.device() 269 270 if isinstance(obj, Lockable) and obj.isLockedFromUpdates(): 271 if device.id == obj.id: 272 msg = 'Update Blocked: %s' % device.id 273 else: 274 objname = obj.id 275 try: objname = obj.name() 276 except: pass 277 msg = "Update Blocked: %s '%s' on %s" % ( 278 obj.meta_type, objname ,device.id) 279 log.warn(msg) 280 if obj.sendEventWhenBlocked(): 281 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning) 282 return changed 283 for attname, value in objmap.items(): 284 if type(value) == type(''): 285 try: 286 value.encode('ascii') 287 except UnicodeEncodeError: 288 decoding = obj.zCollectorDecoding 289 value = value.decode(decoding) 290 except UnicodeDecodeError: 291 continue 292 if attname[0] == '_': continue 293 att = getattr(aq_base(obj), attname, zenmarker) 294 if att == zenmarker: 295 log.warn('The attribute %s was not found on object %s from device %s', 296 attname, obj.id, device.id) 297 continue 298 if callable(att): 299 setter = getattr(obj, attname) 300 gettername = attname.replace("set","get") 301 getter = getattr(obj, gettername, None) 302 303 if not getter: 304 305 log.warn("getter '%s' not found on obj '%s', " 306 "skipping", gettername, obj.id) 307 308 else: 309 310 from plugins.DataMaps import MultiArgs 311 if isinstance(value, MultiArgs): 312 313 args = value.args 314 change = True 315 316 else: 317 318 args = (value,) 319 try: 320 change = not isSameData(value, getter()) 321 except UnicodeDecodeError: 322 change = True 323 324 if change: 325 setter(*args) 326 self.logChange(device, obj, Change_Set, 327 "calling function '%s' with '%s' on " 328 "object %s" % (attname, value, obj.id)) 329 changed = True 330 331 else: 332 try: 333 change = att != value 334 except UnicodeDecodeError: 335 change = True 336 if change: 337 setattr(aq_base(obj), attname, value) 338 self.logChange(device, obj, Change_Set, 339 "set attribute '%s' " 340 "to '%s' on object '%s'" % 341 (attname, value, obj.id)) 342 changed = True 343 if not changed: 344 try: changed = obj._p_changed 345 except: pass 346 if getattr(aq_base(obj), "index_object", False) and changed: 347 log.debug("indexing object %s", obj.id) 348 obj.index_object() 349 if not changed: obj._p_deactivate() 350 return changed
351 352
353 - def _createRelObject(self, device, objmap, relname):
354 """Create an object on a relationship using its objmap. 355 """ 356 constructor = importClass(objmap.modname, objmap.classname) 357 if hasattr(objmap, 'id'): 358 remoteObj = constructor(objmap.id) 359 else: 360 remoteObj = constructor(device, objmap) 361 if remoteObj is None: 362 log.debug("Constructor returned None") 363 return False, None 364 id = remoteObj.id 365 if not remoteObj: 366 raise ObjectCreationError( 367 "failed to create object %s in relation %s" % (id, relname)) 368 369 realdevice = device.device() 370 if realdevice.isLockedFromUpdates(): 371 objtype = "" 372 try: objtype = objmap.modname.split(".")[-1] 373 except: pass 374 msg = "Add Blocked: %s '%s' on %s" % ( 375 objtype, id, realdevice.id) 376 log.warn(msg) 377 if realdevice.sendEventWhenBlocked(): 378 self.logEvent(realdevice, id, Change_Add_Blocked, 379 msg, Event.Warning) 380 return False, None 381 rel = device._getOb(relname, None) 382 if not rel: 383 raise ObjectCreationError( 384 "No relation %s found on device %s" % (relname, device.id)) 385 changed = False 386 try: 387 remoteObj = rel._getOb(remoteObj.id) 388 except AttributeError: 389 self.logChange(realdevice, remoteObj, Change_Add, 390 "adding object %s to relationship %s" % 391 (remoteObj.id, relname)) 392 rel._setObject(remoteObj.id, remoteObj) 393 remoteObj = rel._getOb(remoteObj.id) 394 changed = True 395 return self._updateObject(remoteObj, objmap) or changed, remoteObj
396 397
398 - def stop(self): pass
399 400
401 -class ApplyDataMapThread(threading.Thread, ApplyDataMap):
402 """ 403 Thread that applies datamaps to a device. It reads from a queue that 404 should have tuples of (devid, datamaps) where devid is the primaryId to 405 the device and datamps is a list of datamaps to apply. Cache is synced at 406 the start of each transaction and there is one transaction per device. 407 """ 408
409 - def __init__(self, datacollector, app):
410 threading.Thread.__init__(self) 411 ApplyDataMap.__init__(self, datacollector) 412 self.setName("ApplyDataMapThread") 413 self.setDaemon(1) 414 self.app = app 415 log.debug("Thread conn:%s", self.app._p_jar) 416 self.inputqueue = Queue.Queue() 417 self.done = False
418 419
420 - def processClient(self, device, collectorClient):
421 """Apply datamps to device. 422 """ 423 devpath = device.getPrimaryPath() 424 self.inputqueue.put((devpath, collectorClient))
425 426
427 - def run(self):
428 """Process collectorClients as they are passed in from a data collector. 429 """ 430 log.info("starting applyDataMap thread") 431 while not self.done or not self.inputqueue.empty(): 432 devpath = () 433 try: 434 devpath, collectorClient = self.inputqueue.get(True,1) 435 self.app._p_jar.sync() 436 device = getObjByPath(self.app, devpath) 437 ApplyDataMap.processClient(self, device, collectorClient) 438 except Queue.Empty: pass 439 except (SystemExit, KeyboardInterrupt): raise 440 except: 441 transaction.abort() 442 log.exception("processing device %s", "/".join(devpath)) 443 log.info("stopping applyDataMap thread")
444 445
446 - def stop(self):
447 """Stop the thread once all devices are processed. 448 """ 449 self.done = True 450 self.join()
451