36 lines
1.1 KiB
Python
36 lines
1.1 KiB
Python
import asyncio
|
|
|
|
import telnetlib3
|
|
from telnetlib3 import TelnetReaderUnicode, TelnetWriterUnicode
|
|
|
|
|
|
async def shell(reader: TelnetReaderUnicode, writer: TelnetWriterUnicode):
|
|
# Handle input function
|
|
async def on_input(inp: str):
|
|
# Switch case
|
|
match inp:
|
|
case '\x1b[C' | 'd': # Right
|
|
writer.write('Right pressed\r\n')
|
|
case '\x1b[D' | 'a': # Left
|
|
writer.write('Left pressed\r\n')
|
|
case '\x1b' | 'q' | '\x03': # Escape or q or Ctrl+C
|
|
writer.write('\r\nBye!\r\n')
|
|
writer.close()
|
|
return True
|
|
case _:
|
|
print(f'Unknown input: {repr(inp)}')
|
|
|
|
# Listen input
|
|
while True:
|
|
inp: str = await reader.read(3)
|
|
if inp and await on_input(inp):
|
|
return
|
|
|
|
if __name__ == '__main__':
|
|
# Create a new event loop, start the server and wait for it to close
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
coro = telnetlib3.create_server(port=2323, shell=shell)
|
|
server = loop.run_until_complete(coro)
|
|
loop.run_until_complete(server.wait_closed())
|