Rex/Rex.py

191 lines
8.6 KiB
Python
Raw Normal View History

2020-07-17 19:49:16 +02:00
import asyncio
import aiojobs
import inspect
import traceback
from prompt_toolkit import PromptSession
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.completion import Completer
from prompt_toolkit.completion import Completion
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from fuzzywuzzy import fuzz
2020-07-17 20:28:20 +02:00
# @@@@
# &@((((((((@@&(@@@
# &(((((@(((((((@(((((((((((((((
# ,(((((((%(((((((((@((((((@
# (((((@(@(((((((((
# @((((((((((@@
# @(
# @(
# @( @@@@@@@@@@
# @( @ &
# @( @
# @( @@ @ &@
# @( / @ ( @
# @( % @ & (
# @( @ @ (
# @( @ @
# @( @ # @
# @( * # #* (
# @( @ ### ####
# @( @ ##### ## ## ,
# @( ## ## ## ## @
# @( @ ,, , ,, ## #,##. #### ## ,,,, ,. @
# @( (@ ,, , ,, ### ## # # ## ### ,,,, ,. @(@
# @( @ ((@ ,, , ,, ### # (### ,,,, , @((* @@
# @( @ ((( (((*
# @( @ @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# %(((@ @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# %(((@ @###################################################################
# %(((@ @####@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@#####
# %(((@ @ #########@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@########
# @ ##########&@@@@@@@@@@@@@@@@@@@@@@@@@@@###########
# @ ########%@@@@@@@@@@@@@@@@@#########
# @ ####@@@@@@@@@@@@@@#####
# * % ###@@@@@@@@@@@@@### @
# / % @ ###@@@@@@@@@@@### @ (
# @ @ ###@@@@@@@@@###. @ , @
# @ @ @ ###@@@@@@@@@### @
# @ @ ###@@@@@@@### @ @ @
# & @ #@@,,,,@@## @ @ @
# @ @ . @,@@@@,@@@,, @ @ @
# @ @ @ (@@@@#######@@@,@ @ % @
# @ # @ @@,@ ########### @@@ @ @
# @ @ ,,@ ########### &,, @ @@@
# @@@ ,, ########### *,, @@@@
# (,@ ########### @,,
# *, ########### ,,@ @@
# (. @ %, ########### @& #@ @
# @ @ #@ ########### , #@ @
# @ @ #@@@@@@@@@# @@ @
# .@ (@/ #@@@@@@@@@# @@ @@
# (@ @@@@@@@@@ @@ ,
# @ #@* @@@@@@@@@# @
# @ @
# @@@@, @@@
2020-07-17 19:49:16 +02:00
async def test():
print("pokemon go")
async def close():
exit()
async def nA():
print("this is a test")
async def nB():
print("this is b test")
async def nC():
print("this is c test")
async def arg(arg):
print("got " + arg)
async def arg2(arg1, arg2):
print("got " + arg1 + " and " + arg2)
async def arg4(arg1, arg2, arg3, arg4):
print("got " + arg1 + " and " + arg2 + " and " + arg3 + " and " + arg4)
cmds = {
"test": test,
"exit": close,
"arg": arg,
"arg2": arg2,
"arg4": arg4,
"nested": {
"a": nA,
"b": nB,
"sub": {
"c": nC
}
}
}
2020-07-17 20:21:52 +02:00
class _CompletionLookup(Completer):
2020-07-17 19:49:16 +02:00
def get_completions(self, document, complete_event):
words = document.text.split(" ")
pos = cmds
index = -1
for i,word in enumerate(words):
index = i
if not str(type(pos))=="<class 'function'>" and word in pos:
pos = pos[word]
else:
break
if not str(type(pos))=="<class 'function'>" :
comps = []
for word in pos:
score = fuzz.partial_ratio(word,words[-1]) + fuzz.ratio(word, words[-1])
if score > 90 or (len(words)==index+1 and str(document.text).endswith(" ")):
comps.append([word, score])
comps.sort(key = lambda x: x[1], reverse = True)
for i in range(min(5, len(comps))):
yield Completion(comps[i][0], start_position=0)
else:
args = inspect.getfullargspec(pos)[0]
if len(args)>len(words)-index-1:
arg = args[len(words)-index-1]
yield Completion("<"+str(arg)+">", start_position=0)
2020-07-17 20:21:52 +02:00
class Rex():
def __init__(self, cmds=cmds, prompt="[~> ", hasToolbar = True, printExceptions = True, raiseExceptions = False,
pipeReturn = False):
self.cmds = cmds
self.prompt = prompt
self.hasToolbar = hasToolbar
self.session = PromptSession()
self.toolbar = [("", "")]
self.printExceptions = printExceptions
self.raiseExceptions = raiseExceptions
self.pipeReturn = pipeReturn
async def once(self):
2020-07-17 19:49:16 +02:00
with patch_stdout():
try:
2020-07-17 20:21:52 +02:00
inp = await self.session.prompt_async(self.prompt,
completer = _CompletionLookup(),
2020-07-17 19:49:16 +02:00
auto_suggest = AutoSuggestFromHistory(),
2020-07-17 20:21:52 +02:00
bottom_toolbar = [None,self._bottom_toolbar][self.hasToolbar])
2020-07-17 19:49:16 +02:00
except KeyboardInterrupt:
2020-07-17 20:21:52 +02:00
return False
2020-07-17 19:49:16 +02:00
try:
words = inp.split(" ")
pos = cmds
index = 0
for i,word in enumerate(words):
if not str(type(pos))=="<class 'function'>" and word in pos:
pos = pos[word]
else:
break
index = i
if str(type(pos))=="<class 'function'>":
2020-07-17 20:21:52 +02:00
if len(inspect.getfullargspec(pos)[0])!=len(words[index+1:]):
print("[!] The given commands expects "+str(len(inspect.getfullargspec(pos)[0]))+" arguments, "+
"but "+str(len(words[index+1:]))+" were given")
else:
ret = await pos(*words[index+1:])
if self.pipeReturn:
return ret
2020-07-17 19:49:16 +02:00
else:
print("[!] No such command")
except Exception as e:
2020-07-17 20:21:52 +02:00
if self.printExceptions:
print("[!] An Exception Occured: "+str(e))
for line in traceback.format_exc().split("\n"):
print("[ ] "+line)
if self.raiseExceptions:
raise e
return True
async def run(self):
if self.pipeReturn:
raise Exception("Cannot 'run', if pipeReturn is set to true")
while await self.once():
pass
def runFromSync(self):
asyncio.run(self.run())
def _bottom_toolbar(self):
return self.toolbar
async def setToolbarMsg(self, msg: str, col: str = "bg:black"):
self.toolbar = [(col, " "+msg)]