1
2
3
4
5
6
7
8
9
10
11
12
13
14 import types
15 import threading
16 import Queue
17
18 import transaction
19
20 from Acquisition import aq_base
21
22 from Products.ZenUtils.Utils import importClass, getObjByPath
23 from Exceptions import *
24 from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked
25 from Products.ZenModel.Lockable import Lockable
26 import Products.ZenEvents.Event as Event
27 import logging
28 log = logging.getLogger("zen.ApplyDataMap")
29
30 zenmarker = "__ZENMARKER__"
31
32 _notAscii = dict.fromkeys(range(128,256), u'?')
33
34
36 """
37 A more comprehensive check to see if existing model data is the same as
38 newly modeled data. The primary focus is comparing unsorted lists of
39 dictionaries.
40 """
41 if isinstance(x, (tuple, list)) and isinstance(y, (tuple, list)):
42 if len(x) > 0 and len(y) > 0 \
43 and isinstance(x[0], dict) and isinstance(y[0], dict):
44
45 x = set([ tuple(sorted(d.items())) for d in x ])
46 y = set([ tuple(sorted(d.items())) for d in y ])
47
48 return x == y
49
50
52
54 self.datacollector = datacollector
55
56
57 - def logChange(self, device, compname, eventClass, msg):
62
63
64 - def logEvent(self, device, component, eventClass, msg, severity):
65 ''' Used to report a change to a device model. Logs the given msg
66 to log.info and creates an event.
67 '''
68 device = device.device()
69 compname = ""
70 try:
71 compname = getattr(component, 'id', component)
72 if hasattr(component, 'name') and callable(component.name):
73 compname = component.name()
74 elif device.id == compname:
75 compname = ""
76 except: pass
77 log.debug(msg)
78 devname = device.device().id
79 if (self.datacollector
80
81
82 and getattr(self.datacollector, 'dmd', None)):
83 eventDict = {
84 'eventClass': eventClass,
85 'device': devname,
86 'component': compname,
87 'summary': msg,
88 'severity': severity,
89 'agent': 'ApplyDataMap',
90 'explanation': "Event sent as zCollectorLogChanges is True",
91 }
92 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
93
94
96 """Apply datamps to device.
97 """
98 log.debug("processing data for device %s", device.id)
99 devchanged = False
100 try:
101
102
103
104 for pname, results in collectorClient.getResults():
105 log.debug("processing plugin %s on device %s", pname, device.id)
106 if not results:
107 log.warn("plugin %s no results returned", pname)
108 continue
109 plugin = self.datacollector.collectorPlugins.get(pname, None)
110 if not plugin: continue
111 results = plugin.preprocess(results, log)
112 datamaps = plugin.process(device, results, log)
113
114 if (type(datamaps) != types.ListType
115 and type(datamaps) != types.TupleType):
116 datamaps = [datamaps,]
117 for datamap in datamaps:
118 changed = self._applyDataMap(device, datamap)
119 if changed: devchanged=True
120 if devchanged:
121 device.setLastChange()
122 log.info("changes applied")
123 else:
124 log.info("no change detected")
125 device.setSnmpLastCollection()
126 trans = transaction.get()
127 trans.setUser("datacoll")
128 trans.note("data applied from automated collection")
129 trans.commit()
130 except (SystemExit, KeyboardInterrupt):
131 raise
132 except:
133 transaction.abort()
134 log.exception("plugin %s device %s", pname, device.getId())
135
136
137 - def applyDataMap(self, device, datamap, relname="", compname="",modname=""):
147
148
149
151 """Apply a datamap to a device.
152 """
153 persist = True
154 try:
155 device.dmd._p_jar.sync()
156 except AttributeError:
157
158 persist = False
159
160 if hasattr(datamap, "compname"):
161 if datamap.compname:
162 tobj = getattr(device, datamap.compname)
163 else:
164 tobj = device
165 if hasattr(datamap, "relname"):
166 changed = self._updateRelationship(tobj, datamap)
167 elif hasattr(datamap, 'modname'):
168 changed = self._updateObject(tobj, datamap)
169 else:
170 changed = False
171 log.warn("plugin returned unknown map skipping")
172 else:
173 changed = False
174 if changed and persist:
175 device.setLastChange()
176 trans = transaction.get()
177 trans.setUser("datacoll")
178 trans.note("data applied from automated collection")
179 trans.commit()
180 return changed
181
182
184 """Add/Update/Remote objects to the target relationship.
185 """
186 changed = False
187 rname = relmap.relname
188 rel = getattr(device, rname, None)
189 if not rel:
190 log.warn("no relationship:%s found on:%s",
191 relmap.relname, device.id)
192 return changed
193 relids = rel.objectIdsAll()
194 seenids = {}
195 for objmap in relmap:
196 from Products.ZenModel.ZenModelRM import ZenModelRM
197 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'):
198 if seenids.has_key(objmap.id):
199 seenids[objmap.id] += 1
200 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id])
201 else:
202 seenids[objmap.id] = 1
203 if objmap.id in relids:
204 obj = rel._getOb(objmap.id)
205
206
207
208 existing_modname = ''
209 existing_classname = ''
210 try:
211 import inspect
212 existing_modname = inspect.getmodule(obj).__name__
213 existing_classname = obj.__class__.__name__
214 except:
215 pass
216
217 if objmap.modname == existing_modname and \
218 objmap.classname in ('', existing_classname):
219
220 objchange = self._updateObject(obj, objmap)
221 if not changed: changed = objchange
222 else:
223 rel._delObject(objmap.id)
224 objchange, obj = self._createRelObject(device, objmap, rname)
225 if not changed: changed = objchange
226
227 if objmap.id in relids: relids.remove(objmap.id)
228 else:
229 objchange, obj = self._createRelObject(device, objmap, rname)
230 if objchange: changed = True
231 if obj and obj.id in relids: relids.remove(obj.id)
232 elif isinstance(objmap, ZenModelRM):
233 self.logChange(device, objmap.id, Change_Add,
234 "linking object %s to device %s relation %s" % (
235 objmap.id, device.id, rname))
236 device.addRelation(rname, objmap)
237 changed = True
238 else:
239 objchange, obj = self._createRelObject(device, objmap, rname)
240 if objchange: changed = True
241 if obj and obj.id in relids: relids.remove(obj.id)
242
243 for id in relids:
244 obj = rel._getOb(id)
245 if isinstance(obj, Lockable) and obj.isLockedFromDeletion():
246 objname = obj.id
247 try: objname = obj.name()
248 except: pass
249 msg = "Deletion Blocked: %s '%s' on %s" % (
250 obj.meta_type, objname,obj.device().id)
251 log.warn(msg)
252 if obj.sendEventWhenBlocked():
253 self.logEvent(device, obj, Change_Remove_Blocked,
254 msg, Event.Warning)
255 continue
256 self.logChange(device, obj, Change_Remove,
257 "removing object %s from rel %s on device %s" % (
258 id, rname, device.id))
259 rel._delObject(id)
260 if relids: changed=True
261 return changed
262
263
265 """Update an object using a objmap.
266 """
267 changed = False
268 device = obj.device()
269
270 if isinstance(obj, Lockable) and obj.isLockedFromUpdates():
271 if device.id == obj.id:
272 msg = 'Update Blocked: %s' % device.id
273 else:
274 objname = obj.id
275 try: objname = obj.name()
276 except: pass
277 msg = "Update Blocked: %s '%s' on %s" % (
278 obj.meta_type, objname ,device.id)
279 log.warn(msg)
280 if obj.sendEventWhenBlocked():
281 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning)
282 return changed
283 for attname, value in objmap.items():
284 if type(value) == type(''):
285 try:
286 value.encode('ascii')
287 except UnicodeEncodeError:
288 decoding = obj.zCollectorDecoding
289 value = value.decode(decoding)
290 except UnicodeDecodeError:
291 continue
292 if attname[0] == '_': continue
293 att = getattr(aq_base(obj), attname, zenmarker)
294 if att == zenmarker:
295 log.warn('The attribute %s was not found on object %s from device %s',
296 attname, obj.id, device.id)
297 continue
298 if callable(att):
299 setter = getattr(obj, attname)
300 gettername = attname.replace("set","get")
301 getter = getattr(obj, gettername, None)
302
303 if not getter:
304
305 log.warn("getter '%s' not found on obj '%s', "
306 "skipping", gettername, obj.id)
307
308 else:
309
310 from plugins.DataMaps import MultiArgs
311 if isinstance(value, MultiArgs):
312
313 args = value.args
314 change = True
315
316 else:
317
318 args = (value,)
319 try:
320 change = not isSameData(value, getter())
321 except UnicodeDecodeError:
322 change = True
323
324 if change:
325 setter(*args)
326 self.logChange(device, obj, Change_Set,
327 "calling function '%s' with '%s' on "
328 "object %s" % (attname, value, obj.id))
329 changed = True
330
331 else:
332 try:
333 change = att != value
334 except UnicodeDecodeError:
335 change = True
336 if change:
337 setattr(aq_base(obj), attname, value)
338 self.logChange(device, obj, Change_Set,
339 "set attribute '%s' "
340 "to '%s' on object '%s'" %
341 (attname, value, obj.id))
342 changed = True
343 if not changed:
344 try: changed = obj._p_changed
345 except: pass
346 if getattr(aq_base(obj), "index_object", False) and changed:
347 log.debug("indexing object %s", obj.id)
348 obj.index_object()
349 if not changed: obj._p_deactivate()
350 return changed
351
352
354 """Create an object on a relationship using its objmap.
355 """
356 constructor = importClass(objmap.modname, objmap.classname)
357 if hasattr(objmap, 'id'):
358 remoteObj = constructor(objmap.id)
359 else:
360 remoteObj = constructor(device, objmap)
361 if remoteObj is None:
362 log.debug("Constructor returned None")
363 return False, None
364 id = remoteObj.id
365 if not remoteObj:
366 raise ObjectCreationError(
367 "failed to create object %s in relation %s" % (id, relname))
368
369 realdevice = device.device()
370 if realdevice.isLockedFromUpdates():
371 objtype = ""
372 try: objtype = objmap.modname.split(".")[-1]
373 except: pass
374 msg = "Add Blocked: %s '%s' on %s" % (
375 objtype, id, realdevice.id)
376 log.warn(msg)
377 if realdevice.sendEventWhenBlocked():
378 self.logEvent(realdevice, id, Change_Add_Blocked,
379 msg, Event.Warning)
380 return False, None
381 rel = device._getOb(relname, None)
382 if not rel:
383 raise ObjectCreationError(
384 "No relation %s found on device %s" % (relname, device.id))
385 changed = False
386 try:
387 remoteObj = rel._getOb(remoteObj.id)
388 except AttributeError:
389 self.logChange(realdevice, remoteObj, Change_Add,
390 "adding object %s to relationship %s" %
391 (remoteObj.id, relname))
392 rel._setObject(remoteObj.id, remoteObj)
393 remoteObj = rel._getOb(remoteObj.id)
394 changed = True
395 return self._updateObject(remoteObj, objmap) or changed, remoteObj
396
397
398 - def stop(self): pass
399
400
402 """
403 Thread that applies datamaps to a device. It reads from a queue that
404 should have tuples of (devid, datamaps) where devid is the primaryId to
405 the device and datamps is a list of datamaps to apply. Cache is synced at
406 the start of each transaction and there is one transaction per device.
407 """
408
409 - def __init__(self, datacollector, app):
410 threading.Thread.__init__(self)
411 ApplyDataMap.__init__(self, datacollector)
412 self.setName("ApplyDataMapThread")
413 self.setDaemon(1)
414 self.app = app
415 log.debug("Thread conn:%s", self.app._p_jar)
416 self.inputqueue = Queue.Queue()
417 self.done = False
418
419
421 """Apply datamps to device.
422 """
423 devpath = device.getPrimaryPath()
424 self.inputqueue.put((devpath, collectorClient))
425
426
428 """Process collectorClients as they are passed in from a data collector.
429 """
430 log.info("starting applyDataMap thread")
431 while not self.done or not self.inputqueue.empty():
432 devpath = ()
433 try:
434 devpath, collectorClient = self.inputqueue.get(True,1)
435 self.app._p_jar.sync()
436 device = getObjByPath(self.app, devpath)
437 ApplyDataMap.processClient(self, device, collectorClient)
438 except Queue.Empty: pass
439 except (SystemExit, KeyboardInterrupt): raise
440 except:
441 transaction.abort()
442 log.exception("processing device %s", "/".join(devpath))
443 log.info("stopping applyDataMap thread")
444
445
447 """Stop the thread once all devices are processed.
448 """
449 self.done = True
450 self.join()
451