1
2
3
4
5
6
7
8
9
10
11
12
13
14 import types
15 import threading
16 import Queue
17
18 import transaction
19
20 from Acquisition import aq_base
21
22 from Products.ZenUtils.Utils import importClass, getObjByPath
23 from Exceptions import *
24 from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked
25 from Products.ZenModel.Lockable import Lockable
26 import Products.ZenEvents.Event as Event
27 import logging
28 log = logging.getLogger("zen.ApplyDataMap")
29
30 zenmarker = "__ZENMARKER__"
31
32 _notAscii = dict.fromkeys(range(128,256), u'?')
33
35
37 self.datacollector = datacollector
38
39
40 - def logChange(self, device, compname, eventClass, msg):
45
46
47 - def logEvent(self, device, component, eventClass, msg, severity):
48 ''' Used to report a change to a device model. Logs the given msg
49 to log.info and creates an event.
50 '''
51 device = device.device()
52 compname = ""
53 try:
54 compname = getattr(component, 'id', component)
55 if hasattr(component, 'name') and callable(component.name):
56 compname = component.name()
57 elif device.id == compname:
58 compname = ""
59 except: pass
60 log.debug(msg)
61 devname = device.device().id
62 if (self.datacollector
63
64
65 and getattr(self.datacollector, 'dmd', None)):
66 eventDict = {
67 'eventClass': eventClass,
68 'device': devname,
69 'component': compname,
70 'summary': msg,
71 'severity': severity,
72 }
73 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
74
75
77 """Apply datamps to device.
78 """
79 log.debug("processing data for device %s", device.id)
80 devchanged = False
81 try:
82
83
84
85 for pname, results in collectorClient.getResults():
86 log.debug("processing plugin %s on device %s", pname, device.id)
87 if not results:
88 log.warn("plugin %s no results returned", pname)
89 continue
90 plugin = self.datacollector.collectorPlugins.get(pname, None)
91 if not plugin: continue
92 results = plugin.preprocess(results, log)
93 datamaps = plugin.process(device, results, log)
94
95 if (type(datamaps) != types.ListType
96 and type(datamaps) != types.TupleType):
97 datamaps = [datamaps,]
98 for datamap in datamaps:
99 changed = self._applyDataMap(device, datamap)
100 if changed: devchanged=True
101 if devchanged:
102 device.setLastChange()
103 log.info("changes applied")
104 else:
105 log.info("no change detected")
106 device.setSnmpLastCollection()
107 trans = transaction.get()
108 trans.setUser("datacoll")
109 trans.note("data applied from automated collection")
110 trans.commit()
111 except (SystemExit, KeyboardInterrupt):
112 raise
113 except:
114 transaction.abort()
115 log.exception("plugin %s device %s", pname, device.getId())
116
117
118 - def applyDataMap(self, device, datamap, relname="", compname="",modname=""):
128
129
130
132 """Apply a datamap to a device.
133 """
134 device.dmd._p_jar.sync()
135 if hasattr(datamap, "compname"):
136 if datamap.compname:
137 tobj = getattr(device, datamap.compname)
138 else:
139 tobj = device
140 if hasattr(datamap, "relname"):
141 changed = self._updateRelationship(tobj, datamap)
142 elif hasattr(datamap, 'modname'):
143 changed = self._updateObject(tobj, datamap)
144 else:
145 changed = False
146 log.warn("plugin returned unknown map skipping")
147 else:
148 changed = False
149 if changed:
150 device.setLastChange()
151 trans = transaction.get()
152 trans.setUser("datacoll")
153 trans.note("data applied from automated collection")
154 trans.commit()
155 return changed
156
157
159 """Add/Update/Remote objects to the target relationship.
160 """
161 changed = False
162 rname = relmap.relname
163 rel = getattr(device, rname, None)
164 if not rel:
165 log.warn("no relationship:%s found on:%s",
166 relmap.relname, device.id)
167 return changed
168 relids = rel.objectIdsAll()
169 seenids = {}
170 for objmap in relmap:
171 from Products.ZenModel.ZenModelRM import ZenModelRM
172 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'):
173 if seenids.has_key(objmap.id):
174 seenids[objmap.id] += 1
175 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id])
176 else:
177 seenids[objmap.id] = 1
178 if objmap.id in relids:
179 obj = rel._getOb(objmap.id)
180 objchange = self._updateObject(obj, objmap)
181 if not changed: changed = objchange
182 if objmap.id in relids: relids.remove(objmap.id)
183 else:
184 objchange, obj = self._createRelObject(device, objmap, rname)
185 if objchange: changed = True
186 if obj and obj.id in relids: relids.remove(obj.id)
187 elif isinstance(objmap, ZenModelRM):
188 self.logChange(device, objmap.id, Change_Add,
189 "linking object %s to device %s relation %s" % (
190 objmap.id, device.id, rname))
191 device.addRelation(rname, objmap)
192 changed = True
193 else:
194 objchange, obj = self._createRelObject(device, objmap, rname)
195 if objchange: changed = True
196 if obj and obj.id in relids: relids.remove(obj.id)
197
198 for id in relids:
199 obj = rel._getOb(id)
200 if isinstance(obj, Lockable) and obj.isLockedFromDeletion():
201 objname = obj.id
202 try: objname = obj.name()
203 except: pass
204 msg = "Deletion Blocked: %s '%s' on %s" % (
205 obj.meta_type, objname,obj.device().id)
206 log.warn(msg)
207 if obj.sendEventWhenBlocked():
208 self.logEvent(device, obj, Change_Remove_Blocked,
209 msg, Event.Warning)
210 continue
211 self.logChange(device, obj, Change_Remove,
212 "removing object %s from rel %s on device %s" % (
213 id, rname, device.id))
214 rel._delObject(id)
215 if relids: changed=True
216 return changed
217
218
220 """Update an object using a objmap.
221 """
222 changed = False
223 device = obj.device()
224
225 if isinstance(obj, Lockable) and obj.isLockedFromUpdates():
226 if device.id == obj.id:
227 msg = 'Update Blocked: %s' % device.id
228 else:
229 objname = obj.id
230 try: objname = obj.name()
231 except: pass
232 msg = "Update Blocked: %s '%s' on %s" % (
233 obj.meta_type, objname ,device.id)
234 log.warn(msg)
235 if obj.sendEventWhenBlocked():
236 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning)
237 return changed
238 for attname, value in objmap.items():
239 if type(value) == type(''):
240 try:
241 value.encode('ascii')
242 except UnicodeEncodeError:
243 decoding = obj.zCollectorDecoding
244 value = value.decode(decoding)
245 except UnicodeDecodeError:
246 continue
247 if attname[0] == '_': continue
248 att = getattr(aq_base(obj), attname, zenmarker)
249 if att == zenmarker:
250 log.warn('The attribute %s was not found on object %s from device %s',
251 attname, obj.id, device.id)
252 continue
253 if callable(att):
254 setter = getattr(obj, attname)
255 gettername = attname.replace("set","get")
256 getter = getattr(obj, gettername, None)
257
258 if not getter:
259
260 log.warn("getter '%s' not found on obj '%s', "
261 "skipping", gettername, obj.id)
262
263 else:
264
265 from plugins.DataMaps import MultiArgs
266 if isinstance(value, MultiArgs):
267
268 args = value.args
269 change = True
270
271 else:
272
273 args = (value,)
274
275 try:
276 change = value != getter()
277 except UnicodeDecodeError:
278 change = True
279
280 if change:
281 setter(*args)
282 self.logChange(device, obj, Change_Set,
283 "calling function '%s' with '%s' on "
284 "object %s" % (attname, value, obj.id))
285 changed = True
286
287 else:
288 try:
289 change = att != value
290 except UnicodeDecodeError:
291 change = True
292 if change:
293 setattr(aq_base(obj), attname, value)
294 self.logChange(device, obj, Change_Set,
295 "set attribute '%s' "
296 "to '%s' on object '%s'" %
297 (attname, value, obj.id))
298 changed = True
299 if not changed:
300 try: changed = obj._p_changed
301 except: pass
302 if getattr(aq_base(obj), "index_object", False) and changed:
303 log.debug("indexing object %s", obj.id)
304 obj.index_object()
305 if not changed: obj._p_deactivate()
306 return changed
307
308
310 """Create an object on a relationship using its objmap.
311 """
312 constructor = importClass(objmap.modname, objmap.classname)
313 if hasattr(objmap, 'id'):
314 remoteObj = constructor(objmap.id)
315 else:
316 remoteObj = constructor(device, objmap)
317 if remoteObj is None:
318 log.debug("Constructor returned None")
319 return False, None
320 id = remoteObj.id
321 if not remoteObj:
322 raise ObjectCreationError(
323 "failed to create object %s in relation %s" % (id, relname))
324
325 realdevice = device.device()
326 if realdevice.isLockedFromUpdates():
327 objtype = ""
328 try: objtype = objmap.modname.split(".")[-1]
329 except: pass
330 msg = "Add Blocked: %s '%s' on %s" % (
331 objtype, id, realdevice.id)
332 log.warn(msg)
333 if realdevice.sendEventWhenBlocked():
334 self.logEvent(realdevice, id, Change_Add_Blocked,
335 msg, Event.Warning)
336 return False, None
337 rel = device._getOb(relname, None)
338 if not rel:
339 raise ObjectCreationError(
340 "No relation %s found on device %s" % (relname, device.id))
341 changed = False
342 try:
343 remoteObj = rel._getOb(remoteObj.id)
344 except AttributeError:
345 self.logChange(realdevice, remoteObj, Change_Add,
346 "adding object %s to relationship %s" %
347 (remoteObj.id, relname))
348 rel._setObject(remoteObj.id, remoteObj)
349 remoteObj = rel._getOb(remoteObj.id)
350 changed = True
351 return self._updateObject(remoteObj, objmap) or changed, remoteObj
352
353
354 - def stop(self): pass
355
356
358 """
359 Thread that applies datamaps to a device. It reads from a queue that
360 should have tuples of (devid, datamaps) where devid is the primaryId to
361 the device and datamps is a list of datamaps to apply. Cache is synced at
362 the start of each transaction and there is one transaction per device.
363 """
364
365 - def __init__(self, datacollector, app):
366 threading.Thread.__init__(self)
367 ApplyDataMap.__init__(self, datacollector)
368 self.setName("ApplyDataMapThread")
369 self.setDaemon(1)
370 self.app = app
371 log.debug("Thread conn:%s", self.app._p_jar)
372 self.inputqueue = Queue.Queue()
373 self.done = False
374
375
377 """Apply datamps to device.
378 """
379 devpath = device.getPrimaryPath()
380 self.inputqueue.put((devpath, collectorClient))
381
382
384 """Process collectorClients as they are passed in from a data collector.
385 """
386 log.info("starting applyDataMap thread")
387 while not self.done or not self.inputqueue.empty():
388 devpath = ()
389 try:
390 devpath, collectorClient = self.inputqueue.get(True,1)
391 self.app._p_jar.sync()
392 device = getObjByPath(self.app, devpath)
393 ApplyDataMap.processClient(self, device, collectorClient)
394 except Queue.Empty: pass
395 except (SystemExit, KeyboardInterrupt): raise
396 except:
397 transaction.abort()
398 log.exception("processing device %s", "/".join(devpath))
399 log.info("stopping applyDataMap thread")
400
401
403 """Stop the thread once all devices are processed.
404 """
405 self.done = True
406 self.join()
407