1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """MySqlSendEvent
15 Populate the events database with incoming events
16 """
17
18 import types
19 import threading
20 from Queue import Queue, Empty
21 import logging
22 log = logging.getLogger("zen.Events")
23
24 from _mysql_exceptions import ProgrammingError, OperationalError
25 from ZEO.Exceptions import ClientDisconnected
26
27 import Products.ZenUtils.guid as guid
28 from Products.ZenUtils.Utils import zdecode as decode
29 from Event import buildEventFromDict
30 from ZenEventClasses import Heartbeat, Unknown
31 from Products.ZenEvents.Exceptions import *
32
34 """
35 Run a MySQL statement and return the results.
36 If there's an error, logs it then re-raises the exception.
37
38 @param cursor: an open connection to MySQL
39 @type cursor: database connection
40 @param statement: MySQL statement
41 @type statement: string
42 @return: result of the statement
43 @rtype: string
44 """
45 try:
46 result = cursor.execute(statement)
47 log.debug("%s: --> %d" % (statement, result) )
48 except Exception, ex:
49 log.debug(statement)
50 log.exception(ex)
51 raise ex
52 return result
53
54
56 """
57 Mix-in class that takes a MySQL db connection and builds inserts that
58 sends the event to the backend.
59 """
60
62 """
63 Send an event to the backend.
64
65 @param event: an event
66 @type event: Event class
67 @return: event id or None
68 @rtype: string
69 """
70 if type(event) == types.DictType:
71 event = buildEventFromDict(event)
72
73 if getattr(event, 'eventClass', Unknown) == Heartbeat:
74 return self._sendHeartbeat(event)
75
76 for field in self.requiredEventFields:
77 if not hasattr(event, field):
78 raise ZenEventError(
79 "Required event field %s not found" % field)
80
81
82 try:
83 event.severity = int(event.severity)
84 except:
85 event.severity = 1
86
87
88 known_actions = [ 'history', 'drop', 'status', 'heartbeat',
89 'alert_state', 'log', 'detail',
90 ]
91 if hasattr( event, '_action' ) and event._action not in known_actions:
92 event._action = 'status'
93
94
95
96 if not getattr(event, 'message', False):
97 event.message = getattr(event, 'summary', '')
98 event.summary = (getattr(event, 'summary', '') or event.message)[:128]
99
100 if getattr(self, "getDmdRoot", False):
101 try:
102 event = self.applyEventContext(event)
103 except ClientDisconnected, e:
104 log.error(e)
105 raise ZenBackendFailure(str(e))
106 if not event: return
107
108
109 if getattr(event, 'eventClass', Unknown) == Heartbeat:
110 return self._sendHeartbeat(event)
111
112
113 if not hasattr(event, 'dedupid'):
114 dedupfields = event.getDedupFields(self.defaultEventId)
115 if not getattr(event, "eventKey", ""):
116 if type(dedupfields) != types.ListType:
117 dedupfields = list(dedupfields)
118 dedupfields = dedupfields + ["summary"]
119
120 dedupid = []
121 for field in dedupfields:
122 value = getattr(event, field, "")
123 dedupid.append('%s' % value)
124 dedupid = map(self.escape, dedupid)
125 event.dedupid = "|".join(dedupid)
126
127
128 cleanup = lambda : None
129 evid = None
130 try:
131 try:
132 evid = self.doSendEvent(event)
133 except ProgrammingError, e:
134 log.exception(e)
135 except OperationalError, e:
136 log.exception(e)
137 raise ZenBackendFailure(str(e))
138 finally:
139 cleanup()
140 return evid
141
142
144 """
145 Actually write the sanitized event into the database
146
147 @param event: event
148 @type event: Event class
149 @return: event id or None
150 @rtype: string
151 """
152 insert = ""
153 statusdata, detaildata = self.eventDataMaps(event)
154 conn = self.connect()
155 try:
156 curs = conn.cursor()
157 evid = guid.generate()
158 event.evid = evid
159 rows = 0
160 if event.severity == 0:
161 event._action = "history"
162 clearcls = event.clearClasses()
163 if clearcls:
164 rows = execute(curs, self.buildClearUpdate(event, clearcls))
165 if not rows:
166 return None
167 insert = ('insert into log '
168 '(evid, userName, text) '
169 'select evid, "admin", "auto cleared"'
170 ' from status where clearid = "%s"' % evid)
171 execute(curs, insert)
172 delete = 'DELETE FROM status WHERE clearid IS NOT NULL'
173 execute(curs, delete)
174 stmt = self.buildStatusInsert(statusdata, event._action, evid)
175 rescount = execute(curs, stmt)
176 if detaildata and rescount == 1:
177 execute(curs, self.buildDetailInsert(evid, detaildata))
178 if rescount != 1:
179 sql = ('select evid from %s where dedupid="%s"' % (
180 event._action, decode(self.dmd.Devices, event.dedupid)))
181 execute(curs, sql)
182 rs = curs.fetchone()
183 if rs:
184 evid = rs[0]
185 else:
186 evid = None
187 finally: self.close(conn)
188 return evid
189
190
192 """
193 Find and ip by looking up it up in the Networks catalog.
194
195 @param ipaddress: IP address
196 @type ipaddress: string
197 @param networks: DMD network object
198 @type networks: DMD object
199 @return: device object
200 @rtype: device object
201 """
202 log.debug("Looking up IP %s" % ipaddress)
203 ipobj = networks.findIp(ipaddress)
204 if ipobj and ipobj.device():
205 device = ipobj.device()
206 log.debug("IP %s -> %s", ipobj.id, device.id)
207 return device
208
209
211 """
212 Return the network root and event
213
214 @param evt: event
215 @type evt: Event class
216 @return: DMD object and the event
217 @rtype: DMD object, evt
218 """
219 return self.getDmdRoot('Networks'), evt
220
221
222 - def applyEventContext(self, evt):
223 """
224 Apply event and devices contexts to the event.
225 Only valid if this object has zeo connection.
226
227 @param evt: event
228 @type evt: Event class
229 @return: updated event
230 @rtype: Event class
231 """
232 events = self.getDmdRoot("Events")
233 devices = self.getDmdRoot("Devices")
234 networks, evt = self.getNetworkRoot(evt)
235
236
237
238 if getattr(evt, 'monitor', False):
239 monitorObj = self.getDmdRoot('Monitors'
240 ).Performance._getOb(evt.monitor, None)
241 if monitorObj:
242 devices = monitorObj
243
244
245
246 device = None
247 if getattr(evt, 'device', None):
248 device = devices.findDevice(evt.device)
249 if not device and getattr(evt, 'ipAddress', None):
250 device = devices.findDevice(evt.ipAddress)
251 if not device and getattr(evt, 'device', None):
252 device = self._findByIp(evt.device, networks)
253 if not device and getattr(evt, 'ipAddress', None):
254 device = self._findByIp(evt.ipAddress, networks)
255
256 if device:
257 evt.device = device.id
258 log.debug("Found device %s and adding device-specific"
259 " rules", evt.device)
260 evt = self.applyDeviceContext(device, evt)
261
262 evtclass = events.lookup(evt, device)
263 if evtclass:
264 log.debug("EventClassInst=%s", evtclass.id)
265 evt = evtclass.applyExtraction(evt)
266 evt = evtclass.applyValues(evt)
267 evt = evtclass.applyTransform(evt, device)
268
269 if evt._action == "drop":
270 log.debug("Dropping event")
271 return None
272
273 return evt
274
275
276 - def applyDeviceContext(self, device, evt):
277 """
278 Apply event attributes from device context.
279
280 @param device: device from DMD
281 @type device: device object
282 @param evt: event
283 @type evt: Event class
284 @return: updated event
285 @rtype: Event class
286 """
287 if not hasattr(evt, 'ipAddress'): evt.ipAddress = device.manageIp
288 evt.prodState = device.productionState
289 evt.Location = device.getLocationName()
290 evt.DeviceClass = device.getDeviceClassName()
291 evt.DeviceGroups = "|"+"|".join(device.getDeviceGroupNames())
292 evt.Systems = "|"+"|".join(device.getSystemNames())
293 evt.DevicePriority = device.getPriority()
294 return evt
295
296
298 """
299 Add a heartbeat record to the heartbeat table.
300
301 @param event: event
302 @type event: Event class
303 """
304 evdict = {}
305 if hasattr(event, "device"):
306 evdict['device'] = event.device
307 else:
308 log.warn("heartbeat without device skipping")
309 return
310 if hasattr(event, "timeout"):
311 evdict['timeout'] = event.timeout
312 else:
313 log.warn("heartbeat from %s without timeout skipping", event.device)
314 return
315 if hasattr(event, "component"):
316 evdict['component'] = event.component
317 else:
318 evdict['component'] = ""
319 insert = self.buildInsert(evdict, "heartbeat")
320 insert += " on duplicate key update lastTime=Null"
321 insert += ", timeout=%s" % evdict['timeout']
322 try:
323 conn = self.connect()
324 try:
325 curs = conn.cursor()
326 execute(curs, insert)
327 finally: self.close(conn)
328 except ProgrammingError, e:
329 log.error(insert)
330 log.exception(e)
331 except OperationalError, e:
332 raise ZenBackendFailure(str(e))
333
334
336 """
337 Build an insert statement for the status table that looks like this:
338 insert into status set device='box', count=1, ...
339 on duplicate key update count=count+1, lastTime=23424.34;
340
341 @param statusdata: event
342 @type statusdata: dictionary
343 @param table: name of the table to insert into
344 @type table: string
345 @param evid: event id
346 @type evid: string
347 @return: MySQL insert command string
348 @rtype: string
349 """
350 insert = self.buildInsert(statusdata, table)
351 fields = []
352 if table == "history":
353 fields.append("deletedTime=null")
354 fields.append("evid='%s'" % evid)
355 insert += ","+",".join(fields)
356 if table == self.statusTable:
357 insert += " on duplicate key update "
358 if statusdata.has_key('prodState'):
359 insert += "prodState=%d," % statusdata['prodState']
360 insert += "summary='%s',%s=%s+1,%s=%.3f" % (
361 self.escape(decode(self.dmd.Devices, statusdata.get('summary',''))),
362 self.countField, self.countField,
363 self.lastTimeField,statusdata['lastTime'])
364 return insert
365
366
368 """
369 Build an insert to add detail values from an event to the details
370 table.
371
372 @param evid: event id
373 @type evid: string
374 @param detaildict: event
375 @type detaildict: dictionary
376 @return: MySQL insert command string
377 @rtype: string
378 """
379 insert = "insert into %s (evid, name, value) values " % self.detailTable
380 var = []
381 for field, value in detaildict.items():
382 if type(value) in types.StringTypes:
383 value = self.escape(decode(self.dmd.Devices, value))
384 var.append("('%s','%s','%s')" % (evid, field, value))
385 insert += ",".join(var)
386 return insert
387
388
390 """
391 Build a insert statement for that looks like this:
392 insert into status set field='value', field=1, ...
393
394 @param datadict: event
395 @type datadict: dictionary
396 @param table: name of the table to insert into
397 @type table: string
398 @return: MySQL insert command string
399 @rtype: string
400 """
401 insert = "insert into %s set " % table
402 fields = []
403 for name, value in datadict.items():
404 if type(value) in types.StringTypes:
405 fields.append("%s='%s'" % (name, self.escape(value)))
406 elif type(value) == types.FloatType:
407 fields.append("%s=%.3f" % (name, value))
408 else:
409 fields.append("%s=%s" % (name, value))
410 insert = str(insert) + str(','.join(fields))
411 return insert
412
413
415 """
416 Build an update statement that will clear related events.
417
418 @param evt: event
419 @type evt: Event class
420 @param clearcls: other fields to use to define 'related events'
421 @type clearcls: list of strings
422 @return: MySQL update command string
423 @rtype: string
424 """
425 update = "update %s " % self.statusTable
426 update += "set clearid = '%s' where " % evt.evid
427 w = []
428 w.append("%s='%s'" % (self.deviceField, self.escape(evt.device)))
429 w.append("%s='%s'" % (self.componentField,
430 self.escape(evt.component)[:255]))
431 w.append("eventKey='%s'" % self.escape(evt.eventKey))
432 update += " and ".join(w)
433
434 w = []
435 for cls in clearcls:
436 w.append("%s='%s'" % (self.eventClassField, self.escape(cls)))
437 if w:
438 update += " and (" + " or ".join(w) + ")"
439 return update
440
441
443 """
444 Return tuple (statusdata, detaildata) for this event.
445
446 @param event: event
447 @type event: Event class
448 @return: (statusdata, detaildata)
449 @rtype: tuple of dictionaries
450 """
451 statusfields = self.getFieldList()
452 statusdata = {}
453 detaildata = {}
454 for name, value in event.__dict__.items():
455 if name.startswith("_") or name == "dedupfields": continue
456 if name in statusfields:
457 statusdata[name] = value
458 else:
459 detaildata[name] = value
460 return statusdata, detaildata
461
462
464 """
465 Prepare string values for db by escaping special characters.
466
467 @param value: string containing possibly nasty characters
468 @type value: string
469 @return: escaped string
470 @rtype: string
471 """
472 if type(value) not in types.StringTypes:
473 return value
474
475 import _mysql
476 if type(value) == type(u''):
477 return _mysql.escape_string(value.encode('iso-8859-1'))
478 return _mysql.escape_string(value)
479
480
481
483 """
484 Class that can connect to backend must be passed:
485 username - backend username to use
486 password - backend password
487 database - backend database name
488 host - hostname of database server
489 port - port
490 """
491 backend = "mysql"
492
493 copyattrs = (
494 "username",
495 "password",
496 "database",
497 "host",
498 "port",
499 "requiredEventFields",
500 "defaultEventId",
501 "statusTable",
502 "deviceField",
503 "componentField",
504 "eventClassField",
505 "firstTimeField",
506 "lastTimeField",
507 "countField",
508 "detailTable",
509 )
510
516
518 """
519 To be implemented by the subclass
520 """
521 pass
522
523
525 """
526 Return the list of fields
527
528 @return: list of fields
529 @rtype: list
530 """
531 return self._fieldlist
532
533
535 """
536 Wrapper around MySQL database connection
537 """
538
539 running = True
540
542 threading.Thread.__init__(self)
543 MySqlSendEvent.__init__(self, zem)
544 self.setDaemon(1)
545 self.setName("SendEventThread")
546 self._evqueue = Queue()
547
549 """
550 Called from main thread to put an event on to the send queue.
551 """
552 return self._evqueue.put(evt)
553
554
556 """
557 Main event loop
558 """
559 log.info("Starting")
560 while not self._evqueue.empty() or self.running:
561 try:
562 evt = self._evqueue.get(True,1)
563 MySqlSendEvent.sendEvent(self, evt)
564 except Empty: pass
565 except OperationalError, e:
566 log.warn(e)
567 except Exception, e:
568 log.exception(e)
569 log.info("Stopped")
570
571
573 """
574 Called from main thread to stop this thread.
575 """
576 log.info("Stopping...")
577 self.running = False
578 self.join(3)
579