1
2
3
4
5
6
7
8
9
10
11
12
13 import time
14 import types
15 import threading
16 from Queue import Queue, Empty
17 import logging
18 log = logging.getLogger("zen.Events")
19
20 from _mysql_exceptions import ProgrammingError, OperationalError
21 from ZEO.Exceptions import ClientDisconnected
22
23 import Products.ZenUtils.guid as guid
24 from Products.ZenUtils.Utils import zdecode as decode
25 from Event import Event, EventHeartbeat, buildEventFromDict
26 from ZenEventClasses import Heartbeat, Unknown
27 from Products.ZenEvents.Exceptions import *
28
38
40 """
41 Mix-in class that takes a mysql db connection and builds inserts that
42 send the event to the backend.
43 """
44
46 """Send an event to the backend.
47 """
48 if type(event) == types.DictType:
49 event = buildEventFromDict(event)
50
51 if getattr(event, 'eventClass', Unknown) == Heartbeat:
52 return self._sendHeartbeat(event)
53
54 for field in self.requiredEventFields:
55 if not hasattr(event, field):
56 raise ZenEventError(
57 "Required event field %s not found" % field)
58
59
60 event.severity = int(event.severity)
61
62
63
64 if not getattr(event, 'message', False):
65 event.message = getattr(event, 'summary', '')
66 event.summary = (getattr(event, 'summary', '') or event.message)[:128]
67
68 if getattr(self, "getDmdRoot", False):
69 try:
70 event = self.applyEventContext(event)
71 except ClientDisconnected, e:
72 log.error(e)
73 raise ZenBackendFailure(str(e))
74 if not event: return
75
76
77 if getattr(event, 'eventClass', Unknown) == Heartbeat:
78 return self._sendHeartbeat(event)
79
80
81 if not hasattr(event, 'dedupid'):
82 dedupid = []
83 dedupfields = event.getDedupFields(self.defaultEventId)
84 if not getattr(event, "eventKey", ""):
85 if type(dedupfields) != types.ListType:
86 dedupfields = list(dedupfields)
87 dedupfields = dedupfields + ["summary"]
88 for field in dedupfields:
89 value = getattr(event, field, "")
90 dedupid.append('%s' % value)
91 dedupid = map(self.escape, dedupid)
92 event.dedupid = "|".join(dedupid)
93
94 cleanup = lambda : None
95 evid = None
96 try:
97 try:
98 evid = self.doSendEvent(event)
99 except ProgrammingError, e:
100 log.exception(e)
101 except OperationalError, e:
102 log.exception(e)
103 raise ZenBackendFailure(str(e))
104 finally:
105 cleanup()
106 return evid
107
109 insert = ""
110 statusdata, detaildata = self.eventDataMaps(event)
111 conn = self.connect()
112 try:
113 curs = conn.cursor()
114 evid = guid.generate()
115 event.evid = evid
116 rows = 0
117 if event.severity == 0:
118 event._action = "history"
119 clearcls = event.clearClasses()
120 if clearcls:
121 rows = execute(curs, self.buildClearUpdate(event, clearcls))
122 if not rows:
123 return ''
124 insert = ('insert into log '
125 '(evid, userName, text) '
126 'select evid, "admin", "auto cleared"'
127 ' from status where clearid = "%s"' % evid)
128 execute(curs, insert)
129 delete = 'DELETE FROM status WHERE clearid IS NOT NULL'
130 execute(curs, delete)
131 stmt = self.buildStatusInsert(statusdata, event._action, evid)
132 rescount = execute(curs, stmt)
133 if detaildata and rescount == 1:
134 execute(curs, self.buildDetailInsert(evid, detaildata))
135 if rescount != 1:
136 sql = ('select evid from %s where dedupid="%s"' % (
137 event._action, decode(self.dmd.Devices, event.dedupid)))
138 execute(curs, sql)
139 rs = curs.fetchone()
140 if rs:
141 evid = rs[0]
142 else:
143 evid = None
144 finally: self.close(conn)
145 return evid
146
147
149 """
150 Find and ip by looking up it up in the Networks catalog.
151 """
152 log.debug("looking up ip %s",ipaddress)
153 nets = self.getDmdRoot("Networks")
154 ipobj = nets.findIp(ipaddress)
155 if ipobj and ipobj.device():
156 device = ipobj.device()
157 log.debug("ip %s -> %s", ipobj.id, device.id)
158 return device
159
160
161 - def applyEventContext(self, evt):
162 """
163 Apply event and devices contexts to the event.
164 Only valid if this object has zeo connection.
165 """
166 events = self.getDmdRoot("Events")
167 devices = self.getDmdRoot("Devices")
168 device = None
169 if getattr(evt, 'device', None):
170 device = devices.findDevice(evt.device)
171 if not device and getattr(evt, 'ipAddress', None):
172 device = devices.findDevice(evt.ipAddress)
173 if not device and getattr(evt, 'device', None):
174 device = self._findByIp(evt.device)
175 if not device and getattr(evt, 'ipAddress', None):
176 device = self._findByIp(evt.ipAddress)
177 if device:
178 evt.device = device.id
179 log.debug("Found device=%s", evt.device)
180 evt = self.applyDeviceContext(device, evt)
181 evtclass = events.lookup(evt, device)
182 if evtclass:
183 log.debug("EventClassInst=%s", evtclass.id)
184 evt = evtclass.applyExtraction(evt)
185 evt = evtclass.applyValues(evt)
186 evt = evtclass.applyTransform(evt, device)
187 if evt._action == "drop":
188 log.debug("dropping event")
189 return None
190 if getattr(evtclass, "scUserFunction", False):
191 log.debug("Found scUserFunction")
192 evt = evtclass.scUserFunction(device, evt)
193 return evt
194
195
196 - def applyDeviceContext(self, device, evt):
197 """
198 Apply event attributes from device context.
199 """
200 if not hasattr(evt, 'ipAddress'): evt.ipAddress = device.manageIp
201 evt.prodState = device.productionState
202 evt.Location = device.getLocationName()
203 evt.DeviceClass = device.getDeviceClassName()
204 evt.DeviceGroups = "|"+"|".join(device.getDeviceGroupNames())
205 evt.Systems = "|"+"|".join(device.getSystemNames())
206 return evt
207
208
210 """Build insert to add heartbeat record to heartbeat table.
211 """
212 evdict = {}
213 if hasattr(event, "device"):
214 evdict['device'] = event.device
215 else:
216 log.warn("heartbeat without device skipping")
217 return
218 if hasattr(event, "timeout"):
219 evdict['timeout'] = event.timeout
220 else:
221 log.warn("heartbeat from %s without timeout skipping", event.device)
222 return
223 if hasattr(event, "component"):
224 evdict['component'] = event.component
225 else:
226 evdict['component'] = ""
227 insert = self.buildInsert(evdict, "heartbeat")
228 insert += " on duplicate key update lastTime=Null"
229 insert += ", timeout=%s" % evdict['timeout']
230 try:
231 conn = self.connect()
232 try:
233 curs = conn.cursor()
234 execute(curs, insert)
235 finally: self.close(conn)
236 except ProgrammingError, e:
237 log.error(insert)
238 log.exception(e)
239 except OperationalError, e:
240 raise ZenBackendFailure(str(e))
241
243 """
244 Build an insert statement for the status table that looks like this:
245 insert into status set device='box', count=1, ...
246 on duplicate key update count=count+1, lastTime=23424.34;
247 """
248 insert = self.buildInsert(statusdata, table)
249 fields = []
250 if table == "history":
251 fields.append("deletedTime=null")
252 fields.append("evid='%s'" % evid)
253 insert += ","+",".join(fields)
254 if table == self.statusTable:
255 insert += " on duplicate key update "
256 if statusdata.has_key('prodState'):
257 insert += "prodState=%d," % statusdata['prodState']
258 insert += "summary='%s',%s=%s+1,%s=%.3f" % (
259 self.escape(decode(self.dmd.Devices, statusdata.get('summary',''))),
260 self.countField, self.countField,
261 self.lastTimeField,statusdata['lastTime'])
262 return insert
263
264
266 """Build an insert to add detail values from an event to the details
267 table.
268 """
269 insert = "insert into %s (evid, name, value) values " % self.detailTable
270 var = []
271 for field, value in detaildict.items():
272 if type(value) in types.StringTypes:
273 value = self.escape(decode(self.dmd.Devices, value))
274 var.append("('%s','%s','%s')" % (evid, field, value))
275 insert += ",".join(var)
276 return insert
277
278
280 """
281 Build a insert statement for that looks like this:
282 insert into status set field='value', field=1, ...
283 """
284 insert = "insert into %s set " % table
285 fields = []
286 for name, value in datadict.items():
287 if type(value) in types.StringTypes:
288 fields.append("%s='%s'" % (name, self.escape(value)))
289 elif type(value) == types.FloatType:
290 fields.append("%s=%.3f" % (name, value))
291 else:
292 fields.append("%s=%s" % (name, value))
293 insert = str(insert) + str(','.join(fields))
294 return insert
295
296
313
314
316 """Return tuple (statusdata, detaildata) for this event.
317 """
318 statusfields = self.getFieldList()
319 statusdata = {}
320 detaildata = {}
321 for name, value in event.__dict__.items():
322 if name.startswith("_") or name == "dedupfields": continue
323 if name in statusfields:
324 statusdata[name] = value
325 else:
326 detaildata[name] = value
327 return statusdata, detaildata
328
329
331 """Prepare string values for db by escaping special characters."""
332 import _mysql
333 if type(value) == type(u''):
334 return _mysql.escape_string(value.encode('iso-8859-1'))
335 return _mysql.escape_string(value)
336
337
338
340 """
341 class that can connect to backend must be passed:
342 username - backend username to use
343 password - backend password
344 database - backend database name
345 host - hostname of database server
346 port - port
347 """
348 backend = "mysql"
349
350 copyattrs = (
351 "username",
352 "password",
353 "database",
354 "host",
355 "port",
356 "requiredEventFields",
357 "defaultEventId",
358 "statusTable",
359 "deviceField",
360 "componentField",
361 "eventClassField",
362 "firstTimeField",
363 "lastTimeField",
364 "countField",
365 "detailTable",
366 )
367
373
374 - def stop(self): pass
375
376
378 return self._fieldlist
379
380
391
393 """Called from main thread to put an event on to the send queue.
394 """
395 return self._evqueue.put(evt)
396
397
410
411
413 """Called from main thread to stop this thread.
414 """
415 log.info("stopping...")
416 self.running = False
417 self.join(3)
418