Last active
September 6, 2024 06:46
-
-
Save rvagg/3901f72cfa6a72482c456e9c21a80685 to your computer and use it in GitHub Desktop.
podsi piece parsing to match boost
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// attempt to replicate https://github.com/filecoin-project/boost/blob/main/piecedirectory/piecedirectory.go behaviour | |
import ( | |
"bufio" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"io" | |
"os" | |
"sync/atomic" | |
"github.com/filecoin-project/go-data-segment/datasegment" | |
commcid "github.com/filecoin-project/go-fil-commcid" | |
commp "github.com/filecoin-project/go-fil-commp-hashhash" | |
"github.com/filecoin-project/go-state-types/abi" | |
carv2 "github.com/ipld/go-car/v2" | |
"golang.org/x/sync/errgroup" | |
) | |
func main() { | |
filePath := os.Args[1] | |
r, err := os.Open(filePath) | |
if err != nil { | |
panic(err) | |
} | |
var cp commp.Calc | |
io.Copy(&cp, r) | |
rawCommP, size, err := cp.Digest() | |
if err != nil { | |
panic(err) | |
} | |
r.Seek(0, io.SeekStart) | |
c, _ := commcid.DataCommitmentV1ToCID(rawCommP) | |
fmt.Println("Unpadded piece size: ", size) | |
fmt.Println("Padded piece size: ", abi.PaddedPieceSize(size).Unpadded()) | |
dsis := datasegment.DataSegmentIndexStartOffset(abi.PaddedPieceSize(size)) | |
// unnecessary, something about the way boost works | |
if _, err = r.Seek(0, io.SeekEnd); err != nil { | |
panic(err) | |
} | |
fmt.Printf("Seeking back to %d\n", dsis) | |
if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { | |
panic(err) | |
} | |
var readsCnt int32 | |
cr := &countingReader{ | |
Reader: r, | |
cnt: &readsCnt, | |
} | |
index, err := datasegment.ParseDataSegmentIndex(bufio.NewReaderSize(cr, 20e6)) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Println("CommP: ", c) | |
jsonData, err := json.Marshal(index) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Println("Found index data: " + string(jsonData)) | |
readsCnt = 0 | |
concurrency := len(index.Entries) | |
chunkSize := len(index.Entries) / concurrency | |
results := make([][]datasegment.SegmentDesc, concurrency) | |
var eg errgroup.Group | |
for i := 0; i < concurrency; i++ { | |
i := i | |
eg.Go(func() error { | |
start := i * chunkSize | |
end := start + chunkSize | |
if i == concurrency-1 { | |
end = len(index.Entries) | |
} | |
res, err := validateEntries(index.Entries[start:end]) | |
if err != nil { | |
return err | |
} | |
results[i] = res | |
return nil | |
}) | |
} | |
if err := eg.Wait(); err != nil { | |
panic(err) | |
} | |
validSegments := make([]datasegment.SegmentDesc, 0, len(index.Entries)) | |
for _, res := range results { | |
validSegments = append(validSegments, res...) | |
} | |
if len(validSegments) == 0 { | |
panic("no valid data segments found") | |
} | |
for i, e := range validSegments { | |
if err := e.Validate(); err != nil { | |
fmt.Printf("Error validating entry %d: %s\n", i, err) | |
// continue | |
} | |
segOffset := e.UnpaddedOffest() | |
segSize := e.UnpaddedLength() | |
lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) | |
// write the segment to args[1]_segment_<i> | |
segmentPath := fmt.Sprintf("%s_segment_%d", filePath, i) | |
sw, err := os.Create(segmentPath) | |
if err != nil { | |
panic(err) | |
} | |
if _, err := io.Copy(sw, lr); err != nil { | |
panic(err) | |
} | |
if err := sw.Close(); err != nil { | |
panic(err) | |
} | |
fmt.Printf("Segment #%d written to to %s\n", i, segmentPath) | |
lr = io.NewSectionReader(r, int64(segOffset), int64(segSize)) | |
cr = &countingReader{ | |
Reader: lr, | |
cnt: &readsCnt, | |
} | |
opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)} | |
blockReader, err := carv2.NewBlockReader(bufio.NewReaderSize(cr, 20e6), opts...) | |
if err != nil { | |
panic(e) | |
} | |
blockMetadata, err := blockReader.SkipNext() | |
for err == nil { | |
fmt.Printf("Segment #%d CAR Block: %s, Offset: %d, Size: %d\n", i, blockMetadata.Cid, blockMetadata.SourceOffset, blockMetadata.Size) | |
blockMetadata, err = blockReader.SkipNext() | |
} | |
if !errors.Is(err, io.EOF) { | |
fmt.Printf("Error reading blocks: %s\n", err) | |
} | |
} | |
fmt.Printf("Parsed PoDSI piece (with %d reads)\n", readsCnt) | |
} | |
type countingReader struct { | |
io.Reader | |
cnt *int32 | |
} | |
func (cr *countingReader) Read(p []byte) (n int, err error) { | |
atomic.AddInt32(cr.cnt, 1) | |
return cr.Reader.Read(p) | |
} | |
func validateEntries(entries []datasegment.SegmentDesc) ([]datasegment.SegmentDesc, error) { | |
res := make([]datasegment.SegmentDesc, 0, len(entries)) | |
for i, e := range entries { | |
if err := e.Validate(); err != nil { | |
if errors.Is(err, datasegment.ErrValidation) { | |
fmt.Printf("Error validating entry: %s\n", err) | |
continue | |
} else { | |
return nil, fmt.Errorf("got unknown error for entry %d: %w", i, err) | |
} | |
} | |
res = append(res, e) | |
} | |
return res, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment