CaliGraph/main.py

649 lines
20 KiB
Python

import os
import json
import math
import random
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
import networkx as nx
from pyvis.network import Network
def getAllAuthors(books):
authors = set()
for book in books:
for author in getAuthors(book):
authors.add(author)
return list(authors)
def getAuthors(book):
return book['authors'].split(' & ')
def getRecommenders(book):
for tag in book['tags']:
if tag.find(" Recommendation") != -1:
yield tag.replace(" Recommendation", "")
def getTags(book):
for tag in book['tags']:
if tag.find(" Recommendation") == -1 and tag.find(" Top ") == -1:
yield tag
def getAllRecommenders(books):
recs = set()
for book in books:
for rec in getRecommenders(book):
recs.add(rec)
return list(recs)
def getTopLists(book):
lists = set()
for tag in book['tags']:
if tag.find(" Top ") != -1:
lists.add(tag.split(" Top ")[0])
return list(lists)
def getAllTopLists(books):
tops = set()
for book in books:
for top in getTopLists(book):
tops.add(top)
return list(tops)
def getAllSeries(books):
series = set()
for book in books:
if 'series' in book:
series.add(book['series'])
return list(series)
def getAllTags(books):
tags = set()
for book in books:
for tag in getTags(book):
tags.add(tag)
return list(tags)
def getTopListWheight(book, topList):
minScope = 100000
for tag in book['tags']:
if tag.find(topList+" Top ") != -1:
scope = int(tag.split(" Top ")[1])
minScope = min(minScope, scope)
if minScope == 100000:
raise Exception("You stupid?")
return 100/minScope
def removeRead(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if node['rating'] != None:
G.remove_node(n)
def removeUnread(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if node['rating'] == None:
G.remove_node(n)
def removePriv(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if 'priv' in node['tags']:
G.remove_node(n)
def removeDangling(G, alsoBooks=False):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] != 'book' or alsoBooks:
if not len(G.adj[n]):
G.remove_node(n)
def removeEdge(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] != 'book':
if len(G.adj[n]) < 2:
G.remove_node(n)
def removeBad(G, threshold, groups=['book', 'topList', 'recommender', 'author', 'series', 'tag']):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] in groups:
if 'score' in node and (node['score'] == None or node['score'] < threshold):
G.remove_node(n)
def removeKeepBest(G, num, maxDistForRead=1):
bestlist = []
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if 'score' in node and node['score'] != None:
bestlist.append(node)
bestlist.sort(key=lambda node: node['score'], reverse=True)
bestlist = bestlist[:num]
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book' and node not in bestlist or 'score' in node and node['score'] == None:
if not 'rating' in node or node['rating'] == None or node['rating'] < bestlist[-1]['score']-maxDistForRead:
G.remove_node(n)
def removeTags(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'tag':
G.remove_node(n)
def pruneTags(G, minCons=2):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'tag':
foundCon = 0
for book in G.adj[n]:
for con in G.adj[book]:
if G.nodes[con]['t'] not in ['tag', 'topList', 'series']:
foundCon += 1
if foundCon > minCons:
G.remove_node(n)
def pruneRecommenderCons(G, maxCons=5):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'recommender':
if len(G.adj[n]) > maxCons:
bestlist = []
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] == 'book':
if 'score' in book and book['score'] != None:
bestlist.append(book)
bestlist.sort(key=lambda node: node['score'], reverse=True)
bestlist = bestlist[:maxCons]
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] == 'book' and book not in bestlist or 'score' in book and book['score'] == None:
if not 'rating' in book or book['rating'] == None:
foundCon = 0
for con in G.adj[m]:
if G.nodes[con]['t'] not in ['topList']:
foundCon += 1
if foundCon < 2:
G.remove_node(m)
def pruneAuthorCons(G, maxCons=3):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'author':
if len(G.adj[n]) > maxCons:
bestlist = []
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] == 'book':
if 'score' in book and book['score'] != None:
bestlist.append(book)
bestlist.sort(key=lambda node: node['score'], reverse=True)
bestlist = bestlist[:maxCons]
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] == 'book' and book not in bestlist or 'score' in book and book['score'] == None:
if not 'rating' in book or book['rating'] == None:
foundCon = 0
for con in G.adj[m]:
if G.nodes[con]['t'] not in ['topList']:
foundCon += 1
if foundCon < 2:
G.remove_node(m)
def removeHighSpanTags(G, maxCons=5):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'tag':
if len(G.adj[n]) > maxCons:
G.remove_node(n)
def removeTopLists(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'topList':
G.remove_node(n)
def removeRestOfSeries(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'series':
seriesState = 0
for adj in G.adj[n]:
adjNode = G.nodes[adj]
if adjNode['rating'] != None:
seriesState = max(seriesState, int(
adjNode['series_index']))
for adj in list(G.adj[n]):
adjNode = G.nodes[adj]
if adjNode['series_index'] > seriesState + 1.0001:
G.remove_node(adj)
def scoreOpinions(G, globMu, globStd, errorFac=0.7):
for n in list(G.nodes):
node = G.nodes[n]
feedbacks = []
if node['t'] in ['topList', 'recommender', 'author', 'series', 'tag']:
adjacens = list(G.adj[n].keys())
for adj in adjacens:
adjNode = G.nodes[adj]
if adjNode['rating'] != None:
feedbacks.append(adjNode['rating'])
if len(feedbacks):
node['mean'], node['std'] = norm.fit(feedbacks)
node['se'] = globStd / math.sqrt(len(feedbacks))
ratio = len(feedbacks) / len(adjacens)
node['score'] = node['mean'] - errorFac * \
node['se']*(6/7 + (1-ratio)/7) + 0.001 * \
(node['t'] == 'recommender')
node['feedbacks'] = feedbacks
else:
node['score'] = None
def scoreUnread(G, globMu, globStd, errorFac=0.6):
for n in list(G.nodes):
feedbacks = []
deepFeedbacks = []
tagFeedbacks = []
node = G.nodes[n]
if node['t'] == 'book':
if node['rating'] == None:
adjacens = list(G.adj[n].keys())
for adj in adjacens:
adjNode = G.nodes[adj]
if 'score' in adjNode and adjNode['score'] != None:
if adjNode['t'] != 'tag':
feedbacks.append(adjNode['score'])
for fb in adjNode['feedbacks']:
deepFeedbacks.append(fb)
else:
tagFeedbacks.append(adjNode['score'])
if len(feedbacks):
node['mean'], node['std'] = norm.fit(deepFeedbacks)
node['mean2'], node['std2'] = norm.fit(feedbacks)
f_mean, f_std = norm.fit(feedbacks)
node['se'] = globStd / math.sqrt(len(deepFeedbacks))
# - errorFac*node['se']
node['score'] = (
(node['mean'] - errorFac*node['se'])*4 + node['mean2']*2 + (f_mean)*1)/7
if 'series' in node:
if node['series_index'] == 1.0:
node['score'] += 0.000000001
else:
node['score'] = None
def printBestList(G, num=-1):
bestlist = []
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] == 'book':
if 'score' in node and node['score'] != None:
bestlist.append(node)
bestlist.sort(key=lambda node: node['score'], reverse=True)
for i, book in enumerate(bestlist):
print("["+str(i+1).zfill(int((math.log10(num) if num!=-1 else 3)+1))+"] "+book['title'] +
" ("+" & ".join(book['authors'])+"): {:.5f}".format(book['score']))
if num!=-1 and i == num-1:
break
def readColor(book):
if 'rating' in book:
return 'green'
else:
return 'gray'
def loadBooksFromDB():
return json.loads(os.popen("calibredb list --for-machine -f all").read())
def buildBookGraph(books):
G = nx.Graph()
# Books
for book in books:
if 'rating' in book:
rating = book['rating']
else:
rating = None
if 'comments' in book:
desc = '' # book['comments']
else:
desc = ''
if 'series' in book:
series = book['series']
series_index = book['series_index']
else:
series = None
series_index = None
G.add_node(book['id'], t='book', label=book['title'], title=book['title'], shape='image', image=book['cover'], rating=rating,
tags=book['tags'], desc=desc, isbn=book['isbn'], files=book['formats'], authors=getAuthors(book), series=series, series_index=series_index)
return G
def graphAddAuthors(G, books):
for author in getAllAuthors(books):
G.add_node('a/'+author, color='green', t='author', label=author)
for book in books:
for author in getAuthors(book):
G.add_edge('a/'+author, book['id'], color=readColor(book))
return G
def graphAddRecommenders(G, books):
for rec in getAllRecommenders(books):
G.add_node('r/'+rec, color='orange', t='recommender', label=rec)
for book in books:
for rec in getRecommenders(book):
G.add_edge('r/'+rec, book['id'], color=readColor(book))
return G
def graphAddTopLists(G, books):
for tl in getAllTopLists(books):
G.add_node('t/'+tl, color='yellow', t='topList', label=tl)
for book in books:
for top in getTopLists(book):
G.add_edge('t/'+top, book['id'], wheight=getTopListWheight(
book, top), color=readColor(book))
return G
def graphAddSeries(G, books):
for series in getAllSeries(books):
G.add_node('s/'+series, color='red', t='series', label=series)
for book in books:
if 'series' in book:
G.add_edge('s/'+book['series'], book['id'], color=readColor(book))
return G
def graphAddTags(G, books):
for tag in getAllTags(books):
G.add_node('t/'+tag, color='gray', t='tag', label=tag)
for book in books:
for tag in getTags(book):
G.add_edge('t/'+tag, book['id'], color=readColor(book))
return G
def calcRecDist(G, books):
globRatings = []
for book in books:
if G.nodes[book['id']]['rating'] != None:
globRatings.append(G.nodes[book['id']]['rating'])
return norm.fit(globRatings)
def scaleBooksByRating(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] not in []:
if 'rating' in node and node['rating'] != None:
node['value'] = 20 + 5 * int(node['rating'])
else:
if 'score' in node and node['score'] != None:
node['value'] = 20 + 5 * int(node['score'])
else:
node['value'] = 15
def scaleOpinionsByRating(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] in ['topList', 'recommender', 'author', 'series']:
if 'score' in node and node['score'] != None:
node['value'] = 20 + 5 * int(node['score'])
else:
node['value'] = 20
def addScoreToLabels(G):
for n in list(G.nodes):
node = G.nodes[n]
if node['t'] not in ['tag']:
if 'rating' in node and node['rating'] != None:
node['label'] += " ("+str(node['rating'])+")"
else:
if 'score' in node and node['score'] != None:
node['label'] += " (~{:.2f}".format(node['score'])+")"
else:
node['label'] += " (~0)"
def genAndShowHTML(G, showButtons=False):
net = Network('1080px', '1920px')
if showButtons:
net.show_buttons(filter_=['configure', 'layout',
'interaction', 'physics', 'edges'])
net.from_nx(G)
net.show('nx.html')
def buildFullGraph():
books = loadBooksFromDB()
G = buildBookGraph(books)
graphAddAuthors(G, books)
graphAddRecommenders(G, books)
graphAddTopLists(G, books)
graphAddSeries(G, books)
graphAddTags(G, books)
return G, books
def genScores(G, books):
globMu, globStd = calcRecDist(G, books)
scoreOpinions(G, globMu, globStd)
scoreUnread(G, globMu, globStd)
return globMu, globStd
def recommendNBooks(G, mu, std, n):
removeRestOfSeries(G)
removeBad(G, mu-std-1.5)
removeKeepBest(G, int(n*2) + 5, maxDistForRead=1.5)
removeEdge(G)
removeHighSpanTags(G, 9)
removeDangling(G, alsoBooks=False)
pruneTags(G, 4)
removeBad(G, mu, groups=['book'])
pruneTags(G, 3)
pruneRecommenderCons(G, int(n/7)+1)
pruneAuthorCons(G, int(n/15))
removeTopLists(G)
removeDangling(G, alsoBooks=True)
removeKeepBest(G, n, maxDistForRead=0.75)
removeEdge(G)
removeDangling(G, alsoBooks=True)
scaleBooksByRating(G)
scaleOpinionsByRating(G)
addScoreToLabels(G)
def fullGraph(G):
removeEdge(G)
removeHighSpanTags(G, 7)
removeDangling(G, alsoBooks=False)
removeTopLists(G)
pruneTags(G, 3)
removeDangling(G, alsoBooks=True)
scaleBooksByRating(G)
scaleOpinionsByRating(G)
addScoreToLabels(G)
def readBooksAnalysis(G, minRating=0, showAllTags=True, removeUnconnected=False):
removeUnread(G)
removeBad(G, minRating)
if not showAllTags:
removeEdge(G)
removeHighSpanTags(G, 15)
removeDangling(G, alsoBooks=removeUnconnected)
removeTopLists(G)
pruneTags(G, 8)
scaleBooksByRating(G)
scaleOpinionsByRating(G)
addScoreToLabels(G)
def analyze(G, type_name, name, dist=2.7):
from fuzzywuzzy import fuzz
type_ident = type_name[0]
full_name = type_ident + "/" + name
bestRatio, match, n = 0, None, 0
for ni in list(G.nodes):
node = G.nodes[ni]
if node['t'] == type_name or type_name=="any":
if name==node['label'] or full_name==node['label']:
match, n = node, ni
break
ratio = fuzz.ratio(node['label'], name)
if ratio > bestRatio:
bestRatio, match, n = ratio, node, ni
menge = set()
pruneDist(G, match, n, dist, menge)
for n in list(G.nodes):
if n not in menge:
G.remove_node(n)
removeHighSpanTags(G, 12)
if dist > 1:
removeDangling(G, True)
scaleBooksByRating(G)
scaleOpinionsByRating(G)
match['value'] += 100
addScoreToLabels(G)
def pruneDist(G, node, n, dist, menge, firstEdge=False):
if dist <= 0:
return
dist -= 1
if menge==set():
firstEdge=True
menge.add(n)
if node['t'] in ['tag']:
if firstEdge:
dist-=0.1
else:
return
bestlist = []
keeplist = []
for m in list(G.adj[n]):
book = G.nodes[m]
if book['t'] not in ['topList']:
if 'score' in book and book['score'] != None:
bestlist.append(book)
elif 'rating' in book and book['rating'] != None:
keeplist.append(book)
else:
book['score'] = 0
bestlist.append(book)
bestlist.sort(key=lambda node: node['score'], reverse=True)
toKeep = min(int(dist*10), math.ceil(len(bestlist) * dist - len(keeplist)*0.5))
if toKeep <= 0:
keeplist.sort(key=lambda node: node['rating'], reverse=True)
keeplist = keeplist[:min(int(dist*10), int(len(keeplist) * dist))]
bestlist = []
else:
bestlist = bestlist[:toKeep]
for m in list(G.adj[n]):
node = G.nodes[m]
if node in bestlist or node in keeplist:
pruneDist(G, node, m, dist, menge, firstEdge=firstEdge)
def cliInterface():
import argparse
parser = argparse.ArgumentParser(description='TODO: Write Description.')
parser.add_argument('--keep-priv', action="store_true")
parser.add_argument('--remove-read', action="store_true")
parser.add_argument('--remove-unread', action="store_true")
parser.add_argument('--no-web', action="store_true")
parser.add_argument('--no-list', action="store_true")
cmds = parser.add_subparsers(required=True, dest='cmd')
p_rec = cmds.add_parser('recommend', description="TODO", aliases=['rec'])
p_rec.add_argument('num', type=int, default=25, help='number of books to recommend')
p_read = cmds.add_parser('read', description="TODO", aliases=[])
p_read.add_argument('--min-rating', type=int, default=0)
p_read.add_argument('--all-tags', action="store_true")
p_read.add_argument('--only-connected', action="store_true")
p_show = cmds.add_parser('analyze', description="TODO", aliases=[])
p_show.add_argument('type', choices=['any', 'book', 'recommender', 'author', 'series'])
p_show.add_argument('name', type=str)
p_show.add_argument('depth', type=float, default=2.7, help='depth of expansion')
p_full = cmds.add_parser('full', description="TODO", aliases=[])
args = parser.parse_args()
G, books = buildFullGraph()
mu, std = genScores(G, books)
if not args.keep_priv:
removePriv(G)
if args.remove_read:
removeRead(G)
elif args.remove_unread:
removeUnread(G)
if args.cmd=="recommend":
recommendNBooks(G, mu, std, args.num)
elif args.cmd=="read":
readBooksAnalysis(G, args.min_rating, args.all_tags, args.only_connected)
elif args.cmd=="analyze":
analyze(G, args.type, args.name, args.depth)
elif args.cmd=="full":
fullGraph(G)
else:
raise Exception("Bad")
if not args.no_list:
printBestList(G)
if not args.no_web:
genAndShowHTML(G)
if __name__ == "__main__":
cliInterface()