1
2
3
4
5
6
7
8
9
10
11
12
13
14 import time
15 import types
16 import threading
17 import Queue
18
19 import transaction
20
21 from Acquisition import aq_base
22
23 from Products.ZenUtils.Utils import importClass, getObjByPath
24 from Exceptions import *
25 from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked
26 from Products.ZenModel.Lockable import Lockable
27 import Products.ZenEvents.Event as Event
28 import logging
29 log = logging.getLogger("zen.ApplyDataMap")
30
31 zenmarker = "__ZENMARKER__"
32
33 _notAscii = dict.fromkeys(range(128,256), u'?')
34
36
38 self.datacollector = datacollector
39
40
41 - def logChange(self, device, compname, eventClass, msg):
46
47
48 - def logEvent(self, device, component, eventClass, msg, severity):
49 ''' Used to report a change to a device model. Logs the given msg
50 to log.info and creates an event.
51 '''
52 device = device.device()
53 compname = ""
54 try:
55 compname = getattr(component, 'id', component)
56 if hasattr(component, 'name') and callable(component.name):
57 compname = component.name()
58 elif device.id == compname:
59 compname = ""
60 except: pass
61 log.debug(msg)
62 devname = device.device().id
63 if (self.datacollector
64
65
66 and getattr(self.datacollector, 'dmd', None)):
67 eventDict = {
68 'eventClass': eventClass,
69 'device': devname,
70 'component': compname,
71 'summary': msg,
72 'severity': severity,
73 }
74 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
75
76
78 """Apply datamps to device.
79 """
80 log.debug("processing data for device %s", device.id)
81 devchanged = False
82 try:
83
84
85
86 for pname, results in collectorClient.getResults():
87 log.debug("processing plugin %s on device %s", pname, device.id)
88 if not results:
89 log.warn("plugin %s no results returned", pname)
90 continue
91 plugin = self.datacollector.collectorPlugins.get(pname, None)
92 if not plugin: continue
93 results = plugin.preprocess(results, log)
94 datamaps = plugin.process(device, results, log)
95
96 if (type(datamaps) != types.ListType
97 and type(datamaps) != types.TupleType):
98 datamaps = [datamaps,]
99 for datamap in datamaps:
100 changed = self._applyDataMap(device, datamap)
101 if changed: devchanged=True
102 if devchanged:
103 device.setLastChange()
104 log.info("changes applied")
105 else:
106 log.info("no change detected")
107 device.setSnmpLastCollection()
108 trans = transaction.get()
109 trans.setUser("datacoll")
110 trans.note("data applied from automated collection")
111 trans.commit()
112 except (SystemExit, KeyboardInterrupt):
113 raise
114 except:
115 transaction.abort()
116 log.exception("plugin %s device %s", pname, device.getId())
117
118
119 - def applyDataMap(self, device, datamap, relname="", compname="",modname=""):
129
130
131
133 """Apply a datamap to a device.
134 """
135 changed = False
136 tobj = device
137 if getattr(datamap, "compname", None) is None: return changed
138 if datamap.compname:
139 tobj = getattr(device, datamap.compname)
140 if hasattr(datamap, "relname"):
141 changed = self._updateRelationship(tobj, datamap)
142 elif hasattr(datamap, 'modname'):
143 changed = self._updateObject(tobj, datamap)
144 else:
145 log.warn("plugin returned unknown map skipping")
146 return changed
147
148
150 """Add/Update/Remote objects to the target relationship.
151 """
152 changed = False
153 rname = relmap.relname
154 rel = getattr(device, rname, None)
155 if not rel:
156 log.warn("no relationship:%s found on:%s",
157 relmap.relname, device.id)
158 return changed
159 relids = rel.objectIdsAll()
160 seenids = {}
161 for objmap in relmap:
162 from Products.ZenModel.ZenModelRM import ZenModelRM
163 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'):
164 if seenids.has_key(objmap.id):
165 seenids[objmap.id] += 1
166 objmap.id = "%s_%s" % (objmap.id, seenids[objmap.id])
167 else:
168 seenids[objmap.id] = 1
169 if objmap.id in relids:
170 objchange = False
171 obj = rel._getOb(objmap.id)
172 objchange = self._updateObject(obj, objmap)
173 if not changed: changed = objchange
174 relids.remove(objmap.id)
175 else:
176 changed = self._createRelObject(device, objmap, rname)
177 elif isinstance(objmap, ZenModelRM):
178 self.logChange(device, objmap.id, Change_Add,
179 "linking object %s to device %s relation %s" % (
180 objmap.id, device.id, rname))
181 device.addRelation(rname, objmap)
182 changed = True
183 else:
184 log.warn("ignoring objmap no id found")
185 for id in relids:
186 obj = rel._getOb(id)
187 if isinstance(obj, Lockable) and obj.isLockedFromDeletion():
188 objname = obj.id
189 try: objname = obj.name()
190 except: pass
191 msg = "Deletion Blocked: %s '%s' on %s" % (
192 obj.meta_type, objname,obj.device().id)
193 log.warn(msg)
194 if obj.sendEventWhenBlocked():
195 self.logEvent(device, obj, Change_Remove_Blocked,
196 msg, Event.Warning)
197 continue
198 self.logChange(device, obj, Change_Remove,
199 "removing object %s from rel %s on device %s" % (
200 id, rname, device.id))
201 rel._delObject(id)
202 if relids: changed=True
203 return changed
204
205
207 """Update an object using a objmap.
208 """
209 changed = False
210 device = obj.device()
211 if isinstance(obj, Lockable) and obj.isLockedFromUpdates():
212 if device.id == obj.id:
213 msg = 'Update Blocked: %s' % device.id
214 else:
215 objname = obj.id
216 try: objname = obj.name()
217 except: pass
218 msg = "Update Blocked: %s '%s' on %s" % (
219 obj.meta_type, objname ,device.id)
220 log.warn(msg)
221 if obj.sendEventWhenBlocked():
222 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning)
223 return changed
224 for attname, value in objmap.items():
225 if type(value) == type(''):
226 try:
227 value.encode('ascii')
228 except UnicodeDecodeError:
229 decoding = obj.zCollectorDecoding
230 value = value.decode(decoding)
231 if attname[0] == '_': continue
232 att = getattr(aq_base(obj), attname, zenmarker)
233 if att == zenmarker:
234 log.warn('attribute %s not found on object %s',
235 attname, obj.id)
236 continue
237 if callable(att):
238 setter = getattr(obj, attname)
239 gettername = attname.replace("set","get")
240 getter = getattr(obj, gettername, None)
241 if not getter:
242 log.warn("getter '%s' not found on obj '%s', "
243 "skipping", gettername, obj.id)
244 else:
245 try:
246 change = value != getter()
247 except UnicodeDecodeError:
248 change = True
249 if change:
250 setter(value)
251 self.logChange(device, obj, Change_Set,
252 "calling function '%s' with '%s' on "
253 "object %s" % (attname, value, obj.id))
254 changed = True
255 else:
256 try:
257 change = att != value
258 except UnicodeDecodeError:
259 change = True
260 if change:
261 setattr(aq_base(obj), attname, value)
262 self.logChange(device, obj, Change_Set,
263 "set attribute '%s' "
264 "to '%s' on object '%s'" %
265 (attname, value, obj.id))
266 changed = True
267 if not changed:
268 try: changed = obj._p_changed
269 except: pass
270 if getattr(aq_base(obj), "index_object", False) and changed:
271 log.debug("indexing object %s", obj.id)
272 obj.index_object()
273 if not changed: obj._p_deactivate()
274 return changed
275
276
278 """Create an object on a relationship using its objmap.
279 """
280 realdevice = device.device()
281 if realdevice.isLockedFromUpdates():
282 objtype = ""
283 try: objtype = objmap.modname.split(".")[-1]
284 except: pass
285 msg = "Add Blocked: %s '%s' on %s" % (
286 objtype, objmap.id, realdevice.id)
287 log.warn(msg)
288 if realdevice.sendEventWhenBlocked():
289 self.logEvent(realdevice, objmap.id, Change_Add_Blocked,
290 msg, Event.Warning)
291 return False
292 id = objmap.id
293 constructor = importClass(objmap.modname, objmap.classname)
294 remoteObj = constructor(id)
295 if not remoteObj:
296 raise ObjectCreationError(
297 "failed to create object %s in relation %s" % (id, relname))
298 rel = device._getOb(relname, None)
299 if rel:
300 rel._setObject(remoteObj.id, remoteObj)
301 else:
302 raise ObjectCreationError(
303 "No relation %s found on device %s" % (relname, device.id))
304 remoteObj = rel._getOb(remoteObj.id)
305 self.logChange(realdevice, remoteObj, Change_Add,
306 "adding object %s to relationship %s" % (
307 remoteObj.id, relname))
308 self._updateObject(remoteObj, objmap)
309 return True
310
311
312 - def stop(self): pass
313
314
316 """
317 Thread that applies datamaps to a device. It reads from a queue that
318 should have tuples of (devid, datamaps) where devid is the primaryId to
319 the device and datamps is a list of datamaps to apply. Cache is synced at
320 the start of each transaction and there is one transaction per device.
321 """
322
323 - def __init__(self, datacollector, app):
324 threading.Thread.__init__(self)
325 ApplyDataMap.__init__(self, datacollector)
326 self.setName("ApplyDataMapThread")
327 self.setDaemon(1)
328 self.app = app
329 log.debug("Thread conn:%s", self.app._p_jar)
330 self.inputqueue = Queue.Queue()
331 self.done = False
332
333
335 """Apply datamps to device.
336 """
337 devpath = device.getPrimaryPath()
338 self.inputqueue.put((devpath, collectorClient))
339
340
342 """Process collectorClients as they are passed in from a data collector.
343 """
344 log.info("starting applyDataMap thread")
345 while not self.done or not self.inputqueue.empty():
346 devpath = ()
347 try:
348 devpath, collectorClient = self.inputqueue.get(True,1)
349 self.app._p_jar.sync()
350 device = getObjByPath(self.app, devpath)
351 ApplyDataMap.processClient(self, device, collectorClient)
352 except Queue.Empty: pass
353 except (SystemExit, KeyboardInterrupt): raise
354 except:
355 transaction.abort()
356 log.exception("processing device %s", "/".join(devpath))
357 log.info("stopping applyDataMap thread")
358
359
361 """Stop the thread once all devices are processed.
362 """
363 self.done = True
364 self.join()
365