Examples of aiomonitor usage¶
Below is a list of examples from aiomonitor/examples
Every example is a correct tiny python program.
Basic Usage¶
Basic example, starts monitor around loop.run_forever()
function:
import asyncio
import aiomonitor
loop = asyncio.get_event_loop()
with aiomonitor.start_monitor(loop=loop):
print('Now you can connect with: nc localhost 50101')
loop.run_forever()
aiohttp Example¶
Full feature example with aiohttp application:
import asyncio
import aiomonitor
from aiohttp import web
async def simple(request):
loop = request.app.loop
print('Start sleeping')
await asyncio.sleep(100, loop=loop)
return web.Response(text='Simple answer')
async def init(loop):
app = web.Application(loop=loop)
app.router.add_get('/simple', simple)
return app
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
with aiomonitor.start_monitor(loop=loop):
web.run_app(app, port=8090, host='localhost')
Any above examples compatible with uvloop, in fact aiomonitor test suite executed against asyncio and uvloop.
aiohttp Example with additional command¶
Same as above, but showing a custom command:
import asyncio
import aiomonitor
import uvloop
from aiohttp import web
import requests
async def simple(request):
loop = request.app.loop
await asyncio.sleep(10, loop=loop)
return web.Response(text='Simple answer')
async def hello(request):
resp = web.StreamResponse()
name = request.match_info.get('name', 'Anonymous')
answer = ('Hello, ' + name).encode('utf8')
resp.content_length = len(answer)
resp.content_type = 'text/plain'
await resp.prepare(request)
await asyncio.sleep(10, loop=loop)
await resp.write(answer)
await resp.write_eof()
return resp
async def init(loop):
app = web.Application(loop=loop)
app.router.add_get('/simple', simple)
app.router.add_get('/hello/{name}', hello)
app.router.add_get('/hello', hello)
return app
host, port = 'localhost', 8090
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
app = loop.run_until_complete(init(loop))
class WebMonitor(aiomonitor.Monitor):
def do_hello(self, sin, sout, name=None):
"""Using the /hello GET interface
There is one optional argument, "name". This name argument must be
provided with proper URL excape codes, like %20 for spaces.
"""
name = '' if name is None else '/' + name
r = requests.get('http://localhost:8090/hello' + name)
sout.write(r.text + '\n')
with aiomonitor.start_monitor(loop, monitor=WebMonitor, locals=locals()):
web.run_app(app, port=port, host=host)