#!./.venv/bin/python3.9 import os import json import math import random import numpy as np from scipy.stats import norm import matplotlib.pyplot as plt import networkx as nx from pyvis.network import Network def getAllAuthors(books): authors = set() for book in books: for author in getAuthors(book): authors.add(author) return list(authors) def getAuthors(book): return book['authors'].split(' & ') def getRecommenders(book): for tag in book['tags']: if tag.find(" Recommendation") != -1: yield tag.replace(" Recommendation", "") elif tag.find("s Literature Club") != -1: yield tag.replace("s Literature Club", "") def getTags(book): for tag in book['tags']: if tag.find(" Recommendation") == -1 and tag.find("s Literature Club") == -1 and tag.find(" Top ") == -1: yield tag def getAllRecommenders(books): recs = set() for book in books: for rec in getRecommenders(book): recs.add(rec) return list(recs) def getTopLists(book): lists = set() for tag in book['tags']: if tag.find(" Top ") != -1: lists.add(tag.split(" Top ")[0]) return list(lists) def getAllTopLists(books): tops = set() for book in books: for top in getTopLists(book): tops.add(top) return list(tops) def getAllSeries(books): series = set() for book in books: if 'series' in book: series.add(book['series']) return list(series) def getAllTags(books): tags = set() for book in books: for tag in getTags(book): tags.add(tag) return list(tags) def getTopListWheight(book, topList): minScope = 100000 for tag in book['tags']: if tag.find(topList+" Top ") != -1: scope = int(tag.split(" Top ")[1]) minScope = min(minScope, scope) if minScope == 100000: raise Exception("You stupid?") if minScope == 10: return 1 elif minScope == 25: return 0.85 elif minScope == 100: return 0.5 return minScope / 10 def removeRead(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'book': if node['rating'] != None: G.remove_node(n) def removeUnread(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'book': if node['rating'] == None: G.remove_node(n) def removePriv(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'book': if 'priv' in node['tags']: G.remove_node(n) def removeDangling(G, alsoBooks=False): for n in list(G.nodes): node = G.nodes[n] if node['t'] != 'book' or alsoBooks: if not len(G.adj[n]): G.remove_node(n) def removeEdge(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] != 'book': if len(G.adj[n]) < 2: G.remove_node(n) def removeBad(G, threshold, groups=['book', 'topList', 'recommender', 'author', 'series', 'tag']): for n in list(G.nodes): node = G.nodes[n] if node['t'] in groups: if 'score' in node and (node['score'] == None or node['score'] < threshold): G.remove_node(n) def removeKeepBest(G, num, maxDistForRead=1): bestlist = [] for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'book': if 'score' in node and node['score'] != None: bestlist.append(node) bestlist.sort(key=lambda node: node['score'], reverse=True) bestlist = bestlist[:num] for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'book' and node not in bestlist or 'score' in node and node['score'] == None: if not 'rating' in node or node['rating'] == None or node['rating'] < bestlist[-1]['score']-maxDistForRead: G.remove_node(n) def removeTags(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'tag': G.remove_node(n) def pruneTags(G, minCons=2): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'tag': foundCon = 0 for book in G.adj[n]: for con in G.adj[book]: conType = G.nodes[con]['t'] if conType not in ['topList']: if conType in ['recommender']: foundCon += 0.5 elif conType in ['tag', 'series']: foundCon += 0.25 else: foundCon += 1 if foundCon > minCons: G.remove_node(n) def pruneRecommenderCons(G, maxCons=5): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'recommender': if len(G.adj[n]) > maxCons: bestlist = [] for m in list(G.adj[n]): book = G.nodes[m] if book['t'] == 'book': if 'score' in book and book['score'] != None: bestlist.append(book) bestlist.sort(key=lambda node: node['score'], reverse=True) bestlist = bestlist[:maxCons] for m in list(G.adj[n]): book = G.nodes[m] if book['t'] == 'book' and book not in bestlist or 'score' in book and book['score'] == None: if not 'rating' in book or book['rating'] == None: foundCon = 0 for con in G.adj[m]: if G.nodes[con]['t'] not in ['topList']: foundCon += 1 if foundCon < 2: G.remove_node(m) def pruneAuthorCons(G, maxCons=3): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'author': if len(G.adj[n]) > maxCons: bestlist = [] for m in list(G.adj[n]): book = G.nodes[m] if book['t'] == 'book': if 'score' in book and book['score'] != None: bestlist.append(book) bestlist.sort(key=lambda node: node['score'], reverse=True) bestlist = bestlist[:maxCons] for m in list(G.adj[n]): book = G.nodes[m] if book['t'] == 'book' and book not in bestlist or 'score' in book and book['score'] == None: if not 'rating' in book or book['rating'] == None: foundCon = 0 for con in G.adj[m]: if G.nodes[con]['t'] not in ['topList']: foundCon += 1 if foundCon < 2: G.remove_node(m) def removeHighSpanTags(G, maxCons=5): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'tag': if len(G.adj[n]) > maxCons: G.remove_node(n) def removeTopLists(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'topList': G.remove_node(n) def removeRestOfSeries(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'series': seriesState = 0 for adj in G.adj[n]: adjNode = G.nodes[adj] if adjNode['rating'] != None: seriesState = max(seriesState, int( adjNode['series_index'])) for adj in list(G.adj[n]): adjNode = G.nodes[adj] if adjNode['series_index'] > seriesState + 1.0001: G.remove_node(adj) def scoreOpinions(G, globMu, globStd, errorFac=0.7): for n in list(G.nodes): node = G.nodes[n] feedbacks = [] if node['t'] in ['topList', 'recommender', 'author', 'series', 'tag']: adjacens = list(G.adj[n].keys()) for adj in adjacens: adjNode = G.nodes[adj] if adjNode['rating'] != None: feedbacks.append(adjNode['rating']) if len(feedbacks): node['mean'], node['std'] = norm.fit(feedbacks) node['se'] = globStd / math.sqrt(len(feedbacks)) ratio = len(feedbacks) / len(adjacens) node['score'] = node['mean'] - errorFac * \ node['se']*(6/7 + (1-ratio)/7) + 0.01 * \ (node['t'] == 'recommender') \ - 0.5 / len(feedbacks)**2 node['feedbacks'] = feedbacks else: node['score'] = None def scoreUnread(G, globMu, globStd, errorFac=0.6): for n in list(G.nodes): feedbacks = [] deepFeedbacks = [globMu - globStd*0.5] deepLen = 1 node = G.nodes[n] if node['t'] == 'book': if node['rating'] == None: adjacens = list(G.adj[n].keys()) for adj in adjacens: adjNode = G.nodes[adj] if 'score' in adjNode and adjNode['score'] != None: if adjNode['t'] == 'tag': w = int(10/(len(G.adj[adj]))) elif adjNode['t'] == 'topList': w = int(G[n][adj]['wheight']*5) else: w = 10 feedbacks.append(adjNode['score']) for fb in adjNode['feedbacks']: for i in range(w): deepFeedbacks.append(fb) deepLen += w if len(feedbacks): node['mean'], node['std'] = norm.fit(deepFeedbacks) node['mean2'], node['std2'] = norm.fit(feedbacks) if deepLen: node['se'] = globStd / math.sqrt(deepLen) # - errorFac*node['se'] node['score'] = ( (node['mean'] - errorFac*node['se'])*3 + node['mean2']*2)/5 else: node['score'] = globMu - errorFac*globStd if 'series' in node: if node['series_index'] == 1.0: node['score'] += 0.000000001 else: node['score'] = None def printBestList(G, num=-1): bestlist = [] for n in list(G.nodes): node = G.nodes[n] if node['t'] == 'book': if 'score' in node and node['score'] != None: bestlist.append(node) bestlist.sort(key=lambda node: node['score'], reverse=True) for i, book in enumerate(bestlist): print("["+str(i+1).zfill(int((math.log10(num) if num!=-1 else 3)+1))+"] "+book['title'] + " ("+" & ".join(book['authors'])+"): {:.5f}".format(book['score'])) if num!=-1 and i == num-1: break def readColor(book): if 'rating' in book: return 'green' else: return 'gray' def loadBooksFromDB(): return json.loads(os.popen("calibredb list --for-machine -f all").read()) def buildBookGraph(books): G = nx.Graph() # Books for book in books: if 'rating' in book: rating = book['rating'] else: rating = None if 'comments' in book: desc = '' # book['comments'] else: desc = '' if 'series' in book: series = book['series'] series_index = book['series_index'] else: series = None series_index = None G.add_node(book['id'], t='book', label=book['title'], title=book['title'], shape='image', image=book['cover'], rating=rating, tags=book['tags'], desc=desc, isbn=book['isbn'], files=book['formats'], authors=getAuthors(book), series=series, series_index=series_index) return G def graphAddAuthors(G, books): for author in getAllAuthors(books): G.add_node('a/'+author, color='green', t='author', label=author) for book in books: for author in getAuthors(book): G.add_edge('a/'+author, book['id'], color=readColor(book)) return G def graphAddRecommenders(G, books): for rec in getAllRecommenders(books): G.add_node('r/'+rec, color='orange', t='recommender', label=rec) for book in books: for rec in getRecommenders(book): G.add_edge('r/'+rec, book['id'], color=readColor(book)) return G def graphAddTopLists(G, books): for tl in getAllTopLists(books): G.add_node('t/'+tl, color='yellow', t='topList', label=tl) for book in books: for top in getTopLists(book): G.add_edge('t/'+top, book['id'], wheight=getTopListWheight( book, top), color=readColor(book)) return G def graphAddSeries(G, books): for series in getAllSeries(books): G.add_node('s/'+series, color='red', t='series', label=series, shape='triangle') for book in books: if 'series' in book: G.add_edge('s/'+book['series'], book['id'], color=readColor(book)) return G def graphAddTags(G, books): for tag in getAllTags(books): G.add_node('t/'+tag, color='lightGray', t='tag', label=tag, shape='box') for book in books: for tag in getTags(book): G.add_edge('t/'+tag, book['id'], color=readColor(book)) return G def calcRecDist(G, books): globRatings = [] for book in books: if G.nodes[book['id']]['rating'] != None: globRatings.append(G.nodes[book['id']]['rating']) return norm.fit(globRatings) def scaleBooksByRating(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] not in []: if 'rating' in node and node['rating'] != None: node['value'] = 20 + 5 * int(node['rating']) else: if 'score' in node and node['score'] != None: node['value'] = 20 + 5 * int(node['score']) else: node['value'] = 15 def scaleOpinionsByRating(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] in ['topList', 'recommender', 'author', 'series']: if 'score' in node and node['score'] != None: node['value'] = 20 + 5 * int(node['score']) else: node['value'] = 20 def addScoreToLabels(G): for n in list(G.nodes): node = G.nodes[n] if node['t'] not in ['tag']: if 'rating' in node and node['rating'] != None: node['label'] += " ("+str(node['rating'])+")" else: if 'score' in node and node['score'] != None: node['label'] += " (~{:.2f}".format(node['score'])+")" else: node['label'] += " (~0)" def genAndShowHTML(G, showButtons=False): net = Network('1080px', '1920px') if showButtons: net.show_buttons(filter_=['configure', 'layout', 'interaction', 'physics', 'edges']) net.from_nx(G) net.show('nx.html') def buildFullGraph(): books = loadBooksFromDB() G = buildBookGraph(books) graphAddAuthors(G, books) graphAddRecommenders(G, books) graphAddTopLists(G, books) graphAddSeries(G, books) graphAddTags(G, books) return G, books def genScores(G, books): globMu, globStd = calcRecDist(G, books) scoreOpinions(G, globMu, globStd) scoreUnread(G, globMu, globStd) return globMu, globStd def recommendNBooks(G, mu, std, n): removeRestOfSeries(G) removeBad(G, mu-std-1.5) removeKeepBest(G, int(n*2) + 5, maxDistForRead=1.5) removeEdge(G) removeHighSpanTags(G, 9) removeDangling(G, alsoBooks=False) pruneTags(G, 6) removeBad(G, mu, groups=['book']) pruneTags(G, 4.25) pruneRecommenderCons(G, int(n/7)+1) pruneAuthorCons(G, int(n/15)) removeTopLists(G) removeDangling(G, alsoBooks=True) removeKeepBest(G, n, maxDistForRead=0.75) removeEdge(G) removeDangling(G, alsoBooks=True) scaleBooksByRating(G) scaleOpinionsByRating(G) addScoreToLabels(G) def fullGraph(G): removeEdge(G) removeHighSpanTags(G, 7) removeDangling(G, alsoBooks=False) removeTopLists(G) pruneTags(G, 3) removeDangling(G, alsoBooks=True) scaleBooksByRating(G) scaleOpinionsByRating(G) addScoreToLabels(G) def readBooksAnalysis(G, minRating=0, showAllTags=True, removeUnconnected=False): removeUnread(G) removeBad(G, minRating) if not showAllTags: removeEdge(G) removeHighSpanTags(G, 15) removeDangling(G, alsoBooks=removeUnconnected) removeTopLists(G) pruneTags(G, 8) scaleBooksByRating(G) scaleOpinionsByRating(G) addScoreToLabels(G) def analyze(G, type_name, name, dist=2.7): from fuzzywuzzy import fuzz type_ident = type_name[0] full_name = type_ident + "/" + name bestRatio, match, n = 0, None, 0 for ni in list(G.nodes): node = G.nodes[ni] if node['t'] == type_name or type_name=="any": if name==node['label'] or full_name==node['label']: match, n = node, ni break ratio = fuzz.ratio(node['label'], name) if ratio > bestRatio: bestRatio, match, n = ratio, node, ni if bestRatio < 70: print("Best Match: "+match['label']) menge = set() waveFlow(G, match, n, dist, menge) for n in list(G.nodes): if n not in menge: G.remove_node(n) removeHighSpanTags(G, 12) if dist > 1: removeDangling(G, True) scaleBooksByRating(G) scaleOpinionsByRating(G) #match['value'] = 100 if not 'shape' in match: match['shape'] = 'star' addScoreToLabels(G) match['label'] = "*"+match['label']+"*" def waveFlow(G, node, n, dist, menge, firstEdge=False): if dist <= 0: return dist -= 1 if menge==set(): firstEdge=True if node['t'] in ['topList']: if firstEdge: menge.add(n) return menge.add(n) if node['t'] in ['tag']: if firstEdge: dist-=0.1 else: return bestlist = [] keeplist = [] for m in list(G.adj[n]): book = G.nodes[m] if book['t'] not in ['NOTHING']: if 'score' in book and book['score'] != None: bestlist.append(book) elif 'rating' in book and book['rating'] != None: keeplist.append(book) else: book['score'] = 0 bestlist.append(book) bestlist.sort(key=lambda node: node['score'], reverse=True) toKeep = min(int(dist*10), math.ceil(len(bestlist) * dist - len(keeplist)*0.5)) if toKeep <= 0: keeplist.sort(key=lambda node: node['rating'], reverse=True) keeplist = keeplist[:min(int(dist*10), int(len(keeplist) * dist))] bestlist = [] else: bestlist = bestlist[:toKeep] for m in list(G.adj[n]): node = G.nodes[m] if node in bestlist or node in keeplist: waveFlow(G, node, m, dist, menge, firstEdge=firstEdge) def cliInterface(): import argparse parser = argparse.ArgumentParser(description='TODO: Write Description.') parser.add_argument('--keep-priv', action="store_true") parser.add_argument('--remove-read', action="store_true") parser.add_argument('--remove-unread', action="store_true") parser.add_argument('--no-web', action="store_true") parser.add_argument('--no-list', action="store_true") parser.add_argument('--remove-edge', action="store_true") cmds = parser.add_subparsers(required=True, dest='cmd') p_rec = cmds.add_parser('recommend', description="TODO", aliases=['rec']) p_rec.add_argument('-n', type=int, default=25, help='number of books to recommend') p_read = cmds.add_parser('read', description="TODO", aliases=[]) p_read.add_argument('--min-rating', type=int, default=0) p_read.add_argument('--all-tags', action="store_true") p_read.add_argument('--only-connected', action="store_true") p_show = cmds.add_parser('analyze', description="TODO", aliases=[]) p_show.add_argument('type', choices=['any', 'book', 'recommender', 'author', 'series']) p_show.add_argument('name', type=str) p_show.add_argument('-d', type=float, default=2.7, help='depth of expansion') p_full = cmds.add_parser('full', description="TODO", aliases=[]) args = parser.parse_args() G, books = buildFullGraph() mu, std = genScores(G, books) if not args.keep_priv: removePriv(G) if args.remove_read: removeRead(G) elif args.remove_unread: removeUnread(G) if args.cmd=="recommend": recommendNBooks(G, mu, std, args.n) elif args.cmd=="read": readBooksAnalysis(G, args.min_rating, args.all_tags, args.only_connected) elif args.cmd=="analyze": analyze(G, args.type, args.name, args.d) elif args.cmd=="full": fullGraph(G) else: raise Exception("Bad") if args.remove_edge: removeEdge(G) if not args.no_list: printBestList(G) if not args.no_web: genAndShowHTML(G) if __name__ == "__main__": cliInterface()