Schwarzschild/main.py

221 lines
7.4 KiB
Python

#!/usr/bin/env python3
import traceback
import sys
import socket
import os
import yaml
import time
from ping3 import ping
import fritzconnection.core.exceptions as exp
from fritzconnection.cli import utils
from fritzconnection import FritzConnection
from fritzconnection.core.fritzconnection import (
FRITZ_IP_ADDRESS,
FRITZ_TCP_PORT,
FRITZ_ENV_USERNAME,
FRITZ_ENV_PASSWORD,
)
DRY = False
def readPortmappings(fc):
num = fc.call_action("WANPPPConnection1", "GetPortMappingNumberOfEntries", )
portMappings = []
numi = num["NewPortMappingNumberOfEntries"]
for i in range(numi):
portMappings.append(fc.call_action("WANPPPConnection1", "GetGenericPortMappingEntry", NewPortMappingIndex=i))
return portMappings
def findPortmapping(fc, extPort, intPort):
portmappings = readPortmappings(fc)
ip = readMyIP()
for m in portmappings:
if m["NewInternalClient"] == ip and m["NewInternalPort"] == int(intPort) and m["NewExternalPort"] == int(extPort):
return m
return None
def printMappingHeader():
print('{:<20} {:<8} {:<20} {:<20} {:<15} {:<15} {}\n'.format(
'Description', 'Protocol', 'Dest. Host', 'Dest. IP', 'Mapping', 'Lease Duration', 'Status'))
def printMapping(m):
desc = str(m['NewPortMappingDescription']) if m['NewPortMappingDescription'] else '-'
proto = str(m['NewProtocol']) if m['NewProtocol'] else '-'
host = str(m['NewInternalClient']) if m['NewInternalClient'] else '-'
extport = str(m['NewExternalPort']) if m['NewExternalPort'] else "?"
intport = str(m['NewInternalPort']) if m['NewInternalPort'] else "?"
lease = str(m['NewLeaseDuration']) if m['NewLeaseDuration'] else "infinite"
status = 'active' if m['NewEnabled'] else ''
ip = host
mapping = extport + "->" + intport
try:
host = socket.gethostbyaddr(host)[0]
except:
pass
print(f'{desc:<20} {proto:<8} {host:<20} {ip:<20} {mapping:<15} {lease:<15} {status}')
def listPortmappings(fc):
print('Fritz Port Forwardings: \n')
printMappingHeader()
portmappings = readPortmappings(fc)
for m in portmappings: printMapping(m)
print()
def readMyIP():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
def addPortMapping(fc, extPort, intPort, name=None, protocol='TCP', active=True):
mapping = {
'NewRemoteHost': '0.0.0.0',
'NewExternalPort': extPort,
'NewProtocol': protocol,
'NewInternalPort': intPort,
'NewInternalClient': readMyIP(),
'NewEnabled': active,
'NewPortMappingDescription': name,
'NewLeaseDuration': 0
}
if DRY:
print('DRY', mapping)
return
fc.call_action("WANPPPConnection1", "AddPortMapping", **mapping )
def deletePortMapping(fc, extPort, intPort, protocol='TCP', name=None):
addPortMapping(fc, extPort, intPort, name=name, protocol=protocol, active=False)
def load_config():
with open('config.yaml', 'r') as f:
conf = yaml.safe_load(f)
return conf
def make_con(ip, user, pw):
return FritzConnection(address=ip, user=user, password=pw, use_cache=True)
def main():
conf = load_config()
fc = make_con(conf['ip'], conf['user'], conf['pass'])
loop(conf, fc)
def loop(conf, fc):
period = conf['period']
active_paths = read_paths(conf, fc)
clock = 0
info('starting...')
while True:
begin = time.time()
active_paths, clock = cycle(active_paths, clock, conf, fc)
sleep = time.time() - begin + period
if sleep > 0:
time.sleep(sleep)
def cycle(active_paths, clock, conf, fc):
if clock == -1 or (conf['validate_paths_every'] != 0 and clock >= conf['validate_paths_every']-1):
clock = 0
real_active_paths = read_paths(conf, fc)
diff = paths_diffs(active_paths, real_active_paths)
if diff:
active_paths = real_active_paths
alert(f'The paths configured in the router are different from what we expect based on our state. Were they changed manually? We will fix this.')
else:
info(f'Router paths validated.')
else:
clock += 1
new_paths = select_paths(conf, fc)
diff = paths_diffs(active_paths, new_paths)
if diff:
flush_paths(conf, fc, diff)
for route_name in diff:
old_path = active_paths[route_name]
new_path = new_paths[route_name]
alert(f'Route change: Switched from path {old_path} to path {new_path} for route {route_name}')
clock = -1 # force check next round
info(f'New routes:')
else:
info(f'Routes ok:')
for route in conf['routes']:
print(f' - {route["name"]} -> {new_paths[route["name"]]}')
return new_paths, clock
def paths_diffs(old_paths, new_paths, verbose=False):
diffs = {}
for route_name in new_paths:
if route_name not in old_paths or old_paths[route_name] != new_paths[route_name]:
diffs[route_name] = new_paths[route_name]
return diffs
def select_paths(conf, fc):
path_indices = {}
for route in conf['routes']:
for path in route['paths']:
if check_avail(path):
path_indices[route['name']] = path['name']
break
else:
# All paths are down
path_indices[route['name']] = None
return path_indices
def read_paths(conf, fc):
ms = readPortmappings(fc)
path_indices = {}
for route in conf['routes']:
for path in route['paths']:
# Lets see if we have such a path active
fstFwd = path['forwards'][0]
found_map = False
for m in ms:
if m['NewEnabled'] and m['NewInternalClient'] == path['ip'] and m['NewPortMappingDescription'] == fstFwd['name'] and m['NewExternalPort'] == fstFwd['external'] and m['NewInternalPort'] == fstFwd['internal'] and m['NewProtocol'] == fstFwd['protocol'].upper():
# This path is (at least partially) active
path_indices[route['name']] = path['name']
found_map = True
break
if found_map:
break
else:
# All paths are down
path_indices[route['name']] = None
alert('We found no active path for route {route["name"]}.')
return path_indices
def flush_paths(conf, fc, active_paths):
for route in conf['routes']:
if route['name'] not in active_paths:
continue
active_path = active_paths[route['name']]
make_active_path = None
for path in route['paths']:
if path['name'] == active_path:
if not make_active_path == None:
alert(f'Illegal config for {route["name"]}: Has redundant path names.')
make_active_path = path
else:
flush_forwards(path, False, fc)
flush_forwards(make_active_path, True, fc)
if make_active_path==None:
alert(f'No active path for route {route["name"]}')
def flush_forwards(path, make_active, fc):
for forward in path['forwards']:
addPortMapping(fc, forward['external'], forward['internal'], name=forward['name'], protocol=forward['protocol'], active=make_active)
def check_avail(path):
ip = path['ip']
resp_time = ping(ip, timeout=3)
return resp_time!=None
def alert(txt):
print('[!] '+txt)
def info(txt):
print('[i] '+txt)
if __name__=='__main__':
main()