Package ZenEvents :: Module MySqlSendEvent
[hide private]
[frames] | no frames]

Source Code for Module ZenEvents.MySqlSendEvent

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  import time 
 14  import types 
 15  import threading 
 16  from Queue import Queue, Empty 
 17  import logging 
 18  log = logging.getLogger("zen.Events") 
 19   
 20  from _mysql_exceptions import ProgrammingError, OperationalError 
 21  from ZEO.Exceptions import ClientDisconnected 
 22   
 23  import Products.ZenUtils.guid as guid 
 24  from Products.ZenUtils.Utils import zdecode as decode 
 25  from Event import Event, EventHeartbeat, buildEventFromDict 
 26  from ZenEventClasses import Heartbeat, Unknown 
 27  from Products.ZenEvents.Exceptions import * 
 28   
29 -def execute(cursor, statement):
30 try: 31 result = cursor.execute(statement) 32 log.debug("%s: --> %d" % (statement, result) ) 33 except Exception, ex: 34 log.debug(statement) 35 log.exception(ex) 36 raise ex 37 return result
38
39 -class MySqlSendEventMixin:
40 """ 41 Mix-in class that takes a mysql db connection and builds inserts that 42 send the event to the backend. 43 """ 44
45 - def sendEvent(self, event):
46 """Send an event to the backend. 47 """ 48 if type(event) == types.DictType: 49 event = buildEventFromDict(event) 50 51 if getattr(event, 'eventClass', Unknown) == Heartbeat: 52 return self._sendHeartbeat(event) 53 54 for field in self.requiredEventFields: 55 if not hasattr(event, field): 56 raise ZenEventError( 57 "Required event field %s not found" % field) 58 59 #FIXME - ungly hack to make sure severity is an int 60 event.severity = int(event.severity) 61 62 # If either message or summary is empty then try to copy from the other. 63 # Make sure summary is truncated to 128 64 if not getattr(event, 'message', False): 65 event.message = getattr(event, 'summary', '') 66 event.summary = (getattr(event, 'summary', '') or event.message)[:128] 67 68 if getattr(self, "getDmdRoot", False): 69 try: 70 event = self.applyEventContext(event) 71 except ClientDisconnected, e: 72 log.error(e) 73 raise ZenBackendFailure(str(e)) 74 if not event: return 75 76 # check again for heartbeat after context processing 77 if getattr(event, 'eventClass', Unknown) == Heartbeat: 78 return self._sendHeartbeat(event) 79 80 81 if not hasattr(event, 'dedupid'): 82 dedupid = [] 83 dedupfields = event.getDedupFields(self.defaultEventId) 84 if not getattr(event, "eventKey", ""): 85 if type(dedupfields) != types.ListType: 86 dedupfields = list(dedupfields) 87 dedupfields = dedupfields + ["summary"] 88 for field in dedupfields: 89 value = getattr(event, field, "") 90 dedupid.append('%s' % value) 91 dedupid = map(self.escape, dedupid) 92 event.dedupid = "|".join(dedupid) 93 94 cleanup = lambda : None 95 evid = None 96 try: 97 try: 98 evid = self.doSendEvent(event) 99 except ProgrammingError, e: 100 log.exception(e) 101 except OperationalError, e: 102 log.exception(e) 103 raise ZenBackendFailure(str(e)) 104 finally: 105 cleanup() 106 return evid
107
108 - def doSendEvent(self, event):
109 insert = "" 110 statusdata, detaildata = self.eventDataMaps(event) 111 conn = self.connect() 112 try: 113 curs = conn.cursor() 114 evid = guid.generate() 115 event.evid = evid 116 rows = 0 117 if event.severity == 0: 118 event._action = "history" 119 clearcls = event.clearClasses() 120 if clearcls: 121 rows = execute(curs, self.buildClearUpdate(event, clearcls)) 122 if not rows: 123 return '' 124 insert = ('insert into log ' 125 '(evid, userName, text) ' 126 'select evid, "admin", "auto cleared"' 127 ' from status where clearid = "%s"' % evid) 128 execute(curs, insert) 129 delete = 'DELETE FROM status WHERE clearid IS NOT NULL' 130 execute(curs, delete) 131 stmt = self.buildStatusInsert(statusdata, event._action, evid) 132 rescount = execute(curs, stmt) 133 if detaildata and rescount == 1: 134 execute(curs, self.buildDetailInsert(evid, detaildata)) 135 if rescount != 1: 136 sql = ('select evid from %s where dedupid="%s"' % ( 137 event._action, decode(self.dmd.Devices, event.dedupid))) 138 execute(curs, sql) 139 rs = curs.fetchone() 140 if rs: 141 evid = rs[0] 142 else: 143 evid = None 144 finally: self.close(conn) 145 return evid
146 147
148 - def _findByIp(self, ipaddress):
149 """ 150 Find and ip by looking up it up in the Networks catalog. 151 """ 152 log.debug("looking up ip %s",ipaddress) 153 nets = self.getDmdRoot("Networks") 154 ipobj = nets.findIp(ipaddress) 155 if ipobj and ipobj.device(): 156 device = ipobj.device() 157 log.debug("ip %s -> %s", ipobj.id, device.id) 158 return device
159 160
161 - def applyEventContext(self, evt):
162 """ 163 Apply event and devices contexts to the event. 164 Only valid if this object has zeo connection. 165 """ 166 events = self.getDmdRoot("Events") 167 devices = self.getDmdRoot("Devices") 168 device = None 169 if getattr(evt, 'device', None): 170 device = devices.findDevice(evt.device) 171 if not device and getattr(evt, 'ipAddress', None): 172 device = devices.findDevice(evt.ipAddress) 173 if not device and getattr(evt, 'device', None): 174 device = self._findByIp(evt.device) 175 if not device and getattr(evt, 'ipAddress', None): 176 device = self._findByIp(evt.ipAddress) 177 if device: 178 evt.device = device.id 179 log.debug("Found device=%s", evt.device) 180 evt = self.applyDeviceContext(device, evt) 181 evtclass = events.lookup(evt, device) 182 if evtclass: 183 log.debug("EventClassInst=%s", evtclass.id) 184 evt = evtclass.applyExtraction(evt) 185 evt = evtclass.applyValues(evt) 186 evt = evtclass.applyTransform(evt, device) 187 if evt._action == "drop": 188 log.debug("dropping event") 189 return None 190 if getattr(evtclass, "scUserFunction", False): 191 log.debug("Found scUserFunction") 192 evt = evtclass.scUserFunction(device, evt) 193 return evt
194 195
196 - def applyDeviceContext(self, device, evt):
197 """ 198 Apply event attributes from device context. 199 """ 200 if not hasattr(evt, 'ipAddress'): evt.ipAddress = device.manageIp 201 evt.prodState = device.productionState 202 evt.Location = device.getLocationName() 203 evt.DeviceClass = device.getDeviceClassName() 204 evt.DeviceGroups = "|"+"|".join(device.getDeviceGroupNames()) 205 evt.Systems = "|"+"|".join(device.getSystemNames()) 206 return evt
207 208
209 - def _sendHeartbeat(self, event):
210 """Build insert to add heartbeat record to heartbeat table. 211 """ 212 evdict = {} 213 if hasattr(event, "device"): 214 evdict['device'] = event.device 215 else: 216 log.warn("heartbeat without device skipping") 217 return 218 if hasattr(event, "timeout"): 219 evdict['timeout'] = event.timeout 220 else: 221 log.warn("heartbeat from %s without timeout skipping", event.device) 222 return 223 if hasattr(event, "component"): 224 evdict['component'] = event.component 225 else: 226 evdict['component'] = "" 227 insert = self.buildInsert(evdict, "heartbeat") 228 insert += " on duplicate key update lastTime=Null" 229 insert += ", timeout=%s" % evdict['timeout'] 230 try: 231 conn = self.connect() 232 try: 233 curs = conn.cursor() 234 execute(curs, insert) 235 finally: self.close(conn) 236 except ProgrammingError, e: 237 log.error(insert) 238 log.exception(e) 239 except OperationalError, e: 240 raise ZenBackendFailure(str(e))
241
242 - def buildStatusInsert(self, statusdata, table, evid):
243 """ 244 Build an insert statement for the status table that looks like this: 245 insert into status set device='box', count=1, ... 246 on duplicate key update count=count+1, lastTime=23424.34; 247 """ 248 insert = self.buildInsert(statusdata, table) 249 fields = [] 250 if table == "history": 251 fields.append("deletedTime=null") 252 fields.append("evid='%s'" % evid) 253 insert += ","+",".join(fields) 254 if table == self.statusTable: 255 insert += " on duplicate key update " 256 if statusdata.has_key('prodState'): 257 insert += "prodState=%d," % statusdata['prodState'] 258 insert += "summary='%s',%s=%s+1,%s=%.3f" % ( 259 self.escape(decode(self.dmd.Devices, statusdata.get('summary',''))), 260 self.countField, self.countField, 261 self.lastTimeField,statusdata['lastTime']) 262 return insert
263 264
265 - def buildDetailInsert(self, evid, detaildict):
266 """Build an insert to add detail values from an event to the details 267 table. 268 """ 269 insert = "insert into %s (evid, name, value) values " % self.detailTable 270 var = [] 271 for field, value in detaildict.items(): 272 if type(value) in types.StringTypes: 273 value = self.escape(decode(self.dmd.Devices, value)) 274 var.append("('%s','%s','%s')" % (evid, field, value)) 275 insert += ",".join(var) 276 return insert
277 278
279 - def buildInsert(self, datadict, table):
280 """ 281 Build a insert statement for that looks like this: 282 insert into status set field='value', field=1, ... 283 """ 284 insert = "insert into %s set " % table 285 fields = [] 286 for name, value in datadict.items(): 287 if type(value) in types.StringTypes: 288 fields.append("%s='%s'" % (name, self.escape(value))) 289 elif type(value) == types.FloatType: 290 fields.append("%s=%.3f" % (name, value)) 291 else: 292 fields.append("%s=%s" % (name, value)) 293 insert = str(insert) + str(','.join(fields)) 294 return insert
295 296
297 - def buildClearUpdate(self, evt, clearcls):
298 """Build an update statement that will clear related events. 299 """ 300 update = "update %s " % self.statusTable 301 update += "set clearid = '%s' where " % evt.evid 302 w = [] 303 w.append("%s='%s'" % (self.deviceField, evt.device)) 304 w.append("%s='%s'" % (self.componentField, evt.component)) 305 w.append("eventKey='%s'" % evt.eventKey) 306 update += " and ".join(w) 307 w = [] 308 for cls in clearcls: 309 w.append("%s='%s'" % (self.eventClassField, cls)) 310 if w: 311 update += " and (" + " or ".join(w) + ")" 312 return update
313 314
315 - def eventDataMaps(self, event):
316 """Return tuple (statusdata, detaildata) for this event. 317 """ 318 statusfields = self.getFieldList() 319 statusdata = {} 320 detaildata = {} 321 for name, value in event.__dict__.items(): 322 if name.startswith("_") or name == "dedupfields": continue 323 if name in statusfields: 324 statusdata[name] = value 325 else: 326 detaildata[name] = value 327 return statusdata, detaildata
328 329
330 - def escape(self, value):
331 """Prepare string values for db by escaping special characters.""" 332 import _mysql 333 if type(value) == type(u''): 334 return _mysql.escape_string(value.encode('iso-8859-1')) 335 return _mysql.escape_string(value)
336 337 338
339 -class MySqlSendEvent(MySqlSendEventMixin):
340 """ 341 class that can connect to backend must be passed: 342 username - backend username to use 343 password - backend password 344 database - backend database name 345 host - hostname of database server 346 port - port 347 """ 348 backend = "mysql" 349 350 copyattrs = ( 351 "username", 352 "password", 353 "database", 354 "host", 355 "port", 356 "requiredEventFields", 357 "defaultEventId", 358 "statusTable", 359 "deviceField", 360 "componentField", 361 "eventClassField", 362 "firstTimeField", 363 "lastTimeField", 364 "countField", 365 "detailTable", 366 ) 367
368 - def __init__(self, zem):
369 for att in self.copyattrs: 370 value = getattr(zem, att) 371 setattr(self, att, value) 372 self._fieldlist = zem.getFieldList()
373
374 - def stop(self): pass
375 376
377 - def getFieldList(self):
378 return self._fieldlist
379 380
381 -class MySqlSendEventThread(threading.Thread, MySqlSendEvent):
382 383 running = True 384
385 - def __init__(self, zem):
386 threading.Thread.__init__(self) 387 MySqlSendEvent.__init__(self, zem) 388 self.setDaemon(1) 389 self.setName("SendEventThread") 390 self._evqueue = Queue()
391
392 - def sendEvent(self, evt):
393 """Called from main thread to put an event on to the send queue. 394 """ 395 return self._evqueue.put(evt)
396 397
398 - def run(self):
399 log.info("starting") 400 while not self._evqueue.empty() or self.running: 401 try: 402 evt = self._evqueue.get(True,1) 403 MySqlSendEvent.sendEvent(self, evt) 404 except Empty: pass 405 except OperationalError: 406 log.warn(e) 407 except Exception, e: 408 log.exception(e) 409 log.info("stopped")
410 411
412 - def stop(self):
413 """Called from main thread to stop this thread. 414 """ 415 log.info("stopping...") 416 self.running = False 417 self.join(3)
418