CaliGraph/caliGraph.py

714 lines
22 KiB
Python
Executable File

#!./.venv/bin/python3.9
import os
import json
import math
import random
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
import networkx as nx
from pyvis.network import Network
def getAllAuthors(books):
authors = set()
for book in books:
for author in getAuthors(book):
authors.add(author)
return list(authors)
def getAuthors(book):
return book['authors'].split(' & ')
def getRecommenders(book):
for tag in book['tags']:
if tag.find(" Recommendation") != -1:
yield tag.replace(" Recommendation", "")
elif tag.find("s Literature Club") != -1:
yield tag.replace("s Literature Club", "")
def getTags(book):
for tag in book['tags']:
if tag.find(" Recommendation") == -1 and tag.find("s Literature Club") == -1 and tag.find(" Top ") == -1:
yield tag
def getAllRecommenders(books):
recs = set()
for book in books:
for rec in getRecommenders(book):
recs.add(rec)
return list(recs)
def getTopLists(book):
lists = set()
for tag in book['tags']:
if tag.find(" Top ") != -1:
lists.add(tag.split(" Top ")[0])
return list(lists)
def getAllTopLists(books):
tops = set()
for book in books:
for top in getTopLists(book):
tops.add(top)
return list(tops)
def getAllSeries(books):
series = set()
for book in books:
if 'series' in book:
series.add(book['series'])
return list(series)
def getAllTags(books):
tags = set()
for book in books:
for tag in getTags(book):
tags.add(tag)
return list(tags)
def getTopListWheight(book, topList):
minScope = 100000
for tag in book['tags']:
if tag.find(topList+" Top ") != -1:
scope = int(tag.split(" Top ")[1])
minScope = min(minScope, scope)
if minScope == 100000:
raise Exception("You stupid?")
if minScope == 10:
return 1
elif minScope == 25:
return 0.85
elif minScope == 100:
return 0.5
return 50 / minScope
def removeRead(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if node['rating'] != None:
G.remove_node(n)
def removeUnread(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if node['rating'] == None:
G.remove_node(n)
def removePriv(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if 'priv' in node['tags']:
G.remove_node(n)
def removeDangling(G, alsoBooks=False):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] != 'book' or alsoBooks:
if not len(G.adj[n]):
G.remove_node(n)
def removeEdge(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] != 'book':
if len(G.adj[n]) < 2:
G.remove_node(n)
def removeBad(G, threshold, groups=['book', 'topList', 'recommender', 'author', 'series', 'tag']):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] in groups:
if 'score' in node and (node['score'] == None or node['score'] < threshold):
G.remove_node(n)
def removeKeepBest(G, num, maxDistForRead=1):
bestlist = []
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if 'score' in node and node['score'] != None:
bestlist.append(node)
bestlist.sort(key=lambda node: node['score'], reverse=True)
bestlist = bestlist[:num]
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book' and node not in bestlist or 'score' in node and node['score'] == None:
if not 'rating' in node or node['rating'] == None or node['rating'] < bestlist[-1]['score']-maxDistForRead:
G.remove_node(n)
def removeTags(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'tag':
G.remove_node(n)
def pruneTags(G, minCons=2):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'tag':
foundCon = 0
for book in G.adj[n]:
for con in G.adj[book]:
conType = G.nodes[con]['t']
if conType not in ['topList']:
if conType in ['recommender']:
foundCon += 0.5
elif conType in ['tag', 'series']:
foundCon += 0.25
else:
foundCon += 1
if foundCon > minCons:
G.remove_node(n)
def pruneRecommenderCons(G, maxCons=5):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'recommender':
if len(G.adj[n]) > maxCons:
bestlist = []
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] == 'book':
if 'score' in book and book['score'] != None:
bestlist.append(book)
bestlist.sort(key=lambda node: node['score'], reverse=True)
bestlist = bestlist[:maxCons]
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] == 'book' and book not in bestlist or 'score' in book and book['score'] == None:
if not 'rating' in book or book['rating'] == None:
foundCon = 0
for con in G.adj[m]:
if G.nodes[con]['t'] not in ['topList']:
foundCon += 1
if foundCon < 2:
G.remove_node(m)
def pruneAuthorCons(G, maxCons=3):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'author':
if len(G.adj[n]) > maxCons:
bestlist = []
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] == 'book':
if 'score' in book and book['score'] != None:
bestlist.append(book)
bestlist.sort(key=lambda node: node['score'], reverse=True)
bestlist = bestlist[:maxCons]
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] == 'book' and book not in bestlist or 'score' in book and book['score'] == None:
if not 'rating' in book or book['rating'] == None:
foundCon = 0
for con in G.adj[m]:
if G.nodes[con]['t'] not in ['topList']:
foundCon += 1
if foundCon < 2:
G.remove_node(m)
def removeHighSpanTags(G, maxCons=5):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'tag':
if len(G.adj[n]) > maxCons:
G.remove_node(n)
def removeTopLists(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'topList':
G.remove_node(n)
def removeRestOfSeries(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'series':
seriesState = 0
for adj in G.adj[n]:
adjNode = G.nodes[adj]
if adjNode['rating'] != None:
seriesState = max(seriesState, int(
adjNode['series_index']))
for adj in list(G.adj[n]):
adjNode = G.nodes[adj]
if adjNode['series_index'] > seriesState + 1.0001:
G.remove_node(adj)
def removeUnusedRecommenders(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'recommender':
for adj in G.adj[n]:
adjNode = G.nodes[adj]
if adjNode['t']=='book' and 'score' in adjNode:
break
else: # No unrated recommendation
G.remove_node(n)
def removeUselessReadBooks(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
for adj in G.adj[n]:
foundUnread = True
adjNode = G.nodes[adj]
for cousin in G.adj[adj]:
cousinNode = G.nodes[cousin]
if cousinNode['t']=='book' and 'score' in cousinNode:
break
else: # No unrated book here
foundUnread = False
if foundUnread:
break
else: # No unrated book in cousins
G.remove_node(n)
def scoreOpinions(G, globMu, globStd, errorFac=-0.5):
for n in list(G.nodes):
node = G.nodes[n]
feedbacks = []
if node['t'] in ['topList', 'recommender', 'author', 'series', 'tag']:
adjacens = list(G.adj[n].keys())
for adj in adjacens:
adjNode = G.nodes[adj]
if adjNode['rating'] != None:
feedbacks.append(adjNode['rating'])
if len(feedbacks):
node['mean'], node['std'] = norm.fit(feedbacks)
node['se'] = globStd / math.sqrt(len(feedbacks))
ratio = len(feedbacks) / len(adjacens)
node['score'] = node['mean'] + errorFac * \
node['se']*(6/7 + (1-ratio)/7) + 0.01 * \
(node['t'] == 'recommender') \
- 0.5 / len(feedbacks)**2
node['feedbacks'] = feedbacks
else:
node['score'] = None
def scoreUnread(G, globMu, globStd, errorFac=-0.6):
for n in list(G.nodes):
feedbacks = [globMu]
wheights = [getWheightForType('mu')]
node = G.nodes[n]
if node['t'] == 'book':
if node['rating'] == None:
adjacens = list(G.adj[n].keys())
for adj in adjacens:
adjNode = G.nodes[adj]
if 'score' in adjNode and adjNode['score'] != None:
w = getWheightForType(adjNode['t'], G[n][adj]['wheight'] if 'wheight' in G[n][adj] else None)
for fb in adjNode['feedbacks']:
feedbacks.append(fb)
wheights.append(w)
if len(feedbacks):
node['meanUnweighted'], node['std'] = norm.fit(feedbacks)
node['mean'] = sum([fb*w for fb, w in zip(feedbacks, wheights)])/len(feedbacks)
node['se'] = globStd / math.sqrt(len(feedbacks))
node['score'] = node['mean'] + errorFac*node['se']
else:
node['score'] = globMu + errorFac*globStd + len(feedbacks)*0.0000000001
if 'series' in node:
if node['series_index'] == 1.0:
node['score'] += 0.000000001
# TODO: Make this neural and train it
def getWheightForType(nodeType, edgeWheight=None):
if nodeType == 'topList':
return edgeWheight*0.5
else:
return 1.0
def printBestList(G, num=-1):
bestlist = []
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if 'score' in node and node['score'] != None:
bestlist.append(node)
bestlist.sort(key=lambda node: node['score'], reverse=True)
for i, book in enumerate(bestlist):
print("["+str(i+1).zfill(int((math.log10(num) if num!=-1 else 3)+1))+"] "+book['title'] +
" ("+" & ".join(book['authors'])+"): {:.5f}".format(book['score']))
if num!=-1 and i == num-1:
break
def readColor(book):
if 'rating' in book:
return 'green'
else:
return 'gray'
def loadBooksFromDB():
return json.loads(os.popen("calibredb list --for-machine -f all").read())
def buildBookGraph(books):
G = nx.Graph()
# Books
for book in books:
if 'rating' in book:
rating = book['rating']
else:
rating = None
if 'comments' in book:
desc = '' # book['comments']
else:
desc = ''
if 'series' in book:
series = book['series']
series_index = book['series_index']
else:
series = None
series_index = None
G.add_node(book['id'], t='book', label=book['title'], title=book['title'], shape='image', image=book['cover'], rating=rating,
tags=book['tags'], desc=desc, isbn=book['isbn'], files=book['formats'], authors=getAuthors(book), series=series, series_index=series_index)
return G
def graphAddAuthors(G, books):
for author in getAllAuthors(books):
G.add_node('a/'+author, color='green', t='author', label=author)
for book in books:
for author in getAuthors(book):
G.add_edge('a/'+author, book['id'], color=readColor(book))
return G
def graphAddRecommenders(G, books):
for rec in getAllRecommenders(books):
G.add_node('r/'+rec, color='orange', t='recommender', label=rec)
for book in books:
for rec in getRecommenders(book):
G.add_edge('r/'+rec, book['id'], color=readColor(book))
return G
def graphAddTopLists(G, books):
for tl in getAllTopLists(books):
G.add_node('t/'+tl, color='yellow', t='topList', label=tl)
for book in books:
for top in getTopLists(book):
G.add_edge('t/'+top, book['id'], wheight=getTopListWheight(
book, top), color=readColor(book))
return G
def graphAddSeries(G, books):
for series in getAllSeries(books):
G.add_node('s/'+series, color='red', t='series', label=series, shape='triangle')
for book in books:
if 'series' in book:
G.add_edge('s/'+book['series'], book['id'], color=readColor(book))
return G
def graphAddTags(G, books):
for tag in getAllTags(books):
G.add_node('t/'+tag, color='lightGray', t='tag', label=tag, shape='box')
for book in books:
for tag in getTags(book):
G.add_edge('t/'+tag, book['id'], color=readColor(book))
return G
def calcRecDist(G, books):
globRatings = []
for book in books:
if G.nodes[book['id']]['rating'] != None:
globRatings.append(G.nodes[book['id']]['rating'])
return norm.fit(globRatings)
def scaleBooksByRating(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] not in []:
if 'rating' in node and node['rating'] != None:
node['value'] = 20 + 5 * int(node['rating'])
else:
if 'score' in node and node['score'] != None:
node['value'] = 20 + 5 * int(node['score'])
else:
node['value'] = 15
def scaleOpinionsByRating(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] in ['topList', 'recommender', 'author', 'series']:
if 'score' in node and node['score'] != None:
node['value'] = 20 + 5 * int(node['score'])
else:
node['value'] = 20
def addScoreToLabels(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] not in ['tag']:
if 'rating' in node and node['rating'] != None:
node['label'] += " ("+str(node['rating'])+")"
else:
if 'score' in node and node['score'] != None:
node['label'] += " (~{:.2f}".format(node['score'])+")"
else:
node['label'] += " (~0)"
def genAndShowHTML(G, showButtons=False):
net = Network('1080px', '1920px')
if showButtons:
net.show_buttons(filter_=['configure', 'layout',
'interaction', 'physics', 'edges'])
net.from_nx(G)
net.show('nx.html')
def buildFullGraph():
books = loadBooksFromDB()
G = buildBookGraph(books)
graphAddAuthors(G, books)
graphAddRecommenders(G, books)
graphAddTopLists(G, books)
graphAddSeries(G, books)
graphAddTags(G, books)
return G, books
def genScores(G, books):
globMu, globStd = calcRecDist(G, books)
scoreOpinions(G, globMu, globStd)
scoreUnread(G, globMu, globStd)
return globMu, globStd
def recommendNBooks(G, mu, std, n, removeTopListsB=True, removeUselessRecommenders=True):
removeRestOfSeries(G)
removeBad(G, mu-std-1.5)
removeKeepBest(G, int(n*2) + 5, maxDistForRead=1.5)
removeEdge(G)
removeHighSpanTags(G, 9)
removeDangling(G, alsoBooks=False)
pruneTags(G, 6)
removeBad(G, mu, groups=['book'])
removeUselessReadBooks(G)
pruneTags(G, 4.25)
pruneRecommenderCons(G, int(n/7)+1)
pruneAuthorCons(G, int(n/15))
if removeTopListsB:
removeTopLists(G)
removeDangling(G, alsoBooks=True)
removeKeepBest(G, n, maxDistForRead=0.75)
removeEdge(G)
removeDangling(G, alsoBooks=True)
if removeUselessRecommenders:
removeUnusedRecommenders(G)
scaleBooksByRating(G)
scaleOpinionsByRating(G)
addScoreToLabels(G)
def fullGraph(G, removeTopLists=True):
removeEdge(G)
removeHighSpanTags(G, 7)
removeDangling(G, alsoBooks=False)
if removeTopLists:
removeTopLists(G)
pruneTags(G, 3)
removeDangling(G, alsoBooks=True)
scaleBooksByRating(G)
scaleOpinionsByRating(G)
addScoreToLabels(G)
def readBooksAnalysis(G, minRating=0, showAllTags=True, removeUnconnected=False, removeTopListsB=True):
removeUnread(G)
removeBad(G, minRating)
if not showAllTags:
removeEdge(G)
removeHighSpanTags(G, 15)
removeDangling(G, alsoBooks=removeUnconnected)
if removeTopListsB:
removeTopLists(G)
pruneTags(G, 8)
scaleBooksByRating(G)
scaleOpinionsByRating(G)
addScoreToLabels(G)
def analyze(G, type_name, name, dist=2.7):
from fuzzywuzzy import fuzz
type_ident = type_name[0]
full_name = type_ident + "/" + name
bestRatio, match, n = 0, None, 0
for ni in list(G.nodes):
node = G.nodes[ni]
if node['t'] == type_name or type_name=="any":
if name==node['label'] or full_name==node['label']:
match, n = node, ni
break
ratio = fuzz.ratio(node['label'], name)
if ratio > bestRatio:
bestRatio, match, n = ratio, node, ni
if bestRatio < 70:
print("Best Match: "+match['label'])
menge = set()
waveFlow(G, match, n, dist, menge)
for n in list(G.nodes):
if n not in menge:
G.remove_node(n)
removeHighSpanTags(G, 12)
if dist > 1:
removeDangling(G, True)
scaleBooksByRating(G)
scaleOpinionsByRating(G)
#match['value'] = 100
if not 'shape' in match:
match['shape'] = 'star'
addScoreToLabels(G)
match['label'] = "*"+match['label']+"*"
def waveFlow(G, node, n, dist, menge, firstEdge=False):
if dist <= 0:
return
dist -= 1
if menge==set():
firstEdge=True
if node['t'] in ['topList']:
if firstEdge:
menge.add(n)
return
menge.add(n)
if node['t'] in ['tag']:
if firstEdge:
dist-=0.1
else:
return
bestlist = []
keeplist = []
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] not in ['NOTHING']:
if 'score' in book and book['score'] != None:
bestlist.append(book)
elif 'rating' in book and book['rating'] != None:
keeplist.append(book)
else:
book['score'] = 0
bestlist.append(book)
bestlist.sort(key=lambda node: node['score'], reverse=True)
toKeep = min(int(dist*10), math.ceil(len(bestlist) * dist - len(keeplist)*0.5))
if toKeep <= 0:
keeplist.sort(key=lambda node: node['rating'], reverse=True)
keeplist = keeplist[:min(int(dist*10), int(len(keeplist) * dist))]
bestlist = []
else:
bestlist = bestlist[:toKeep]
for m in list(G.adj[n]):
node = G.nodes[m]
if node in bestlist or node in keeplist:
waveFlow(G, node, m, dist, menge, firstEdge=firstEdge)
def cliInterface():
import argparse
parser = argparse.ArgumentParser(description='TODO: Write Description.')
parser.add_argument('--keep-priv', action="store_true")
parser.add_argument('--remove-read', action="store_true")
parser.add_argument('--remove-unread', action="store_true")
parser.add_argument('--no-web', action="store_true")
parser.add_argument('--no-list', action="store_true")
parser.add_argument('--remove-edge', action="store_true")
parser.add_argument('--keep-top-lists', action="store_true")
parser.add_argument('--keep-useless-recommenders', action="store_true")
cmds = parser.add_subparsers(required=True, dest='cmd')
p_rec = cmds.add_parser('recommend', description="TODO", aliases=['rec'])
p_rec.add_argument('-n', type=int, default=25, help='number of books to recommend')
p_read = cmds.add_parser('read', description="TODO", aliases=[])
p_read.add_argument('--min-rating', type=int, default=0)
p_read.add_argument('--all-tags', action="store_true")
p_read.add_argument('--only-connected', action="store_true")
p_show = cmds.add_parser('analyze', description="TODO", aliases=[])
p_show.add_argument('type', choices=['any', 'book', 'recommender', 'author', 'series'])
p_show.add_argument('name', type=str)
p_show.add_argument('-d', type=float, default=2.7, help='depth of expansion')
p_full = cmds.add_parser('full', description="TODO", aliases=[])
args = parser.parse_args()
G, books = buildFullGraph()
mu, std = genScores(G, books)
if not args.keep_priv:
removePriv(G)
if args.remove_read:
removeRead(G)
elif args.remove_unread:
removeUnread(G)
if args.cmd=="recommend":
recommendNBooks(G, mu, std, args.n, not args.keep_top_lists, not args.keep_useless_recommenders)
elif args.cmd=="read":
readBooksAnalysis(G, args.min_rating, args.all_tags, args.only_connected, not args.keep_top_lists)
elif args.cmd=="analyze":
analyze(G, args.type, args.name, args.d)
elif args.cmd=="full":
fullGraph(G, not args.keep_top_lists)
else:
raise Exception("Bad")
if args.remove_edge:
removeEdge(G)
if not args.no_list:
printBestList(G)
if not args.no_web:
genAndShowHTML(G)
if __name__ == "__main__":
cliInterface()