Examples of aiomonitor usage

Below is a list of examples from aiomonitor/examples

Every example is a correct tiny python program.

Basic Usage

Basic example, starts monitor around loop.run_forever() function:

import asyncio
import aiomonitor

loop = asyncio.get_event_loop()
with aiomonitor.start_monitor(loop=loop):
    print('Now you can connect with: nc localhost 50101')
    loop.run_forever()

aiohttp Example

Full feature example with aiohttp application:

import asyncio

import aiomonitor
from aiohttp import web


async def simple(request):
    loop = request.app.loop
    print('Start sleeping')
    await asyncio.sleep(100, loop=loop)
    return web.Response(text='Simple answer')


async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_get('/simple', simple)
    return app

loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))

with aiomonitor.start_monitor(loop=loop):
    web.run_app(app, port=8090, host='localhost')

Any above examples compatible with uvloop, in fact aiomonitor test suite executed against asyncio and uvloop.

aiohttp Example with additional command

Same as above, but showing a custom command:

import asyncio

import aiomonitor
import uvloop

from aiohttp import web
import requests


async def simple(request):
    loop = request.app.loop
    await asyncio.sleep(10, loop=loop)
    return web.Response(text='Simple answer')


async def hello(request):
    resp = web.StreamResponse()
    name = request.match_info.get('name', 'Anonymous')
    answer = ('Hello, ' + name).encode('utf8')
    resp.content_length = len(answer)
    resp.content_type = 'text/plain'
    await resp.prepare(request)
    await asyncio.sleep(10, loop=loop)
    await resp.write(answer)
    await resp.write_eof()
    return resp


async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_get('/simple', simple)
    app.router.add_get('/hello/{name}', hello)
    app.router.add_get('/hello', hello)
    return app

host, port = 'localhost', 8090
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
app = loop.run_until_complete(init(loop))


class WebMonitor(aiomonitor.Monitor):
    def do_hello(self, sin, sout, name=None):
        """Using the /hello GET interface

        There is one optional argument, "name".  This name argument must be
        provided with proper URL excape codes, like %20 for spaces.
        """
        name = '' if name is None else '/' + name
        r = requests.get('http://localhost:8090/hello' + name)
        sout.write(r.text + '\n')


with aiomonitor.start_monitor(loop, monitor=WebMonitor, locals=locals()):
    web.run_app(app, port=port, host=host)