Package DataCollector :: Module ApplyDataMap
[hide private]
[frames] | no frames]

Source Code for Module DataCollector.ApplyDataMap

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import types 
 15  import threading 
 16  import Queue 
 17   
 18  import transaction 
 19   
 20  from Acquisition import aq_base 
 21   
 22  from Products.ZenUtils.Utils import importClass, getObjByPath 
 23  from Exceptions import * 
 24  from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked 
 25  from Products.ZenModel.Lockable import Lockable 
 26  import Products.ZenEvents.Event as Event 
 27  import logging 
 28  log = logging.getLogger("zen.ApplyDataMap") 
 29   
 30  zenmarker = "__ZENMARKER__" 
 31   
 32  _notAscii = dict.fromkeys(range(128,256), u'?') 
 33   
34 -class ApplyDataMap(object):
35
36 - def __init__(self, datacollector=None):
37 self.datacollector = datacollector
38 39
40 - def logChange(self, device, compname, eventClass, msg):
41 if not getattr(device, 'zCollectorLogChanges', True): return 42 if type(msg) == type(u''): 43 msg = msg.translate(_notAscii) 44 self.logEvent(device, compname, eventClass, msg, Event.Info)
45 46
47 - def logEvent(self, device, component, eventClass, msg, severity):
48 ''' Used to report a change to a device model. Logs the given msg 49 to log.info and creates an event. 50 ''' 51 device = device.device() 52 compname = "" 53 try: 54 compname = getattr(component, 'id', component) 55 if hasattr(component, 'name') and callable(component.name): 56 compname = component.name() 57 elif device.id == compname: 58 compname = "" 59 except: pass 60 log.debug(msg) 61 devname = device.device().id 62 if (self.datacollector 63 # why is this line here? Blocks evnets from model in zope 64 #and getattr(self.datacollector, 'generateEvents', False) 65 and getattr(self.datacollector, 'dmd', None)): 66 eventDict = { 67 'eventClass': eventClass, 68 'device': devname, 69 'component': compname, 70 'summary': msg, 71 'severity': severity, 72 } 73 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
74 75
76 - def processClient(self, device, collectorClient):
77 """Apply datamps to device. 78 """ 79 log.debug("processing data for device %s", device.id) 80 devchanged = False 81 try: 82 #clientresults = collectorClient.getResults() 83 #clientresults.sort() 84 #for pname, results in clientresults: 85 for pname, results in collectorClient.getResults(): 86 log.debug("processing plugin %s on device %s", pname, device.id) 87 if not results: 88 log.warn("plugin %s no results returned", pname) 89 continue 90 plugin = self.datacollector.collectorPlugins.get(pname, None) 91 if not plugin: continue 92 results = plugin.preprocess(results, log) 93 datamaps = plugin.process(device, results, log) 94 #allow multiple maps to be returned from one plugin 95 if (type(datamaps) != types.ListType 96 and type(datamaps) != types.TupleType): 97 datamaps = [datamaps,] 98 for datamap in datamaps: 99 changed = self._applyDataMap(device, datamap) 100 if changed: devchanged=True 101 if devchanged: 102 device.setLastChange() 103 log.info("changes applied") 104 else: 105 log.info("no change detected") 106 device.setSnmpLastCollection() 107 trans = transaction.get() 108 trans.setUser("datacoll") 109 trans.note("data applied from automated collection") 110 trans.commit() 111 except (SystemExit, KeyboardInterrupt): 112 raise 113 except: 114 transaction.abort() 115 log.exception("plugin %s device %s", pname, device.getId())
116 117
118 - def applyDataMap(self, device, datamap, relname="", compname="",modname=""):
119 """Apply a datamap passed as a list of dicts through XML-RPC. 120 """ 121 from plugins.DataMaps import RelationshipMap, ObjectMap 122 if relname: 123 datamap = RelationshipMap(relname=relname, compname=compname, 124 modname=modname, objmaps=datamap) 125 else: 126 datamap = ObjectMap(datamap, compname=compname, modname=modname) 127 self._applyDataMap(device, datamap)
128 129 130
131 - def _applyDataMap(self, device, datamap):
132 """Apply a datamap to a device. 133 """ 134 persist = True 135 try: 136 device.dmd._p_jar.sync() 137 except AttributeError: 138 # This can occur in unit testing when the device is not persisted. 139 persist = False 140 141 if hasattr(datamap, "compname"): 142 if datamap.compname: 143 tobj = getattr(device, datamap.compname) 144 else: 145 tobj = device 146 if hasattr(datamap, "relname"): 147 changed = self._updateRelationship(tobj, datamap) 148 elif hasattr(datamap, 'modname'): 149 changed = self._updateObject(tobj, datamap) 150 else: 151 changed = False 152 log.warn("plugin returned unknown map skipping") 153 else: 154 changed = False 155 if changed and persist: 156 device.setLastChange() 157 trans = transaction.get() 158 trans.setUser("datacoll") 159 trans.note("data applied from automated collection") 160 trans.commit() 161 return changed
162 163
164 - def _updateRelationship(self, device, relmap):
165 """Add/Update/Remote objects to the target relationship. 166 """ 167 changed = False 168 rname = relmap.relname 169 rel = getattr(device, rname, None) 170 if not rel: 171 log.warn("no relationship:%s found on:%s", 172 relmap.relname, device.id) 173 return changed 174 relids = rel.objectIdsAll() 175 seenids = {} 176 for objmap in relmap: 177 from Products.ZenModel.ZenModelRM import ZenModelRM 178 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'): 179 if seenids.has_key(objmap.id): 180 seenids[objmap.id] += 1 181 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id]) 182 else: 183 seenids[objmap.id] = 1 184 if objmap.id in relids: 185 obj = rel._getOb(objmap.id) 186 objchange = self._updateObject(obj, objmap) 187 if not changed: changed = objchange 188 if objmap.id in relids: relids.remove(objmap.id) 189 else: 190 objchange, obj = self._createRelObject(device, objmap, rname) 191 if objchange: changed = True 192 if obj and obj.id in relids: relids.remove(obj.id) 193 elif isinstance(objmap, ZenModelRM): 194 self.logChange(device, objmap.id, Change_Add, 195 "linking object %s to device %s relation %s" % ( 196 objmap.id, device.id, rname)) 197 device.addRelation(rname, objmap) 198 changed = True 199 else: 200 objchange, obj = self._createRelObject(device, objmap, rname) 201 if objchange: changed = True 202 if obj and obj.id in relids: relids.remove(obj.id) 203 204 for id in relids: 205 obj = rel._getOb(id) 206 if isinstance(obj, Lockable) and obj.isLockedFromDeletion(): 207 objname = obj.id 208 try: objname = obj.name() 209 except: pass 210 msg = "Deletion Blocked: %s '%s' on %s" % ( 211 obj.meta_type, objname,obj.device().id) 212 log.warn(msg) 213 if obj.sendEventWhenBlocked(): 214 self.logEvent(device, obj, Change_Remove_Blocked, 215 msg, Event.Warning) 216 continue 217 self.logChange(device, obj, Change_Remove, 218 "removing object %s from rel %s on device %s" % ( 219 id, rname, device.id)) 220 rel._delObject(id) 221 if relids: changed=True 222 return changed
223 224
225 - def _updateObject(self, obj, objmap):
226 """Update an object using a objmap. 227 """ 228 changed = False 229 device = obj.device() 230 231 if isinstance(obj, Lockable) and obj.isLockedFromUpdates(): 232 if device.id == obj.id: 233 msg = 'Update Blocked: %s' % device.id 234 else: 235 objname = obj.id 236 try: objname = obj.name() 237 except: pass 238 msg = "Update Blocked: %s '%s' on %s" % ( 239 obj.meta_type, objname ,device.id) 240 log.warn(msg) 241 if obj.sendEventWhenBlocked(): 242 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning) 243 return changed 244 for attname, value in objmap.items(): 245 if type(value) == type(''): 246 try: 247 value.encode('ascii') 248 except UnicodeEncodeError: 249 decoding = obj.zCollectorDecoding 250 value = value.decode(decoding) 251 except UnicodeDecodeError: 252 continue 253 if attname[0] == '_': continue 254 att = getattr(aq_base(obj), attname, zenmarker) 255 if att == zenmarker: 256 log.warn('The attribute %s was not found on object %s from device %s', 257 attname, obj.id, device.id) 258 continue 259 if callable(att): 260 setter = getattr(obj, attname) 261 gettername = attname.replace("set","get") 262 getter = getattr(obj, gettername, None) 263 264 if not getter: 265 266 log.warn("getter '%s' not found on obj '%s', " 267 "skipping", gettername, obj.id) 268 269 else: 270 271 from plugins.DataMaps import MultiArgs 272 if isinstance(value, MultiArgs): 273 274 args = value.args 275 change = True 276 277 else: 278 279 args = (value,) 280 281 try: 282 change = value != getter() 283 except UnicodeDecodeError: 284 change = True 285 286 if change: 287 setter(*args) 288 self.logChange(device, obj, Change_Set, 289 "calling function '%s' with '%s' on " 290 "object %s" % (attname, value, obj.id)) 291 changed = True 292 293 else: 294 try: 295 change = att != value 296 except UnicodeDecodeError: 297 change = True 298 if change: 299 setattr(aq_base(obj), attname, value) 300 self.logChange(device, obj, Change_Set, 301 "set attribute '%s' " 302 "to '%s' on object '%s'" % 303 (attname, value, obj.id)) 304 changed = True 305 if not changed: 306 try: changed = obj._p_changed 307 except: pass 308 if getattr(aq_base(obj), "index_object", False) and changed: 309 log.debug("indexing object %s", obj.id) 310 obj.index_object() 311 if not changed: obj._p_deactivate() 312 return changed
313 314
315 - def _createRelObject(self, device, objmap, relname):
316 """Create an object on a relationship using its objmap. 317 """ 318 constructor = importClass(objmap.modname, objmap.classname) 319 if hasattr(objmap, 'id'): 320 remoteObj = constructor(objmap.id) 321 else: 322 remoteObj = constructor(device, objmap) 323 if remoteObj is None: 324 log.debug("Constructor returned None") 325 return False, None 326 id = remoteObj.id 327 if not remoteObj: 328 raise ObjectCreationError( 329 "failed to create object %s in relation %s" % (id, relname)) 330 331 realdevice = device.device() 332 if realdevice.isLockedFromUpdates(): 333 objtype = "" 334 try: objtype = objmap.modname.split(".")[-1] 335 except: pass 336 msg = "Add Blocked: %s '%s' on %s" % ( 337 objtype, id, realdevice.id) 338 log.warn(msg) 339 if realdevice.sendEventWhenBlocked(): 340 self.logEvent(realdevice, id, Change_Add_Blocked, 341 msg, Event.Warning) 342 return False, None 343 rel = device._getOb(relname, None) 344 if not rel: 345 raise ObjectCreationError( 346 "No relation %s found on device %s" % (relname, device.id)) 347 changed = False 348 try: 349 remoteObj = rel._getOb(remoteObj.id) 350 except AttributeError: 351 self.logChange(realdevice, remoteObj, Change_Add, 352 "adding object %s to relationship %s" % 353 (remoteObj.id, relname)) 354 rel._setObject(remoteObj.id, remoteObj) 355 remoteObj = rel._getOb(remoteObj.id) 356 changed = True 357 return self._updateObject(remoteObj, objmap) or changed, remoteObj
358 359
360 - def stop(self): pass
361 362
363 -class ApplyDataMapThread(threading.Thread, ApplyDataMap):
364 """ 365 Thread that applies datamaps to a device. It reads from a queue that 366 should have tuples of (devid, datamaps) where devid is the primaryId to 367 the device and datamps is a list of datamaps to apply. Cache is synced at 368 the start of each transaction and there is one transaction per device. 369 """ 370
371 - def __init__(self, datacollector, app):
372 threading.Thread.__init__(self) 373 ApplyDataMap.__init__(self, datacollector) 374 self.setName("ApplyDataMapThread") 375 self.setDaemon(1) 376 self.app = app 377 log.debug("Thread conn:%s", self.app._p_jar) 378 self.inputqueue = Queue.Queue() 379 self.done = False
380 381
382 - def processClient(self, device, collectorClient):
383 """Apply datamps to device. 384 """ 385 devpath = device.getPrimaryPath() 386 self.inputqueue.put((devpath, collectorClient))
387 388
389 - def run(self):
390 """Process collectorClients as they are passed in from a data collector. 391 """ 392 log.info("starting applyDataMap thread") 393 while not self.done or not self.inputqueue.empty(): 394 devpath = () 395 try: 396 devpath, collectorClient = self.inputqueue.get(True,1) 397 self.app._p_jar.sync() 398 device = getObjByPath(self.app, devpath) 399 ApplyDataMap.processClient(self, device, collectorClient) 400 except Queue.Empty: pass 401 except (SystemExit, KeyboardInterrupt): raise 402 except: 403 transaction.abort() 404 log.exception("processing device %s", "/".join(devpath)) 405 log.info("stopping applyDataMap thread")
406 407
408 - def stop(self):
409 """Stop the thread once all devices are processed. 410 """ 411 self.done = True 412 self.join()
413