1
2
3
4
5
6
7
8
9
10
11
12
13
14 import time
15 import types
16 import re
17 import socket
18
19 import Globals
20 import transaction
21 import DateTime
22 from twisted.internet import reactor
23
24 from Products.ZenUtils.ZCmdBase import ZCmdBase
25 from Products.ZenEvents.ZenEventClasses import Heartbeat
26
27 from ApplyDataMap import ApplyDataMap, ApplyDataMapThread
28 import SshClient
29 import TelnetClient
30 import SnmpClient
31 import PortscanClient
32
33 from Exceptions import *
34
35 defaultPortScanTimeout = 5
36 defaultParallel = 1
37 defaultProtocol = "ssh"
38 defaultPort = 22
39 defaultStartSleep = 10 * 60
40
41 from Plugins import loadPlugins
42
44
45 generateEvents = True
46
49 ZCmdBase.__init__(self, noopts, app, keeproot)
50
51 if self.options.daemon:
52 if self.options.now:
53 self.log.debug("Run as a daemon, starting immediately.")
54 else:
55 self.log.debug("Run as a daemon, waiting %s sec to start." % defaultStartSleep)
56 time.sleep(defaultStartSleep)
57 self.log.debug("Run as a daemon, slept %s sec, starting now." % defaultStartSleep)
58 else:
59 self.log.debug("Run in foreground, starting immediately.")
60
61 self.single = single
62 if self.options.device:
63 self.single = True
64 self.threaded = threaded
65 if self.threaded is None:
66 self.threaded = not self.options.nothread
67 self.cycletime = self.options.cycletime*60
68 self.collage = self.options.collage / 1440.0
69 self.clients = []
70 self.finished = []
71 self.collectorPlugins = {}
72 self.devicegen = None
73 self.loadPlugins()
74 self.slowDown = False
75 if self.threaded and not self.single:
76 self.log.info("starting apply in separate thread.")
77 self.applyData = ApplyDataMapThread(self, self.getConnection())
78 self.applyData.start()
79 else:
80 self.log.debug("in debug mode starting apply in main thread.")
81 self.applyData = ApplyDataMap(self)
82
84 """Load plugins from the plugin directory.
85 """
86 self.collectorPlugins = loadPlugins(self.dmd)
87
88
90 """Build a list of active plugins for a device.
91 """
92 names = getattr(device, 'zCollectorPlugins', [])
93 result = []
94 collectTest = lambda x: False
95 ignoreTest = lambda x: False
96 if self.options.collectPlugins:
97 collectTest = re.compile(self.options.collectPlugins).search
98 elif self.options.ignorePlugins:
99 ignoreTest = re.compile(self.options.ignorePlugins).search
100 for plugin in self.collectorPlugins.values():
101 if plugin.transport != transport:
102 continue
103 name = plugin.name()
104 try:
105 if not plugin.condition(device, self.log):
106 self.log.debug("condition failed %s on %s",name,device.id)
107 elif ignoreTest(name):
108 self.log.debug("ignoring %s on %s",name, device.id)
109 elif name in names and not self.options.collectPlugins:
110 self.log.debug("using %s on %s",name, device.id)
111 result.append(plugin)
112 elif collectTest(name):
113 self.log.debug("--collect %s on %s", name, device.id)
114 result.append(plugin)
115 else:
116 self.log.debug("skipping %s for %s", name, device.id)
117 except (SystemExit, KeyboardInterrupt): raise
118 except:
119 self.log.exception("failed to select plugin %s", name)
120 return result
121
122
132
133
145
146
148 """Start command collection client.
149 """
150 client = None
151 clientType = 'snmp'
152 hostname = device.id
153 try:
154 plugins = self.selectPlugins(device,"command")
155 commands = map(lambda x: (x.name(), x.command), plugins)
156 if not commands:
157 self.log.info("no cmd plugins found for %s" % hostname)
158 return
159 protocol = getattr(device, 'zCommandProtocol', defaultProtocol)
160 commandPort = getattr(device, 'zCommandPort', defaultPort)
161 if protocol == "ssh":
162 client = SshClient.SshClient(hostname, ip, commandPort,
163 options=self.options,
164 commands=commands, device=device,
165 datacollector=self)
166 clientType = 'ssh'
167 self.log.info('using ssh collection device %s' % hostname)
168 elif protocol == 'telnet':
169 if commandPort == 22: commandPort = 23
170 client = TelnetClient.TelnetClient(hostname, ip, commandPort,
171 options=self.options,
172 commands=commands, device=device,
173 datacollector=self)
174 clientType = 'telnet'
175 self.log.info('using telnet collection device %s' % hostname)
176 else:
177 self.log.warn("unknown protocol %s for device %s",
178 protocol, hostname)
179 if not client:
180 self.log.warn("cmd client creation failed")
181 else:
182 self.log.info("plugins: %s",
183 ", ".join(map(lambda p: p.name(), plugins)))
184 except (SystemExit, KeyboardInterrupt): raise
185 except:
186 self.log.exception("error opening cmdclient")
187 self.addClient(client, timeout, clientType, device.id)
188
189
191 """Start snmp collection client.
192 """
193 client = None
194 try:
195 plugins = []
196 hostname = device.id
197 plugins = self.selectPlugins(device,"snmp")
198 if not plugins:
199 self.log.info("no snmp plugins found for %s" % hostname)
200 return
201 if self.checkCollection(device):
202 self.log.info('snmp collection device %s' % hostname)
203 self.log.info("plugins: %s",
204 ", ".join(map(lambda p: p.name(), plugins)))
205 client = SnmpClient.SnmpClient(device.id,
206 ip,
207 self.options,
208 device,
209 self,
210 plugins)
211 if not client or not plugins:
212 self.log.warn("snmp client creation failed")
213 return
214 except (SystemExit, KeyboardInterrupt): raise
215 except:
216 self.log.exception("error opening snmpclient")
217 self.addClient(client, timeout, 'snmp', device.id)
218
219 - def addClient(self, obj, timeout, clientType, name):
220 if obj:
221 obj.timeout = timeout
222 self.clients.append(obj)
223 obj.run()
224 else:
225 self.log.warn('Unable to create a %s client for %s',
226 clientType, name)
227
228
229
231 """
232 Start portscan collection client.
233 """
234 client = None
235 try:
236 plugins = []
237 hostname = device.id
238 plugins = self.selectPlugins(device, "portscan")
239 if not plugins:
240 self.log.info("no portscan plugins found for %s" % hostname)
241 return
242 if self.checkCollection(device):
243 self.log.info('portscan collection device %s' % hostname)
244 self.log.info("plugins: %s",
245 ", ".join(map(lambda p: p.name(), plugins)))
246 client = PortscanClient.PortscanClient(device.id, ip,
247 self.options, device, self, plugins)
248 if not client or not plugins:
249 self.log.warn("portscan client creation failed")
250 return
251 except (SystemExit, KeyboardInterrupt): raise
252 except:
253 self.log.exception("error opening portscanclient")
254 self.addClient(client, timeout, 'portscan', device.id)
255
256
263
264
266 """Callback that processes the return values from a device.
267 """
268 try:
269 self.log.debug("client for %s finished collecting",
270 collectorClient.hostname)
271 device = collectorClient.device
272 self.applyData.processClient(device, collectorClient)
273 finally:
274 try:
275 self.clients.remove(collectorClient)
276 self.finished.append(collectorClient)
277 except ValueError:
278 self.log.warn("client %s not found in active clients",
279 collectorClient.hostname)
280 self.fillCollectionSlots()
281
282
301
325
326
328 ZCmdBase.buildOptions(self)
329 self.parser.add_option('--debug',
330 dest='debug', action="store_true", default=False,
331 help="don't fork threads for processing")
332 self.parser.add_option('--nothread',
333 dest='nothread', action="store_true", default=True,
334 help="do not use threads when applying updates")
335 self.parser.add_option('--parallel', dest='parallel',
336 type='int', default=defaultParallel,
337 help="number of devices to collect from in parallel")
338 self.parser.add_option('--cycletime',
339 dest='cycletime',default=720,type='int',
340 help="run collection every x minutes")
341 self.parser.add_option('--ignore',
342 dest='ignorePlugins',default="",
343 help="Comma separated list of collection maps to ignore")
344 self.parser.add_option('--collect',
345 dest='collectPlugins',default="",
346 help="Comma separated list of collection maps to use")
347 self.parser.add_option('-p', '--path', dest='path',
348 help="start path for collection ie /NetworkDevices")
349 self.parser.add_option('-d', '--device', dest='device',
350 help="fully qualified device name ie www.confmon.com")
351 self.parser.add_option('-a', '--collage',
352 dest='collage', default=0, type='float',
353 help="do not collect from devices whose collect date " +
354 "is within this many minutes")
355 self.parser.add_option('--writetries',
356 dest='writetries',default=2,type='int',
357 help="number of times to try to write if a "
358 "read conflict is found")
359 self.parser.add_option("-F", "--force",
360 dest="force", action='store_true', default=False,
361 help="force collection of config data "
362 "(even without change to the device)")
363 self.parser.add_option('--portscantimeout', dest='portscantimeout',
364 type='int', default=defaultPortScanTimeout,
365 help="time to wait for connection failures when port scanning")
366 self.parser.add_option('--now',
367 dest='now', action="store_true", default=False,
368 help="start daemon now, do not sleep before starting")
369 self.parser.add_option('--monitor',
370 dest='monitor',
371 help='Name of monitor instance to use for configuration.')
372 TelnetClient.buildOptions(self.parser, self.usage)
373
374
375
377 if not self.options.path and not self.options.device:
378 self.options.path = "/Devices"
379 if self.options.ignorePlugins and self.options.collectPlugins:
380 raise SystemExit("--ignore and --collect are mutually exclusive")
381
382
395
396
398 reactor.startRunning(installSignalHandlers=False)
399 while reactor.running:
400 try:
401 while reactor.running:
402 reactor.runUntilCurrent()
403 reactor.doIteration(0)
404 timeout = reactor.timeout()
405 self.slowDown = timeout < 0.01
406 reactor.doIteration(timeout)
407 except:
408 self.log.exception("Unexpected error in main loop.")
409
410
412 """Stop ZenModeler make sure reactor is stopped, join with
413 applyData thread and close the zeo connection.
414 """
415 self.log.info("stopping...")
416 self.applyData.stop()
417 transaction.abort()
418 reactor.callLater(0., reactor.crash)
419
420
421 - def mainLoop(self):
422 if self.options.cycle:
423 reactor.callLater(self.cycletime, self.mainLoop)
424
425 if self.clients:
426 self.log.error("modeling cycle taking too long")
427 return
428
429 self.log.info("starting collector loop")
430 self.app._p_jar.sync()
431 self.start = time.time()
432 if self.options.device:
433 self.log.info("collecting for device %s", self.options.device)
434 self.devicegen = iter([self.resolveDevice(self.options.device)])
435 elif self.options.monitor:
436 self.log.info("collecting for monitor %s", self.options.monitor)
437 self.devicegen = getattr(self.dmd.Monitors.Performance,
438 self.options.monitor).devices.objectValuesGen()
439 else:
440 self.log.info("collecting for path %s", self.options.path)
441 root = self.dmd.Devices.getOrganizer(self.options.path)
442 self.devicegen = root.getSubDevicesGen()
443 self.fillCollectionSlots()
444
445
447 'controlled shutdown of main loop on interrupt'
448 try:
449 ZCmdBase.sigTerm(self, *unused)
450 except SystemExit:
451 pass
452
454 self.finished = []
455 self.mainLoop()
456 self.timeoutClients()
457
464
465 if __name__ == '__main__':
466 dc = ZenModeler()
467 dc.processOptions()
468 dc.main()
469 dc.reactorLoop()
470