Package Products :: Package ZenEvents :: Module MySqlSendEvent
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.MySqlSendEvent

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2008, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """MySqlSendEvent 
 15  Populate the events database with incoming events 
 16  """ 
 17   
 18  import types 
 19  import threading 
 20  from Queue import Queue, Empty 
 21  import logging 
 22  log = logging.getLogger("zen.Events") 
 23   
 24  from _mysql_exceptions import ProgrammingError, OperationalError 
 25  from ZEO.Exceptions import ClientDisconnected 
 26   
 27  import Products.ZenUtils.guid as guid 
 28  from Products.ZenUtils.Utils import zdecode as decode 
 29  from Event import buildEventFromDict 
 30  from ZenEventClasses import Heartbeat, Unknown 
 31  from Products.ZenEvents.Exceptions import * 
 32   
33 -def execute(cursor, statement):
34 """ 35 Run a MySQL statement and return the results. 36 If there's an error, logs it then re-raises the exception. 37 38 @param cursor: an open connection to MySQL 39 @type cursor: database connection 40 @param statement: MySQL statement 41 @type statement: string 42 @return: result of the statement 43 @rtype: string 44 """ 45 try: 46 result = cursor.execute(statement) 47 log.debug("%s: --> %d" % (statement, result) ) 48 except Exception, ex: 49 log.info("Invalid statement: %s", statement) 50 log.error(str(ex)) 51 raise ex 52 return result
53 54
55 -class MySqlSendEventMixin:
56 """ 57 Mix-in class that takes a MySQL db connection and builds inserts that 58 sends the event to the backend. 59 """ 60
61 - def sendEvent(self, event):
62 """ 63 Send an event to the backend. 64 65 @param event: an event 66 @type event: Event class 67 @return: event id or None 68 @rtype: string 69 """ 70 log.debug('%s%s%s' % ('=' * 15, ' incoming event ', '=' * 15)) 71 if type(event) == types.DictType: 72 event = buildEventFromDict(event) 73 74 if getattr(event, 'eventClass', Unknown) == Heartbeat: 75 log.debug("Got a %s %s heartbeat event (timeout %s sec).", 76 getattr(event, 'device', 'Unknown'), 77 getattr(event, 'component', 'Unknown'), 78 getattr(event, 'timeout', 'Unknown')) 79 return self._sendHeartbeat(event) 80 81 for field in self.requiredEventFields: 82 if not hasattr(event, field): 83 raise ZenEventError( 84 "Required event field %s not found" % field) 85 86 #FIXME - ungly hack to make sure severity is an int 87 try: 88 event.severity = int(event.severity) 89 except: 90 event.severity = 1 # Info 91 92 # Check for nasty haxor tricks 93 known_actions = [ 'history', 'drop', 'status', 'heartbeat', 94 'alert_state', 'log', 'detail', 95 ] 96 if hasattr( event, '_action' ) and event._action not in known_actions: 97 event._action = 'status' 98 99 # If either message or summary is empty then try to copy from the other. 100 # Make sure summary is truncated to 128 101 if not getattr(event, 'message', False): 102 event.message = getattr(event, 'summary', '') 103 event.summary = (getattr(event, 'summary', '') or event.message)[:128] 104 105 statusdata, detaildata = self.eventDataMaps(event) 106 log.debug("Event info: %s", statusdata) 107 if detaildata: 108 log.debug("Detail data: %s", detaildata) 109 110 if getattr(self, "getDmdRoot", False): 111 try: 112 event = self.applyEventContext(event) 113 except ClientDisconnected, e: 114 log.error(e) 115 raise ZenBackendFailure(str(e)) 116 if not event: 117 log.debug("Unable to obtain event -- ignoring.(%s)", event) 118 return 119 120 # check again for heartbeat after context processing 121 if getattr(event, 'eventClass', Unknown) == Heartbeat: 122 log.debug("Transform created a heartbeat event.") 123 return self._sendHeartbeat(event) 124 125 126 if not hasattr(event, 'dedupid'): 127 dedupfields = event.getDedupFields(self.defaultEventId) 128 if not getattr(event, "eventKey", ""): 129 if type(dedupfields) != types.ListType: 130 dedupfields = list(dedupfields) 131 dedupfields = dedupfields + ["summary"] 132 133 dedupid = [] 134 for field in dedupfields: 135 value = getattr(event, field, "") 136 dedupid.append('%s' % value) 137 dedupid = map(self.escape, dedupid) 138 event.dedupid = "|".join(dedupid) 139 log.debug("Created deupid of %s", event.dedupid) 140 141 # WTH is 'cleanup' supposed to do? Never gets used 142 cleanup = lambda : None 143 evid = None 144 try: 145 try: 146 evid = self.doSendEvent(event) 147 except ProgrammingError, e: 148 log.exception(e) 149 except OperationalError, e: 150 log.exception(e) 151 raise ZenBackendFailure(str(e)) 152 finally: 153 cleanup() 154 155 if evid: 156 log.debug("New event id = %s", evid) 157 else: 158 log.debug("Duplicate event, updated database.") 159 return evid
160 161
162 - def doSendEvent(self, event):
163 """ 164 Actually write the sanitized event into the database 165 166 @param event: event 167 @type event: Event class 168 @return: event id or None 169 @rtype: string 170 """ 171 insert = "" 172 statusdata, detaildata = self.eventDataMaps(event) 173 log.debug("Performing action '%s' on event %s", 174 event._action, statusdata) 175 if detaildata: 176 log.debug("Detail data: %s", detaildata) 177 178 conn = self.connect() 179 try: 180 curs = conn.cursor() 181 evid = guid.generate() 182 event.evid = evid 183 rows = 0 184 if int(event.severity) == 0: 185 event._action = "history" 186 clearcls = event.clearClasses() 187 if clearcls: 188 rows = execute(curs, self.buildClearUpdate(event, clearcls)) 189 if not rows: 190 return None 191 insert = ('insert into log ' 192 '(evid, userName, text) ' 193 'select evid, "admin", "auto cleared"' 194 ' from status where clearid = "%s"' % evid) 195 execute(curs, insert) 196 delete = 'DELETE FROM status WHERE clearid IS NOT NULL' 197 execute(curs, delete) 198 stmt = self.buildStatusInsert(statusdata, event._action, evid) 199 rescount = execute(curs, stmt) 200 if detaildata and rescount == 1: 201 execute(curs, self.buildDetailInsert(evid, detaildata)) 202 if rescount != 1: 203 sql = ('select evid from %s where dedupid="%s"' % ( 204 event._action, decode(self.dmd.Devices, event.dedupid))) 205 execute(curs, sql) 206 rs = curs.fetchone() 207 if rs: 208 evid = rs[0] 209 else: 210 evid = None 211 finally: self.close(conn) 212 return evid
213 214
215 - def _findByIp(self, ipaddress, networks):
216 """ 217 Find and ip by looking up it up in the Networks catalog. 218 219 @param ipaddress: IP address 220 @type ipaddress: string 221 @param networks: DMD network object 222 @type networks: DMD object 223 @return: device object 224 @rtype: device object 225 """ 226 log.debug("Looking up IP %s" % ipaddress) 227 ipobj = networks.findIp(ipaddress) 228 if ipobj and ipobj.device(): 229 device = ipobj.device() 230 log.debug("IP %s -> %s", ipobj.id, device.id) 231 return device
232 233
234 - def getNetworkRoot(self, evt):
235 """ 236 Return the network root and event 237 238 @param evt: event 239 @type evt: Event class 240 @return: DMD object and the event 241 @rtype: DMD object, evt 242 """ 243 return self.getDmdRoot('Networks'), evt
244 245
246 - def applyEventContext(self, evt):
247 """ 248 Apply event and devices contexts to the event. 249 Only valid if this object has zeo connection. 250 251 @param evt: event 252 @type evt: Event class 253 @return: updated event 254 @rtype: Event class 255 """ 256 events = self.getDmdRoot("Events") 257 devices = self.getDmdRoot("Devices") 258 networks, evt = self.getNetworkRoot(evt) 259 260 # if the event has a monitor field use the PerformanceConf 261 # findDevice so that the find is scoped to the monitor (collector) 262 if getattr(evt, 'monitor', False): 263 monitorObj = self.getDmdRoot('Monitors' 264 ).Performance._getOb(evt.monitor, None) 265 if monitorObj: 266 devices = monitorObj 267 268 # Look for the device by name, then IP 'globally' 269 # and then from the /Network class 270 device = None 271 if getattr(evt, 'device', None): 272 device = devices.findDevice(evt.device) 273 if not device and getattr(evt, 'ipAddress', None): 274 device = devices.findDevice(evt.ipAddress) 275 if not device and getattr(evt, 'device', None): 276 device = self._findByIp(evt.device, networks) 277 if not device and getattr(evt, 'ipAddress', None): 278 device = self._findByIp(evt.ipAddress, networks) 279 280 if device: 281 evt.device = device.id 282 log.debug("Found device %s and adding device-specific" 283 " data", evt.device) 284 evt = self.applyDeviceContext(device, evt) 285 286 evtclass = events.lookup(evt, device) 287 if evtclass: 288 evt = evtclass.applyExtraction(evt) 289 evt = evtclass.applyValues(evt) 290 evt = evtclass.applyTransform(evt, device) 291 292 if evt._action == "drop": 293 log.debug("Dropping event") 294 return None 295 296 return evt
297 298
299 - def applyDeviceContext(self, device, evt):
300 """ 301 Apply event attributes from device context. 302 303 @param device: device from DMD 304 @type device: device object 305 @param evt: event 306 @type evt: Event class 307 @return: updated event 308 @rtype: Event class 309 """ 310 if not hasattr(evt, 'ipAddress'): evt.ipAddress = device.manageIp 311 evt.prodState = device.productionState 312 evt.Location = device.getLocationName() 313 evt.DeviceClass = device.getDeviceClassName() 314 evt.DeviceGroups = "|"+"|".join(device.getDeviceGroupNames()) 315 evt.Systems = "|"+"|".join(device.getSystemNames()) 316 evt.DevicePriority = device.getPriority() 317 return evt
318 319
320 - def _sendHeartbeat(self, event):
321 """ 322 Add a heartbeat record to the heartbeat table. 323 324 @param event: event 325 @type event: Event class 326 """ 327 evdict = {} 328 if hasattr(event, "device"): 329 evdict['device'] = event.device 330 else: 331 log.warn("heartbeat without device skipping") 332 return 333 if hasattr(event, "timeout"): 334 evdict['timeout'] = event.timeout 335 else: 336 log.warn("heartbeat from %s without timeout skipping", event.device) 337 return 338 if hasattr(event, "component"): 339 evdict['component'] = event.component 340 else: 341 evdict['component'] = "" 342 insert = self.buildInsert(evdict, "heartbeat") 343 insert += " on duplicate key update lastTime=Null" 344 insert += ", timeout=%s" % evdict['timeout'] 345 try: 346 conn = self.connect() 347 try: 348 curs = conn.cursor() 349 execute(curs, insert) 350 finally: self.close(conn) 351 except ProgrammingError, e: 352 log.error(insert) 353 log.exception(e) 354 except OperationalError, e: 355 raise ZenBackendFailure(str(e))
356 357
358 - def buildStatusInsert(self, statusdata, table, evid):
359 """ 360 Build an insert statement for the status table that looks like this: 361 insert into status set device='box', count=1, ... 362 on duplicate key update count=count+1, lastTime=23424.34; 363 364 @param statusdata: event 365 @type statusdata: dictionary 366 @param table: name of the table to insert into 367 @type table: string 368 @param evid: event id 369 @type evid: string 370 @return: MySQL insert command string 371 @rtype: string 372 """ 373 insert = self.buildInsert(statusdata, table) 374 fields = [] 375 if table == "history": 376 fields.append("deletedTime=null") 377 fields.append("evid='%s'" % evid) 378 insert += ","+",".join(fields) 379 if table == self.statusTable: 380 insert += " on duplicate key update " 381 if statusdata.has_key('prodState'): 382 insert += "prodState=%d," % statusdata['prodState'] 383 insert += "summary='%s',%s=%s+1,%s=%.3f" % ( 384 self.escape(decode(self.dmd.Devices, statusdata.get('summary',''))), 385 self.countField, self.countField, 386 self.lastTimeField,statusdata['lastTime']) 387 return insert
388 389
390 - def buildDetailInsert(self, evid, detaildict):
391 """ 392 Build an insert to add detail values from an event to the details 393 table. 394 395 @param evid: event id 396 @type evid: string 397 @param detaildict: event 398 @type detaildict: dictionary 399 @return: MySQL insert command string 400 @rtype: string 401 """ 402 insert = "insert into %s (evid, name, value) values " % self.detailTable 403 var = [] 404 for field, value in detaildict.items(): 405 if type(value) in types.StringTypes: 406 value = self.escape(decode(self.dmd.Devices, value)) 407 var.append("('%s','%s','%s')" % (evid, field, value)) 408 insert += ",".join(var) 409 return insert
410 411
412 - def buildInsert(self, datadict, table):
413 """ 414 Build a insert statement for that looks like this: 415 insert into status set field='value', field=1, ... 416 417 @param datadict: event 418 @type datadict: dictionary 419 @param table: name of the table to insert into 420 @type table: string 421 @return: MySQL insert command string 422 @rtype: string 423 """ 424 insert = "insert into %s set " % table 425 fields = [] 426 for name, value in datadict.items(): 427 if type(value) in types.StringTypes: 428 fields.append("%s='%s'" % (name, self.escape(value))) 429 elif type(value) == types.FloatType: 430 fields.append("%s=%.3f" % (name, value)) 431 else: 432 fields.append("%s=%s" % (name, value)) 433 insert = str(insert) + str(','.join(fields)) 434 return insert
435 436
437 - def buildClearUpdate(self, evt, clearcls):
438 """ 439 Build an update statement that will clear related events. 440 441 @param evt: event 442 @type evt: Event class 443 @param clearcls: other fields to use to define 'related events' 444 @type clearcls: list of strings 445 @return: MySQL update command string 446 @rtype: string 447 """ 448 update = "update %s " % self.statusTable 449 update += "set clearid = '%s' where " % evt.evid 450 w = [] 451 w.append("%s='%s'" % (self.deviceField, self.escape(evt.device))) 452 w.append("%s='%s'" % (self.componentField, 453 self.escape(evt.component)[:255])) 454 w.append("eventKey='%s'" % self.escape(evt.eventKey)[:128]) 455 update += " and ".join(w) 456 457 w = [] 458 for cls in clearcls: 459 w.append("%s='%s'" % (self.eventClassField, self.escape(cls))) 460 if w: 461 update += " and (" + " or ".join(w) + ")" 462 return update
463 464
465 - def eventDataMaps(self, event):
466 """ 467 Return tuple (statusdata, detaildata) for this event. 468 469 @param event: event 470 @type event: Event class 471 @return: (statusdata, detaildata) 472 @rtype: tuple of dictionaries 473 """ 474 statusfields = self.getFieldList() 475 statusdata = {} 476 detaildata = {} 477 for name, value in event.__dict__.items(): 478 if name.startswith("_") or name == "dedupfields": continue 479 if name in statusfields: 480 statusdata[name] = value 481 else: 482 detaildata[name] = value 483 return statusdata, detaildata
484 485
486 - def escape(self, value):
487 """ 488 Prepare string values for db by escaping special characters. 489 490 @param value: string containing possibly nasty characters 491 @type value: string 492 @return: escaped string 493 @rtype: string 494 """ 495 if type(value) not in types.StringTypes: 496 return value 497 498 import _mysql 499 if type(value) == type(u''): 500 return _mysql.escape_string(value.encode('iso-8859-1')) 501 return _mysql.escape_string(value)
502 503 504
505 -class MySqlSendEvent(MySqlSendEventMixin):
506 """ 507 Class that can connect to backend must be passed: 508 username - backend username to use 509 password - backend password 510 database - backend database name 511 host - hostname of database server 512 port - port 513 """ 514 backend = "mysql" 515 516 copyattrs = ( 517 "username", 518 "password", 519 "database", 520 "host", 521 "port", 522 "requiredEventFields", 523 "defaultEventId", 524 "statusTable", 525 "deviceField", 526 "componentField", 527 "eventClassField", 528 "firstTimeField", 529 "lastTimeField", 530 "countField", 531 "detailTable", 532 ) 533
534 - def __init__(self, zem):
535 for att in self.copyattrs: 536 value = getattr(zem, att) 537 setattr(self, att, value) 538 self._fieldlist = zem.getFieldList()
539
540 - def stop(self):
541 """ 542 To be implemented by the subclass 543 """ 544 pass
545 546
547 - def getFieldList(self):
548 """ 549 Return the list of fields 550 551 @return: list of fields 552 @rtype: list 553 """ 554 return self._fieldlist
555 556
557 -class MySqlSendEventThread(threading.Thread, MySqlSendEvent):
558 """ 559 Wrapper around MySQL database connection 560 """ 561 562 running = True 563
564 - def __init__(self, zem):
565 threading.Thread.__init__(self) 566 MySqlSendEvent.__init__(self, zem) 567 self.setDaemon(1) 568 self.setName("SendEventThread") 569 self._evqueue = Queue()
570
571 - def sendEvent(self, evt):
572 """ 573 Called from main thread to put an event on to the send queue. 574 """ 575 return self._evqueue.put(evt)
576 577
578 - def run(self):
579 """ 580 Main event loop 581 """ 582 log.info("Starting") 583 while not self._evqueue.empty() or self.running: 584 try: 585 evt = self._evqueue.get(True,1) 586 MySqlSendEvent.sendEvent(self, evt) 587 except Empty: pass 588 except OperationalError, e: 589 log.warn(e) 590 except Exception, e: 591 log.exception(e) 592 log.info("Stopped")
593 594
595 - def stop(self):
596 """ 597 Called from main thread to stop this thread. 598 """ 599 log.info("Stopping...") 600 self.running = False 601 self.join(3)
602