1
2
3
4
5
6
7
8
9
10
11
12
13
14 import types
15 import threading
16 import Queue
17
18 import transaction
19
20 from Acquisition import aq_base
21
22 from Products.ZenUtils.Utils import importClass, getObjByPath
23 from Exceptions import *
24 from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked
25 from Products.ZenModel.Lockable import Lockable
26 import Products.ZenEvents.Event as Event
27 import logging
28 log = logging.getLogger("zen.ApplyDataMap")
29
30 zenmarker = "__ZENMARKER__"
31
32 _notAscii = dict.fromkeys(range(128,256), u'?')
33
35
37 self.datacollector = datacollector
38
39
40 - def logChange(self, device, compname, eventClass, msg):
45
46
47 - def logEvent(self, device, component, eventClass, msg, severity):
48 ''' Used to report a change to a device model. Logs the given msg
49 to log.info and creates an event.
50 '''
51 device = device.device()
52 compname = ""
53 try:
54 compname = getattr(component, 'id', component)
55 if hasattr(component, 'name') and callable(component.name):
56 compname = component.name()
57 elif device.id == compname:
58 compname = ""
59 except: pass
60 log.debug(msg)
61 devname = device.device().id
62 if (self.datacollector
63
64
65 and getattr(self.datacollector, 'dmd', None)):
66 eventDict = {
67 'eventClass': eventClass,
68 'device': devname,
69 'component': compname,
70 'summary': msg,
71 'severity': severity,
72 }
73 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
74
75
77 """Apply datamps to device.
78 """
79 log.debug("processing data for device %s", device.id)
80 devchanged = False
81 try:
82
83
84
85 for pname, results in collectorClient.getResults():
86 log.debug("processing plugin %s on device %s", pname, device.id)
87 if not results:
88 log.warn("plugin %s no results returned", pname)
89 continue
90 plugin = self.datacollector.collectorPlugins.get(pname, None)
91 if not plugin: continue
92 results = plugin.preprocess(results, log)
93 datamaps = plugin.process(device, results, log)
94
95 if (type(datamaps) != types.ListType
96 and type(datamaps) != types.TupleType):
97 datamaps = [datamaps,]
98 for datamap in datamaps:
99 changed = self._applyDataMap(device, datamap)
100 if changed: devchanged=True
101 if devchanged:
102 device.setLastChange()
103 log.info("changes applied")
104 else:
105 log.info("no change detected")
106 device.setSnmpLastCollection()
107 trans = transaction.get()
108 trans.setUser("datacoll")
109 trans.note("data applied from automated collection")
110 trans.commit()
111 except (SystemExit, KeyboardInterrupt):
112 raise
113 except:
114 transaction.abort()
115 log.exception("plugin %s device %s", pname, device.getId())
116
117
118 - def applyDataMap(self, device, datamap, relname="", compname="",modname=""):
128
129
130
132 """Apply a datamap to a device.
133 """
134 persist = True
135 try:
136 device.dmd._p_jar.sync()
137 except AttributeError:
138
139 persist = False
140
141 if hasattr(datamap, "compname"):
142 if datamap.compname:
143 tobj = getattr(device, datamap.compname)
144 else:
145 tobj = device
146 if hasattr(datamap, "relname"):
147 changed = self._updateRelationship(tobj, datamap)
148 elif hasattr(datamap, 'modname'):
149 changed = self._updateObject(tobj, datamap)
150 else:
151 changed = False
152 log.warn("plugin returned unknown map skipping")
153 else:
154 changed = False
155 if changed and persist:
156 device.setLastChange()
157 trans = transaction.get()
158 trans.setUser("datacoll")
159 trans.note("data applied from automated collection")
160 trans.commit()
161 return changed
162
163
165 """Add/Update/Remote objects to the target relationship.
166 """
167 changed = False
168 rname = relmap.relname
169 rel = getattr(device, rname, None)
170 if not rel:
171 log.warn("no relationship:%s found on:%s",
172 relmap.relname, device.id)
173 return changed
174 relids = rel.objectIdsAll()
175 seenids = {}
176 for objmap in relmap:
177 from Products.ZenModel.ZenModelRM import ZenModelRM
178 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'):
179 if seenids.has_key(objmap.id):
180 seenids[objmap.id] += 1
181 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id])
182 else:
183 seenids[objmap.id] = 1
184 if objmap.id in relids:
185 obj = rel._getOb(objmap.id)
186 objchange = self._updateObject(obj, objmap)
187 if not changed: changed = objchange
188 if objmap.id in relids: relids.remove(objmap.id)
189 else:
190 objchange, obj = self._createRelObject(device, objmap, rname)
191 if objchange: changed = True
192 if obj and obj.id in relids: relids.remove(obj.id)
193 elif isinstance(objmap, ZenModelRM):
194 self.logChange(device, objmap.id, Change_Add,
195 "linking object %s to device %s relation %s" % (
196 objmap.id, device.id, rname))
197 device.addRelation(rname, objmap)
198 changed = True
199 else:
200 objchange, obj = self._createRelObject(device, objmap, rname)
201 if objchange: changed = True
202 if obj and obj.id in relids: relids.remove(obj.id)
203
204 for id in relids:
205 obj = rel._getOb(id)
206 if isinstance(obj, Lockable) and obj.isLockedFromDeletion():
207 objname = obj.id
208 try: objname = obj.name()
209 except: pass
210 msg = "Deletion Blocked: %s '%s' on %s" % (
211 obj.meta_type, objname,obj.device().id)
212 log.warn(msg)
213 if obj.sendEventWhenBlocked():
214 self.logEvent(device, obj, Change_Remove_Blocked,
215 msg, Event.Warning)
216 continue
217 self.logChange(device, obj, Change_Remove,
218 "removing object %s from rel %s on device %s" % (
219 id, rname, device.id))
220 rel._delObject(id)
221 if relids: changed=True
222 return changed
223
224
226 """Update an object using a objmap.
227 """
228 changed = False
229 device = obj.device()
230
231 if isinstance(obj, Lockable) and obj.isLockedFromUpdates():
232 if device.id == obj.id:
233 msg = 'Update Blocked: %s' % device.id
234 else:
235 objname = obj.id
236 try: objname = obj.name()
237 except: pass
238 msg = "Update Blocked: %s '%s' on %s" % (
239 obj.meta_type, objname ,device.id)
240 log.warn(msg)
241 if obj.sendEventWhenBlocked():
242 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning)
243 return changed
244 for attname, value in objmap.items():
245 if type(value) == type(''):
246 try:
247 value.encode('ascii')
248 except UnicodeEncodeError:
249 decoding = obj.zCollectorDecoding
250 value = value.decode(decoding)
251 except UnicodeDecodeError:
252 continue
253 if attname[0] == '_': continue
254 att = getattr(aq_base(obj), attname, zenmarker)
255 if att == zenmarker:
256 log.warn('The attribute %s was not found on object %s from device %s',
257 attname, obj.id, device.id)
258 continue
259 if callable(att):
260 setter = getattr(obj, attname)
261 gettername = attname.replace("set","get")
262 getter = getattr(obj, gettername, None)
263
264 if not getter:
265
266 log.warn("getter '%s' not found on obj '%s', "
267 "skipping", gettername, obj.id)
268
269 else:
270
271 from plugins.DataMaps import MultiArgs
272 if isinstance(value, MultiArgs):
273
274 args = value.args
275 change = True
276
277 else:
278
279 args = (value,)
280
281 try:
282 change = value != getter()
283 except UnicodeDecodeError:
284 change = True
285
286 if change:
287 setter(*args)
288 self.logChange(device, obj, Change_Set,
289 "calling function '%s' with '%s' on "
290 "object %s" % (attname, value, obj.id))
291 changed = True
292
293 else:
294 try:
295 change = att != value
296 except UnicodeDecodeError:
297 change = True
298 if change:
299 setattr(aq_base(obj), attname, value)
300 self.logChange(device, obj, Change_Set,
301 "set attribute '%s' "
302 "to '%s' on object '%s'" %
303 (attname, value, obj.id))
304 changed = True
305 if not changed:
306 try: changed = obj._p_changed
307 except: pass
308 if getattr(aq_base(obj), "index_object", False) and changed:
309 log.debug("indexing object %s", obj.id)
310 obj.index_object()
311 if not changed: obj._p_deactivate()
312 return changed
313
314
316 """Create an object on a relationship using its objmap.
317 """
318 constructor = importClass(objmap.modname, objmap.classname)
319 if hasattr(objmap, 'id'):
320 remoteObj = constructor(objmap.id)
321 else:
322 remoteObj = constructor(device, objmap)
323 if remoteObj is None:
324 log.debug("Constructor returned None")
325 return False, None
326 id = remoteObj.id
327 if not remoteObj:
328 raise ObjectCreationError(
329 "failed to create object %s in relation %s" % (id, relname))
330
331 realdevice = device.device()
332 if realdevice.isLockedFromUpdates():
333 objtype = ""
334 try: objtype = objmap.modname.split(".")[-1]
335 except: pass
336 msg = "Add Blocked: %s '%s' on %s" % (
337 objtype, id, realdevice.id)
338 log.warn(msg)
339 if realdevice.sendEventWhenBlocked():
340 self.logEvent(realdevice, id, Change_Add_Blocked,
341 msg, Event.Warning)
342 return False, None
343 rel = device._getOb(relname, None)
344 if not rel:
345 raise ObjectCreationError(
346 "No relation %s found on device %s" % (relname, device.id))
347 changed = False
348 try:
349 remoteObj = rel._getOb(remoteObj.id)
350 except AttributeError:
351 self.logChange(realdevice, remoteObj, Change_Add,
352 "adding object %s to relationship %s" %
353 (remoteObj.id, relname))
354 rel._setObject(remoteObj.id, remoteObj)
355 remoteObj = rel._getOb(remoteObj.id)
356 changed = True
357 return self._updateObject(remoteObj, objmap) or changed, remoteObj
358
359
360 - def stop(self): pass
361
362
364 """
365 Thread that applies datamaps to a device. It reads from a queue that
366 should have tuples of (devid, datamaps) where devid is the primaryId to
367 the device and datamps is a list of datamaps to apply. Cache is synced at
368 the start of each transaction and there is one transaction per device.
369 """
370
371 - def __init__(self, datacollector, app):
372 threading.Thread.__init__(self)
373 ApplyDataMap.__init__(self, datacollector)
374 self.setName("ApplyDataMapThread")
375 self.setDaemon(1)
376 self.app = app
377 log.debug("Thread conn:%s", self.app._p_jar)
378 self.inputqueue = Queue.Queue()
379 self.done = False
380
381
383 """Apply datamps to device.
384 """
385 devpath = device.getPrimaryPath()
386 self.inputqueue.put((devpath, collectorClient))
387
388
390 """Process collectorClients as they are passed in from a data collector.
391 """
392 log.info("starting applyDataMap thread")
393 while not self.done or not self.inputqueue.empty():
394 devpath = ()
395 try:
396 devpath, collectorClient = self.inputqueue.get(True,1)
397 self.app._p_jar.sync()
398 device = getObjByPath(self.app, devpath)
399 ApplyDataMap.processClient(self, device, collectorClient)
400 except Queue.Empty: pass
401 except (SystemExit, KeyboardInterrupt): raise
402 except:
403 transaction.abort()
404 log.exception("processing device %s", "/".join(devpath))
405 log.info("stopping applyDataMap thread")
406
407
409 """Stop the thread once all devices are processed.
410 """
411 self.done = True
412 self.join()
413