Package DataCollector :: Module ApplyDataMap
[hide private]
[frames] | no frames]

Source Code for Module DataCollector.ApplyDataMap

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import time 
 15  import types 
 16  import threading 
 17  import Queue 
 18   
 19  import transaction 
 20   
 21  from Acquisition import aq_base 
 22   
 23  from Products.ZenUtils.Utils import importClass, getObjByPath 
 24  from Exceptions import * 
 25  from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked 
 26  from Products.ZenModel.Lockable import Lockable 
 27  import Products.ZenEvents.Event as Event 
 28  import logging 
 29  log = logging.getLogger("zen.ApplyDataMap") 
 30   
 31  zenmarker = "__ZENMARKER__" 
 32   
 33  _notAscii = dict.fromkeys(range(128,256), u'?') 
 34   
35 -class ApplyDataMap(object):
36
37 - def __init__(self, datacollector=None):
38 self.datacollector = datacollector
39 40
41 - def logChange(self, device, compname, eventClass, msg):
42 if not getattr(device, 'zCollectorLogChanges', True): return 43 if type(msg) == type(u''): 44 msg = msg.translate(_notAscii) 45 self.logEvent(device, compname, eventClass, msg, Event.Info)
46 47
48 - def logEvent(self, device, component, eventClass, msg, severity):
49 ''' Used to report a change to a device model. Logs the given msg 50 to log.info and creates an event. 51 ''' 52 device = device.device() 53 compname = "" 54 try: 55 compname = getattr(component, 'id', component) 56 if hasattr(component, 'name') and callable(component.name): 57 compname = component.name() 58 elif device.id == compname: 59 compname = "" 60 except: pass 61 log.debug(msg) 62 devname = device.device().id 63 if (self.datacollector 64 # why is this line here? Blocks evnets from model in zope 65 #and getattr(self.datacollector, 'generateEvents', False) 66 and getattr(self.datacollector, 'dmd', None)): 67 eventDict = { 68 'eventClass': eventClass, 69 'device': devname, 70 'component': compname, 71 'summary': msg, 72 'severity': severity, 73 } 74 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
75 76
77 - def processClient(self, device, collectorClient):
78 """Apply datamps to device. 79 """ 80 log.debug("processing data for device %s", device.id) 81 devchanged = False 82 try: 83 #clientresults = collectorClient.getResults() 84 #clientresults.sort() 85 #for pname, results in clientresults: 86 for pname, results in collectorClient.getResults(): 87 log.debug("processing plugin %s on device %s", pname, device.id) 88 if not results: 89 log.warn("plugin %s no results returned", pname) 90 continue 91 plugin = self.datacollector.collectorPlugins.get(pname, None) 92 if not plugin: continue 93 results = plugin.preprocess(results, log) 94 datamaps = plugin.process(device, results, log) 95 #allow multiple maps to be returned from one plugin 96 if (type(datamaps) != types.ListType 97 and type(datamaps) != types.TupleType): 98 datamaps = [datamaps,] 99 for datamap in datamaps: 100 changed = self._applyDataMap(device, datamap) 101 if changed: devchanged=True 102 if devchanged: 103 device.setLastChange() 104 log.info("changes applied") 105 else: 106 log.info("no change detected") 107 device.setSnmpLastCollection() 108 trans = transaction.get() 109 trans.setUser("datacoll") 110 trans.note("data applied from automated collection") 111 trans.commit() 112 except (SystemExit, KeyboardInterrupt): 113 raise 114 except: 115 transaction.abort() 116 log.exception("plugin %s device %s", pname, device.getId())
117 118
119 - def applyDataMap(self, device, datamap, relname="", compname="",modname=""):
120 """Apply a datamap passed as a list of dicts through XML-RPC. 121 """ 122 from plugins.DataMaps import RelationshipMap, ObjectMap 123 if relname: 124 datamap = RelationshipMap(relname=relname, compname=compname, 125 modname=modname, objmaps=datamap) 126 else: 127 datamap = ObjectMap(datamap, compname=compname, modname=modname) 128 self._applyDataMap(device, datamap)
129 130 131
132 - def _applyDataMap(self, device, datamap):
133 """Apply a datamap to a device. 134 """ 135 changed = False 136 tobj = device 137 if getattr(datamap, "compname", None) is None: return changed 138 if datamap.compname: 139 tobj = getattr(device, datamap.compname) 140 if hasattr(datamap, "relname"): 141 changed = self._updateRelationship(tobj, datamap) 142 elif hasattr(datamap, 'modname'): 143 changed = self._updateObject(tobj, datamap) 144 else: 145 log.warn("plugin returned unknown map skipping") 146 return changed
147 148
149 - def _updateRelationship(self, device, relmap):
150 """Add/Update/Remote objects to the target relationship. 151 """ 152 changed = False 153 rname = relmap.relname 154 rel = getattr(device, rname, None) 155 if not rel: 156 log.warn("no relationship:%s found on:%s", 157 relmap.relname, device.id) 158 return changed 159 relids = rel.objectIdsAll() 160 seenids = {} 161 for objmap in relmap: 162 from Products.ZenModel.ZenModelRM import ZenModelRM 163 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'): 164 if seenids.has_key(objmap.id): 165 seenids[objmap.id] += 1 166 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id]) 167 else: 168 seenids[objmap.id] = 1 169 if objmap.id in relids: 170 objchange = False 171 obj = rel._getOb(objmap.id) 172 objchange = self._updateObject(obj, objmap) 173 if not changed: changed = objchange 174 relids.remove(objmap.id) 175 else: 176 changed = self._createRelObject(device, objmap, rname) 177 elif isinstance(objmap, ZenModelRM): 178 self.logChange(device, objmap.id, Change_Add, 179 "linking object %s to device %s relation %s" % ( 180 objmap.id, device.id, rname)) 181 device.addRelation(rname, objmap) 182 changed = True 183 else: 184 log.warn("ignoring objmap no id found") 185 for id in relids: 186 obj = rel._getOb(id) 187 if isinstance(obj, Lockable) and obj.isLockedFromDeletion(): 188 objname = obj.id 189 try: objname = obj.name() 190 except: pass 191 msg = "Deletion Blocked: %s '%s' on %s" % ( 192 obj.meta_type, objname,obj.device().id) 193 log.warn(msg) 194 if obj.sendEventWhenBlocked(): 195 self.logEvent(device, obj, Change_Remove_Blocked, 196 msg, Event.Warning) 197 continue 198 self.logChange(device, obj, Change_Remove, 199 "removing object %s from rel %s on device %s" % ( 200 id, rname, device.id)) 201 rel._delObject(id) 202 if relids: changed=True 203 return changed
204 205
206 - def _updateObject(self, obj, objmap):
207 """Update an object using a objmap. 208 """ 209 changed = False 210 device = obj.device() 211 if isinstance(obj, Lockable) and obj.isLockedFromUpdates(): 212 if device.id == obj.id: 213 msg = 'Update Blocked: %s' % device.id 214 else: 215 objname = obj.id 216 try: objname = obj.name() 217 except: pass 218 msg = "Update Blocked: %s '%s' on %s" % ( 219 obj.meta_type, objname ,device.id) 220 log.warn(msg) 221 if obj.sendEventWhenBlocked(): 222 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning) 223 return changed 224 for attname, value in objmap.items(): 225 if type(value) == type(''): 226 try: 227 value.encode('ascii') 228 except UnicodeDecodeError: 229 decoding = obj.zCollectorDecoding 230 value = value.decode(decoding) 231 if attname[0] == '_': continue 232 att = getattr(aq_base(obj), attname, zenmarker) 233 if att == zenmarker: 234 log.warn('attribute %s not found on object %s', 235 attname, obj.id) 236 continue 237 if callable(att): 238 setter = getattr(obj, attname) 239 gettername = attname.replace("set","get") 240 getter = getattr(obj, gettername, None) 241 if not getter: 242 log.warn("getter '%s' not found on obj '%s', " 243 "skipping", gettername, obj.id) 244 else: 245 try: 246 change = value != getter() 247 except UnicodeDecodeError: 248 change = True 249 if change: 250 setter(value) 251 self.logChange(device, obj, Change_Set, 252 "calling function '%s' with '%s' on " 253 "object %s" % (attname, value, obj.id)) 254 changed = True 255 else: 256 try: 257 change = att != value 258 except UnicodeDecodeError: 259 change = True 260 if change: 261 setattr(aq_base(obj), attname, value) 262 self.logChange(device, obj, Change_Set, 263 "set attribute '%s' " 264 "to '%s' on object '%s'" % 265 (attname, value, obj.id)) 266 changed = True 267 if not changed: 268 try: changed = obj._p_changed 269 except: pass 270 if getattr(aq_base(obj), "index_object", False) and changed: 271 log.debug("indexing object %s", obj.id) 272 obj.index_object() 273 if not changed: obj._p_deactivate() 274 return changed
275 276
277 - def _createRelObject(self, device, objmap, relname):
278 """Create an object on a relationship using its objmap. 279 """ 280 realdevice = device.device() 281 if realdevice.isLockedFromUpdates(): 282 objtype = "" 283 try: objtype = objmap.modname.split(".")[-1] 284 except: pass 285 msg = "Add Blocked: %s '%s' on %s" % ( 286 objtype, objmap.id, realdevice.id) 287 log.warn(msg) 288 if realdevice.sendEventWhenBlocked(): 289 self.logEvent(realdevice, objmap.id, Change_Add_Blocked, 290 msg, Event.Warning) 291 return False 292 id = objmap.id 293 constructor = importClass(objmap.modname, objmap.classname) 294 remoteObj = constructor(id) 295 if not remoteObj: 296 raise ObjectCreationError( 297 "failed to create object %s in relation %s" % (id, relname)) 298 rel = device._getOb(relname, None) 299 if rel: 300 rel._setObject(remoteObj.id, remoteObj) 301 else: 302 raise ObjectCreationError( 303 "No relation %s found on device %s" % (relname, device.id)) 304 remoteObj = rel._getOb(remoteObj.id) 305 self.logChange(realdevice, remoteObj, Change_Add, 306 "adding object %s to relationship %s" % ( 307 remoteObj.id, relname)) 308 self._updateObject(remoteObj, objmap) 309 return True
310 311
312 - def stop(self): pass
313 314
315 -class ApplyDataMapThread(threading.Thread, ApplyDataMap):
316 """ 317 Thread that applies datamaps to a device. It reads from a queue that 318 should have tuples of (devid, datamaps) where devid is the primaryId to 319 the device and datamps is a list of datamaps to apply. Cache is synced at 320 the start of each transaction and there is one transaction per device. 321 """ 322
323 - def __init__(self, datacollector, app):
324 threading.Thread.__init__(self) 325 ApplyDataMap.__init__(self, datacollector) 326 self.setName("ApplyDataMapThread") 327 self.setDaemon(1) 328 self.app = app 329 log.debug("Thread conn:%s", self.app._p_jar) 330 self.inputqueue = Queue.Queue() 331 self.done = False
332 333
334 - def processClient(self, device, collectorClient):
335 """Apply datamps to device. 336 """ 337 devpath = device.getPrimaryPath() 338 self.inputqueue.put((devpath, collectorClient))
339 340
341 - def run(self):
342 """Process collectorClients as they are passed in from a data collector. 343 """ 344 log.info("starting applyDataMap thread") 345 while not self.done or not self.inputqueue.empty(): 346 devpath = () 347 try: 348 devpath, collectorClient = self.inputqueue.get(True,1) 349 self.app._p_jar.sync() 350 device = getObjByPath(self.app, devpath) 351 ApplyDataMap.processClient(self, device, collectorClient) 352 except Queue.Empty: pass 353 except (SystemExit, KeyboardInterrupt): raise 354 except: 355 transaction.abort() 356 log.exception("processing device %s", "/".join(devpath)) 357 log.info("stopping applyDataMap thread")
358 359
360 - def stop(self):
361 """Stop the thread once all devices are processed. 362 """ 363 self.done = True 364 self.join()
365