Package DataCollector :: Module ApplyDataMap
[hide private]
[frames] | no frames]

Source Code for Module DataCollector.ApplyDataMap

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import types 
 15  import threading 
 16  import Queue 
 17   
 18  import transaction 
 19   
 20  from Acquisition import aq_base 
 21   
 22  from Products.ZenUtils.Utils import importClass, getObjByPath 
 23  from Exceptions import * 
 24  from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked 
 25  from Products.ZenModel.Lockable import Lockable 
 26  import Products.ZenEvents.Event as Event 
 27  import logging 
 28  log = logging.getLogger("zen.ApplyDataMap") 
 29   
 30  zenmarker = "__ZENMARKER__" 
 31   
 32  _notAscii = dict.fromkeys(range(128,256), u'?') 
 33   
34 -class ApplyDataMap(object):
35
36 - def __init__(self, datacollector=None):
37 self.datacollector = datacollector
38 39
40 - def logChange(self, device, compname, eventClass, msg):
41 if not getattr(device, 'zCollectorLogChanges', True): return 42 if type(msg) == type(u''): 43 msg = msg.translate(_notAscii) 44 self.logEvent(device, compname, eventClass, msg, Event.Info)
45 46
47 - def logEvent(self, device, component, eventClass, msg, severity):
48 ''' Used to report a change to a device model. Logs the given msg 49 to log.info and creates an event. 50 ''' 51 device = device.device() 52 compname = "" 53 try: 54 compname = getattr(component, 'id', component) 55 if hasattr(component, 'name') and callable(component.name): 56 compname = component.name() 57 elif device.id == compname: 58 compname = "" 59 except: pass 60 log.debug(msg) 61 devname = device.device().id 62 if (self.datacollector 63 # why is this line here? Blocks evnets from model in zope 64 #and getattr(self.datacollector, 'generateEvents', False) 65 and getattr(self.datacollector, 'dmd', None)): 66 eventDict = { 67 'eventClass': eventClass, 68 'device': devname, 69 'component': compname, 70 'summary': msg, 71 'severity': severity, 72 } 73 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
74 75
76 - def processClient(self, device, collectorClient):
77 """Apply datamps to device. 78 """ 79 log.debug("processing data for device %s", device.id) 80 devchanged = False 81 try: 82 #clientresults = collectorClient.getResults() 83 #clientresults.sort() 84 #for pname, results in clientresults: 85 for pname, results in collectorClient.getResults(): 86 log.debug("processing plugin %s on device %s", pname, device.id) 87 if not results: 88 log.warn("plugin %s no results returned", pname) 89 continue 90 plugin = self.datacollector.collectorPlugins.get(pname, None) 91 if not plugin: continue 92 results = plugin.preprocess(results, log) 93 datamaps = plugin.process(device, results, log) 94 #allow multiple maps to be returned from one plugin 95 if (type(datamaps) != types.ListType 96 and type(datamaps) != types.TupleType): 97 datamaps = [datamaps,] 98 for datamap in datamaps: 99 changed = self._applyDataMap(device, datamap) 100 if changed: devchanged=True 101 if devchanged: 102 device.setLastChange() 103 log.info("changes applied") 104 else: 105 log.info("no change detected") 106 device.setSnmpLastCollection() 107 trans = transaction.get() 108 trans.setUser("datacoll") 109 trans.note("data applied from automated collection") 110 trans.commit() 111 except (SystemExit, KeyboardInterrupt): 112 raise 113 except: 114 transaction.abort() 115 log.exception("plugin %s device %s", pname, device.getId())
116 117
118 - def applyDataMap(self, device, datamap, relname="", compname="",modname=""):
119 """Apply a datamap passed as a list of dicts through XML-RPC. 120 """ 121 from plugins.DataMaps import RelationshipMap, ObjectMap 122 if relname: 123 datamap = RelationshipMap(relname=relname, compname=compname, 124 modname=modname, objmaps=datamap) 125 else: 126 datamap = ObjectMap(datamap, compname=compname, modname=modname) 127 self._applyDataMap(device, datamap)
128 129 130
131 - def _applyDataMap(self, device, datamap):
132 """Apply a datamap to a device. 133 """ 134 device.dmd._p_jar.sync() 135 if hasattr(datamap, "compname"): 136 if datamap.compname: 137 tobj = getattr(device, datamap.compname) 138 else: 139 tobj = device 140 if hasattr(datamap, "relname"): 141 changed = self._updateRelationship(tobj, datamap) 142 elif hasattr(datamap, 'modname'): 143 changed = self._updateObject(tobj, datamap) 144 else: 145 changed = False 146 log.warn("plugin returned unknown map skipping") 147 else: 148 changed = False 149 if changed: 150 device.setLastChange() 151 trans = transaction.get() 152 trans.setUser("datacoll") 153 trans.note("data applied from automated collection") 154 trans.commit() 155 return changed
156 157
158 - def _updateRelationship(self, device, relmap):
159 """Add/Update/Remote objects to the target relationship. 160 """ 161 changed = False 162 rname = relmap.relname 163 rel = getattr(device, rname, None) 164 if not rel: 165 log.warn("no relationship:%s found on:%s", 166 relmap.relname, device.id) 167 return changed 168 relids = rel.objectIdsAll() 169 seenids = {} 170 for objmap in relmap: 171 from Products.ZenModel.ZenModelRM import ZenModelRM 172 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'): 173 if seenids.has_key(objmap.id): 174 seenids[objmap.id] += 1 175 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id]) 176 else: 177 seenids[objmap.id] = 1 178 if objmap.id in relids: 179 obj = rel._getOb(objmap.id) 180 objchange = self._updateObject(obj, objmap) 181 if not changed: changed = objchange 182 if objmap.id in relids: relids.remove(objmap.id) 183 else: 184 objchange, obj = self._createRelObject(device, objmap, rname) 185 if objchange: changed = True 186 if obj and obj.id in relids: relids.remove(obj.id) 187 elif isinstance(objmap, ZenModelRM): 188 self.logChange(device, objmap.id, Change_Add, 189 "linking object %s to device %s relation %s" % ( 190 objmap.id, device.id, rname)) 191 device.addRelation(rname, objmap) 192 changed = True 193 else: 194 objchange, obj = self._createRelObject(device, objmap, rname) 195 if objchange: changed = True 196 if obj and obj.id in relids: relids.remove(obj.id) 197 198 for id in relids: 199 obj = rel._getOb(id) 200 if isinstance(obj, Lockable) and obj.isLockedFromDeletion(): 201 objname = obj.id 202 try: objname = obj.name() 203 except: pass 204 msg = "Deletion Blocked: %s '%s' on %s" % ( 205 obj.meta_type, objname,obj.device().id) 206 log.warn(msg) 207 if obj.sendEventWhenBlocked(): 208 self.logEvent(device, obj, Change_Remove_Blocked, 209 msg, Event.Warning) 210 continue 211 self.logChange(device, obj, Change_Remove, 212 "removing object %s from rel %s on device %s" % ( 213 id, rname, device.id)) 214 rel._delObject(id) 215 if relids: changed=True 216 return changed
217 218
219 - def _updateObject(self, obj, objmap):
220 """Update an object using a objmap. 221 """ 222 changed = False 223 device = obj.device() 224 225 if isinstance(obj, Lockable) and obj.isLockedFromUpdates(): 226 if device.id == obj.id: 227 msg = 'Update Blocked: %s' % device.id 228 else: 229 objname = obj.id 230 try: objname = obj.name() 231 except: pass 232 msg = "Update Blocked: %s '%s' on %s" % ( 233 obj.meta_type, objname ,device.id) 234 log.warn(msg) 235 if obj.sendEventWhenBlocked(): 236 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning) 237 return changed 238 for attname, value in objmap.items(): 239 if type(value) == type(''): 240 try: 241 value.encode('ascii') 242 except UnicodeEncodeError: 243 decoding = obj.zCollectorDecoding 244 value = value.decode(decoding) 245 except UnicodeDecodeError: 246 continue 247 if attname[0] == '_': continue 248 att = getattr(aq_base(obj), attname, zenmarker) 249 if att == zenmarker: 250 log.warn('The attribute %s was not found on object %s from device %s', 251 attname, obj.id, device.id) 252 continue 253 if callable(att): 254 setter = getattr(obj, attname) 255 gettername = attname.replace("set","get") 256 getter = getattr(obj, gettername, None) 257 258 if not getter: 259 260 log.warn("getter '%s' not found on obj '%s', " 261 "skipping", gettername, obj.id) 262 263 else: 264 265 from plugins.DataMaps import MultiArgs 266 if isinstance(value, MultiArgs): 267 268 args = value.args 269 change = True 270 271 else: 272 273 args = (value,) 274 275 try: 276 change = value != getter() 277 except UnicodeDecodeError: 278 change = True 279 280 if change: 281 setter(*args) 282 self.logChange(device, obj, Change_Set, 283 "calling function '%s' with '%s' on " 284 "object %s" % (attname, value, obj.id)) 285 changed = True 286 287 else: 288 try: 289 change = att != value 290 except UnicodeDecodeError: 291 change = True 292 if change: 293 setattr(aq_base(obj), attname, value) 294 self.logChange(device, obj, Change_Set, 295 "set attribute '%s' " 296 "to '%s' on object '%s'" % 297 (attname, value, obj.id)) 298 changed = True 299 if not changed: 300 try: changed = obj._p_changed 301 except: pass 302 if getattr(aq_base(obj), "index_object", False) and changed: 303 log.debug("indexing object %s", obj.id) 304 obj.index_object() 305 if not changed: obj._p_deactivate() 306 return changed
307 308
309 - def _createRelObject(self, device, objmap, relname):
310 """Create an object on a relationship using its objmap. 311 """ 312 constructor = importClass(objmap.modname, objmap.classname) 313 if hasattr(objmap, 'id'): 314 remoteObj = constructor(objmap.id) 315 else: 316 remoteObj = constructor(device, objmap) 317 if remoteObj is None: 318 log.debug("Constructor returned None") 319 return False, None 320 id = remoteObj.id 321 if not remoteObj: 322 raise ObjectCreationError( 323 "failed to create object %s in relation %s" % (id, relname)) 324 325 realdevice = device.device() 326 if realdevice.isLockedFromUpdates(): 327 objtype = "" 328 try: objtype = objmap.modname.split(".")[-1] 329 except: pass 330 msg = "Add Blocked: %s '%s' on %s" % ( 331 objtype, id, realdevice.id) 332 log.warn(msg) 333 if realdevice.sendEventWhenBlocked(): 334 self.logEvent(realdevice, id, Change_Add_Blocked, 335 msg, Event.Warning) 336 return False, None 337 rel = device._getOb(relname, None) 338 if not rel: 339 raise ObjectCreationError( 340 "No relation %s found on device %s" % (relname, device.id)) 341 changed = False 342 try: 343 remoteObj = rel._getOb(remoteObj.id) 344 except AttributeError: 345 self.logChange(realdevice, remoteObj, Change_Add, 346 "adding object %s to relationship %s" % 347 (remoteObj.id, relname)) 348 rel._setObject(remoteObj.id, remoteObj) 349 remoteObj = rel._getOb(remoteObj.id) 350 changed = True 351 return self._updateObject(remoteObj, objmap) or changed, remoteObj
352 353
354 - def stop(self): pass
355 356
357 -class ApplyDataMapThread(threading.Thread, ApplyDataMap):
358 """ 359 Thread that applies datamaps to a device. It reads from a queue that 360 should have tuples of (devid, datamaps) where devid is the primaryId to 361 the device and datamps is a list of datamaps to apply. Cache is synced at 362 the start of each transaction and there is one transaction per device. 363 """ 364
365 - def __init__(self, datacollector, app):
366 threading.Thread.__init__(self) 367 ApplyDataMap.__init__(self, datacollector) 368 self.setName("ApplyDataMapThread") 369 self.setDaemon(1) 370 self.app = app 371 log.debug("Thread conn:%s", self.app._p_jar) 372 self.inputqueue = Queue.Queue() 373 self.done = False
374 375
376 - def processClient(self, device, collectorClient):
377 """Apply datamps to device. 378 """ 379 devpath = device.getPrimaryPath() 380 self.inputqueue.put((devpath, collectorClient))
381 382
383 - def run(self):
384 """Process collectorClients as they are passed in from a data collector. 385 """ 386 log.info("starting applyDataMap thread") 387 while not self.done or not self.inputqueue.empty(): 388 devpath = () 389 try: 390 devpath, collectorClient = self.inputqueue.get(True,1) 391 self.app._p_jar.sync() 392 device = getObjByPath(self.app, devpath) 393 ApplyDataMap.processClient(self, device, collectorClient) 394 except Queue.Empty: pass 395 except (SystemExit, KeyboardInterrupt): raise 396 except: 397 transaction.abort() 398 log.exception("processing device %s", "/".join(devpath)) 399 log.info("stopping applyDataMap thread")
400 401
402 - def stop(self):
403 """Stop the thread once all devices are processed. 404 """ 405 self.done = True 406 self.join()
407