package resource import ( "bytes" "context" "errors" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "io/ioutil" "log" "net/url" "os" "path/filepath" "resource-manager/domain/cache" "strconv" "strings" "sync" "time" ) func newLocalFolder(cacheManager cache.Manager, expiration time.Duration) Manager { path := os.Getenv("UPLOAD_PATH") // if path isn't set use local if path == "" { path, _ = os.Getwd() } log.Println("Manager | using local file system for data storage (" + path + ")") manager := fileManager{cacheManager, path} return &manager } type fileManager struct { cache cache.Manager path string } func (f *fileManager) Upload(request UploadRequest) { fullPath := filepath.Join(f.path, request.Path) createFolder(fullPath) if checkFileExists(fullPath) && !request.Overwrite { log.Println("Manager | cannot upload file as file on same path already exists") return } log.Println("Manager | uploading to (" + request.Path + ")") if err := os.WriteFile(fullPath, request.Buffer.Bytes(), 0o644); err != nil { log.Println("Manager | failed uploading (" + request.Path + ") cause: " + err.Error()) } } func checkFileExists(path string) bool { _, err := os.Stat(path) return err == nil } func (f *fileManager) Download(ctx context.Context, path string) (file []byte, err error) { fullPath := filepath.Join(f.path, path) file, err = ioutil.ReadFile(fullPath) if err != nil { log.Println("Manager | failed downloading (" + path + ") cause: " + err.Error()) } return } func (f *fileManager) Presign(_ context.Context, path string) (string, error) { return "/api/v1/get?path=" + path, nil } func (f *fileManager) Copy(from string, to string, overwrite bool) error { fromPath := filepath.Join(f.path, from) toPath := filepath.Join(f.path, to) log.Println("Manager | copying from (" + fromPath + ") to (" + toPath + ")") createFolder(filepath.Join(toPath, "file")) return copyFolder(fromPath, toPath, overwrite) } func (f *fileManager) Delete(path string) error { log.Println("Manager | deleting " + path) fullPath := filepath.Join(f.path, path) if err := os.Remove(fullPath); err != nil { log.Println("Manager | failed deleting (" + path + ") cause: " + err.Error()) return err } f.cache.Remove(path) return nil } func createFolder(path string) { paths := strings.Split(path, "/") folderPath := "/" + filepath.Join(paths[:len(paths)-1]...) if err := os.MkdirAll(folderPath, 0o755); err != nil { log.Println("[error] ", err) } } func copyFolder(source, destination string, overwrite bool) error { var err = filepath.WalkDir(source, func(path string, d os.DirEntry, err error) error { relPath := strings.Replace(path, source, "", 1) if relPath == "" { return nil } if d.IsDir() { return os.Mkdir(filepath.Join(destination, relPath), 0755) } else { doesExist := false if _, err := os.Stat(filepath.Join(destination, relPath)); !os.IsNotExist(err) { doesExist = true } if !doesExist || overwrite { var data, err1 = ioutil.ReadFile(filepath.Join(source, relPath)) if err1 != nil { return err1 } return ioutil.WriteFile(filepath.Join(destination, relPath), data, 0777) } else { return nil } } }) return err } func newS3Bucket(cacheManager cache.Manager, expiration time.Duration) Manager { sess, err := session.NewSession(&aws.Config{ Region: aws.String(os.Getenv("AWS_REGION_NAME"))}, ) if err != nil { panic(err) } uploader := s3manager.NewUploader(sess) downloader := s3manager.NewDownloader(sess) upload := make(chan UploadRequest, 20) manager := awsManager{ cacheManager, upload, make(map[string][]onUpload), sync.Mutex{}, expiration, uploader, downloader, } go manager.uploadRunner() return &manager } type onUpload chan bool type awsManager struct { cache cache.Manager requests chan UploadRequest currentlyUploading map[string][]onUpload uploadSync sync.Mutex expiration time.Duration uploader *s3manager.Uploader downloader *s3manager.Downloader } func (a *awsManager) uploadRunner() { for req := range a.requests { status := true if !a.doesFileExist(req.Path) { status = a.upload(req.Path, req.Buffer.Bytes(), req.MimeType) } a.notifyOnUploadComplete(req.Path, status) } } func (a *awsManager) Upload(request UploadRequest) { a.addToCurrentlyUploading(request.Path) a.requests <- request } func (a *awsManager) Presign(ctx context.Context, path string) (string, error) { if !a.waitForUploadComplete(path) { return "", errors.New("failed upload") } // ask cache for latest url if url, err := a.cache.Get(path); err == nil { return url, nil } // if there is no value in cache, presign url and add it to cache url, err := a.presign(path, a.expiration) if err != nil { a.cache.Set(path, url, a.expiration) } return url, err } func (a *awsManager) Download(ctx context.Context, path string) (file []byte, err error) { return a.download(path) } func (a *awsManager) Copy(from string, to string, overwrite bool) error { fromPath := strings.SplitN(from, "/", 2) toPath := strings.SplitN(to, "/", 2) output, err := a.downloader.S3.ListObjectsV2(&s3.ListObjectsV2Input{ Bucket: aws.String(fromPath[0]), Prefix: aws.String(fromPath[1]), }) if err != nil { return err } for _, obj := range output.Contents { if overwrite || !a.doesFileExist(strings.Replace(*obj.Key, fromPath[1], toPath[1], 1)) { _, err = a.downloader.S3.CopyObject(&s3.CopyObjectInput{ Bucket: aws.String(toPath[0]), CopySource: aws.String(url.PathEscape(from)), Key: aws.String(toPath[1]), }) if err != nil { return err } } } return nil } func (a *awsManager) Delete(path string) error { paths := strings.SplitN(path, "/", 2) _, err := a.uploader.S3.DeleteObject(&s3.DeleteObjectInput{ Bucket: aws.String(paths[0]), Key: aws.String(paths[1]), }) if err != nil { return err } a.cache.Remove(path) return nil } func (a *awsManager) addToCurrentlyUploading(path string) { a.uploadSync.Lock() log.Println("Manager | (" + path + ") added to upload list") a.currentlyUploading[path] = make([]onUpload, 0) a.uploadSync.Unlock() } func (a *awsManager) waitForUploadComplete(path string) bool { a.uploadSync.Lock() defer a.uploadSync.Unlock() if _, contains := a.currentlyUploading[path]; contains { upload := make(onUpload) a.currentlyUploading[path] = append(a.currentlyUploading[path], upload) return <-upload } return true } func (a *awsManager) notifyOnUploadComplete(path string, status bool) { a.uploadSync.Lock() defer a.uploadSync.Unlock() log.Println("Manager | (" + path + ") finished uploading, success: " + strconv.FormatBool(status)) for _, upload := range a.currentlyUploading[path] { upload <- status close(upload) } delete(a.currentlyUploading, path) } func (a *awsManager) upload(path string, data []byte, mimeType string) bool { log.Println("Manager | (" + path + ") uploading") paths := strings.SplitN(path, "/", 2) _, err := a.uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(paths[0]), ContentType: aws.String(mimeType), Key: aws.String(paths[1]), Body: bytes.NewReader(data), }) return err == nil } func (a *awsManager) presign(path string, expiration time.Duration) (string, error) { paths := strings.SplitN(path, "/", 2) requestInput := s3.GetObjectInput{ Bucket: aws.String(paths[0]), Key: aws.String(paths[1]), } request, _ := a.downloader.S3.GetObjectRequest(&requestInput) if request.Error != nil { log.Println("Manager | (" + path + ") failed presiging, cause: " + request.Error.Error()) } return request.Presign(expiration) } func (a *awsManager) download(path string) ([]byte, error) { paths := strings.SplitN(path, "/", 2) requestInput := s3.GetObjectInput{ Bucket: aws.String(paths[0]), Key: aws.String(paths[1]), } buf := aws.NewWriteAtBuffer([]byte{}) _, err := a.downloader.Download(buf, &requestInput) if err != nil { log.Println("Manager | (" + path + ") failed downloading, cause: " + err.Error()) } return buf.Bytes(), err } func (a *awsManager) doesFileExist(path string) bool { paths := strings.SplitN(path, "/", 2) requestInput := s3.HeadObjectInput{ Bucket: aws.String(paths[0]), Key: aws.String(paths[1]), } _, err := a.downloader.S3.HeadObject(&requestInput) if err != nil && err.Error() == s3.ErrCodeNoSuchKey { return true } return false }