Refactor git command stdio pipe (#36422)
Most potential deadlock problems should have been fixed, and new code is unlikely to cause new problems with the new design. Also raise the minimum Git version required to 2.6.0 (released in 2015)
This commit is contained in:
@@ -8,96 +8,84 @@ package lfs
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"code.gitea.io/gitea/modules/git"
|
||||
"code.gitea.io/gitea/modules/git/gitcmd"
|
||||
"code.gitea.io/gitea/modules/git/pipeline"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// SearchPointerBlobs scans the whole repository for LFS pointer files
|
||||
func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan chan<- PointerBlob, errChan chan<- error) {
|
||||
basePath := repo.Path
|
||||
func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan chan<- PointerBlob) error {
|
||||
cmd1AllObjs, cmd3BatchContent := gitcmd.NewCommand(), gitcmd.NewCommand()
|
||||
|
||||
catFileCheckReader, catFileCheckWriter := io.Pipe()
|
||||
shasToBatchReader, shasToBatchWriter := io.Pipe()
|
||||
catFileBatchReader, catFileBatchWriter := io.Pipe()
|
||||
cmd1AllObjsStdout, cmd1AllObjsStdoutClose := cmd1AllObjs.MakeStdoutPipe()
|
||||
defer cmd1AllObjsStdoutClose()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(4)
|
||||
|
||||
// Create the go-routines in reverse order.
|
||||
cmd3BatchContentIn, cmd3BatchContentOut, cmd3BatchContentClose := cmd3BatchContent.MakeStdinStdoutPipe()
|
||||
defer cmd3BatchContentClose()
|
||||
|
||||
// Create the go-routines in reverse order (update: the order is not needed any more, the pipes are properly prepared)
|
||||
wg := errgroup.Group{}
|
||||
// 4. Take the output of cat-file --batch and check if each file in turn
|
||||
// to see if they're pointers to files in the LFS store
|
||||
go createPointerResultsFromCatFileBatch(ctx, catFileBatchReader, &wg, pointerChan)
|
||||
wg.Go(func() error {
|
||||
return createPointerResultsFromCatFileBatch(cmd3BatchContentOut, pointerChan)
|
||||
})
|
||||
|
||||
// 3. Take the shas of the blobs and batch read them
|
||||
go pipeline.CatFileBatch(ctx, shasToBatchReader, catFileBatchWriter, &wg, basePath)
|
||||
wg.Go(func() error {
|
||||
return pipeline.CatFileBatch(ctx, cmd3BatchContent, repo.Path)
|
||||
})
|
||||
|
||||
// 2. From the provided objects restrict to blobs <=1k
|
||||
go pipeline.BlobsLessThan1024FromCatFileBatchCheck(catFileCheckReader, shasToBatchWriter, &wg)
|
||||
wg.Go(func() error {
|
||||
return pipeline.BlobsLessThan1024FromCatFileBatchCheck(cmd1AllObjsStdout, cmd3BatchContentIn)
|
||||
})
|
||||
|
||||
// 1. Run batch-check on all objects in the repository
|
||||
if !git.DefaultFeatures().CheckVersionAtLeast("2.6.0") {
|
||||
revListReader, revListWriter := io.Pipe()
|
||||
shasToCheckReader, shasToCheckWriter := io.Pipe()
|
||||
wg.Add(2)
|
||||
go pipeline.CatFileBatchCheck(ctx, shasToCheckReader, catFileCheckWriter, &wg, basePath)
|
||||
go pipeline.BlobsFromRevListObjects(revListReader, shasToCheckWriter, &wg)
|
||||
go pipeline.RevListAllObjects(ctx, revListWriter, &wg, basePath, errChan)
|
||||
} else {
|
||||
go pipeline.CatFileBatchCheckAllObjects(ctx, catFileCheckWriter, &wg, basePath, errChan)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
wg.Go(func() error {
|
||||
return pipeline.CatFileBatchCheckAllObjects(ctx, cmd1AllObjs, repo.Path)
|
||||
})
|
||||
err := wg.Wait()
|
||||
close(pointerChan)
|
||||
close(errChan)
|
||||
return err
|
||||
}
|
||||
|
||||
func createPointerResultsFromCatFileBatch(ctx context.Context, catFileBatchReader *io.PipeReader, wg *sync.WaitGroup, pointerChan chan<- PointerBlob) {
|
||||
defer wg.Done()
|
||||
func createPointerResultsFromCatFileBatch(catFileBatchReader io.ReadCloser, pointerChan chan<- PointerBlob) error {
|
||||
defer catFileBatchReader.Close()
|
||||
|
||||
bufferedReader := bufio.NewReader(catFileBatchReader)
|
||||
buf := make([]byte, 1025)
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break loop
|
||||
default:
|
||||
}
|
||||
|
||||
// File descriptor line: sha
|
||||
sha, err := bufferedReader.ReadString(' ')
|
||||
if err != nil {
|
||||
_ = catFileBatchReader.CloseWithError(err)
|
||||
break
|
||||
return util.Iif(errors.Is(err, io.EOF), nil, err)
|
||||
}
|
||||
sha = strings.TrimSpace(sha)
|
||||
// Throw away the blob
|
||||
if _, err := bufferedReader.ReadString(' '); err != nil {
|
||||
_ = catFileBatchReader.CloseWithError(err)
|
||||
break
|
||||
return err
|
||||
}
|
||||
sizeStr, err := bufferedReader.ReadString('\n')
|
||||
if err != nil {
|
||||
_ = catFileBatchReader.CloseWithError(err)
|
||||
break
|
||||
return err
|
||||
}
|
||||
size, err := strconv.Atoi(sizeStr[:len(sizeStr)-1])
|
||||
if err != nil {
|
||||
_ = catFileBatchReader.CloseWithError(err)
|
||||
break
|
||||
return err
|
||||
}
|
||||
pointerBuf := buf[:size+1]
|
||||
if _, err := io.ReadFull(bufferedReader, pointerBuf); err != nil {
|
||||
_ = catFileBatchReader.CloseWithError(err)
|
||||
break
|
||||
return err
|
||||
}
|
||||
pointerBuf = pointerBuf[:size]
|
||||
// Now we need to check if the pointerBuf is an LFS pointer
|
||||
@@ -105,7 +93,6 @@ loop:
|
||||
if !pointer.IsValid() {
|
||||
continue
|
||||
}
|
||||
|
||||
pointerChan <- PointerBlob{Hash: sha, Pointer: pointer}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user