resource_manager/domain/resource/s3_manager.go

201 lines
5.4 KiB
Go

package resource
import (
"bytes"
"context"
"errors"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"log"
"net/url"
"os"
"resource-manager/domain/cache"
"strconv"
"strings"
"sync"
"time"
)
func newS3Bucket(cacheManager cache.Manager, expiration time.Duration) Manager {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(os.Getenv("AWS_REGION_NAME"))},
)
if err != nil {
panic(err)
}
uploader := s3manager.NewUploader(sess)
downloader := s3manager.NewDownloader(sess)
upload := make(chan UploadRequest, 20)
manager := awsManager{
cacheManager,
upload,
make(map[string][]onUpload),
sync.Mutex{},
expiration,
uploader,
downloader,
}
go manager.uploadRunner()
return &manager
}
type onUpload chan bool
type awsManager struct {
cache cache.Manager
requests chan UploadRequest
currentlyUploading map[string][]onUpload
uploadSync sync.Mutex
expiration time.Duration
uploader *s3manager.Uploader
downloader *s3manager.Downloader
}
func (a *awsManager) uploadRunner() {
for req := range a.requests {
status := true
if !a.doesFileExist(req.Path) {
status = a.upload(req.Path, req.Buffer.Bytes(), req.MimeType)
}
a.notifyOnUploadComplete(req.Path, status)
}
}
func (a *awsManager) Upload(request UploadRequest) {
a.addToCurrentlyUploading(request.Path)
a.requests <- request
}
func (a *awsManager) Presign(_ context.Context, path string) (string, error) {
if !a.waitForUploadComplete(path) {
return "", errors.New("failed upload")
}
// ask cache for latest url
if uri, err := a.cache.Get(path); err == nil {
return uri, nil
}
// if there is no value in cache, presign url and add it to cache
uri, err := a.presign(path, a.expiration)
if err != nil {
_ = a.cache.Set(path, uri, a.expiration)
}
return uri, err
}
func (a *awsManager) Download(_ context.Context, path string) (file []byte, err error) {
return a.download(path)
}
func (a *awsManager) Copy(from string, to string, overwrite bool) error {
fromPath := strings.SplitN(from, "/", 2)
toPath := strings.SplitN(to, "/", 2)
output, err := a.downloader.S3.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(fromPath[0]),
Prefix: aws.String(fromPath[1]),
})
if err != nil {
return err
}
for _, obj := range output.Contents {
if overwrite || !a.doesFileExist(strings.Replace(*obj.Key, fromPath[1], toPath[1], 1)) {
_, err = a.downloader.S3.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(toPath[0]),
CopySource: aws.String(url.PathEscape(from)),
Key: aws.String(toPath[1]),
})
if err != nil {
return err
}
}
}
return nil
}
func (a *awsManager) Delete(path string) error {
paths := strings.SplitN(path, "/", 2)
_, err := a.uploader.S3.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(paths[0]),
Key: aws.String(paths[1]),
})
if err != nil {
return err
}
a.cache.Remove(path)
return nil
}
func (a *awsManager) addToCurrentlyUploading(path string) {
a.uploadSync.Lock()
log.Println("Manager | (" + path + ") added to upload list")
a.currentlyUploading[path] = make([]onUpload, 0)
a.uploadSync.Unlock()
}
func (a *awsManager) waitForUploadComplete(path string) bool {
a.uploadSync.Lock()
defer a.uploadSync.Unlock()
if _, contains := a.currentlyUploading[path]; contains {
upload := make(onUpload)
a.currentlyUploading[path] = append(a.currentlyUploading[path], upload)
return <-upload
}
return true
}
func (a *awsManager) notifyOnUploadComplete(path string, status bool) {
a.uploadSync.Lock()
defer a.uploadSync.Unlock()
log.Println("Manager | (" + path + ") finished uploading, success: " + strconv.FormatBool(status))
for _, upload := range a.currentlyUploading[path] {
upload <- status
close(upload)
}
delete(a.currentlyUploading, path)
}
func (a *awsManager) upload(path string, data []byte, mimeType string) bool {
log.Println("Manager | (" + path + ") uploading")
paths := strings.SplitN(path, "/", 2)
_, err := a.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(paths[0]),
ContentType: aws.String(mimeType),
Key: aws.String(paths[1]),
Body: bytes.NewReader(data),
})
return err == nil
}
func (a *awsManager) presign(path string, expiration time.Duration) (string, error) {
paths := strings.SplitN(path, "/", 2)
requestInput := s3.GetObjectInput{
Bucket: aws.String(paths[0]),
Key: aws.String(paths[1]),
}
request, _ := a.downloader.S3.GetObjectRequest(&requestInput)
if request.Error != nil {
log.Println("Manager | (" + path + ") failed presiging, cause: " + request.Error.Error())
}
return request.Presign(expiration)
}
func (a *awsManager) download(path string) ([]byte, error) {
paths := strings.SplitN(path, "/", 2)
requestInput := s3.GetObjectInput{
Bucket: aws.String(paths[0]),
Key: aws.String(paths[1]),
}
buf := aws.NewWriteAtBuffer([]byte{})
_, err := a.downloader.Download(buf, &requestInput)
if err != nil {
log.Println("Manager | (" + path + ") failed downloading, cause: " + err.Error())
}
return buf.Bytes(), err
}
func (a *awsManager) doesFileExist(path string) bool {
paths := strings.SplitN(path, "/", 2)
requestInput := s3.HeadObjectInput{
Bucket: aws.String(paths[0]),
Key: aws.String(paths[1]),
}
_, err := a.downloader.S3.HeadObject(&requestInput)
if err != nil && err.Error() == s3.ErrCodeNoSuchKey {
return true
}
return false
}