#!/usr/bin/env python3 import traceback import sys import socket import os import yaml import time from ping3 import ping import fritzconnection.core.exceptions as exp from fritzconnection.cli import utils from fritzconnection import FritzConnection from fritzconnection.core.fritzconnection import ( FRITZ_IP_ADDRESS, FRITZ_TCP_PORT, FRITZ_ENV_USERNAME, FRITZ_ENV_PASSWORD, ) DRY = False def readPortmappings(fc): num = fc.call_action("WANPPPConnection1", "GetPortMappingNumberOfEntries", ) portMappings = [] numi = num["NewPortMappingNumberOfEntries"] for i in range(numi): portMappings.append(fc.call_action("WANPPPConnection1", "GetGenericPortMappingEntry", NewPortMappingIndex=i)) return portMappings def findPortmapping(fc, extPort, intPort): portmappings = readPortmappings(fc) ip = readMyIP() for m in portmappings: if m["NewInternalClient"] == ip and m["NewInternalPort"] == int(intPort) and m["NewExternalPort"] == int(extPort): return m return None def printMappingHeader(): print('{:<20} {:<8} {:<20} {:<20} {:<15} {:<15} {}\n'.format( 'Description', 'Protocol', 'Dest. Host', 'Dest. IP', 'Mapping', 'Lease Duration', 'Status')) def printMapping(m): desc = str(m['NewPortMappingDescription']) if m['NewPortMappingDescription'] else '-' proto = str(m['NewProtocol']) if m['NewProtocol'] else '-' host = str(m['NewInternalClient']) if m['NewInternalClient'] else '-' extport = str(m['NewExternalPort']) if m['NewExternalPort'] else "?" intport = str(m['NewInternalPort']) if m['NewInternalPort'] else "?" lease = str(m['NewLeaseDuration']) if m['NewLeaseDuration'] else "infinite" status = 'active' if m['NewEnabled'] else '' ip = host mapping = extport + "->" + intport try: host = socket.gethostbyaddr(host)[0] except: pass print(f'{desc:<20} {proto:<8} {host:<20} {ip:<20} {mapping:<15} {lease:<15} {status}') def listPortmappings(fc): print('Fritz Port Forwardings: \n') printMappingHeader() portmappings = readPortmappings(fc) for m in portmappings: printMapping(m) print() def readMyIP(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) return s.getsockname()[0] def upsertPortMapping(fc, extPort, intPort, name=None, protocol='TCP', active=True): mapping = { 'NewRemoteHost': '0.0.0.0', 'NewExternalPort': extPort, 'NewProtocol': protocol, 'NewInternalPort': intPort, 'NewInternalClient': readMyIP(), 'NewEnabled': active, 'NewPortMappingDescription': name, 'NewLeaseDuration': 0 } if DRY: print('DRY', mapping) return fc.call_action("WANPPPConnection1", "AddPortMapping", **mapping ) def load_config(): with open('config.yaml', 'r') as f: conf = yaml.safe_load(f) return conf def make_con(ip, user, pw): return FritzConnection(address=ip, user=user, password=pw, use_cache=True) def main(): conf = load_config() fc = make_con(conf['ip'], conf['user'], conf['pass']) loop(conf, fc) def loop(conf, fc): period = conf['period'] active_paths = read_paths(conf, fc) clock = 0 info('starting...') while True: begin = time.time() active_paths, clock = cycle(active_paths, clock, conf, fc) sleep = time.time() - begin + period if sleep > 0: time.sleep(sleep) def cycle(active_paths, clock, conf, fc): if clock == -1 or (conf['validate_paths_every'] != 0 and clock >= conf['validate_paths_every']-1): clock = 0 real_active_paths = read_paths(conf, fc) diff = paths_diffs(active_paths, real_active_paths) if diff: active_paths = real_active_paths alert(f'The paths configured in the router are different from what we expect based on our state. Were they changed manually? We will fix this.') else: info(f'Router paths validated.') else: clock += 1 new_paths = select_paths(conf, fc) diff = paths_diffs(active_paths, new_paths) if diff: flush_paths(conf, fc, diff) for route_name in diff: old_path = active_paths[route_name] new_path = new_paths[route_name] alert(f'Route change: Switched from path {old_path} to path {new_path} for route {route_name}') clock = -1 # force check next round info(f'New routes:') else: info(f'Routes ok:') for route in conf['routes']: print(f' - {route["name"]} -> {new_paths[route["name"]]}') return new_paths, clock def paths_diffs(old_paths, new_paths, verbose=False): diffs = {} for route_name in new_paths: if route_name not in old_paths or old_paths[route_name] != new_paths[route_name]: diffs[route_name] = new_paths[route_name] return diffs def select_paths(conf, fc): path_indices = {} for route in conf['routes']: for path in route['paths']: if check_avail(path): path_indices[route['name']] = path['name'] break else: # All paths are down path_indices[route['name']] = None return path_indices def read_paths(conf, fc): ms = readPortmappings(fc) path_indices = {} for route in conf['routes']: found_map = False for path in route['paths']: # Lets see if we have such a path active fstFwd = path['forwards'][0] for m in ms: if m['NewEnabled'] and m['NewInternalClient'] == path['ip'] and m['NewPortMappingDescription'] == fstFwd['name'] and m['NewExternalPort'] == fstFwd['external'] and m['NewInternalPort'] == fstFwd['internal'] and m['NewProtocol'] == fstFwd['protocol'].upper(): # This path is (at least partially) active if found_map: # Collision!!! path_indices[route['name']] = 'COLLISION' else: path_indices[route['name']] = path['name'] found_map = True if not found_map: # All paths are down path_indices[route['name']] = None alert('We found no active path for route {route["name"]}.') return path_indices def flush_paths(conf, fc, active_paths): for route in conf['routes']: if route['name'] not in active_paths: continue active_path = active_paths[route['name']] make_active_path = None for path in route['paths']: if path['name'] == active_path: if not make_active_path == None: alert(f'Illegal config for {route["name"]}: Has redundant path names.') make_active_path = path else: flush_forwards(path, False, fc) flush_forwards(make_active_path, True, fc) if make_active_path==None: alert(f'No active path for route {route["name"]}') def flush_forwards(path, make_active, fc): for forward in path['forwards']: upsertPortMapping(fc, forward['external'], forward['internal'], name=forward['name'], protocol=forward['protocol'], active=make_active) def check_avail(path): ip = path['ip'] resp_time = ping(ip, timeout=3) return resp_time!=None def alert(txt): print('[!] '+txt) def info(txt): print('[i] '+txt) if __name__=='__main__': main()