Package ZenEvents :: Module MySqlSendEvent
[hide private]
[frames] | no frames]

Source Code for Module ZenEvents.MySqlSendEvent

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2008, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """MySqlSendEvent 
 15  Populate the events database with incoming events 
 16  """ 
 17   
 18  import types 
 19  import threading 
 20  from Queue import Queue, Empty 
 21  import logging 
 22  log = logging.getLogger("zen.Events") 
 23   
 24  from _mysql_exceptions import ProgrammingError, OperationalError 
 25  from ZEO.Exceptions import ClientDisconnected 
 26   
 27  import Products.ZenUtils.guid as guid 
 28  from Products.ZenUtils.Utils import zdecode as decode 
 29  from Event import buildEventFromDict 
 30  from ZenEventClasses import Heartbeat, Unknown 
 31  from Products.ZenEvents.Exceptions import * 
 32   
33 -def execute(cursor, statement):
34 """ 35 Run a MySQL statement and return the results. 36 If there's an error, logs it then re-raises the exception. 37 38 @param cursor: an open connection to MySQL 39 @type cursor: database connection 40 @param statement: MySQL statement 41 @type statement: string 42 @return: result of the statement 43 @rtype: string 44 """ 45 try: 46 result = cursor.execute(statement) 47 log.debug("%s: --> %d" % (statement, result) ) 48 except Exception, ex: 49 log.debug(statement) 50 log.exception(ex) 51 raise ex 52 return result
53 54
55 -class MySqlSendEventMixin:
56 """ 57 Mix-in class that takes a MySQL db connection and builds inserts that 58 sends the event to the backend. 59 """ 60
61 - def sendEvent(self, event):
62 """ 63 Send an event to the backend. 64 65 @param event: an event 66 @type event: Event class 67 @return: event id or None 68 @rtype: string 69 """ 70 if type(event) == types.DictType: 71 event = buildEventFromDict(event) 72 73 if getattr(event, 'eventClass', Unknown) == Heartbeat: 74 return self._sendHeartbeat(event) 75 76 for field in self.requiredEventFields: 77 if not hasattr(event, field): 78 raise ZenEventError( 79 "Required event field %s not found" % field) 80 81 #FIXME - ungly hack to make sure severity is an int 82 try: 83 event.severity = int(event.severity) 84 except: 85 event.severity = 1 # Info 86 87 # Check for nasty haxor tricks 88 known_actions = [ 'history', 'drop', 'status', 'heartbeat', 89 'alert_state', 'log', 'detail', 90 ] 91 if hasattr( event, '_action' ) and event._action not in known_actions: 92 event._action = 'status' 93 94 # If either message or summary is empty then try to copy from the other. 95 # Make sure summary is truncated to 128 96 if not getattr(event, 'message', False): 97 event.message = getattr(event, 'summary', '') 98 event.summary = (getattr(event, 'summary', '') or event.message)[:128] 99 100 if getattr(self, "getDmdRoot", False): 101 try: 102 event = self.applyEventContext(event) 103 except ClientDisconnected, e: 104 log.error(e) 105 raise ZenBackendFailure(str(e)) 106 if not event: return 107 108 # check again for heartbeat after context processing 109 if getattr(event, 'eventClass', Unknown) == Heartbeat: 110 return self._sendHeartbeat(event) 111 112 113 if not hasattr(event, 'dedupid'): 114 dedupfields = event.getDedupFields(self.defaultEventId) 115 if not getattr(event, "eventKey", ""): 116 if type(dedupfields) != types.ListType: 117 dedupfields = list(dedupfields) 118 dedupfields = dedupfields + ["summary"] 119 120 dedupid = [] 121 for field in dedupfields: 122 value = getattr(event, field, "") 123 dedupid.append('%s' % value) 124 dedupid = map(self.escape, dedupid) 125 event.dedupid = "|".join(dedupid) 126 127 # WTH is 'cleanup' supposed to do? Never gets used 128 cleanup = lambda : None 129 evid = None 130 try: 131 try: 132 evid = self.doSendEvent(event) 133 except ProgrammingError, e: 134 log.exception(e) 135 except OperationalError, e: 136 log.exception(e) 137 raise ZenBackendFailure(str(e)) 138 finally: 139 cleanup() 140 return evid
141 142
143 - def doSendEvent(self, event):
144 """ 145 Actually write the sanitized event into the database 146 147 @param event: event 148 @type event: Event class 149 @return: event id or None 150 @rtype: string 151 """ 152 insert = "" 153 statusdata, detaildata = self.eventDataMaps(event) 154 conn = self.connect() 155 try: 156 curs = conn.cursor() 157 evid = guid.generate() 158 event.evid = evid 159 rows = 0 160 if event.severity == 0: 161 event._action = "history" 162 clearcls = event.clearClasses() 163 if clearcls: 164 rows = execute(curs, self.buildClearUpdate(event, clearcls)) 165 if not rows: 166 return None 167 insert = ('insert into log ' 168 '(evid, userName, text) ' 169 'select evid, "admin", "auto cleared"' 170 ' from status where clearid = "%s"' % evid) 171 execute(curs, insert) 172 delete = 'DELETE FROM status WHERE clearid IS NOT NULL' 173 execute(curs, delete) 174 stmt = self.buildStatusInsert(statusdata, event._action, evid) 175 rescount = execute(curs, stmt) 176 if detaildata and rescount == 1: 177 execute(curs, self.buildDetailInsert(evid, detaildata)) 178 if rescount != 1: 179 sql = ('select evid from %s where dedupid="%s"' % ( 180 event._action, decode(self.dmd.Devices, event.dedupid))) 181 execute(curs, sql) 182 rs = curs.fetchone() 183 if rs: 184 evid = rs[0] 185 else: 186 evid = None 187 finally: self.close(conn) 188 return evid
189 190
191 - def _findByIp(self, ipaddress, networks):
192 """ 193 Find and ip by looking up it up in the Networks catalog. 194 195 @param ipaddress: IP address 196 @type ipaddress: string 197 @param networks: DMD network object 198 @type networks: DMD object 199 @return: device object 200 @rtype: device object 201 """ 202 log.debug("Looking up IP %s" % ipaddress) 203 ipobj = networks.findIp(ipaddress) 204 if ipobj and ipobj.device(): 205 device = ipobj.device() 206 log.debug("IP %s -> %s", ipobj.id, device.id) 207 return device
208 209
210 - def getNetworkRoot(self, evt):
211 """ 212 Return the network root and event 213 214 @param evt: event 215 @type evt: Event class 216 @return: DMD object and the event 217 @rtype: DMD object, evt 218 """ 219 return self.getDmdRoot('Networks'), evt
220 221
222 - def applyEventContext(self, evt):
223 """ 224 Apply event and devices contexts to the event. 225 Only valid if this object has zeo connection. 226 227 @param evt: event 228 @type evt: Event class 229 @return: updated event 230 @rtype: Event class 231 """ 232 events = self.getDmdRoot("Events") 233 devices = self.getDmdRoot("Devices") 234 networks, evt = self.getNetworkRoot(evt) 235 236 # if the event has a monitor field use the PerformanceConf 237 # findDevice so that the find is scoped to the monitor (collector) 238 if getattr(evt, 'monitor', False): 239 monitorObj = self.getDmdRoot('Monitors' 240 ).Performance._getOb(evt.monitor, None) 241 if monitorObj: 242 devices = monitorObj 243 244 # Look for the device by name, then IP 'globally' 245 # and then from the /Network class 246 device = None 247 if getattr(evt, 'device', None): 248 device = devices.findDevice(evt.device) 249 if not device and getattr(evt, 'ipAddress', None): 250 device = devices.findDevice(evt.ipAddress) 251 if not device and getattr(evt, 'device', None): 252 device = self._findByIp(evt.device, networks) 253 if not device and getattr(evt, 'ipAddress', None): 254 device = self._findByIp(evt.ipAddress, networks) 255 256 if device: 257 evt.device = device.id 258 log.debug("Found device %s and adding device-specific" 259 " rules", evt.device) 260 evt = self.applyDeviceContext(device, evt) 261 262 evtclass = events.lookup(evt, device) 263 if evtclass: 264 log.debug("EventClassInst=%s", evtclass.id) 265 evt = evtclass.applyExtraction(evt) 266 evt = evtclass.applyValues(evt) 267 evt = evtclass.applyTransform(evt, device) 268 269 if evt._action == "drop": 270 log.debug("Dropping event") 271 return None 272 273 return evt
274 275
276 - def applyDeviceContext(self, device, evt):
277 """ 278 Apply event attributes from device context. 279 280 @param device: device from DMD 281 @type device: device object 282 @param evt: event 283 @type evt: Event class 284 @return: updated event 285 @rtype: Event class 286 """ 287 if not hasattr(evt, 'ipAddress'): evt.ipAddress = device.manageIp 288 evt.prodState = device.productionState 289 evt.Location = device.getLocationName() 290 evt.DeviceClass = device.getDeviceClassName() 291 evt.DeviceGroups = "|"+"|".join(device.getDeviceGroupNames()) 292 evt.Systems = "|"+"|".join(device.getSystemNames()) 293 evt.DevicePriority = device.getPriority() 294 return evt
295 296
297 - def _sendHeartbeat(self, event):
298 """ 299 Add a heartbeat record to the heartbeat table. 300 301 @param event: event 302 @type event: Event class 303 """ 304 evdict = {} 305 if hasattr(event, "device"): 306 evdict['device'] = event.device 307 else: 308 log.warn("heartbeat without device skipping") 309 return 310 if hasattr(event, "timeout"): 311 evdict['timeout'] = event.timeout 312 else: 313 log.warn("heartbeat from %s without timeout skipping", event.device) 314 return 315 if hasattr(event, "component"): 316 evdict['component'] = event.component 317 else: 318 evdict['component'] = "" 319 insert = self.buildInsert(evdict, "heartbeat") 320 insert += " on duplicate key update lastTime=Null" 321 insert += ", timeout=%s" % evdict['timeout'] 322 try: 323 conn = self.connect() 324 try: 325 curs = conn.cursor() 326 execute(curs, insert) 327 finally: self.close(conn) 328 except ProgrammingError, e: 329 log.error(insert) 330 log.exception(e) 331 except OperationalError, e: 332 raise ZenBackendFailure(str(e))
333 334
335 - def buildStatusInsert(self, statusdata, table, evid):
336 """ 337 Build an insert statement for the status table that looks like this: 338 insert into status set device='box', count=1, ... 339 on duplicate key update count=count+1, lastTime=23424.34; 340 341 @param statusdata: event 342 @type statusdata: dictionary 343 @param table: name of the table to insert into 344 @type table: string 345 @param evid: event id 346 @type evid: string 347 @return: MySQL insert command string 348 @rtype: string 349 """ 350 insert = self.buildInsert(statusdata, table) 351 fields = [] 352 if table == "history": 353 fields.append("deletedTime=null") 354 fields.append("evid='%s'" % evid) 355 insert += ","+",".join(fields) 356 if table == self.statusTable: 357 insert += " on duplicate key update " 358 if statusdata.has_key('prodState'): 359 insert += "prodState=%d," % statusdata['prodState'] 360 insert += "summary='%s',%s=%s+1,%s=%.3f" % ( 361 self.escape(decode(self.dmd.Devices, statusdata.get('summary',''))), 362 self.countField, self.countField, 363 self.lastTimeField,statusdata['lastTime']) 364 return insert
365 366
367 - def buildDetailInsert(self, evid, detaildict):
368 """ 369 Build an insert to add detail values from an event to the details 370 table. 371 372 @param evid: event id 373 @type evid: string 374 @param detaildict: event 375 @type detaildict: dictionary 376 @return: MySQL insert command string 377 @rtype: string 378 """ 379 insert = "insert into %s (evid, name, value) values " % self.detailTable 380 var = [] 381 for field, value in detaildict.items(): 382 if type(value) in types.StringTypes: 383 value = self.escape(decode(self.dmd.Devices, value)) 384 var.append("('%s','%s','%s')" % (evid, field, value)) 385 insert += ",".join(var) 386 return insert
387 388
389 - def buildInsert(self, datadict, table):
390 """ 391 Build a insert statement for that looks like this: 392 insert into status set field='value', field=1, ... 393 394 @param datadict: event 395 @type datadict: dictionary 396 @param table: name of the table to insert into 397 @type table: string 398 @return: MySQL insert command string 399 @rtype: string 400 """ 401 insert = "insert into %s set " % table 402 fields = [] 403 for name, value in datadict.items(): 404 if type(value) in types.StringTypes: 405 fields.append("%s='%s'" % (name, self.escape(value))) 406 elif type(value) == types.FloatType: 407 fields.append("%s=%.3f" % (name, value)) 408 else: 409 fields.append("%s=%s" % (name, value)) 410 insert = str(insert) + str(','.join(fields)) 411 return insert
412 413
414 - def buildClearUpdate(self, evt, clearcls):
415 """ 416 Build an update statement that will clear related events. 417 418 @param evt: event 419 @type evt: Event class 420 @param clearcls: other fields to use to define 'related events' 421 @type clearcls: list of strings 422 @return: MySQL update command string 423 @rtype: string 424 """ 425 update = "update %s " % self.statusTable 426 update += "set clearid = '%s' where " % evt.evid 427 w = [] 428 w.append("%s='%s'" % (self.deviceField, self.escape(evt.device))) 429 w.append("%s='%s'" % (self.componentField, 430 self.escape(evt.component)[:255])) 431 w.append("eventKey='%s'" % self.escape(evt.eventKey)) 432 update += " and ".join(w) 433 434 w = [] 435 for cls in clearcls: 436 w.append("%s='%s'" % (self.eventClassField, self.escape(cls))) 437 if w: 438 update += " and (" + " or ".join(w) + ")" 439 return update
440 441
442 - def eventDataMaps(self, event):
443 """ 444 Return tuple (statusdata, detaildata) for this event. 445 446 @param event: event 447 @type event: Event class 448 @return: (statusdata, detaildata) 449 @rtype: tuple of dictionaries 450 """ 451 statusfields = self.getFieldList() 452 statusdata = {} 453 detaildata = {} 454 for name, value in event.__dict__.items(): 455 if name.startswith("_") or name == "dedupfields": continue 456 if name in statusfields: 457 statusdata[name] = value 458 else: 459 detaildata[name] = value 460 return statusdata, detaildata
461 462
463 - def escape(self, value):
464 """ 465 Prepare string values for db by escaping special characters. 466 467 @param value: string containing possibly nasty characters 468 @type value: string 469 @return: escaped string 470 @rtype: string 471 """ 472 if type(value) not in types.StringTypes: 473 return value 474 475 import _mysql 476 if type(value) == type(u''): 477 return _mysql.escape_string(value.encode('iso-8859-1')) 478 return _mysql.escape_string(value)
479 480 481
482 -class MySqlSendEvent(MySqlSendEventMixin):
483 """ 484 Class that can connect to backend must be passed: 485 username - backend username to use 486 password - backend password 487 database - backend database name 488 host - hostname of database server 489 port - port 490 """ 491 backend = "mysql" 492 493 copyattrs = ( 494 "username", 495 "password", 496 "database", 497 "host", 498 "port", 499 "requiredEventFields", 500 "defaultEventId", 501 "statusTable", 502 "deviceField", 503 "componentField", 504 "eventClassField", 505 "firstTimeField", 506 "lastTimeField", 507 "countField", 508 "detailTable", 509 ) 510
511 - def __init__(self, zem):
512 for att in self.copyattrs: 513 value = getattr(zem, att) 514 setattr(self, att, value) 515 self._fieldlist = zem.getFieldList()
516
517 - def stop(self):
518 """ 519 To be implemented by the subclass 520 """ 521 pass
522 523
524 - def getFieldList(self):
525 """ 526 Return the list of fields 527 528 @return: list of fields 529 @rtype: list 530 """ 531 return self._fieldlist
532 533
534 -class MySqlSendEventThread(threading.Thread, MySqlSendEvent):
535 """ 536 Wrapper around MySQL database connection 537 """ 538 539 running = True 540
541 - def __init__(self, zem):
542 threading.Thread.__init__(self) 543 MySqlSendEvent.__init__(self, zem) 544 self.setDaemon(1) 545 self.setName("SendEventThread") 546 self._evqueue = Queue()
547
548 - def sendEvent(self, evt):
549 """ 550 Called from main thread to put an event on to the send queue. 551 """ 552 return self._evqueue.put(evt)
553 554
555 - def run(self):
556 """ 557 Main event loop 558 """ 559 log.info("Starting") 560 while not self._evqueue.empty() or self.running: 561 try: 562 evt = self._evqueue.get(True,1) 563 MySqlSendEvent.sendEvent(self, evt) 564 except Empty: pass 565 except OperationalError, e: 566 log.warn(e) 567 except Exception, e: 568 log.exception(e) 569 log.info("Stopped")
570 571
572 - def stop(self):
573 """ 574 Called from main thread to stop this thread. 575 """ 576 log.info("Stopping...") 577 self.running = False 578 self.join(3)
579