1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """MySqlSendEvent
15 Populate the events database with incoming events
16 """
17
18 import types
19 import threading
20 from Queue import Queue, Empty
21 import logging
22 log = logging.getLogger("zen.Events")
23
24 from _mysql_exceptions import ProgrammingError, OperationalError
25 from ZEO.Exceptions import ClientDisconnected
26
27 import Products.ZenUtils.guid as guid
28 from Products.ZenUtils.Utils import zdecode as decode
29 from Event import buildEventFromDict
30 from ZenEventClasses import Heartbeat, Unknown
31 from Products.ZenEvents.Exceptions import *
32
34 """
35 Run a MySQL statement and return the results.
36 If there's an error, logs it then re-raises the exception.
37
38 @param cursor: an open connection to MySQL
39 @type cursor: database connection
40 @param statement: MySQL statement
41 @type statement: string
42 @return: result of the statement
43 @rtype: string
44 """
45 try:
46 result = cursor.execute(statement)
47 log.debug("%s: --> %d" % (statement, result) )
48 except Exception, ex:
49 log.info("Invalid statement: %s", statement)
50 log.error(str(ex))
51 raise ex
52 return result
53
54
56 """
57 Mix-in class that takes a MySQL db connection and builds inserts that
58 sends the event to the backend.
59 """
60
62 """
63 Send an event to the backend.
64
65 @param event: an event
66 @type event: Event class
67 @return: event id or None
68 @rtype: string
69 """
70 log.debug('%s%s%s' % ('=' * 15, ' incoming event ', '=' * 15))
71 if type(event) == types.DictType:
72 event = buildEventFromDict(event)
73
74 if getattr(event, 'eventClass', Unknown) == Heartbeat:
75 log.debug("Got a %s %s heartbeat event (timeout %s sec).",
76 getattr(event, 'device', 'Unknown'),
77 getattr(event, 'component', 'Unknown'),
78 getattr(event, 'timeout', 'Unknown'))
79 return self._sendHeartbeat(event)
80
81 for field in self.requiredEventFields:
82 if not hasattr(event, field):
83 log.error("Required event field %s not found" \
84 " -- ignoring event", field)
85 statusdata, detaildata = self.eventDataMaps(event)
86 log.error("Event info: %s", statusdata)
87 if detaildata:
88 log.error("Detail data: %s", detaildata)
89 return None
90
91
92 try:
93 event.severity = int(event.severity)
94 except:
95 event.severity = 1
96
97
98 known_actions = [ 'history', 'drop', 'status', 'heartbeat',
99 'alert_state', 'log', 'detail',
100 ]
101 if hasattr( event, '_action' ) and event._action not in known_actions:
102 event._action = 'status'
103
104
105
106 if not getattr(event, 'message', False):
107 event.message = getattr(event, 'summary', '')
108 event.summary = (getattr(event, 'summary', '') or event.message)[:128]
109
110 statusdata, detaildata = self.eventDataMaps(event)
111 log.debug("Event info: %s", statusdata)
112 if detaildata:
113 log.debug("Detail data: %s", detaildata)
114
115 if getattr(self, "getDmdRoot", False):
116 try:
117 event = self.applyEventContext(event)
118 except ClientDisconnected, e:
119 log.error(e)
120 raise ZenBackendFailure(str(e))
121 if not event:
122 log.debug("Unable to obtain event -- ignoring.(%s)", event)
123 return
124
125
126 if getattr(event, 'eventClass', Unknown) == Heartbeat:
127 log.debug("Transform created a heartbeat event.")
128 return self._sendHeartbeat(event)
129
130
131 if not hasattr(event, 'dedupid'):
132 dedupfields = event.getDedupFields(self.defaultEventId)
133 if not getattr(event, "eventKey", ""):
134 if type(dedupfields) != types.ListType:
135 dedupfields = list(dedupfields)
136 dedupfields = dedupfields + ["summary"]
137
138 dedupid = []
139 for field in dedupfields:
140 value = getattr(event, field, "")
141 dedupid.append('%s' % value)
142 dedupid = map(self.escape, dedupid)
143 event.dedupid = "|".join(dedupid)
144 log.debug("Created deupid of %s", event.dedupid)
145
146
147 cleanup = lambda : None
148 evid = None
149 try:
150 try:
151 evid = self.doSendEvent(event)
152 except ProgrammingError, e:
153 log.exception(e)
154 except OperationalError, e:
155 log.exception(e)
156 raise ZenBackendFailure(str(e))
157 finally:
158 cleanup()
159
160 if evid:
161 log.debug("New event id = %s", evid)
162 else:
163 log.debug("Duplicate event, updated database.")
164 return evid
165
166
168 """
169 Actually write the sanitized event into the database
170
171 @param event: event
172 @type event: Event class
173 @return: event id or None
174 @rtype: string
175 """
176 insert = ""
177 statusdata, detaildata = self.eventDataMaps(event)
178 if int(event.severity) == 0:
179 log.debug("Clear event found with event data %s",
180 statusdata)
181 else:
182 log.debug("Performing action '%s' on event %s",
183 event._action, statusdata)
184 if detaildata:
185 log.debug("Detail data: %s", detaildata)
186
187 conn = self.connect()
188 try:
189 curs = conn.cursor()
190 evid = guid.generate()
191 event.evid = evid
192 rows = 0
193 if int(event.severity) == 0:
194 event._action = "history"
195 clearcls = event.clearClasses()
196 if not clearcls:
197 log.debug("No clear classes in event -- no action taken.")
198 else:
199 rows = execute(curs, self.buildClearUpdate(event, clearcls))
200 log.debug("%d events matched clear criteria", rows)
201 if not rows:
202 return None
203 insert = ('insert into log '
204 '(evid, userName, text) '
205 'select evid, "admin", "auto cleared"'
206 ' from status where clearid = "%s"' % evid)
207 execute(curs, insert)
208 delete = 'DELETE FROM status WHERE clearid IS NOT NULL'
209 execute(curs, delete)
210 stmt = self.buildStatusInsert(statusdata, event._action, evid)
211 rescount = execute(curs, stmt)
212 if detaildata and rescount == 1:
213 execute(curs, self.buildDetailInsert(evid, detaildata))
214 if rescount != 1:
215 sql = ('select evid from %s where dedupid="%s"' % (
216 event._action, decode(self.dmd.Devices, event.dedupid)))
217 log.debug("%d events returned from insert -- selecting first match from %s.",
218 rescount, sql)
219 execute(curs, sql)
220 rs = curs.fetchone()
221 if rs:
222 evid = rs[0]
223 else:
224 log.debug("No matches found")
225 evid = None
226 finally: self.close(conn)
227 return evid
228
229
231 """
232 Find and ip by looking up it up in the Networks catalog.
233
234 @param ipaddress: IP address
235 @type ipaddress: string
236 @param networks: DMD network object
237 @type networks: DMD object
238 @return: device object
239 @rtype: device object
240 """
241 log.debug("Looking up IP %s" % ipaddress)
242 ipobj = networks.findIp(ipaddress)
243 if ipobj and ipobj.device():
244 device = ipobj.device()
245 log.debug("IP %s -> %s", ipobj.id, device.id)
246 return device
247
248
250 """
251 Return the network root and event
252
253 @param evt: event
254 @type evt: Event class
255 @return: DMD object and the event
256 @rtype: DMD object, evt
257 """
258 return self.getDmdRoot('Networks'), evt
259
260
261 - def applyEventContext(self, evt):
262 """
263 Apply event and devices contexts to the event.
264 Only valid if this object has zeo connection.
265
266 @param evt: event
267 @type evt: Event class
268 @return: updated event
269 @rtype: Event class
270 """
271 events = self.getDmdRoot("Events")
272 devices = self.getDmdRoot("Devices")
273 networks, evt = self.getNetworkRoot(evt)
274
275
276
277 if getattr(evt, 'monitor', False):
278 monitorObj = self.getDmdRoot('Monitors'
279 ).Performance._getOb(evt.monitor, None)
280 if monitorObj:
281 devices = monitorObj
282
283
284
285 device = None
286 if getattr(evt, 'device', None):
287 device = devices.findDevice(evt.device)
288 if not device and getattr(evt, 'ipAddress', None):
289 device = devices.findDevice(evt.ipAddress)
290 if not device and getattr(evt, 'device', None):
291 device = self._findByIp(evt.device, networks)
292 if not device and getattr(evt, 'ipAddress', None):
293 device = self._findByIp(evt.ipAddress, networks)
294
295 if device:
296 evt.device = device.id
297 log.debug("Found device %s and adding device-specific"
298 " data", evt.device)
299 evt = self.applyDeviceContext(device, evt)
300
301 evtclass = events.lookup(evt, device)
302 if evtclass:
303 evt = evtclass.applyExtraction(evt)
304 evt = evtclass.applyValues(evt)
305 evt = evtclass.applyTransform(evt, device)
306
307 if evt._action == "drop":
308 log.debug("Dropping event")
309 return None
310
311 return evt
312
313
314 - def applyDeviceContext(self, device, evt):
315 """
316 Apply event attributes from device context.
317
318 @param device: device from DMD
319 @type device: device object
320 @param evt: event
321 @type evt: Event class
322 @return: updated event
323 @rtype: Event class
324 """
325 if not hasattr(evt, 'ipAddress'): evt.ipAddress = device.manageIp
326 evt.prodState = device.productionState
327 evt.Location = device.getLocationName()
328 evt.DeviceClass = device.getDeviceClassName()
329 evt.DeviceGroups = "|"+"|".join(device.getDeviceGroupNames())
330 evt.Systems = "|"+"|".join(device.getSystemNames())
331 evt.DevicePriority = device.getPriority()
332 return evt
333
334
336 """
337 Add a heartbeat record to the heartbeat table.
338
339 @param event: event
340 @type event: Event class
341 """
342 evdict = {}
343 if hasattr(event, "device"):
344 evdict['device'] = event.device
345 else:
346 log.warn("heartbeat without device skipping")
347 return
348 if hasattr(event, "timeout"):
349 evdict['timeout'] = event.timeout
350 else:
351 log.warn("heartbeat from %s without timeout skipping", event.device)
352 return
353 if hasattr(event, "component"):
354 evdict['component'] = event.component
355 else:
356 evdict['component'] = ""
357 insert = self.buildInsert(evdict, "heartbeat")
358 insert += " on duplicate key update lastTime=Null"
359 insert += ", timeout=%s" % evdict['timeout']
360 try:
361 conn = self.connect()
362 try:
363 curs = conn.cursor()
364 execute(curs, insert)
365 finally: self.close(conn)
366 except ProgrammingError, e:
367 log.error(insert)
368 log.exception(e)
369 except OperationalError, e:
370 raise ZenBackendFailure(str(e))
371
372
374 """
375 Build an insert statement for the status table that looks like this:
376 insert into status set device='box', count=1, ...
377 on duplicate key update count=count+1, lastTime=23424.34;
378
379 @param statusdata: event
380 @type statusdata: dictionary
381 @param table: name of the table to insert into
382 @type table: string
383 @param evid: event id
384 @type evid: string
385 @return: MySQL insert command string
386 @rtype: string
387 """
388 insert = self.buildInsert(statusdata, table)
389 fields = []
390 if table == "history":
391 fields.append("deletedTime=null")
392 fields.append("evid='%s'" % evid)
393 insert += ","+",".join(fields)
394 if table == self.statusTable:
395 insert += " on duplicate key update "
396 if statusdata.has_key('prodState'):
397 insert += "prodState=%d," % int(statusdata['prodState'])
398 insert += "summary='%s',message='%s',%s=%s+1,%s=%.3f" % (
399 self.escape(decode(self.dmd.Devices, statusdata.get('summary',''))),
400 self.escape(decode(self.dmd.Devices, statusdata.get('message', ''))),
401 self.countField, self.countField,
402 self.lastTimeField,statusdata['lastTime'])
403 return insert
404
405
407 """
408 Build an insert to add detail values from an event to the details
409 table.
410
411 @param evid: event id
412 @type evid: string
413 @param detaildict: event
414 @type detaildict: dictionary
415 @return: MySQL insert command string
416 @rtype: string
417 """
418 insert = "insert into %s (evid, name, value) values " % self.detailTable
419 var = []
420 for field, value in detaildict.items():
421 if type(value) in types.StringTypes:
422 value = self.escape(decode(self.dmd.Devices, value))
423 var.append("('%s','%s','%s')" % (evid, field, value))
424 insert += ",".join(var)
425 return insert
426
427
429 """
430 Build a insert statement for that looks like this:
431 insert into status set field='value', field=1, ...
432
433 @param datadict: event
434 @type datadict: dictionary
435 @param table: name of the table to insert into
436 @type table: string
437 @return: MySQL insert command string
438 @rtype: string
439 """
440 insert = "insert into %s set " % table
441 fields = []
442 for name, value in datadict.items():
443 if type(value) in types.StringTypes:
444 fields.append("%s='%s'" % (name, self.escape(value)))
445 elif type(value) == types.FloatType:
446 fields.append("%s=%.3f" % (name, value))
447 else:
448 fields.append("%s=%s" % (name, value))
449 insert = str(insert) + str(','.join(fields))
450 return insert
451
452
454 """
455 Build an update statement that will clear related events.
456
457 @param evt: event
458 @type evt: Event class
459 @param clearcls: other fields to use to define 'related events'
460 @type clearcls: list of strings
461 @return: MySQL update command string
462 @rtype: string
463 """
464 update = "update %s " % self.statusTable
465 update += "set clearid = '%s' where " % evt.evid
466 w = []
467 w.append("%s='%s'" % (self.deviceField, self.escape(evt.device)))
468 w.append("%s='%s'" % (self.componentField,
469 self.escape(evt.component)[:255]))
470 w.append("eventKey='%s'" % self.escape(evt.eventKey)[:128])
471 update += " and ".join(w)
472
473 w = []
474 for cls in clearcls:
475 w.append("%s='%s'" % (self.eventClassField, self.escape(cls)))
476 if w:
477 update += " and (" + " or ".join(w) + ")"
478 log.debug("Clear command: %s", update)
479 return update
480
481
483 """
484 Return tuple (statusdata, detaildata) for this event.
485
486 @param event: event
487 @type event: Event class
488 @return: (statusdata, detaildata)
489 @rtype: tuple of dictionaries
490 """
491 statusfields = self.getFieldList()
492 statusdata = {}
493 detaildata = {}
494 for name, value in event.__dict__.items():
495 if name.startswith("_") or name == "dedupfields": continue
496 if name in statusfields:
497 statusdata[name] = value
498 else:
499 detaildata[name] = value
500 return statusdata, detaildata
501
502
504 """
505 Prepare string values for db by escaping special characters.
506
507 @param value: string containing possibly nasty characters
508 @type value: string
509 @return: escaped string
510 @rtype: string
511 """
512 if type(value) not in types.StringTypes:
513 return value
514
515 import _mysql
516 if type(value) == type(u''):
517 return _mysql.escape_string(value.encode('iso-8859-1'))
518 return _mysql.escape_string(value)
519
520
521
523 """
524 Class that can connect to backend must be passed:
525 username - backend username to use
526 password - backend password
527 database - backend database name
528 host - hostname of database server
529 port - port
530 """
531 backend = "mysql"
532
533 copyattrs = (
534 "username",
535 "password",
536 "database",
537 "host",
538 "port",
539 "requiredEventFields",
540 "defaultEventId",
541 "statusTable",
542 "deviceField",
543 "componentField",
544 "eventClassField",
545 "firstTimeField",
546 "lastTimeField",
547 "countField",
548 "detailTable",
549 )
550
556
558 """
559 To be implemented by the subclass
560 """
561 pass
562
563
565 """
566 Return the list of fields
567
568 @return: list of fields
569 @rtype: list
570 """
571 return self._fieldlist
572
573
575 """
576 Wrapper around MySQL database connection
577 """
578
579 running = True
580
582 threading.Thread.__init__(self)
583 MySqlSendEvent.__init__(self, zem)
584 self.setDaemon(1)
585 self.setName("SendEventThread")
586 self._evqueue = Queue()
587
589 """
590 Called from main thread to put an event on to the send queue.
591 """
592 return self._evqueue.put(evt)
593
594
596 """
597 Main event loop
598 """
599 log.info("Starting")
600 while not self._evqueue.empty() or self.running:
601 try:
602 evt = self._evqueue.get(True,1)
603 MySqlSendEvent.sendEvent(self, evt)
604 except Empty: pass
605 except OperationalError, e:
606 log.warn(e)
607 except Exception, e:
608 log.exception(e)
609 log.info("Stopped")
610
611
613 """
614 Called from main thread to stop this thread.
615 """
616 log.info("Stopping...")
617 self.running = False
618 self.join(3)
619