Commit ac856c69 authored by manas kashyap's avatar manas kashyap 💬

New upstream version 4.3.1+debian

parent b9096a08
......@@ -2,6 +2,15 @@
Formerly known as 'gitlab-git-http-server'.
v 4.3.1
- Objectstorage ETag checking !263
v 4.3.0
- Multipart upload support !257
- Make external commands extend the environment !261
v 4.2.1
- Fix objectstore error shadowing !259
......
......@@ -67,6 +67,17 @@ func NewAPI(myURL *url.URL, version string, roundTripper *badgateway.RoundTrippe
type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
type MultipartUploadParams struct {
// PartSize is the exact size of each uploaded part. Only the last one can be smaller
PartSize int64
// PartURLs contains the presigned URLs for each part
PartURLs []string
// CompleteURL is a presigned URL for CompleteMulipartUpload
CompleteURL string
// AbortURL is a presigned URL for AbortMultipartUpload
AbortURL string
}
type RemoteObject struct {
// GetURL is an S3 GetObject URL
GetURL string
......@@ -78,6 +89,8 @@ type RemoteObject struct {
ID string
// Timeout is a number that represents timeout in seconds for sending data to StoreURL
Timeout int
// MultipartUpload contains presigned URLs for S3 MultipartUpload
MultipartUpload *MultipartUploadParams
}
type Response struct {
......
......@@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
)
......@@ -273,3 +274,45 @@ func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testin
testhelper.AssertResponseCode(t, response, 500)
assert.Equal(t, 1, putCalledTimes, "upload should be called only once")
}
func TestUploadHandlerMultipartUploadSizeLimit(t *testing.T) {
os, server := test.StartObjectStore()
defer server.Close()
err := os.InitiateMultipartUpload(test.ObjectPath)
require.NoError(t, err)
objectURL := server.URL + test.ObjectPath
partSize := int64(1)
uploadSize := 10
preauth := api.Response{
RemoteObject: api.RemoteObject{
ID: "store-id",
MultipartUpload: &api.MultipartUploadParams{
PartSize: partSize,
PartURLs: []string{objectURL + "?partNumber=1"},
AbortURL: objectURL, // DELETE
CompleteURL: objectURL, // POST
},
},
}
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
ts := testArtifactsUploadServer(t, preauth, responseProcessor)
defer ts.Close()
contentBuffer, contentType := createTestMultipartForm(t, make([]byte, uploadSize))
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, http.StatusRequestEntityTooLarge)
// Poll because AbortMultipartUpload is async
for i := 0; os.IsMultipartUpload(test.ObjectPath) && i < 100; i++ {
time.Sleep(10 * time.Millisecond)
}
assert.False(t, os.IsMultipartUpload(test.ObjectPath), "MultipartUpload should not be in progress anymore")
assert.Empty(t, os.GetObjectMD5(test.ObjectPath), "upload should have failed, so the object should not exists")
}
......@@ -67,10 +67,10 @@ func unpackFileFromZip(archivePath, encodedFilename string, headers http.Header,
}
catFile := exec.Command("gitlab-zip-cat")
catFile.Env = []string{
"ARCHIVE_PATH=" + archivePath,
"ENCODED_FILE_NAME=" + encodedFilename,
}
catFile.Env = append(os.Environ(),
"ARCHIVE_PATH="+archivePath,
"ENCODED_FILE_NAME="+encodedFilename,
)
catFile.Stderr = os.Stderr
catFile.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := catFile.StdoutPipe()
......
......@@ -12,9 +12,11 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
type MD5Error error
type SizeError error
// ErrEntityTooLarge means that the uploaded content is bigger then maximum allowed size
var ErrEntityTooLarge = errors.New("Entity is too large")
// FileHandler represent a file that has been processed for upload
// it may be either uploaded to an ObjectStore and/or saved on local path.
type FileHandler struct {
......@@ -81,7 +83,7 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) map[string]string {
// SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Make sure the provided context will not expire before finalizing upload with GitLab Rails.
func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (fh *FileHandler, err error) {
var object *objectstore.Object
var remoteWriter objectstore.Upload
fh = &FileHandler{
Name: opts.TempFilePrefix,
RemoteID: opts.RemoteID,
......@@ -97,13 +99,20 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}
}()
if opts.IsRemote() {
object, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Timeout, size)
if opts.IsMultipart() {
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.Deadline, opts.PartSize)
if err != nil {
return nil, err
}
writers = append(writers, remoteWriter)
} else if opts.IsRemote() {
remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Deadline, size)
if err != nil {
return nil, err
}
writers = append(writers, object)
writers = append(writers, remoteWriter)
}
if opts.IsLocal() {
......@@ -133,14 +142,16 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
if opts.IsRemote() {
// we need to close the writer in order to get ETag header
err = object.Close()
err = remoteWriter.Close()
if err != nil {
if err == objectstore.ErrNotEnoughParts {
return nil, ErrEntityTooLarge
}
return nil, err
}
if fh.MD5() != object.MD5() {
return nil, MD5Error(fmt.Errorf("expected md5 %s, got %s", fh.MD5(), object.MD5()))
}
etag := remoteWriter.ETag()
fh.hashes["etag"] = etag
}
return fh, err
......
......@@ -19,6 +19,10 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
)
func testDeadline() time.Time {
return time.Now().Add(filestore.DefaultObjectStoreTimeout)
}
func assertFileGetsRemovedAsync(t *testing.T, filePath string) {
var err error
......@@ -76,31 +80,50 @@ func TestSaveFromDiskNotExistingFile(t *testing.T) {
assert.Nil(fh, "On error FileHandler should be nil")
}
func TestSaveFileWrongMD5(t *testing.T) {
assert := assert.New(t)
func TestSaveFileWrongETag(t *testing.T) {
tests := []struct {
name string
multipart bool
}{
{name: "single part"},
{name: "multi part", multipart: true},
}
osStub, ts := test.StartObjectStoreWithCustomMD5(map[string]string{test.ObjectPath: "brokenMD5"})
defer ts.Close()
for _, spec := range tests {
t.Run(spec.name, func(t *testing.T) {
assert := assert.New(t)
objectURL := ts.URL + test.ObjectPath
osStub, ts := test.StartObjectStoreWithCustomMD5(map[string]string{test.ObjectPath: "brokenMD5"})
defer ts.Close()
opts := &filestore.SaveFileOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PresignedPut: objectURL + "?Signature=ASignature",
PresignedDelete: objectURL + "?Signature=AnotherSignature",
}
objectURL := ts.URL + test.ObjectPath
ctx, cancel := context.WithCancel(context.Background())
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
assert.Nil(fh)
assert.Error(err)
_, isMD5Error := err.(filestore.MD5Error)
assert.True(isMD5Error, "Should fail with MD5Error")
assert.Equal(1, osStub.PutsCnt(), "File not uploaded")
opts := &filestore.SaveFileOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PresignedPut: objectURL + "?Signature=ASignature",
PresignedDelete: objectURL + "?Signature=AnotherSignature",
Deadline: testDeadline(),
}
if spec.multipart {
opts.PresignedParts = []string{objectURL + "?partNumber=1"}
opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSig"
opts.PresignedAbortMultipart = objectURL + "?Signature=AbortSig"
opts.PartSize = test.ObjectSize
cancel() // this will trigger an async cleanup
assertObjectStoreDeletedAsync(t, 1, osStub)
osStub.InitiateMultipartUpload(test.ObjectPath)
}
ctx, cancel := context.WithCancel(context.Background())
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
assert.Nil(fh)
assert.Error(err)
assert.Equal(1, osStub.PutsCnt(), "File not uploaded")
cancel() // this will trigger an async cleanup
assertObjectStoreDeletedAsync(t, 1, osStub)
assert.False(spec.multipart && osStub.IsMultipartUpload(test.ObjectPath), "there must be no multipart upload in progress now")
})
}
}
func TestSaveFileFromDiskToLocalPath(t *testing.T) {
......@@ -132,6 +155,13 @@ func TestSaveFileFromDiskToLocalPath(t *testing.T) {
}
func TestSaveFile(t *testing.T) {
type remote int
const (
notRemote remote = iota
remoteSingle
remoteMultipart
)
tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
require.NoError(t, err)
defer os.RemoveAll(tmpFolder)
......@@ -139,11 +169,13 @@ func TestSaveFile(t *testing.T) {
tests := []struct {
name string
local bool
remote bool
remote remote
}{
{name: "Local only", local: true},
{name: "Remote only", remote: true},
{name: "Both", local: true, remote: true},
{name: "Remote Single only", remote: remoteSingle},
{name: "Remote Single and Local", local: true, remote: remoteSingle},
{name: "Remote Multipart only", remote: remoteMultipart},
{name: "Remote Multipart and Local", local: true, remote: remoteMultipart},
}
for _, spec := range tests {
......@@ -156,16 +188,32 @@ func TestSaveFile(t *testing.T) {
osStub, ts := test.StartObjectStore()
defer ts.Close()
if spec.remote {
switch spec.remote {
case remoteSingle:
objectURL := ts.URL + test.ObjectPath
opts.RemoteID = "test-file"
opts.RemoteURL = objectURL
opts.PresignedPut = objectURL + "?Signature=ASignature"
opts.PresignedDelete = objectURL + "?Signature=AnotherSignature"
opts.Deadline = testDeadline()
expectedDeletes = 1
expectedPuts = 1
case remoteMultipart:
objectURL := ts.URL + test.ObjectPath
opts.RemoteID = "test-file"
opts.RemoteURL = objectURL
opts.PresignedDelete = objectURL + "?Signature=AnotherSignature"
opts.PartSize = int64(len(test.ObjectContent)/2) + 1
opts.PresignedParts = []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"}
opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSignature"
opts.Deadline = testDeadline()
osStub.InitiateMultipartUpload(test.ObjectPath)
expectedDeletes = 1
expectedPuts = 2
}
if spec.local {
......@@ -220,6 +268,41 @@ func TestSaveFile(t *testing.T) {
assert.Equal(test.ObjectSHA1, fields["file.sha1"])
assert.Equal(test.ObjectSHA256, fields["file.sha256"])
assert.Equal(test.ObjectSHA512, fields["file.sha512"])
if spec.remote == notRemote {
assert.NotContains(fields, "file.etag")
} else {
assert.Contains(fields, "file.etag")
}
})
}
}
func TestSaveMultipartInBodyFailure(t *testing.T) {
assert := assert.New(t)
osStub, ts := test.StartObjectStore()
defer ts.Close()
// this is a broken path because it contains bucket name but no key
// this is the only way to get an in-body failure from our ObjectStoreStub
objectPath := "/bucket-but-no-object-key"
objectURL := ts.URL + objectPath
opts := filestore.SaveFileOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PartSize: test.ObjectSize,
PresignedParts: []string{objectURL + "?partNumber=1", objectURL + "?partNumber=2"},
PresignedCompleteMultipart: objectURL + "?Signature=CompleteSignature",
Deadline: testDeadline(),
}
osStub.InitiateMultipartUpload(objectPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
assert.Nil(fh)
require.Error(t, err)
assert.EqualError(err, test.MultipartUploadInternalError().Error())
}
package filestore
import (
"net/url"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
// DefaultObjectStoreTimeout is the timeout for ObjectStore upload operation
const DefaultObjectStoreTimeout = 4 * time.Hour
// SaveFileOpts represents all the options available for saving a file to object store
type SaveFileOpts struct {
// TempFilePrefix is the prefix used to create temporary local file
......@@ -22,8 +23,18 @@ type SaveFileOpts struct {
PresignedPut string
// PresignedDelete is a presigned S3 DeleteObject compatible URL.
PresignedDelete string
// Timeout it the S3 operation timeout. If 0, objectstore.DefaultObjectStoreTimeout will be used
Timeout time.Duration
// Deadline it the S3 operation deadline, the upload will be aborted if not completed in time
Deadline time.Time
//MultipartUpload parameters
// PartSize is the exact size of each uploaded part. Only the last one can be smaller
PartSize int64
// PresignedParts contains the presigned URLs for each part
PresignedParts []string
// PresignedCompleteMultipart is a presigned URL for CompleteMulipartUpload
PresignedCompleteMultipart string
// PresignedAbortMultipart is a presigned URL for AbortMultipartUpload
PresignedAbortMultipart string
}
// IsLocal checks if the options require the writing of the file on disk
......@@ -33,35 +44,36 @@ func (s *SaveFileOpts) IsLocal() bool {
// IsRemote checks if the options requires a remote upload
func (s *SaveFileOpts) IsRemote() bool {
return s.PresignedPut != ""
return s.PresignedPut != "" || s.IsMultipart()
}
func (s *SaveFileOpts) isGoogleCloudStorage() bool {
if !s.IsRemote() {
return false
}
getURL, err := url.Parse(s.RemoteURL)
if err != nil {
return false
}
return objectstore.IsGoogleCloudStorage(getURL)
// IsMultipart checks if the options requires a Multipart upload
func (s *SaveFileOpts) IsMultipart() bool {
return s.PartSize > 0
}
// GetOpts converts GitLab api.Response to a proper SaveFileOpts
func GetOpts(apiResponse *api.Response) *SaveFileOpts {
timeout := time.Duration(apiResponse.RemoteObject.Timeout) * time.Second
if timeout == 0 {
timeout = objectstore.DefaultObjectStoreTimeout
timeout = DefaultObjectStoreTimeout
}
return &SaveFileOpts{
opts := SaveFileOpts{
LocalTempPath: apiResponse.TempPath,
RemoteID: apiResponse.RemoteObject.ID,
RemoteURL: apiResponse.RemoteObject.GetURL,
PresignedPut: apiResponse.RemoteObject.StoreURL,
PresignedDelete: apiResponse.RemoteObject.DeleteURL,
Timeout: timeout,
Deadline: time.Now().Add(timeout),
}
if multiParams := apiResponse.RemoteObject.MultipartUpload; multiParams != nil {
opts.PartSize = multiParams.PartSize
opts.PresignedCompleteMultipart = multiParams.CompleteURL
opts.PresignedAbortMultipart = multiParams.AbortURL
opts.PresignedParts = append([]string(nil), multiParams.PartURLs...)
}
return &opts
}
......@@ -8,7 +8,6 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
func TestSaveFileOptsLocalAndRemote(t *testing.T) {
......@@ -16,8 +15,10 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) {
name string
localTempPath string
presignedPut string
partSize int64
isLocal bool
isRemote bool
isMultipart bool
}{
{
name: "Only LocalTempPath",
......@@ -39,6 +40,20 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) {
presignedPut: "http://example.com",
isRemote: true,
},
{
name: "Multipart",
partSize: 10,
isRemote: true,
isMultipart: true,
},
{
name: "Multipart and Local",
partSize: 10,
localTempPath: "/tmp",
isRemote: true,
isMultipart: true,
isLocal: true,
},
}
for _, test := range tests {
......@@ -49,42 +64,80 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) {
opts := filestore.SaveFileOpts{
LocalTempPath: test.localTempPath,
PresignedPut: test.presignedPut,
PartSize: test.partSize,
}
assert.Equal(test.isLocal, opts.IsLocal(), "IsLocal() mismatch")
assert.Equal(test.isRemote, opts.IsRemote(), "IsRemote() mismatch")
assert.Equal(test.isMultipart, opts.IsMultipart(), "IsMultipart() mismatch")
})
}
}
func TestGetOpts(t *testing.T) {
assert := assert.New(t)
apiResponse := &api.Response{
TempPath: "/tmp",
RemoteObject: api.RemoteObject{
Timeout: 10,
ID: "id",
GetURL: "http://get",
StoreURL: "http://store",
DeleteURL: "http://delete",
tests := []struct {
name string
multipart *api.MultipartUploadParams
}{
{
name: "Single upload",
}, {
name: "Multipart upload",
multipart: &api.MultipartUploadParams{
PartSize: 10,
CompleteURL: "http://complete",
AbortURL: "http://abort",
PartURLs: []string{"http://part1", "http://part2"},
},
},
}
opts := filestore.GetOpts(apiResponse)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert := assert.New(t)
apiResponse := &api.Response{
TempPath: "/tmp",
RemoteObject: api.RemoteObject{
Timeout: 10,
ID: "id",
GetURL: "http://get",
StoreURL: "http://store",
DeleteURL: "http://delete",
MultipartUpload: test.multipart,
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts := filestore.GetOpts(apiResponse)
assert.Equal(apiResponse.TempPath, opts.LocalTempPath)
assert.Equal(time.Duration(apiResponse.RemoteObject.Timeout)*time.Second, opts.Timeout)
assert.Equal(apiResponse.RemoteObject.ID, opts.RemoteID)
assert.Equal(apiResponse.RemoteObject.GetURL, opts.RemoteURL)
assert.Equal(apiResponse.RemoteObject.StoreURL, opts.PresignedPut)
assert.Equal(apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete)
assert.Equal(apiResponse.TempPath, opts.LocalTempPath)
assert.WithinDuration(deadline, opts.Deadline, time.Second)
assert.Equal(apiResponse.RemoteObject.ID, opts.RemoteID)
assert.Equal(apiResponse.RemoteObject.GetURL, opts.RemoteURL)
assert.Equal(apiResponse.RemoteObject.StoreURL, opts.PresignedPut)
assert.Equal(apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete)
if test.multipart == nil {
assert.False(opts.IsMultipart())
assert.Empty(opts.PresignedCompleteMultipart)
assert.Empty(opts.PresignedAbortMultipart)
assert.Zero(opts.PartSize)
assert.Empty(opts.PresignedParts)
} else {
assert.True(opts.IsMultipart())
assert.Equal(test.multipart.CompleteURL, opts.PresignedCompleteMultipart)
assert.Equal(test.multipart.AbortURL, opts.PresignedAbortMultipart)
assert.Equal(test.multipart.PartSize, opts.PartSize)
assert.Equal(test.multipart.PartURLs, opts.PresignedParts)
}
})
}
}
func TestGetOptsDefaultTimeout(t *testing.T) {
assert := assert.New(t)
deadline := time.Now().Add(filestore.DefaultObjectStoreTimeout)
opts := filestore.GetOpts(&api.Response{})
assert.Equal(objectstore.DefaultObjectStoreTimeout, opts.Timeout)
assert.WithinDuration(deadline, opts.Deadline, time.Minute)
}
package objectstore
import (
"bytes"
"context"
"encoding/xml"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"time"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
// ErrNotEnoughParts will be used when writing more than size * len(partURLs)
var ErrNotEnoughParts = errors.New("Not enough Parts")
// Multipart represents a MultipartUpload on a S3 compatible Object Store service.
// It can be used as io.WriteCloser for uploading an object
type Multipart struct {
// CompleteURL is a presigned URL for CompleteMultipartUpload
CompleteURL string
// AbortURL is a presigned URL for AbortMultipartUpload
AbortURL string
// DeleteURL is a presigned URL for RemoveObject
DeleteURL string
uploader
}
// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes
// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent.
// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources
func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, deadline time.Time, partSize int64) (*Multipart, error) {