Last active
November 15, 2023 07:20
-
-
Save anacrolix/d0adb869016959fdc875800c3a9bbdb1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package s3storage | |
import ( | |
"bytes" | |
"crypto/sha1" | |
"encoding/hex" | |
"errors" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"path/filepath" | |
"strings" | |
"sync" | |
"github.com/anacrolix/torrent" | |
dataPkg "github.com/anacrolix/torrent/data" | |
"github.com/anacrolix/torrent/metainfo" | |
"github.com/mitchellh/goamz/s3" | |
) | |
const ( | |
filePerm = 0640 | |
dirPerm = 0750 | |
completedPieceS3Path = "completed-pieces/" | |
incompletePieceDirPath = "data/incomplete" | |
) | |
func New(bucket *s3.Bucket) (ret *pieceStorage, err error) { | |
ret = &pieceStorage{ | |
bucket: bucket, | |
} | |
ret.mu.Lock() | |
go func() { | |
defer ret.mu.Unlock() | |
if err := ret.updateComplete(); err != nil { | |
log.Printf("error updating completed pieces: %s", err) | |
} | |
}() | |
return | |
} | |
func (me *pieceStorage) updateComplete() error { | |
marker := "" | |
me.pieces = make(map[string]bool, len(me.pieces)) | |
for { | |
lr, err := me.bucket.List(completedPieceS3Path, "/", marker, 1000) | |
if err != nil { | |
return err | |
} | |
for _, key := range lr.Contents { | |
hash := strings.TrimPrefix(key.Key, completedPieceS3Path) | |
if len(hash) != 40 { | |
continue | |
} | |
me.pieces[hash] = true | |
} | |
if !lr.IsTruncated { | |
break | |
} | |
marker = lr.NextMarker | |
} | |
log.Printf("updated complete pieces: %d of them", len(me.pieces)) | |
return nil | |
} | |
type pieceStorage struct { | |
mu sync.Mutex | |
bucket *s3.Bucket | |
pieces map[string]bool | |
} | |
func (me *pieceStorage) piecePath(hash []byte) string { | |
return "completed-pieces/" + hex.EncodeToString(hash) | |
} | |
func (me *pieceStorage) havePiece(hash []byte) bool { | |
_, err := me.bucket.GetKey(me.piecePath(hash)) | |
return err == nil | |
} | |
func (me *pieceStorage) OpenTorrent(info *metainfo.Info) dataPkg.Data { | |
return &torrentStorage{info, me} | |
} | |
type torrentStorage struct { | |
info *metainfo.Info | |
pieces *pieceStorage | |
} | |
var _ torrent.SectionOpener = &torrentStorage{} | |
type section struct { | |
curPiece io.ReadCloser | |
sectionRemaining int64 | |
torrentStorage *torrentStorage | |
curPieceIndex int | |
pieceRemaining int | |
} | |
func (me *section) Close() error { | |
if me.curPiece != nil { | |
return me.curPiece.Close() | |
} | |
return nil | |
} | |
func (me *section) Read(b []byte) (n int, err error) { | |
if me.sectionRemaining == 0 { | |
err = io.EOF | |
return | |
} | |
if me.pieceRemaining == 0 { | |
me.curPieceIndex++ | |
if me.curPiece != nil { | |
me.curPiece.Close() | |
} | |
me.curPiece, err = me.torrentStorage.pieces.pieceReader(me.torrentStorage.info.Piece(me.curPieceIndex), 0) | |
if err != nil { | |
return | |
} | |
me.pieceRemaining = int(me.torrentStorage.info.Piece(me.curPieceIndex).Length()) | |
} | |
if len(b) > me.pieceRemaining { | |
b = b[:me.pieceRemaining] | |
} | |
if int64(len(b)) > me.sectionRemaining { | |
b = b[:me.sectionRemaining] | |
} | |
n, err = me.curPiece.Read(b) | |
me.sectionRemaining -= int64(n) | |
me.pieceRemaining -= n | |
if err == io.EOF { | |
if me.pieceRemaining != 0 { | |
err = io.ErrUnexpectedEOF | |
} else if me.sectionRemaining != 0 { | |
err = nil | |
} | |
} | |
return | |
} | |
func (me *torrentStorage) OpenSection(off, n int64) (rc io.ReadCloser, err error) { | |
pieceIndex := int(off / me.info.PieceLength) | |
pieceOff := off % me.info.PieceLength | |
pieceInfo := me.info.Piece(pieceIndex) | |
pieceReader, err := me.pieces.pieceReader(pieceInfo, pieceOff) | |
if err != nil { | |
return | |
} | |
rc = §ion{pieceReader, n, me, pieceIndex, int(pieceInfo.Length() - pieceOff)} | |
return | |
} | |
func (me *torrentStorage) ReadAt(p []byte, off int64) (n int, err error) { | |
piece := me.info.Piece(int(off / me.info.PieceLength)) | |
off %= me.info.PieceLength | |
maxLen := piece.Length() - off | |
if int64(len(p)) > maxLen { | |
p = p[:maxLen] | |
} | |
r, err := me.pieces.pieceReader(piece, off) | |
if err != nil { | |
return | |
} | |
defer r.Close() | |
n, err = r.Read(p) | |
// Individual piece io.Reader's can eagerly return io.EOF, but it isn't | |
// the end of the torrent yet. The next Read will be to another piece. | |
if n != 0 && err == io.EOF { | |
err = nil | |
} | |
return | |
} | |
func (me *pieceStorage) incompletePiecePath(piece metainfo.Piece) string { | |
return filepath.Join(incompletePieceDirPath, hex.EncodeToString(piece.Hash())) | |
} | |
func (me *pieceStorage) pieceWriter(piece metainfo.Piece, off int64) (ret io.WriteCloser, err error) { | |
if me.pieces[hex.EncodeToString(piece.Hash())] { | |
err = errors.New("can't write to completed piece") | |
return | |
} | |
os.MkdirAll(incompletePieceDirPath, dirPerm) | |
var f *os.File | |
f, err = os.OpenFile(me.incompletePiecePath(piece), os.O_WRONLY|os.O_CREATE, filePerm) | |
if err != nil { | |
return | |
} | |
_, err = f.Seek(off, os.SEEK_SET) | |
if err != nil { | |
f.Close() | |
return | |
} | |
ret = f | |
return | |
} | |
func (me *torrentStorage) WriteAt(p []byte, off int64) (n int, err error) { | |
piece := me.info.Piece(int(off / me.info.PieceLength)) | |
off %= me.info.PieceLength | |
for len(p) != 0 { | |
var w io.WriteCloser | |
w, err = me.pieces.pieceWriter(piece, off) | |
if err != nil { | |
return | |
} | |
p1 := p | |
maxN := piece.Length() - off | |
if int64(len(p1)) > maxN { | |
p1 = p1[:maxN] | |
} | |
var n1 int | |
n1, err = w.Write(p1) | |
w.Close() | |
n += n1 | |
if err != nil { | |
return | |
} | |
p = p[n1:] | |
off = 0 | |
} | |
return | |
} | |
func (me *pieceStorage) incompletePieceReader(piece metainfo.Piece, off int64) (ret io.ReadCloser, err error) { | |
var f *os.File | |
f, err = os.Open(me.incompletePiecePath(piece)) | |
if os.IsNotExist(err) { | |
err = io.EOF | |
return | |
} | |
if err != nil { | |
return | |
} | |
_, err = f.Seek(off, os.SEEK_SET) | |
if err != nil { | |
f.Close() | |
return | |
} | |
ret = f | |
return | |
} | |
func (me *pieceStorage) pieceReader(piece metainfo.Piece, off int64) (ret io.ReadCloser, err error) { | |
if me.pieceComplete(piece) { | |
path := me.piecePath(piece.Hash()) | |
range_ := fmt.Sprintf("bytes=%d-%d", off, piece.Length()) | |
log.Printf("opening s3://webtorrent/%s (%s)", path, range_) | |
ret, err = me.bucket.GetRangeReader(path, range_) | |
// ret, err = me.bucket.GetReader(me.piecePath(piece.Hash())) | |
// if err != nil { | |
// return | |
// } | |
// _, err = io.CopyN(ioutil.Discard, ret, off) | |
// if err != nil { | |
// ret.Close() | |
// return | |
// } | |
return | |
} | |
return me.incompletePieceReader(piece, off) | |
} | |
func (me *torrentStorage) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) { | |
i := int(off / me.info.PieceLength) | |
off %= me.info.PieceLength | |
for n != 0 { | |
var pr io.ReadCloser | |
pr, err = me.pieces.pieceReader(me.info.Piece(i), off) | |
if err != nil { | |
if err == io.EOF { | |
err = nil | |
} | |
return | |
} | |
var n1 int64 | |
n1, err = io.CopyN(w, pr, n) | |
written += n1 | |
n -= n1 | |
if err != nil { | |
return | |
} | |
off = 0 | |
} | |
return | |
} | |
func (me *pieceStorage) pieceComplete(piece metainfo.Piece) bool { | |
return me.pieces[hex.EncodeToString(piece.Hash())] | |
} | |
func (me *pieceStorage) PieceComplete(piece metainfo.Piece) bool { | |
me.mu.Lock() | |
defer me.mu.Unlock() | |
return me.pieceComplete(piece) | |
} | |
func (me *pieceStorage) pieceCompleted(piece metainfo.Piece) (err error) { | |
if me.pieceComplete(piece) { | |
return nil | |
} | |
incomplete, err := me.incompletePieceReader(piece, 0) | |
if err != nil { | |
return | |
} | |
defer incomplete.Close() | |
hash := sha1.New() | |
r := io.TeeReader(io.LimitReader(incomplete, piece.Length()), hash) | |
completedS3Path := completedPieceS3Path + hex.EncodeToString(piece.Hash()) | |
err = me.bucket.PutReaderHeader(completedS3Path, r, piece.Length(), map[string][]string{ | |
"x-amz-storage-class": []string{"REDUCED_REDUNDANCY"}, | |
}, s3.Private) | |
if err != nil { | |
return | |
} | |
if !bytes.Equal(hash.Sum(nil), piece.Hash()) { | |
err = errors.New("piece actually incomplete") | |
if err := me.bucket.Del(completedS3Path); err != nil { | |
log.Print(err) | |
} | |
return | |
} | |
me.pieces[hex.EncodeToString(piece.Hash())] = true | |
os.Remove(me.incompletePiecePath(piece)) | |
return | |
} | |
func (me *torrentStorage) PieceCompleted(piece int) (err error) { | |
return me.pieces.pieceCompleted(me.info.Piece(piece)) | |
} | |
func (me *torrentStorage) PieceComplete(piece int) bool { | |
return me.pieces.PieceComplete(me.info.Piece(piece)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment