wsstat package

Submodules

wsstat.clients module

class wsstat.clients.ConnectedWebsocketConnection(ws, identifier)[source]

Bases: object

increment_message_counter()[source]
message_count
process_message(message)[source]
class wsstat.clients.WebsocketTestingClient(websocket_url, **kwargs)[source]

Bases: object

Setting up the websocket calls the following callbacks that can be overridden to extend functinality. For an example see WebsocketTestingClientWithApiTokenHeader

def before_connect(self): def setup_websocket_connection(self, statedict): def get_identifier(self, statedict): def after_connect(self, statedict):

def before_recv(self, statedict): def after_recv(self, statedict, message):

after_connect(statedict)[source]
after_recv(statedict, message)[source]
before_connect()[source]
before_recv(statedict)[source]
create_websocket_connection()[source]
exit()[source]
get_identifier(statedict)[source]
handle_exceptions(loop, context)[source]
handle_keypresses(keypress)[source]
log(identifier, message)[source]
messages_per_second
setup_tasks()[source]
setup_websocket_connection(statedict)[source]
update_urwid()[source]
class wsstat.clients.WebsocketTestingClientWithRandomApiTokenHeader(*args, **kwargs)[source]

Bases: wsstat.clients.WebsocketTestingClient

Introduces a new parameter: header_name - used to specify the key to ‘extra_headers’ passed to websocket.connect

before_connect()[source]
get_identifier(statedict)[source]
setup_websocket_connection(statedict)[source]

wsstat.gui module

class wsstat.gui.BlinkBoardWidget[source]

Bases: object

generate_blinkers(connected_sockets)[source]
get_ws_status(websocket, connected, error)[source]
update_large_blinkers(blinkers)[source]
update_small_blinkers(blinkers)[source]
class wsstat.gui.LoggerWidget[source]

Bases: object

graph_title = ' Messages / Second | 3 Second window '
log(string)[source]
log_messages = []
timelogger = deque([], maxlen=30)
update_graph_data(data)[source]
class wsstat.gui.WSStatConsoleApplication(client)[source]

Bases: object

class DummyScreen(input=<_io.TextIOWrapper name='<stdin>' mode='r' encoding='UTF-8'>, output=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Bases: urwid.raw_display.Screen

draw_screen(xxx_todo_changeme, r)[source]
class FixedAsyncLoop(**kwargs)[source]

Bases: urwid.main_loop.AsyncioEventLoop

run()[source]

Start the event loop. Exit the loop when any callback raises an exception. If ExitMainLoop is raised, exit cleanly.

run()[source]

wsstat.main module

wsstat.main.parse_args()[source]
wsstat.main.wsstat_console()[source]

Module contents