1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """MySqlSendEvent
15 Populate the events database with incoming events
16 """
17
18 import types
19 import threading
20 from Queue import Queue, Empty
21 import logging
22 log = logging.getLogger("zen.Events")
23
24 from _mysql_exceptions import ProgrammingError, OperationalError
25 from ZEO.Exceptions import ClientDisconnected
26
27 import Products.ZenUtils.guid as guid
28 from Products.ZenUtils.Utils import zdecode as decode
29 from Event import buildEventFromDict
30 from ZenEventClasses import Heartbeat, Unknown
31 from Products.ZenEvents.Exceptions import *
32
34 """
35 Run a MySQL statement and return the results.
36 If there's an error, logs it then re-raises the exception.
37
38 @param cursor: an open connection to MySQL
39 @type cursor: database connection
40 @param statement: MySQL statement
41 @type statement: string
42 @return: result of the statement
43 @rtype: string
44 """
45 try:
46 result = cursor.execute(statement)
47 log.debug("%s: --> %d" % (statement, result) )
48 except Exception, ex:
49 log.info("Invalid statement: %s", statement)
50 log.error(str(ex))
51 raise ex
52 return result
53
54
56 """
57 Mix-in class that takes a MySQL db connection and builds inserts that
58 sends the event to the backend.
59 """
60
62 """
63 Send an event to the backend.
64
65 @param event: an event
66 @type event: Event class
67 @return: event id or None
68 @rtype: string
69 """
70 log.debug('%s%s%s' % ('=' * 15, ' incoming event ', '=' * 15))
71 if type(event) == types.DictType:
72 event = buildEventFromDict(event)
73
74 if getattr(event, 'eventClass', Unknown) == Heartbeat:
75 log.debug("Got a %s %s heartbeat event (timeout %s sec).",
76 getattr(event, 'device', 'Unknown'),
77 getattr(event, 'component', 'Unknown'),
78 getattr(event, 'timeout', 'Unknown'))
79 return self._sendHeartbeat(event)
80
81 for field in self.requiredEventFields:
82 if not hasattr(event, field):
83 raise ZenEventError(
84 "Required event field %s not found" % field)
85
86
87 try:
88 event.severity = int(event.severity)
89 except:
90 event.severity = 1
91
92
93 known_actions = [ 'history', 'drop', 'status', 'heartbeat',
94 'alert_state', 'log', 'detail',
95 ]
96 if hasattr( event, '_action' ) and event._action not in known_actions:
97 event._action = 'status'
98
99
100
101 if not getattr(event, 'message', False):
102 event.message = getattr(event, 'summary', '')
103 event.summary = (getattr(event, 'summary', '') or event.message)[:128]
104
105 statusdata, detaildata = self.eventDataMaps(event)
106 log.debug("Event info: %s", statusdata)
107 if detaildata:
108 log.debug("Detail data: %s", detaildata)
109
110 if getattr(self, "getDmdRoot", False):
111 try:
112 event = self.applyEventContext(event)
113 except ClientDisconnected, e:
114 log.error(e)
115 raise ZenBackendFailure(str(e))
116 if not event:
117 log.debug("Unable to obtain event -- ignoring.(%s)", event)
118 return
119
120
121 if getattr(event, 'eventClass', Unknown) == Heartbeat:
122 log.debug("Transform created a heartbeat event.")
123 return self._sendHeartbeat(event)
124
125
126 if not hasattr(event, 'dedupid'):
127 dedupfields = event.getDedupFields(self.defaultEventId)
128 if not getattr(event, "eventKey", ""):
129 if type(dedupfields) != types.ListType:
130 dedupfields = list(dedupfields)
131 dedupfields = dedupfields + ["summary"]
132
133 dedupid = []
134 for field in dedupfields:
135 value = getattr(event, field, "")
136 dedupid.append('%s' % value)
137 dedupid = map(self.escape, dedupid)
138 event.dedupid = "|".join(dedupid)
139 log.debug("Created deupid of %s", event.dedupid)
140
141
142 cleanup = lambda : None
143 evid = None
144 try:
145 try:
146 evid = self.doSendEvent(event)
147 except ProgrammingError, e:
148 log.exception(e)
149 except OperationalError, e:
150 log.exception(e)
151 raise ZenBackendFailure(str(e))
152 finally:
153 cleanup()
154
155 if evid:
156 log.debug("New event id = %s", evid)
157 else:
158 log.debug("Duplicate event, updated database.")
159 return evid
160
161
163 """
164 Actually write the sanitized event into the database
165
166 @param event: event
167 @type event: Event class
168 @return: event id or None
169 @rtype: string
170 """
171 insert = ""
172 statusdata, detaildata = self.eventDataMaps(event)
173 log.debug("Performing action '%s' on event %s",
174 event._action, statusdata)
175 if detaildata:
176 log.debug("Detail data: %s", detaildata)
177
178 conn = self.connect()
179 try:
180 curs = conn.cursor()
181 evid = guid.generate()
182 event.evid = evid
183 rows = 0
184 if int(event.severity) == 0:
185 event._action = "history"
186 clearcls = event.clearClasses()
187 if clearcls:
188 rows = execute(curs, self.buildClearUpdate(event, clearcls))
189 if not rows:
190 return None
191 insert = ('insert into log '
192 '(evid, userName, text) '
193 'select evid, "admin", "auto cleared"'
194 ' from status where clearid = "%s"' % evid)
195 execute(curs, insert)
196 delete = 'DELETE FROM status WHERE clearid IS NOT NULL'
197 execute(curs, delete)
198 stmt = self.buildStatusInsert(statusdata, event._action, evid)
199 rescount = execute(curs, stmt)
200 if detaildata and rescount == 1:
201 execute(curs, self.buildDetailInsert(evid, detaildata))
202 if rescount != 1:
203 sql = ('select evid from %s where dedupid="%s"' % (
204 event._action, decode(self.dmd.Devices, event.dedupid)))
205 execute(curs, sql)
206 rs = curs.fetchone()
207 if rs:
208 evid = rs[0]
209 else:
210 evid = None
211 finally: self.close(conn)
212 return evid
213
214
216 """
217 Find and ip by looking up it up in the Networks catalog.
218
219 @param ipaddress: IP address
220 @type ipaddress: string
221 @param networks: DMD network object
222 @type networks: DMD object
223 @return: device object
224 @rtype: device object
225 """
226 log.debug("Looking up IP %s" % ipaddress)
227 ipobj = networks.findIp(ipaddress)
228 if ipobj and ipobj.device():
229 device = ipobj.device()
230 log.debug("IP %s -> %s", ipobj.id, device.id)
231 return device
232
233
235 """
236 Return the network root and event
237
238 @param evt: event
239 @type evt: Event class
240 @return: DMD object and the event
241 @rtype: DMD object, evt
242 """
243 return self.getDmdRoot('Networks'), evt
244
245
246 - def applyEventContext(self, evt):
247 """
248 Apply event and devices contexts to the event.
249 Only valid if this object has zeo connection.
250
251 @param evt: event
252 @type evt: Event class
253 @return: updated event
254 @rtype: Event class
255 """
256 events = self.getDmdRoot("Events")
257 devices = self.getDmdRoot("Devices")
258 networks, evt = self.getNetworkRoot(evt)
259
260
261
262 if getattr(evt, 'monitor', False):
263 monitorObj = self.getDmdRoot('Monitors'
264 ).Performance._getOb(evt.monitor, None)
265 if monitorObj:
266 devices = monitorObj
267
268
269
270 device = None
271 if getattr(evt, 'device', None):
272 device = devices.findDevice(evt.device)
273 if not device and getattr(evt, 'ipAddress', None):
274 device = devices.findDevice(evt.ipAddress)
275 if not device and getattr(evt, 'device', None):
276 device = self._findByIp(evt.device, networks)
277 if not device and getattr(evt, 'ipAddress', None):
278 device = self._findByIp(evt.ipAddress, networks)
279
280 if device:
281 evt.device = device.id
282 log.debug("Found device %s and adding device-specific"
283 " data", evt.device)
284 evt = self.applyDeviceContext(device, evt)
285
286 evtclass = events.lookup(evt, device)
287 if evtclass:
288 evt = evtclass.applyExtraction(evt)
289 evt = evtclass.applyValues(evt)
290 evt = evtclass.applyTransform(evt, device)
291
292 if evt._action == "drop":
293 log.debug("Dropping event")
294 return None
295
296 return evt
297
298
299 - def applyDeviceContext(self, device, evt):
300 """
301 Apply event attributes from device context.
302
303 @param device: device from DMD
304 @type device: device object
305 @param evt: event
306 @type evt: Event class
307 @return: updated event
308 @rtype: Event class
309 """
310 if not hasattr(evt, 'ipAddress'): evt.ipAddress = device.manageIp
311 evt.prodState = device.productionState
312 evt.Location = device.getLocationName()
313 evt.DeviceClass = device.getDeviceClassName()
314 evt.DeviceGroups = "|"+"|".join(device.getDeviceGroupNames())
315 evt.Systems = "|"+"|".join(device.getSystemNames())
316 evt.DevicePriority = device.getPriority()
317 return evt
318
319
321 """
322 Add a heartbeat record to the heartbeat table.
323
324 @param event: event
325 @type event: Event class
326 """
327 evdict = {}
328 if hasattr(event, "device"):
329 evdict['device'] = event.device
330 else:
331 log.warn("heartbeat without device skipping")
332 return
333 if hasattr(event, "timeout"):
334 evdict['timeout'] = event.timeout
335 else:
336 log.warn("heartbeat from %s without timeout skipping", event.device)
337 return
338 if hasattr(event, "component"):
339 evdict['component'] = event.component
340 else:
341 evdict['component'] = ""
342 insert = self.buildInsert(evdict, "heartbeat")
343 insert += " on duplicate key update lastTime=Null"
344 insert += ", timeout=%s" % evdict['timeout']
345 try:
346 conn = self.connect()
347 try:
348 curs = conn.cursor()
349 execute(curs, insert)
350 finally: self.close(conn)
351 except ProgrammingError, e:
352 log.error(insert)
353 log.exception(e)
354 except OperationalError, e:
355 raise ZenBackendFailure(str(e))
356
357
359 """
360 Build an insert statement for the status table that looks like this:
361 insert into status set device='box', count=1, ...
362 on duplicate key update count=count+1, lastTime=23424.34;
363
364 @param statusdata: event
365 @type statusdata: dictionary
366 @param table: name of the table to insert into
367 @type table: string
368 @param evid: event id
369 @type evid: string
370 @return: MySQL insert command string
371 @rtype: string
372 """
373 insert = self.buildInsert(statusdata, table)
374 fields = []
375 if table == "history":
376 fields.append("deletedTime=null")
377 fields.append("evid='%s'" % evid)
378 insert += ","+",".join(fields)
379 if table == self.statusTable:
380 insert += " on duplicate key update "
381 if statusdata.has_key('prodState'):
382 insert += "prodState=%d," % statusdata['prodState']
383 insert += "summary='%s',%s=%s+1,%s=%.3f" % (
384 self.escape(decode(self.dmd.Devices, statusdata.get('summary',''))),
385 self.countField, self.countField,
386 self.lastTimeField,statusdata['lastTime'])
387 return insert
388
389
391 """
392 Build an insert to add detail values from an event to the details
393 table.
394
395 @param evid: event id
396 @type evid: string
397 @param detaildict: event
398 @type detaildict: dictionary
399 @return: MySQL insert command string
400 @rtype: string
401 """
402 insert = "insert into %s (evid, name, value) values " % self.detailTable
403 var = []
404 for field, value in detaildict.items():
405 if type(value) in types.StringTypes:
406 value = self.escape(decode(self.dmd.Devices, value))
407 var.append("('%s','%s','%s')" % (evid, field, value))
408 insert += ",".join(var)
409 return insert
410
411
413 """
414 Build a insert statement for that looks like this:
415 insert into status set field='value', field=1, ...
416
417 @param datadict: event
418 @type datadict: dictionary
419 @param table: name of the table to insert into
420 @type table: string
421 @return: MySQL insert command string
422 @rtype: string
423 """
424 insert = "insert into %s set " % table
425 fields = []
426 for name, value in datadict.items():
427 if type(value) in types.StringTypes:
428 fields.append("%s='%s'" % (name, self.escape(value)))
429 elif type(value) == types.FloatType:
430 fields.append("%s=%.3f" % (name, value))
431 else:
432 fields.append("%s=%s" % (name, value))
433 insert = str(insert) + str(','.join(fields))
434 return insert
435
436
438 """
439 Build an update statement that will clear related events.
440
441 @param evt: event
442 @type evt: Event class
443 @param clearcls: other fields to use to define 'related events'
444 @type clearcls: list of strings
445 @return: MySQL update command string
446 @rtype: string
447 """
448 update = "update %s " % self.statusTable
449 update += "set clearid = '%s' where " % evt.evid
450 w = []
451 w.append("%s='%s'" % (self.deviceField, self.escape(evt.device)))
452 w.append("%s='%s'" % (self.componentField,
453 self.escape(evt.component)[:255]))
454 w.append("eventKey='%s'" % self.escape(evt.eventKey)[:128])
455 update += " and ".join(w)
456
457 w = []
458 for cls in clearcls:
459 w.append("%s='%s'" % (self.eventClassField, self.escape(cls)))
460 if w:
461 update += " and (" + " or ".join(w) + ")"
462 return update
463
464
466 """
467 Return tuple (statusdata, detaildata) for this event.
468
469 @param event: event
470 @type event: Event class
471 @return: (statusdata, detaildata)
472 @rtype: tuple of dictionaries
473 """
474 statusfields = self.getFieldList()
475 statusdata = {}
476 detaildata = {}
477 for name, value in event.__dict__.items():
478 if name.startswith("_") or name == "dedupfields": continue
479 if name in statusfields:
480 statusdata[name] = value
481 else:
482 detaildata[name] = value
483 return statusdata, detaildata
484
485
487 """
488 Prepare string values for db by escaping special characters.
489
490 @param value: string containing possibly nasty characters
491 @type value: string
492 @return: escaped string
493 @rtype: string
494 """
495 if type(value) not in types.StringTypes:
496 return value
497
498 import _mysql
499 if type(value) == type(u''):
500 return _mysql.escape_string(value.encode('iso-8859-1'))
501 return _mysql.escape_string(value)
502
503
504
506 """
507 Class that can connect to backend must be passed:
508 username - backend username to use
509 password - backend password
510 database - backend database name
511 host - hostname of database server
512 port - port
513 """
514 backend = "mysql"
515
516 copyattrs = (
517 "username",
518 "password",
519 "database",
520 "host",
521 "port",
522 "requiredEventFields",
523 "defaultEventId",
524 "statusTable",
525 "deviceField",
526 "componentField",
527 "eventClassField",
528 "firstTimeField",
529 "lastTimeField",
530 "countField",
531 "detailTable",
532 )
533
539
541 """
542 To be implemented by the subclass
543 """
544 pass
545
546
548 """
549 Return the list of fields
550
551 @return: list of fields
552 @rtype: list
553 """
554 return self._fieldlist
555
556
558 """
559 Wrapper around MySQL database connection
560 """
561
562 running = True
563
565 threading.Thread.__init__(self)
566 MySqlSendEvent.__init__(self, zem)
567 self.setDaemon(1)
568 self.setName("SendEventThread")
569 self._evqueue = Queue()
570
572 """
573 Called from main thread to put an event on to the send queue.
574 """
575 return self._evqueue.put(evt)
576
577
579 """
580 Main event loop
581 """
582 log.info("Starting")
583 while not self._evqueue.empty() or self.running:
584 try:
585 evt = self._evqueue.get(True,1)
586 MySqlSendEvent.sendEvent(self, evt)
587 except Empty: pass
588 except OperationalError, e:
589 log.warn(e)
590 except Exception, e:
591 log.exception(e)
592 log.info("Stopped")
593
594
596 """
597 Called from main thread to stop this thread.
598 """
599 log.info("Stopping...")
600 self.running = False
601 self.join(3)
602