iotaFS/next.py

666 lines
23 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# TODO: Implement File COW (except for append) (+ version / token updates caused by this)
# TODO: ? Stop using tokens for dirs, use hashed name + parent token
# TODO: ? Stop using tokens for files, use hashed name + short ?version-number + parent token instead
# TODO: Store version-numbers for files in parent-dir
# TODO: Chane Milestone-Format from [stone,stone,...] to [[dirStne,dirStone,...],[fileStone,,...]] and dont save type for the stones
# TODO: Decide how / when / from which class to push new blocks
# TODO: Unload 'overwritten' blobs
# TODO: Close blobs when they become unknown to the kernel or when we unmount (genesis only on unmount)
# TODO: inode_id -> actuall Inode lookup table
# TODO: Add chareable folders:
# TODO: No more 'tokens' -> write-key for address-gen and read-key for encryption
# TODO: Change iota receive address to last adress (encrypted using read-key + hashed)
# -> relation of different transactions as part of the file/dir-structore stays hidden;
# but someone with the read-key of a directory/file is able to traverse the associated blockchain one by one
# TODO: Add share-functionality:
# 1. publish read-key of the directory
# 2. publish public adress of current milestone-block
# Problem: No updateable milestone-index... -> Create share-genesis?
# Problem: When sharing a single file, updates are not received... -> new file-blobs reverence old one via rec-address
# TODO: ? Switch to salsa20 stream-cipher -> no padding required -> 2187 as blocksize instead of 2176 -> .5% more
from iota import Iota, ProposedTransaction, Address, TryteString, Tag
from iota.crypto.addresses import AddressGenerator
from iota.crypto.types import Seed
from iota.codecs import TrytesDecodeError
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import math
from pprint import pprint
import hashlib
import sys
import time
import msgpack
import gzip
import secrets
import stat
import errno
import pyfuse3
import trio
from collections import defaultdict
from pyfuse3 import FUSEError
try:
import faulthandler
except ImportError:
pass
else:
faulthandler.enable()
# For RAW:
#CHUNKSIZE = 2187
# For AES:
CHUNKSIZE = 2176
SYNCWRITES = True
def log(txt):
print("[-] "+str(txt))
def sendEmOff(bundles, api):
for bundle in bundles:
print("[->]")
api.send_trytes(
trytes=bundle
)
class Atom():
def __init__(self, milestone: bool, cont, name: str = None) -> None:
self.milestone = milestone
self.delta = not milestone
self.name = name
self.cont = cont
def dump(self):
if self.milestone:
return msgpack.dumps([True, self.cont])
else:
return msgpack.dumps([False, self.name, self.cont])
class BlobChunk():
def __init__(self, data: bytes = b'', sealed: bool = False) -> None:
self.data = data
self.sealed = sealed
def getData(self) -> bytes:
return self.data
def append(self, data: bytes) -> None:
if len(data)+len(self.data) > CHUNKSIZE:
raise Exception("That to big!")
self.data += data
if len(self.data) == CHUNKSIZE:
self.seal()
def getBytesLeft(self) -> int:
if self.sealed:
return 0
return CHUNKSIZE - len(self.data)
def seal(self) -> None:
self.sealed = True
def isSealed(self) -> bool:
return self.sealed
class TangleBlob():
def __init__(self, writeKey: bytes, readKey: bytes, iotaApi: Iota) -> None:
self.writeKey = writeKey
self.readKey = readKey
self.iotaApi = iotaApi
self.preChunks = 0
self.chunks = []
m = hashlib.sha3_512()
m.update(self.writeKey)
trSeed = TryteString.from_bytes(m.digest())[:81]
self.adressGen = AddressGenerator(Seed(trSeed))
self.fetched = False
self.pushedNum = 0
def _requireFetched(self):
if not self.fetched:
self.fetch()
def _getReadKey(self, chunkNum: int) -> bytes:
m = hashlib.sha3_384()
m.update(self.readKey)
m.update(chunkNum.to_bytes(8, "little")) # 64 bits should be enought...
m.update(self.readKey)
return m.digest()
def _genBundle(self, data, addr) -> str:
txMsg = TryteString.from_bytes(data)
trans = ProposedTransaction(
address = addr,
value = 0,
tag = Tag("IOTAFS"),
message = txMsg
)
return self.iotaApi.prepare_transfer(
transfers = [trans],
inputs = [addr]
)['trytes']
def _dumpChunk(self, chunkNum: int) -> str:
readKey = self._getReadKey(chunkNum + self.preChunks)
data = self.chunks[chunkNum].getData()
cipher = AES.new(readKey[16:][:16], AES.MODE_CBC, readKey[:16])
ct_bytes = cipher.encrypt(pad(data, AES.block_size))
addr = self.adressGen.get_addresses(start=chunkNum + self.preChunks, count=1)[0]
return self._genBundle(ct_bytes, addr)
def dumpAllSealed(self):
bundles = []
for i in range(max(0,len(self.chunks)-self.pushedNum)):
c = i + self.pushedNum
chunk = self.chunks[c]
if chunk.isSealed():
bundles.append(self._dumpChunk(c))
self.pushedNum+=1
return bundles
def sealAndDump(self):
# When unmounting / closing / ...
if self.chunks:
self.chunks[-1].seal()
return self.dumpAllSealed()
def append(self, data: bytes, newBlock: bool = False) -> None:
self._requireFetched()
if newBlock:
self.chunks[-1].seal()
elif len(self.chunks):
bytesLeft = self.chunks[-1].getBytesLeft()
if bytesLeft:
leftChunk = data[:bytesLeft]
data = data[bytesLeft:]
self.chunks[-1].append(leftChunk)
while len(data):
chunk = data[:CHUNKSIZE]
self.chunks.append(BlobChunk(chunk))
data = data[CHUNKSIZE:]
if SYNCWRITES:
bundles = self.dumpAllSealed()
if bundles:
sendEmOff(bundles, self.iotaApi)
def getChunkLen(self) -> int:
return self.preChunks + len(self.chunks)
def getSize(self) -> int:
if len(self.chunks):
return self.getChunkLen()*CHUNKSIZE - self.chunks[-1].getBytesLeft()
return self.preChunks
def read(self) -> bytes:
self._requireFetched()
data = b''
for chunk in self.chunks:
data += chunk.getData()
return data
def _dump(self) -> str:
self.chunks[-1].seal()
data = ""
for c in range(len(self.chunks)-self.pushedNum):
num = c + self.pushedNum
data += self._dumpChunk(num) # num is without preChunks
self.pushedNum = self.getChunkLen()
return data
def fetch(self) -> None:
skipChunks = self.preChunks
chunkNum = self.getChunkLen() + skipChunks
while True:
readKey = self._getReadKey(chunkNum)
cipher = AES.new(readKey[16:][:16], AES.MODE_CBC, readKey[:16])
addr = self.adressGen.get_addresses(start=chunkNum, count=1)[0]
txHash = self.iotaApi.find_transactions(tags=[Tag("IOTAFS")], addresses=[addr])["hashes"]
if len(txHash)==0:
break
bundles = self.iotaApi.get_bundles(txHash[0])["bundles"]
for bundle in bundles:
for tx in bundle.transactions:
# TODO: Can we just strip the 9s and call it a day?
tryteStr = TryteString(str(tx.signature_message_fragment).rstrip("9"))
try:
ct_bytes = tryteStr.as_bytes()
except TrytesDecodeError:
ct_bytes = (tryteStr+"9").as_bytes()
self.chunks.append(BlobChunk(unpad(cipher.decrypt(ct_bytes), AES.block_size), True))
chunkNum += 1
self.pushedNum = len(self.chunks)
self.fetched = True
self._afterFetch()
def _afterFetch(self) -> None:
return
def sealLastChunk(self) -> None:
self.chunks[-1].seal()
def chunkLayout(self, width=50):
fac = width / CHUNKSIZE
lines = []
for c,chunk in enumerate(self.chunks):
bytesWritten = len(chunk.data)
bytesEmpty = CHUNKSIZE - bytesWritten
if chunk.isSealed():
lines.append("["+"#"*int(bytesWritten*fac)+"="*int(bytesEmpty*fac)+"] (SEALED)")
else:
lines.append("["+"#"*int(bytesWritten*fac)+" "*int(bytesEmpty*fac)+"] ("+str(bytesWritten)+"/"+str(CHUNKSIZE)+")")
if self.pushedNum < c+1:
lines[-1]+=" {+}"
return "\n".join(lines)
def _close(self):
bundles = self.sealAndDump()
if bundles:
sendEmOff(bundles, self.iotaApi)
class TangleFileTreeElement(TangleBlob):
def __init__(self, name: str, lastMilestoneIndex: int, parent, iotaApi: Iota) -> None:
if isinstance(parent, bytes):
self.writeKey = hashlib.sha3_384(parent + name.encode() + b'write' + parent).digest()
self.readKey = hashlib.sha3_384(parent + name.encode() + b'read' + parent).digest()
else:
self.writeKey = hashlib.sha3_384(parent.writeKey + name.encode()).digest()
self.readKey = hashlib.sha3_384(parent.readKey + name.encode()).digest()
super(TangleFileTreeElement, self).__init__(self.writeKey, self.readKey, iotaApi)
self.name = name
self.inodes = {}
self.parent = parent
self.milestoneIndex = lastMilestoneIndex
self.preChunks = self.milestoneIndex
def _afterFetch(self) -> None:
raw = self.read()
if raw==b'':
return
unpacker = msgpack.Unpacker(raw=True)
unpacker.feed(raw)
for i, elem in enumerate(reversed(list(unpacker))):
if elem[0]:
# Is a milestone
# TODO: Update our known milestoneIndex, if we find one
# might have to rewrite .fetch() and merge it here...
self.milestoneIndex = self.getChunkLen()
self._applyMilestone(elem[1])
break
else:
name = elem[1].decode()
if name in self.inodes: #name
atom = Atom(False, elem[2], name)
self.inodes[name].applyAtom(atom)
else:
# new inode
type = ["dir","file"][elem[2][b't']]
self.inodes[name] = Inode(name, self.iotaApi, self, type)
atom = Atom(False, elem[2], name)
self.inodes[name].applyAtom(atom)
def _getSkipChunks(self):
return
def _applyMilestone(self, milestone) -> None:
self.inodes = {}
for stone in milestone:
atom = Atom()
atom.load(stone)
self.inodes[atom.name] = atom
def _applyAtom(self, atom: Atom) -> None:
if atom.name in self.inodes:
self.inodes[atom.name].applyAtom(atom)
else:
cont = atom.cont
type = ["dir","file"][cont[b't']]
inode = Inode(atom.name, self.iotaApi, self, type)
self.inodes[atom.name] = inode
def _newAtom(self, atom: Atom) -> None:
self.append(atom.dump())
def getNameList(self):
self._requireFetched()
return list(self.inodes.keys())
def _tree(self):
self._requireFetched()
dirs = {}
files = []
for inode in self.inodes:
if self.inodes[inode].type=="file":
files.append(inode)
elif self.inodes[inode].type=="dir":
dirs[inode] = self.inodes[inode].getRef()._tree()
else:
files.append("["+str(self.inodes[inode].type)+"]/"+inode)
lines = ["{"+self.name+"}"]
for i,d in enumerate(dirs):
dir = dirs[d]
if len(files)==0 and i==len(dirs)-1:
lines.append(" └──"+dir[0]+"")
for l in range(len(dir)-1):
lines.append("   "+dir[l+1])
else:
lines.append(" ├──"+dir[0]+"")
for l in range(len(dir)-1):
lines.append(" │  "+dir[l+1])
if len(files):
for f in range(len(files)-1):
lines.append(" ├──"+files[f])
lines.append(" └──"+files[-1])
return lines
def tree(self):
return "\n".join(self._tree())
def getInode(self, name: str) -> Atom:
self._requireFetched()
return self.inodes[name]
def mkdir(self, name: str) -> bool:
self._requireFetched()
if name in self.getNameList():
return False
inode = Inode(name, self.iotaApi, self, "dir")
atom = inode.change(milestoneIndex=0)
self._newAtom(atom)
self.inodes[name] = inode
return True
def mkfile(self, name: str) -> bool:
self._requireFetched()
if name in self.getNameList():
return False
inode = Inode(name, self.iotaApi, self, "file")
atom = inode.change(size=0, milestoneIndex=0, hash=b'NULL')
self._newAtom(atom)
self.inodes[name] = inode
return True
def _updateFileSize(self, name: str, size: int) -> None:
self._requireFetched()
atom = self.inodes[name].change(size=size)
self._newAtom(atom)
def _updateFileHash(self, name: str, hash: bytes, size: int) -> None:
log("New Hash for file '"+name+"' registered")
self._requireFetched()
atom = self.inodes[name].change(size=size, hash=hash)
self._newAtom(atom)
def performMilestone(self) -> None:
if isinstance(self.parent, bytes):
raise Exception("You cant create a milestone of the genesis block, you idiot!")
stones = {}
for a in self.inodes:
stones[a] = self.inodes[a].toStone()
self.atomStack = 0
milestoneAtom = Atom(True, stones)
data = milestoneAtom.dump()
self.milestoneIndex = self.getChunkLen()
if self.parent!=None:
self.parent._updateChildMilestone(self.name, self.milestoneIndex)
self.append(data, True)
def _updateChildMilestone(self, name: str, milestoneIndex: int):
atom = self.inodes[name].change(milestoneIndex = milestoneIndex)
if isinstance(self.parent, bytes):
# We are the genesis-block
self.milestoneIndex = milestoneIndex
self.append(msgpack.dumps(milestoneIndex))
else:
self._newAtom(atom)
def close(self) -> None:
for i in self.inodes:
inode = self.inodes[i]
if inode.hasRef:
ref = inode.getRef()
ref.close()
self._close()
class TangleFile():
def __init__(self, name: str, parent: TangleFileTreeElement, iotaApi: Iota) -> None:
self.iotaApi = iotaApi
self.name = name
self.parent = parent
self.reflexiveInode = parent.inodes[name]
self.size = self.reflexiveInode.size
self.hash = self.reflexiveInode.hash
self.writeKey = hashlib.sha3_384(b'f' + parent.writeKey + self.hash).digest()
self.readKey = hashlib.sha3_384(b'f' + parent.readKey + self.hash).digest()
self.blob = TangleBlob(self.token, iotaApi)
def write(self, offset: int, data: bytes):
if offset == self.size:
self.blob.append(data)
self.size = self.blob.getSize()
self.parent._updateFileSize(self.name, self.size)
else:
oldData = self.blob.read()
newData = oldData[:offset] + data + oldData[offset+len(data):]
self.hash = hashlib.sha256(newData).digest()
self.writeKey = hashlib.sha3_384(b'f' + self.parent.writeKey + self.hash).digest()
self.readKey = hashlib.sha3_384(b'f' + self.parent.readKey + self.hash).digest()
# Maybe this way the gc will remove it quicker?
del self.blob
self.blob = TangleBlob(self.writeKey, self.readKey, self.iotaApi)
self.blob.append(newData)
self.size = self.blob.getSize()
self.parent._updateFileHash(self.name, self.hash, self.size)
def close(self):
self.blob._close()
class Inode():
def __init__(self, name: str, iotaApi: Iota, parent: TangleFileTreeElement = None, type: str = None) -> None:
self.parent = parent
self.name = name
self.type = type
self.ref = None
self.iotaApi = iotaApi
def setType(self, type: str) -> None:
self.type = type
def change(self, size: int=None, hash: int=None, milestoneIndex: int=None) -> Atom:
delta = {}
delta[b't'] = (self.type=="file")
if size!=None:
self.size = size
delta[b's'] = size
if hash!=None:
self.hash = hash
delta[b'h'] = hash
if milestoneIndex!=None:
self.milestoneIndex = milestoneIndex
delta[b'm'] = milestoneIndex
return Atom(False, delta, self.name)
def applyAtom(self, atom: Atom) -> None:
if atom.name != self.name:
raise Exception("Cannot apply atom ment for a different inode (names differ)")
if atom.milestone:
stones = atom.cont
if self.name in stones:
self.applyAtom(Atom(False, stones[self.name], self.name))
else:
if (self.type=="file") != atom.cont[b't']:
raise Exception("I am a "+self.type+"; this atom is for the other thing")
delta = atom.cont
if b's' in delta:
self.size = delta[b's']
if b'h' in delta:
self.hash = delta[b'h']
if b'm' in delta:
self.milestoneIndex = delta[b'm']
def toStone(self) -> None:
if self.type=="file":
return [self.size, self.hash]
else:
return [self.milestoneIndex]
def getRef(self):
if self.name == "*":
return None
if not self.ref:
if self.type=="dir":
self.ref = TangleFileTreeElement(self.name, self.milestoneIndex, self.parent, self.iotaApi)
elif self.type=="file":
self.ref = TangleFile(self.name, self.parent, self.iotaApi)
else:
raise Exception("Cannot get reference of an inode of type "+self.type)
return self.ref
def hasRef(self):
return not self.ref==None
class IotaFS(pyfuse3.Operations):
#supports_dot_lookup = True
enable_writeback_cache = True
def __init__(self, token) -> None:
self.api = Iota('https://nodes.thetangle.org:443', local_pow=True)
# TODO Cache last known milestone-Index of genesis locally
self.genesis = TangleFileTreeElement("*", 0, token, None, self.api)
log("Fetching Genesis...")
self.genesis.fetch()
log("Retrieving reference to root")
if self.genesis.getNameList()!=["/"]:
if len(self.genesis.getNameList()):
# theres another directory in our genesis chain... WTF?!
raise Exception("Corrupted Genesis-Chain:"
+ "Unknown records for no root-directory in Genesis Chain: "+str(self.genesis.getNameList()))
else:
# we dont have a root yet, lets create one...
log("Unable to find reference to root: Creating new root")
self.genesis.mkdir("/")
log("Successfully Mounted!")
self.inodeIds = {}
self.fhs = {}
async def access(self, inodeId, mode, ctx):
# not called
raise Exception("This function should not been called; WTF")
async def create(self, parent_inodeId, name, mode, flags, ctx):
#return (fi, attr)
# $increase lookupN
pass
async def flush(self, fh):
# flush the file at fh
# basically means: close, but may be called multiple times,
# when open multiple times with same fh
pass
async def forget(self, inode_list):
# inodelist = [(fh, nlookup),...]
# decrement lookupN of file at fh
# if lookupN == 0:
# 'remove' Inode
# should be called at unmount to bring lookupN to 0 for all files
pass
async def getattr(self, inodeId, ctx):
# return EntryAttributes()
pass
#async def link(self, inodeId, new_parent_inode, new_name, ctx):
async def lookup(self, parent_inodeId, name, ctx):
#return EntryAttributes()
# not exists: raise FUSEError(errno.ENOENT)
# must handle .. and .
# $increase lookupN
pass
async def mkdir(self, parent_inodeId, name, mode, ctx):
#return EntryAttributes()
# $increase lookupN
pass
async def mknod(self, parent_inodeId, name, mode, rdev, ctx):
# create file
#return EntryAttributes()
# $increase lookupN
pass
async def open(self, inodeId, flags, ctx):
# open file at inodeId; give back fh
#return FileInode(..fh)
pass
async def opendir(self, inodeId, ctx):
#return fh
pass
async def read(self, fh, off, size):
# Read size bytes from fh at position off
pass
async def readdir(self, fh, start_id, token):
# fuck this shit
# http://www.rath.org/pyfuse3-docs/operations.html#pyfuse3.Operations.readdir
pass
async def release(self, fh):
# file no longer open -> close? uncache?
pass
async def releasedir(self, fh):
# dir no longer open -> uncache?
pass
#api = Iota('https://nodes.thetangle.org:443', local_pow=True)
#token = b'testToken'
#genesis = TangleFileTreeElement("*", 0, token, api)
if False:
genesis.mkdir("/")
root = genesis.inodes["/"].getRef()
root.mkdir("dir1")
root.mkdir("dir2")
root.mkdir("dir3")
root.mkdir("dir4")
root.mkfile("file.txt")
d2 = root.inodes["dir2"].getRef()
d2.mkfile("a.txt")
d3 = root.inodes["dir3"].getRef()
d3.mkfile("b.txt")
d3.mkfile("c.txt")
d3.mkfile("d.txt")
d3.mkdir("subDir")
d4 = root.inodes["dir4"].getRef()
d4.mkdir("sub")
sub = d4.inodes["sub"].getRef()
sub.mkdir("subsub")
print(genesis.tree())
def main():
iotaFs = IotaFS(b'This is a test token')
opts = set(pyfuse3.default_options)
opts.add('fsname=IotaFS')
#opts.add('debug')
pyfuse3.init(iotaFs, "mount", opts)
try:
trio.run(pyfuse3.main)
except:
pyfuse3.close(unmount=True)
raise
pyfuse3.close()