Schwarzschild/main.py

218 lines
7.4 KiB
Python
Raw Normal View History

2024-05-21 16:55:57 +02:00
#!/usr/bin/env python3
import traceback
import sys
import socket
import os
import yaml
import time
from ping3 import ping
import fritzconnection.core.exceptions as exp
from fritzconnection.cli import utils
from fritzconnection import FritzConnection
from fritzconnection.core.fritzconnection import (
FRITZ_IP_ADDRESS,
FRITZ_TCP_PORT,
FRITZ_ENV_USERNAME,
FRITZ_ENV_PASSWORD,
)
2024-05-21 16:57:14 +02:00
DRY = False
2024-05-21 16:55:57 +02:00
def readPortmappings(fc):
num = fc.call_action("WANPPPConnection1", "GetPortMappingNumberOfEntries", )
portMappings = []
numi = num["NewPortMappingNumberOfEntries"]
for i in range(numi):
portMappings.append(fc.call_action("WANPPPConnection1", "GetGenericPortMappingEntry", NewPortMappingIndex=i))
return portMappings
def findPortmapping(fc, extPort, intPort):
portmappings = readPortmappings(fc)
ip = readMyIP()
for m in portmappings:
if m["NewInternalClient"] == ip and m["NewInternalPort"] == int(intPort) and m["NewExternalPort"] == int(extPort):
return m
return None
def printMappingHeader():
print('{:<20} {:<8} {:<20} {:<20} {:<15} {:<15} {}\n'.format(
'Description', 'Protocol', 'Dest. Host', 'Dest. IP', 'Mapping', 'Lease Duration', 'Status'))
def printMapping(m):
desc = str(m['NewPortMappingDescription']) if m['NewPortMappingDescription'] else '-'
proto = str(m['NewProtocol']) if m['NewProtocol'] else '-'
host = str(m['NewInternalClient']) if m['NewInternalClient'] else '-'
extport = str(m['NewExternalPort']) if m['NewExternalPort'] else "?"
intport = str(m['NewInternalPort']) if m['NewInternalPort'] else "?"
lease = str(m['NewLeaseDuration']) if m['NewLeaseDuration'] else "infinite"
status = 'active' if m['NewEnabled'] else ''
ip = host
mapping = extport + "->" + intport
try:
host = socket.gethostbyaddr(host)[0]
except:
pass
print(f'{desc:<20} {proto:<8} {host:<20} {ip:<20} {mapping:<15} {lease:<15} {status}')
def listPortmappings(fc):
print('Fritz Port Forwardings: \n')
printMappingHeader()
portmappings = readPortmappings(fc)
for m in portmappings: printMapping(m)
print()
def readMyIP():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
def upsertPortMapping(fc, extPort, intPort, name=None, protocol='TCP', active=True):
2024-05-21 16:55:57 +02:00
mapping = {
'NewRemoteHost': '0.0.0.0',
'NewExternalPort': extPort,
'NewProtocol': protocol,
'NewInternalPort': intPort,
'NewInternalClient': readMyIP(),
'NewEnabled': active,
'NewPortMappingDescription': name,
'NewLeaseDuration': 0
}
if DRY:
print('DRY', mapping)
return
fc.call_action("WANPPPConnection1", "AddPortMapping", **mapping )
def load_config():
with open('config.yaml', 'r') as f:
conf = yaml.safe_load(f)
return conf
def make_con(ip, user, pw):
return FritzConnection(address=ip, user=user, password=pw, use_cache=True)
def main():
conf = load_config()
fc = make_con(conf['ip'], conf['user'], conf['pass'])
loop(conf, fc)
def loop(conf, fc):
period = conf['period']
active_paths = read_paths(conf, fc)
clock = 0
info('starting...')
while True:
begin = time.time()
active_paths, clock = cycle(active_paths, clock, conf, fc)
sleep = time.time() - begin + period
if sleep > 0:
time.sleep(sleep)
def cycle(active_paths, clock, conf, fc):
if clock == -1 or (conf['validate_paths_every'] != 0 and clock >= conf['validate_paths_every']-1):
clock = 0
real_active_paths = read_paths(conf, fc)
diff = paths_diffs(active_paths, real_active_paths)
if diff:
active_paths = real_active_paths
alert(f'The paths configured in the router are different from what we expect based on our state. Were they changed manually? We will fix this.')
else:
info(f'Router paths validated.')
else:
clock += 1
new_paths = select_paths(conf, fc)
diff = paths_diffs(active_paths, new_paths)
if diff:
flush_paths(conf, fc, diff)
for route_name in diff:
old_path = active_paths[route_name]
new_path = new_paths[route_name]
alert(f'Route change: Switched from path {old_path} to path {new_path} for route {route_name}')
clock = -1 # force check next round
2024-05-21 16:58:27 +02:00
info(f'New routes:')
else:
info(f'Routes ok:')
2024-05-21 16:55:57 +02:00
for route in conf['routes']:
print(f' - {route["name"]} -> {new_paths[route["name"]]}')
return new_paths, clock
def paths_diffs(old_paths, new_paths, verbose=False):
diffs = {}
for route_name in new_paths:
if route_name not in old_paths or old_paths[route_name] != new_paths[route_name]:
diffs[route_name] = new_paths[route_name]
return diffs
def select_paths(conf, fc):
path_indices = {}
for route in conf['routes']:
for path in route['paths']:
if check_avail(path):
path_indices[route['name']] = path['name']
break
else:
# All paths are down
path_indices[route['name']] = None
return path_indices
def read_paths(conf, fc):
ms = readPortmappings(fc)
path_indices = {}
for route in conf['routes']:
found_map = False
2024-05-21 16:55:57 +02:00
for path in route['paths']:
# Lets see if we have such a path active
fstFwd = path['forwards'][0]
for m in ms:
if m['NewEnabled'] and m['NewInternalClient'] == path['ip'] and m['NewPortMappingDescription'] == fstFwd['name'] and m['NewExternalPort'] == fstFwd['external'] and m['NewInternalPort'] == fstFwd['internal'] and m['NewProtocol'] == fstFwd['protocol'].upper():
# This path is (at least partially) active
if found_map: # Collision!!!
path_indices[route['name']] = 'COLLISION'
else:
path_indices[route['name']] = path['name']
found_map = True
if not found_map:
2024-05-21 16:55:57 +02:00
# All paths are down
path_indices[route['name']] = None
alert('We found no active path for route {route["name"]}.')
return path_indices
def flush_paths(conf, fc, active_paths):
for route in conf['routes']:
if route['name'] not in active_paths:
continue
active_path = active_paths[route['name']]
make_active_path = None
for path in route['paths']:
if path['name'] == active_path:
if not make_active_path == None:
alert(f'Illegal config for {route["name"]}: Has redundant path names.')
make_active_path = path
else:
flush_forwards(path, False, fc)
flush_forwards(make_active_path, True, fc)
if make_active_path==None:
alert(f'No active path for route {route["name"]}')
def flush_forwards(path, make_active, fc):
for forward in path['forwards']:
upsertPortMapping(fc, forward['external'], forward['internal'], name=forward['name'], protocol=forward['protocol'], active=make_active)
2024-05-21 16:55:57 +02:00
def check_avail(path):
ip = path['ip']
resp_time = ping(ip, timeout=3)
return resp_time!=None
def alert(txt):
print('[!] '+txt)
def info(txt):
print('[i] '+txt)
if __name__=='__main__':
main()