package resource import ( "bytes" "context" "errors" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "log" "net/url" "os" "resource-manager/domain/cache" "strconv" "strings" "sync" "time" ) func newS3Bucket(cacheManager cache.Manager, expiration time.Duration) Manager { sess, err := session.NewSession(&aws.Config{ Region: aws.String(os.Getenv("AWS_REGION_NAME"))}, ) if err != nil { panic(err) } uploader := s3manager.NewUploader(sess) downloader := s3manager.NewDownloader(sess) upload := make(chan UploadRequest, 20) manager := awsManager{ cacheManager, upload, make(map[string][]onUpload), sync.Mutex{}, expiration, uploader, downloader, } go manager.uploadRunner() return &manager } type onUpload chan bool type awsManager struct { cache cache.Manager requests chan UploadRequest currentlyUploading map[string][]onUpload uploadSync sync.Mutex expiration time.Duration uploader *s3manager.Uploader downloader *s3manager.Downloader } func (a *awsManager) uploadRunner() { for req := range a.requests { status := true if !a.doesFileExist(req.Path) { status = a.upload(req.Path, req.Buffer.Bytes(), req.MimeType) } a.notifyOnUploadComplete(req.Path, status) } } func (a *awsManager) Upload(request UploadRequest) { a.addToCurrentlyUploading(request.Path) a.requests <- request } func (a *awsManager) Presign(_ context.Context, path string) (string, error) { if !a.waitForUploadComplete(path) { return "", errors.New("failed upload") } // ask cache for latest url if uri, err := a.cache.Get(path); err == nil { return uri, nil } // if there is no value in cache, presign url and add it to cache uri, err := a.presign(path, a.expiration) if err != nil { _ = a.cache.Set(path, uri, a.expiration) } return uri, err } func (a *awsManager) Download(_ context.Context, path string) (file []byte, err error) { return a.download(path) } func (a *awsManager) Copy(from string, to string, overwrite bool) error { fromPath := strings.SplitN(from, "/", 2) toPath := strings.SplitN(to, "/", 2) output, err := a.downloader.S3.ListObjectsV2(&s3.ListObjectsV2Input{ Bucket: aws.String(fromPath[0]), Prefix: aws.String(fromPath[1]), }) if err != nil { return err } for _, obj := range output.Contents { if overwrite || !a.doesFileExist(strings.Replace(*obj.Key, fromPath[1], toPath[1], 1)) { _, err = a.downloader.S3.CopyObject(&s3.CopyObjectInput{ Bucket: aws.String(toPath[0]), CopySource: aws.String(url.PathEscape(from)), Key: aws.String(toPath[1]), }) if err != nil { return err } } } return nil } func (a *awsManager) Delete(path string) error { paths := strings.SplitN(path, "/", 2) _, err := a.uploader.S3.DeleteObject(&s3.DeleteObjectInput{ Bucket: aws.String(paths[0]), Key: aws.String(paths[1]), }) if err != nil { return err } a.cache.Remove(path) return nil } func (a *awsManager) addToCurrentlyUploading(path string) { a.uploadSync.Lock() log.Println("Manager | (" + path + ") added to upload list") a.currentlyUploading[path] = make([]onUpload, 0) a.uploadSync.Unlock() } func (a *awsManager) waitForUploadComplete(path string) bool { a.uploadSync.Lock() defer a.uploadSync.Unlock() if _, contains := a.currentlyUploading[path]; contains { upload := make(onUpload) a.currentlyUploading[path] = append(a.currentlyUploading[path], upload) return <-upload } return true } func (a *awsManager) notifyOnUploadComplete(path string, status bool) { a.uploadSync.Lock() defer a.uploadSync.Unlock() log.Println("Manager | (" + path + ") finished uploading, success: " + strconv.FormatBool(status)) for _, upload := range a.currentlyUploading[path] { upload <- status close(upload) } delete(a.currentlyUploading, path) } func (a *awsManager) upload(path string, data []byte, mimeType string) bool { log.Println("Manager | (" + path + ") uploading") paths := strings.SplitN(path, "/", 2) _, err := a.uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(paths[0]), ContentType: aws.String(mimeType), Key: aws.String(paths[1]), Body: bytes.NewReader(data), }) return err == nil } func (a *awsManager) presign(path string, expiration time.Duration) (string, error) { paths := strings.SplitN(path, "/", 2) requestInput := s3.GetObjectInput{ Bucket: aws.String(paths[0]), Key: aws.String(paths[1]), } request, _ := a.downloader.S3.GetObjectRequest(&requestInput) if request.Error != nil { log.Println("Manager | (" + path + ") failed presiging, cause: " + request.Error.Error()) } return request.Presign(expiration) } func (a *awsManager) download(path string) ([]byte, error) { paths := strings.SplitN(path, "/", 2) requestInput := s3.GetObjectInput{ Bucket: aws.String(paths[0]), Key: aws.String(paths[1]), } buf := aws.NewWriteAtBuffer([]byte{}) _, err := a.downloader.Download(buf, &requestInput) if err != nil { log.Println("Manager | (" + path + ") failed downloading, cause: " + err.Error()) } return buf.Bytes(), err } func (a *awsManager) doesFileExist(path string) bool { paths := strings.SplitN(path, "/", 2) requestInput := s3.HeadObjectInput{ Bucket: aws.String(paths[0]), Key: aws.String(paths[1]), } _, err := a.downloader.S3.HeadObject(&requestInput) if err != nil && err.Error() == s3.ErrCodeNoSuchKey { return true } return false }