Package Products :: Package ZenEvents :: Module MySqlSendEvent
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.MySqlSendEvent

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2008, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """MySqlSendEvent 
 15  Populate the events database with incoming events 
 16  """ 
 17   
 18  import types 
 19  import threading 
 20  from Queue import Queue, Empty 
 21  import logging 
 22  log = logging.getLogger("zen.Events") 
 23   
 24  from _mysql_exceptions import ProgrammingError, OperationalError 
 25  from ZEO.Exceptions import ClientDisconnected 
 26   
 27  import Products.ZenUtils.guid as guid 
 28  from Products.ZenUtils.Utils import zdecode as decode 
 29  from Event import buildEventFromDict 
 30  from ZenEventClasses import Heartbeat, Unknown 
 31  from Products.ZenEvents.Exceptions import * 
 32   
33 -def execute(cursor, statement):
34 """ 35 Run a MySQL statement and return the results. 36 If there's an error, logs it then re-raises the exception. 37 38 @param cursor: an open connection to MySQL 39 @type cursor: database connection 40 @param statement: MySQL statement 41 @type statement: string 42 @return: result of the statement 43 @rtype: string 44 """ 45 try: 46 result = cursor.execute(statement) 47 log.debug("%s: --> %d" % (statement, result) ) 48 except Exception, ex: 49 log.info("Invalid statement: %s", statement) 50 log.error(str(ex)) 51 raise ex 52 return result
53 54
55 -class MySqlSendEventMixin:
56 """ 57 Mix-in class that takes a MySQL db connection and builds inserts that 58 sends the event to the backend. 59 """ 60
61 - def sendEvent(self, event):
62 """ 63 Send an event to the backend. 64 65 @param event: an event 66 @type event: Event class 67 @return: event id or None 68 @rtype: string 69 """ 70 log.debug('%s%s%s' % ('=' * 15, ' incoming event ', '=' * 15)) 71 if type(event) == types.DictType: 72 event = buildEventFromDict(event) 73 74 if getattr(event, 'eventClass', Unknown) == Heartbeat: 75 log.debug("Got a %s %s heartbeat event (timeout %s sec).", 76 getattr(event, 'device', 'Unknown'), 77 getattr(event, 'component', 'Unknown'), 78 getattr(event, 'timeout', 'Unknown')) 79 return self._sendHeartbeat(event) 80 81 for field in self.requiredEventFields: 82 if not hasattr(event, field): 83 log.error("Required event field %s not found" \ 84 " -- ignoring event", field) 85 statusdata, detaildata = self.eventDataMaps(event) 86 log.error("Event info: %s", statusdata) 87 if detaildata: 88 log.error("Detail data: %s", detaildata) 89 return None 90 91 #FIXME - ungly hack to make sure severity is an int 92 try: 93 event.severity = int(event.severity) 94 except: 95 event.severity = 1 # Info 96 97 # Check for nasty haxor tricks 98 known_actions = [ 'history', 'drop', 'status', 'heartbeat', 99 'alert_state', 'log', 'detail', 100 ] 101 if hasattr( event, '_action' ) and event._action not in known_actions: 102 event._action = 'status' 103 104 # If either message or summary is empty then try to copy from the other. 105 # Make sure summary is truncated to 128 106 if not getattr(event, 'message', False): 107 event.message = getattr(event, 'summary', '') 108 event.summary = (getattr(event, 'summary', '') or event.message)[:128] 109 110 statusdata, detaildata = self.eventDataMaps(event) 111 log.debug("Event info: %s", statusdata) 112 if detaildata: 113 log.debug("Detail data: %s", detaildata) 114 115 if getattr(self, "getDmdRoot", False): 116 try: 117 event = self.applyEventContext(event) 118 except ClientDisconnected, e: 119 log.error(e) 120 raise ZenBackendFailure(str(e)) 121 if not event: 122 log.debug("Unable to obtain event -- ignoring.(%s)", event) 123 return 124 125 # check again for heartbeat after context processing 126 if getattr(event, 'eventClass', Unknown) == Heartbeat: 127 log.debug("Transform created a heartbeat event.") 128 return self._sendHeartbeat(event) 129 130 131 if not hasattr(event, 'dedupid'): 132 dedupfields = event.getDedupFields(self.defaultEventId) 133 if not getattr(event, "eventKey", ""): 134 if type(dedupfields) != types.ListType: 135 dedupfields = list(dedupfields) 136 dedupfields = dedupfields + ["summary"] 137 138 dedupid = [] 139 for field in dedupfields: 140 value = getattr(event, field, "") 141 dedupid.append('%s' % value) 142 dedupid = map(self.escape, dedupid) 143 event.dedupid = "|".join(dedupid) 144 log.debug("Created deupid of %s", event.dedupid) 145 146 # WTH is 'cleanup' supposed to do? Never gets used 147 cleanup = lambda : None 148 evid = None 149 try: 150 try: 151 evid = self.doSendEvent(event) 152 except ProgrammingError, e: 153 log.exception(e) 154 except OperationalError, e: 155 log.exception(e) 156 raise ZenBackendFailure(str(e)) 157 finally: 158 cleanup() 159 160 if evid: 161 log.debug("New event id = %s", evid) 162 else: 163 log.debug("Duplicate event, updated database.") 164 return evid
165 166
167 - def doSendEvent(self, event):
168 """ 169 Actually write the sanitized event into the database 170 171 @param event: event 172 @type event: Event class 173 @return: event id or None 174 @rtype: string 175 """ 176 insert = "" 177 statusdata, detaildata = self.eventDataMaps(event) 178 if int(event.severity) == 0: 179 log.debug("Clear event found with event data %s", 180 statusdata) 181 else: 182 log.debug("Performing action '%s' on event %s", 183 event._action, statusdata) 184 if detaildata: 185 log.debug("Detail data: %s", detaildata) 186 187 conn = self.connect() 188 try: 189 curs = conn.cursor() 190 evid = guid.generate() 191 event.evid = evid 192 rows = 0 193 if int(event.severity) == 0: 194 event._action = "history" 195 clearcls = event.clearClasses() 196 if not clearcls: 197 log.debug("No clear classes in event -- no action taken.") 198 else: 199 rows = execute(curs, self.buildClearUpdate(event, clearcls)) 200 log.debug("%d events matched clear criteria", rows) 201 if not rows: 202 return None 203 insert = ('insert into log ' 204 '(evid, userName, text) ' 205 'select evid, "admin", "auto cleared"' 206 ' from status where clearid = "%s"' % evid) 207 execute(curs, insert) 208 delete = 'DELETE FROM status WHERE clearid IS NOT NULL' 209 execute(curs, delete) 210 stmt = self.buildStatusInsert(statusdata, event._action, evid) 211 rescount = execute(curs, stmt) 212 if detaildata and rescount == 1: 213 execute(curs, self.buildDetailInsert(evid, detaildata)) 214 if rescount != 1: 215 sql = ('select evid from %s where dedupid="%s"' % ( 216 event._action, decode(self.dmd.Devices, event.dedupid))) 217 log.debug("%d events returned from insert -- selecting first match from %s.", 218 rescount, sql) 219 execute(curs, sql) 220 rs = curs.fetchone() 221 if rs: 222 evid = rs[0] 223 else: 224 log.debug("No matches found") 225 evid = None 226 finally: self.close(conn) 227 return evid
228 229
230 - def _findByIp(self, ipaddress, networks):
231 """ 232 Find and ip by looking up it up in the Networks catalog. 233 234 @param ipaddress: IP address 235 @type ipaddress: string 236 @param networks: DMD network object 237 @type networks: DMD object 238 @return: device object 239 @rtype: device object 240 """ 241 log.debug("Looking up IP %s" % ipaddress) 242 ipobj = networks.findIp(ipaddress) 243 if ipobj and ipobj.device(): 244 device = ipobj.device() 245 log.debug("IP %s -> %s", ipobj.id, device.id) 246 return device
247 248
249 - def getNetworkRoot(self, evt):
250 """ 251 Return the network root and event 252 253 @param evt: event 254 @type evt: Event class 255 @return: DMD object and the event 256 @rtype: DMD object, evt 257 """ 258 return self.getDmdRoot('Networks'), evt
259 260
261 - def applyEventContext(self, evt):
262 """ 263 Apply event and devices contexts to the event. 264 Only valid if this object has zeo connection. 265 266 @param evt: event 267 @type evt: Event class 268 @return: updated event 269 @rtype: Event class 270 """ 271 events = self.getDmdRoot("Events") 272 devices = self.getDmdRoot("Devices") 273 networks, evt = self.getNetworkRoot(evt) 274 275 # if the event has a monitor field use the PerformanceConf 276 # findDevice so that the find is scoped to the monitor (collector) 277 if getattr(evt, 'monitor', False): 278 monitorObj = self.getDmdRoot('Monitors' 279 ).Performance._getOb(evt.monitor, None) 280 if monitorObj: 281 devices = monitorObj 282 283 # Look for the device by name, then IP 'globally' 284 # and then from the /Network class 285 device = None 286 if getattr(evt, 'device', None): 287 device = devices.findDevice(evt.device) 288 if not device and getattr(evt, 'ipAddress', None): 289 device = devices.findDevice(evt.ipAddress) 290 if not device and getattr(evt, 'device', None): 291 device = self._findByIp(evt.device, networks) 292 if not device and getattr(evt, 'ipAddress', None): 293 device = self._findByIp(evt.ipAddress, networks) 294 295 if device: 296 evt.device = device.id 297 log.debug("Found device %s and adding device-specific" 298 " data", evt.device) 299 evt = self.applyDeviceContext(device, evt) 300 301 evtclass = events.lookup(evt, device) 302 if evtclass: 303 evt = evtclass.applyExtraction(evt) 304 evt = evtclass.applyValues(evt) 305 evt = evtclass.applyTransform(evt, device) 306 307 if evt._action == "drop": 308 log.debug("Dropping event") 309 return None 310 311 return evt
312 313
314 - def applyDeviceContext(self, device, evt):
315 """ 316 Apply event attributes from device context. 317 318 @param device: device from DMD 319 @type device: device object 320 @param evt: event 321 @type evt: Event class 322 @return: updated event 323 @rtype: Event class 324 """ 325 if not hasattr(evt, 'ipAddress'): evt.ipAddress = device.manageIp 326 evt.prodState = device.productionState 327 evt.Location = device.getLocationName() 328 evt.DeviceClass = device.getDeviceClassName() 329 evt.DeviceGroups = "|"+"|".join(device.getDeviceGroupNames()) 330 evt.Systems = "|"+"|".join(device.getSystemNames()) 331 evt.DevicePriority = device.getPriority() 332 return evt
333 334
335 - def _sendHeartbeat(self, event):
336 """ 337 Add a heartbeat record to the heartbeat table. 338 339 @param event: event 340 @type event: Event class 341 """ 342 evdict = {} 343 if hasattr(event, "device"): 344 evdict['device'] = event.device 345 else: 346 log.warn("heartbeat without device skipping") 347 return 348 if hasattr(event, "timeout"): 349 evdict['timeout'] = event.timeout 350 else: 351 log.warn("heartbeat from %s without timeout skipping", event.device) 352 return 353 if hasattr(event, "component"): 354 evdict['component'] = event.component 355 else: 356 evdict['component'] = "" 357 insert = self.buildInsert(evdict, "heartbeat") 358 insert += " on duplicate key update lastTime=Null" 359 insert += ", timeout=%s" % evdict['timeout'] 360 try: 361 conn = self.connect() 362 try: 363 curs = conn.cursor() 364 execute(curs, insert) 365 finally: self.close(conn) 366 except ProgrammingError, e: 367 log.error(insert) 368 log.exception(e) 369 except OperationalError, e: 370 raise ZenBackendFailure(str(e))
371 372
373 - def buildStatusInsert(self, statusdata, table, evid):
374 """ 375 Build an insert statement for the status table that looks like this: 376 insert into status set device='box', count=1, ... 377 on duplicate key update count=count+1, lastTime=23424.34; 378 379 @param statusdata: event 380 @type statusdata: dictionary 381 @param table: name of the table to insert into 382 @type table: string 383 @param evid: event id 384 @type evid: string 385 @return: MySQL insert command string 386 @rtype: string 387 """ 388 insert = self.buildInsert(statusdata, table) 389 fields = [] 390 if table == "history": 391 fields.append("deletedTime=null") 392 fields.append("evid='%s'" % evid) 393 insert += ","+",".join(fields) 394 if table == self.statusTable: 395 insert += " on duplicate key update " 396 if statusdata.has_key('prodState'): 397 insert += "prodState=%d," % int(statusdata['prodState']) 398 insert += "summary='%s',message='%s',%s=%s+1,%s=%.3f" % ( 399 self.escape(decode(self.dmd.Devices, statusdata.get('summary',''))), 400 self.escape(decode(self.dmd.Devices, statusdata.get('message', ''))), 401 self.countField, self.countField, 402 self.lastTimeField,statusdata['lastTime']) 403 return insert
404 405
406 - def buildDetailInsert(self, evid, detaildict):
407 """ 408 Build an insert to add detail values from an event to the details 409 table. 410 411 @param evid: event id 412 @type evid: string 413 @param detaildict: event 414 @type detaildict: dictionary 415 @return: MySQL insert command string 416 @rtype: string 417 """ 418 insert = "insert into %s (evid, name, value) values " % self.detailTable 419 var = [] 420 for field, value in detaildict.items(): 421 if type(value) in types.StringTypes: 422 value = self.escape(decode(self.dmd.Devices, value)) 423 var.append("('%s','%s','%s')" % (evid, field, value)) 424 insert += ",".join(var) 425 return insert
426 427
428 - def buildInsert(self, datadict, table):
429 """ 430 Build a insert statement for that looks like this: 431 insert into status set field='value', field=1, ... 432 433 @param datadict: event 434 @type datadict: dictionary 435 @param table: name of the table to insert into 436 @type table: string 437 @return: MySQL insert command string 438 @rtype: string 439 """ 440 insert = "insert into %s set " % table 441 fields = [] 442 for name, value in datadict.items(): 443 if type(value) in types.StringTypes: 444 fields.append("%s='%s'" % (name, self.escape(value))) 445 elif type(value) == types.FloatType: 446 fields.append("%s=%.3f" % (name, value)) 447 else: 448 fields.append("%s=%s" % (name, value)) 449 insert = str(insert) + str(','.join(fields)) 450 return insert
451 452
453 - def buildClearUpdate(self, evt, clearcls):
454 """ 455 Build an update statement that will clear related events. 456 457 @param evt: event 458 @type evt: Event class 459 @param clearcls: other fields to use to define 'related events' 460 @type clearcls: list of strings 461 @return: MySQL update command string 462 @rtype: string 463 """ 464 update = "update %s " % self.statusTable 465 update += "set clearid = '%s' where " % evt.evid 466 w = [] 467 w.append("%s='%s'" % (self.deviceField, self.escape(evt.device))) 468 w.append("%s='%s'" % (self.componentField, 469 self.escape(evt.component)[:255])) 470 w.append("eventKey='%s'" % self.escape(evt.eventKey)[:128]) 471 update += " and ".join(w) 472 473 w = [] 474 for cls in clearcls: 475 w.append("%s='%s'" % (self.eventClassField, self.escape(cls))) 476 if w: 477 update += " and (" + " or ".join(w) + ")" 478 log.debug("Clear command: %s", update) 479 return update
480 481
482 - def eventDataMaps(self, event):
483 """ 484 Return tuple (statusdata, detaildata) for this event. 485 486 @param event: event 487 @type event: Event class 488 @return: (statusdata, detaildata) 489 @rtype: tuple of dictionaries 490 """ 491 statusfields = self.getFieldList() 492 statusdata = {} 493 detaildata = {} 494 for name, value in event.__dict__.items(): 495 if name.startswith("_") or name == "dedupfields": continue 496 if name in statusfields: 497 statusdata[name] = value 498 else: 499 detaildata[name] = value 500 return statusdata, detaildata
501 502
503 - def escape(self, value):
504 """ 505 Prepare string values for db by escaping special characters. 506 507 @param value: string containing possibly nasty characters 508 @type value: string 509 @return: escaped string 510 @rtype: string 511 """ 512 if type(value) not in types.StringTypes: 513 return value 514 515 import _mysql 516 if type(value) == type(u''): 517 return _mysql.escape_string(value.encode('iso-8859-1')) 518 return _mysql.escape_string(value)
519 520 521
522 -class MySqlSendEvent(MySqlSendEventMixin):
523 """ 524 Class that can connect to backend must be passed: 525 username - backend username to use 526 password - backend password 527 database - backend database name 528 host - hostname of database server 529 port - port 530 """ 531 backend = "mysql" 532 533 copyattrs = ( 534 "username", 535 "password", 536 "database", 537 "host", 538 "port", 539 "requiredEventFields", 540 "defaultEventId", 541 "statusTable", 542 "deviceField", 543 "componentField", 544 "eventClassField", 545 "firstTimeField", 546 "lastTimeField", 547 "countField", 548 "detailTable", 549 ) 550
551 - def __init__(self, zem):
552 for att in self.copyattrs: 553 value = getattr(zem, att) 554 setattr(self, att, value) 555 self._fieldlist = zem.getFieldList()
556
557 - def stop(self):
558 """ 559 To be implemented by the subclass 560 """ 561 pass
562 563
564 - def getFieldList(self):
565 """ 566 Return the list of fields 567 568 @return: list of fields 569 @rtype: list 570 """ 571 return self._fieldlist
572 573
574 -class MySqlSendEventThread(threading.Thread, MySqlSendEvent):
575 """ 576 Wrapper around MySQL database connection 577 """ 578 579 running = True 580
581 - def __init__(self, zem):
582 threading.Thread.__init__(self) 583 MySqlSendEvent.__init__(self, zem) 584 self.setDaemon(1) 585 self.setName("SendEventThread") 586 self._evqueue = Queue()
587
588 - def sendEvent(self, evt):
589 """ 590 Called from main thread to put an event on to the send queue. 591 """ 592 return self._evqueue.put(evt)
593 594
595 - def run(self):
596 """ 597 Main event loop 598 """ 599 log.info("Starting") 600 while not self._evqueue.empty() or self.running: 601 try: 602 evt = self._evqueue.get(True,1) 603 MySqlSendEvent.sendEvent(self, evt) 604 except Empty: pass 605 except OperationalError, e: 606 log.warn(e) 607 except Exception, e: 608 log.exception(e) 609 log.info("Stopped")
610 611
612 - def stop(self):
613 """ 614 Called from main thread to stop this thread. 615 """ 616 log.info("Stopping...") 617 self.running = False 618 self.join(3)
619