import asyncio import aiojobs import inspect import traceback from prompt_toolkit import PromptSession from prompt_toolkit.patch_stdout import patch_stdout from prompt_toolkit.completion import Completer from prompt_toolkit.completion import Completion from prompt_toolkit.auto_suggest import AutoSuggestFromHistory from fuzzywuzzy import fuzz # @@@@ # &@((((((((@@&(@@@ # &(((((@(((((((@((((((((((((((( # ,(((((((%(((((((((@((((((@ # (((((@(@((((((((( # @((((((((((@@ # @( # @( # @( @@@@@@@@@@ # @( @ & # @( @ # @( @@ @ &@ # @( / @ ( @ # @( % @ & ( # @( @ @ ( # @( @ @ # @( @ # @ # @( * # #* ( # @( @ ### #### # @( @ ##### ## ## , # @( ## ## ## ## @ # @( @ ,, , ,, ## #,##. #### ## ,,,, ,. @ # @( (@ ,, , ,, ### ## # # ## ### ,,,, ,. @(@ # @( @ ((@ ,, , ,, ### # (### ,,,, , @((* @@ # @( @ ((( (((* # @( @ @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ # %(((@ @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ # %(((@ @################################################################### # %(((@ @####@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@##### # %(((@ @ #########@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@######## # @ ##########&@@@@@@@@@@@@@@@@@@@@@@@@@@@########### # @ ########%@@@@@@@@@@@@@@@@@######### # @ ####@@@@@@@@@@@@@@##### # * % ###@@@@@@@@@@@@@### @ # / % @ ###@@@@@@@@@@@### @ ( # @ @ ###@@@@@@@@@###. @ , @ # @ @ @ ###@@@@@@@@@### @ # @ @ ###@@@@@@@### @ @ @ # & @ #@@,,,,@@## @ @ @ # @ @ . @,@@@@,@@@,, @ @ @ # @ @ @ (@@@@#######@@@,@ @ % @ # @ # @ @@,@ ########### @@@ @ @ # @ @ ,,@ ########### &,, @ @@@ # @@@ ,, ########### *,, @@@@ # (,@ ########### @,, # *, ########### ,,@ @@ # (. @ %, ########### @& #@ @ # @ @ #@ ########### , #@ @ # @ @ #@@@@@@@@@# @@ @ # .@ (@/ #@@@@@@@@@# @@ @@ # (@ @@@@@@@@@ @@ , # @ #@* @@@@@@@@@# @ # @ @ # @@@@, @@@ async def test(): print("pokemon go") async def close(): exit() async def nA(): print("this is a test") async def nB(): print("this is b test") async def nC(): print("this is c test") async def arg(arg): print("got " + arg) async def arg2(arg1, arg2): print("got " + arg1 + " and " + arg2) async def arg4(arg1, arg2, arg3, arg4): print("got " + arg1 + " and " + arg2 + " and " + arg3 + " and " + arg4) cmds = { "test": test, "exit": close, "arg": arg, "arg2": arg2, "arg4": arg4, "nested": { "a": nA, "b": nB, "sub": { "c": nC } } } class _CompletionLookup(Completer): def get_completions(self, document, complete_event): words = document.text.split(" ") pos = cmds index = -1 for i,word in enumerate(words): index = i if not str(type(pos))=="" and word in pos: pos = pos[word] else: break if not str(type(pos))=="" : comps = [] for word in pos: score = fuzz.partial_ratio(word,words[-1]) + fuzz.ratio(word, words[-1]) if score > 90 or (len(words)==index+1 and str(document.text).endswith(" ")): comps.append([word, score]) comps.sort(key = lambda x: x[1], reverse = True) for i in range(min(5, len(comps))): yield Completion(comps[i][0], start_position=0) else: args = inspect.getfullargspec(pos)[0] if len(args)>len(words)-index-1: arg = args[len(words)-index-1] yield Completion("<"+str(arg)+">", start_position=0) class Rex(): def __init__(self, cmds=cmds, prompt="[~> ", hasToolbar = True, printExceptions = True, raiseExceptions = False, pipeReturn = False): self.cmds = cmds self.prompt = prompt self.hasToolbar = hasToolbar self.session = PromptSession() self.toolbar = [("", "")] self.printExceptions = printExceptions self.raiseExceptions = raiseExceptions self.pipeReturn = pipeReturn async def once(self): with patch_stdout(): try: inp = await self.session.prompt_async(self.prompt, completer = _CompletionLookup(), auto_suggest = AutoSuggestFromHistory(), bottom_toolbar = [None,self._bottom_toolbar][self.hasToolbar]) except KeyboardInterrupt: return False try: words = inp.split(" ") pos = cmds index = 0 for i,word in enumerate(words): if not str(type(pos))=="" and word in pos: pos = pos[word] else: break index = i if str(type(pos))=="": if len(inspect.getfullargspec(pos)[0])!=len(words[index+1:]): print("[!] The given commands expects "+str(len(inspect.getfullargspec(pos)[0]))+" arguments, "+ "but "+str(len(words[index+1:]))+" were given") else: ret = await pos(*words[index+1:]) if self.pipeReturn: return ret else: print("[!] No such command") except Exception as e: if self.printExceptions: print("[!] An Exception Occured: "+str(e)) for line in traceback.format_exc().split("\n"): print("[ ] "+line) if self.raiseExceptions: raise e return True async def run(self): if self.pipeReturn: raise Exception("Cannot 'run', if pipeReturn is set to true") while await self.once(): pass def runFromSync(self): asyncio.run(self.run()) def _bottom_toolbar(self): return self.toolbar async def setToolbarMsg(self, msg: str, col: str = "bg:black"): self.toolbar = [(col, " "+msg)]