Skip to content

Commit 52b242f

Browse files
committed
1 parent 8110dab commit 52b242f

File tree

1 file changed

+394
-0
lines changed

1 file changed

+394
-0
lines changed

zktop.py

Lines changed: 394 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,394 @@
1+
#!/usr/bin/env python
2+
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
19+
from optparse import OptionParser
20+
21+
import curses
22+
import threading
23+
try:
24+
import Queue
25+
except ImportError: # py3
26+
import queue as Queue
27+
import socket
28+
import signal
29+
import re
30+
try:
31+
from StringIO import StringIO
32+
except ImportError: # py3
33+
from io import StringIO
34+
import logging as LOG
35+
36+
ZK_DEFAULT_PORT = 2181
37+
38+
usage = "usage: %prog [options]"
39+
parser = OptionParser(usage=usage)
40+
parser.add_option("", "--servers",
41+
dest="servers", default="localhost:%s" % ZK_DEFAULT_PORT,
42+
help="comma separated list of host:port (default localhost:2181)")
43+
parser.add_option("-n", "--names",
44+
action="store_true", dest="names", default=False,
45+
help="resolve session name from ip (default False)")
46+
parser.add_option("", "--fix_330",
47+
action="store_true", dest="fix_330", default=False,
48+
help="workaround for a bug in ZK 3.3.0")
49+
parser.add_option("-v", "--verbosity",
50+
dest="verbosity", default="DEBUG",
51+
help="log level verbosity (DEBUG, INFO, WARN(ING), ERROR, CRITICAL/FATAL)")
52+
parser.add_option("-l", "--logfile",
53+
dest="logfile", default=None,
54+
help="directory in which to place log file, or empty for none")
55+
parser.add_option("-c", "--config",
56+
dest="configfile", default=None,
57+
help="zookeeper configuration file to lookup servers from")
58+
parser.add_option("-t", "--timeout",
59+
dest="timeout", default=None,
60+
help="connection timeout to zookeeper instance")
61+
62+
(options, args) = parser.parse_args()
63+
64+
if options.logfile:
65+
LOG.basicConfig(filename=options.logfile, level=getattr(LOG, options.verbosity))
66+
else:
67+
LOG.disable(LOG.CRITICAL)
68+
69+
resized_sig = False
70+
71+
def strToLong(str, base):
72+
try:
73+
return long(str, base)
74+
except: # py3
75+
return int(str, base)
76+
77+
# threads to get server data
78+
# UI class
79+
# track current data and historical
80+
81+
class Session(object):
82+
def __init__(self, session, server_id):
83+
# allow both ipv4 and ipv6 addresses
84+
m = re.search('/([\da-fA-F:\.]+):(\d+)\[(\d+)\]\((.*)\)', session)
85+
self.host = m.group(1)
86+
self.port = m.group(2)
87+
self.server_id = server_id
88+
self.interest_ops = m.group(3)
89+
for d in m.group(4).split(","):
90+
k,v = d.split("=")
91+
setattr(self, k, v)
92+
93+
class ZKServer(object):
94+
def __init__(self, server, server_id):
95+
self.server_id = server_id
96+
self.host, self.port = server.split(':')
97+
try:
98+
stat = send_cmd(self.host, self.port, b'stat\n')
99+
100+
sio = StringIO(stat)
101+
line = sio.readline()
102+
m = re.search('.*: (\d+\.\d+\.\d+)-.*', line)
103+
self.version = m.group(1)
104+
sio.readline()
105+
self.sessions = []
106+
for line in sio:
107+
if not line.strip():
108+
break
109+
self.sessions.append(Session(line.strip(), server_id))
110+
for line in sio:
111+
attr, value = line.split(':')
112+
attr = attr.strip().replace(" ", "_").replace("/", "_").lower()
113+
setattr(self, attr, value.strip())
114+
115+
self.min_latency, self.avg_latency, self.max_latency = self.latency_min_avg_max.split("/")
116+
117+
self.unavailable = False
118+
except:
119+
self.unavailable = True
120+
self.mode = "Unavailable"
121+
self.sessions = []
122+
self.version = "Unknown"
123+
return
124+
125+
def send_cmd(host, port, cmd):
126+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
127+
if options.timeout:
128+
s.settimeout(float(options.timeout))
129+
s.connect((host, int(port)))
130+
result = []
131+
try:
132+
s.sendall(cmd)
133+
134+
# shutting down the socket write side helps ensure
135+
# that we don't end up with TIME_WAIT sockets
136+
if not options.fix_330:
137+
s.shutdown(socket.SHUT_WR)
138+
139+
while True:
140+
data = s.recv(4096)
141+
if not data:
142+
break
143+
144+
data = data.decode()
145+
result.append(data)
146+
finally:
147+
s.close()
148+
149+
return "".join(result)
150+
151+
q_stats = Queue.Queue()
152+
153+
p_wakeup = threading.Condition()
154+
155+
def wakeup_poller():
156+
p_wakeup.acquire()
157+
p_wakeup.notifyAll()
158+
p_wakeup.release()
159+
160+
def reset_server_stats(server):
161+
host, port = server.split(':')
162+
send_cmd(host, port, b'srst\n')
163+
164+
server_id = 0
165+
class StatPoller(threading.Thread):
166+
def __init__(self, server):
167+
self.server = server
168+
global server_id
169+
self.server_id = server_id
170+
server_id += 1
171+
threading.Thread.__init__(self)
172+
173+
def run(self):
174+
p_wakeup.acquire()
175+
while True:
176+
s = ZKServer(self.server, self.server_id)
177+
q_stats.put(s)
178+
p_wakeup.wait(3.0)
179+
# no need - never hit here except exit - "p_wakeup.release()"
180+
# also, causes error on console
181+
182+
class BaseUI(object):
183+
def __init__(self, win):
184+
self.win = win
185+
global mainwin
186+
self.maxy, self.maxx = mainwin.getmaxyx()
187+
self.resize(self.maxy, self.maxx)
188+
189+
def resize(self, maxy, maxx):
190+
LOG.debug("resize called y %d x %d" % (maxy, maxx))
191+
self.maxy = maxy
192+
self.maxx = maxx
193+
194+
def addstr(self, y, x, line, flags = 0):
195+
LOG.debug("addstr with maxx %d" % (self.maxx))
196+
self.win.addstr(y, x, line[:self.maxx-1], flags)
197+
self.win.clrtoeol()
198+
self.win.noutrefresh()
199+
200+
class SummaryUI(BaseUI):
201+
def __init__(self, height, width, server_count):
202+
BaseUI.__init__(self, curses.newwin(1, width, 0, 0))
203+
self.session_counts = [0 for i in range(server_count)]
204+
self.node_counts = [0 for i in range(server_count)]
205+
self.zxids = [0 for i in range(server_count)]
206+
207+
def update(self, s):
208+
self.win.erase()
209+
if s.unavailable:
210+
self.session_counts[s.server_id] = 0
211+
self.node_counts[s.server_id] = 0
212+
self.zxids[s.server_id] = 0
213+
else:
214+
self.session_counts[s.server_id] = len(s.sessions)
215+
self.node_counts[s.server_id] = int(s.node_count)
216+
self.zxids[s.server_id] = strToLong(s.zxid, 16)
217+
nc = max(self.node_counts)
218+
zxid = max(self.zxids)
219+
sc = sum(self.session_counts)
220+
self.addstr(0, 0, "Ensemble -- nodecount:%d zxid:0x%x sessions:%d" %
221+
(nc, zxid, sc))
222+
223+
class ServerUI(BaseUI):
224+
def __init__(self, height, width, server_count):
225+
BaseUI.__init__(self, curses.newwin(server_count + 2, width, 1, 0))
226+
227+
def resize(self, maxy, maxx):
228+
BaseUI.resize(self, maxy, maxx)
229+
self.addstr(1, 0, "ID SERVER PORT M OUTST RECVD SENT CONNS MINLAT AVGLAT MAXLAT", curses.A_REVERSE)
230+
231+
def update(self, s):
232+
if s.unavailable:
233+
self.addstr(s.server_id + 2, 0, "%-2s %-15s %5s %s" %
234+
(s.server_id, s.host[:15], s.port, s.mode[:1].upper()))
235+
else:
236+
self.addstr(s.server_id + 2, 0, "%-2s %-15s %5s %s %8s %8s %8s %5d %6s %6s %6s" %
237+
(s.server_id, s.host[:15], s.port, s.mode[:1].upper(),
238+
s.outstanding, s.received, s.sent, len(s.sessions),
239+
s.min_latency, s.avg_latency, s.max_latency))
240+
241+
class SessionUI(BaseUI):
242+
def __init__(self, height, width, server_count):
243+
BaseUI.__init__(self, curses.newwin(height - server_count - 3, width, server_count + 3, 0))
244+
self.sessions = [[] for i in range(server_count)]
245+
self.ip_to_hostname = {}
246+
247+
def hostname(self, session):
248+
if session.host not in self.ip_to_hostname:
249+
hostname = socket.getnameinfo((session.host, int(session.port)), 0)[0]
250+
self.ip_to_hostname[session.host] = hostname
251+
return self.ip_to_hostname[session.host]
252+
253+
def update(self, s):
254+
self.win.erase()
255+
self.addstr(1, 0, "CLIENT PORT S I QUEUED RECVD SENT", curses.A_REVERSE)
256+
self.sessions[s.server_id] = s.sessions
257+
items = []
258+
for l in self.sessions:
259+
items.extend(l)
260+
items.sort(key=lambda x: int(x.queued), reverse=True)
261+
for i, session in enumerate(items):
262+
try:
263+
#ugh, need to handle if slow - thread for async resolver?
264+
host = self.hostname(session) if options.names else session.host
265+
self.addstr(i + 2, 0, "%-15s %5s %1s %1s %8s %8s %8s" %
266+
(host[:15], session.port, session.server_id, session.interest_ops,
267+
session.queued, session.recved, session.sent))
268+
except:
269+
break
270+
271+
mainwin = None
272+
class Main(object):
273+
def __init__(self, servers):
274+
self.servers = servers.split(",")
275+
276+
def show_ui(self, stdscr):
277+
global mainwin
278+
mainwin = stdscr
279+
curses.use_default_colors()
280+
# w/o this for some reason takes 1 cycle to draw wins
281+
stdscr.refresh()
282+
283+
signal.signal(signal.SIGWINCH, sigwinch_handler)
284+
285+
TIMEOUT = 250
286+
stdscr.timeout(TIMEOUT)
287+
288+
server_count = len(self.servers)
289+
maxy, maxx = stdscr.getmaxyx()
290+
uis = (SummaryUI(maxy, maxx, server_count),
291+
ServerUI(maxy, maxx, server_count),
292+
SessionUI(maxy, maxx, server_count))
293+
294+
# start the polling threads
295+
pollers = [StatPoller(server) for server in self.servers]
296+
for poller in pollers:
297+
poller.setName("PollerThread:" + poller.server)
298+
poller.setDaemon(True)
299+
poller.start()
300+
301+
LOG.debug("starting main loop")
302+
global resized_sig
303+
flash = None
304+
while True:
305+
try:
306+
if resized_sig:
307+
resized_sig = False
308+
self.resize(uis)
309+
wakeup_poller()
310+
311+
while not q_stats.empty():
312+
zkserver = q_stats.get_nowait()
313+
for ui in uis:
314+
ui.update(zkserver)
315+
316+
ch = stdscr.getch()
317+
if 0 < ch <=255:
318+
if ch == ord('q'):
319+
return
320+
elif ch == ord('h'):
321+
flash = "Help: q:quit r:reset stats spc:refresh"
322+
flash_count = 1000/TIMEOUT * 5
323+
elif ch == ord('r'):
324+
for server in self.servers:
325+
try:
326+
reset_server_stats(server)
327+
except:
328+
pass
329+
330+
flash = "Server stats reset"
331+
flash_count = 1000/TIMEOUT * 5
332+
wakeup_poller()
333+
elif ch == ord(' '):
334+
wakeup_poller()
335+
336+
stdscr.move(1, 0)
337+
if flash:
338+
stdscr.addstr(1, 0, flash)
339+
flash_count -= 1
340+
if flash_count == 0:
341+
flash = None
342+
stdscr.clrtoeol()
343+
344+
curses.doupdate()
345+
346+
except KeyboardInterrupt:
347+
break
348+
349+
def resize(self, uis):
350+
curses.endwin()
351+
curses.doupdate()
352+
353+
global mainwin
354+
mainwin.refresh()
355+
maxy, maxx = mainwin.getmaxyx()
356+
357+
for ui in uis:
358+
ui.resize(maxy, maxx)
359+
360+
def sigwinch_handler(*nada):
361+
LOG.debug("sigwinch called")
362+
global resized_sig
363+
resized_sig = True
364+
365+
def read_zk_config(filename):
366+
config = {}
367+
f = open(filename, 'r')
368+
try:
369+
for line in f:
370+
if line.rstrip() and not line.startswith('#'):
371+
k,v = tuple(line.replace(' ', '').strip().split('=', 1))
372+
config[k] = v
373+
except IOError as e:
374+
print("Unable to open `{0}': I/O error({1}): {2}".format(filename, e.errno, e.strerror))
375+
finally:
376+
f.close()
377+
return config
378+
379+
def get_zk_servers(filename):
380+
if filename:
381+
config = read_zk_config(options.configfile)
382+
client_port = config['clientPort']
383+
return ','.join("%s:%s" % (v.split(':', 1)[0], client_port)
384+
for k, v in config.items() if k.startswith('server.'))
385+
else:
386+
return ','.join("%s:%s" % (s.strip(), ZK_DEFAULT_PORT) if not ':' in s else "%s" % s
387+
for s in options.servers.split(',', 1))
388+
389+
if __name__ == '__main__':
390+
LOG.debug("startup")
391+
392+
ui = Main(get_zk_servers(options.configfile))
393+
curses.wrapper(ui.show_ui)
394+

0 commit comments

Comments
 (0)