Skip to content

Commit 5945eac

Browse files
authored
Add cluster name to s3 prefix for all objects (#1429)
1 parent b40eaeb commit 5945eac

File tree

14 files changed

+62
-57
lines changed

14 files changed

+62
-57
lines changed

cli/cmd/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ var _exportCmd = &cobra.Command{
659659
exit.Error(err)
660660
}
661661

662-
err = awsClient.DownloadFileFromS3(info.ClusterConfig.Bucket, apiSpec.RawAPIKey(), path.Join(baseDir, apiSpec.FileName))
662+
err = awsClient.DownloadFileFromS3(info.ClusterConfig.Bucket, apiSpec.RawAPIKey(info.ClusterConfig.ClusterName), path.Join(baseDir, apiSpec.FileName))
663663
if err != nil {
664664
exit.Error(err)
665665
}

cli/local/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func UpdateAPI(apiConfig *userconfig.API, configPath string, projectID string, d
7070
return nil, "", err
7171
}
7272

73-
newAPISpec := spec.GetAPISpec(apiConfig, projectID, _deploymentID)
73+
newAPISpec := spec.GetAPISpec(apiConfig, projectID, _deploymentID, "")
7474

7575
// apiConfig.Predictor.ModelPath was already added to apiConfig.Predictor.Models for ease of use
7676
if len(apiConfig.Predictor.Models) > 0 {

pkg/operator/operator/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
)
2424

2525
func DownloadAPISpec(apiName string, apiID string) (*spec.API, error) {
26-
s3Key := spec.Key(apiName, apiID)
26+
s3Key := spec.Key(apiName, apiID, config.Cluster.ClusterName)
2727
var api spec.API
2828

2929
if err := config.AWS.ReadJSONFromS3(&api, config.Cluster.Bucket, s3Key); err != nil {

pkg/operator/resources/batchapi/api.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string,
4242
return nil, "", err
4343
}
4444

45-
api := spec.GetAPISpec(apiConfig, projectID, "") // Deployment ID not needed for BatchAPI spec
45+
api := spec.GetAPISpec(apiConfig, projectID, "", config.Cluster.ClusterName) // Deployment ID not needed for BatchAPI spec
4646

4747
if prevVirtualService == nil {
4848
if err := config.AWS.UploadJSONToS3(api, config.Cluster.Bucket, api.Key); err != nil {
4949
return nil, "", errors.Wrap(err, "upload api spec")
5050
}
5151

52-
if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil {
52+
if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil {
5353
return nil, "", errors.Wrap(err, "upload raw api spec")
5454
}
5555

@@ -79,7 +79,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string,
7979
return nil, "", errors.Wrap(err, "upload api spec")
8080
}
8181

82-
if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil {
82+
if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil {
8383
return nil, "", errors.Wrap(err, "upload raw api spec")
8484
}
8585

@@ -153,11 +153,11 @@ func deleteK8sResources(apiName string) error {
153153
func deleteS3Resources(apiName string) error {
154154
return parallel.RunFirstErr(
155155
func() error {
156-
prefix := filepath.Join("apis", apiName)
156+
prefix := filepath.Join(config.Cluster.ClusterName, "apis", apiName)
157157
return config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true)
158158
},
159159
func() error {
160-
prefix := spec.BatchAPIJobPrefix(apiName)
160+
prefix := spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName)
161161
go config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while
162162
return nil
163163
},

pkg/operator/resources/batchapi/enqueue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func randomMessageID() string {
4949
}
5050

5151
func updateLiveness(jobKey spec.JobKey) error {
52-
s3Key := path.Join(jobKey.Prefix(), _enqueuingLivenessFile)
52+
s3Key := path.Join(jobKey.Prefix(config.Cluster.ClusterName), _enqueuingLivenessFile)
5353
err := config.AWS.UploadJSONToS3(time.Now(), config.Cluster.Bucket, s3Key)
5454
if err != nil {
5555
return errors.Wrap(err, "failed to update liveness", jobKey.UserString())

pkg/operator/resources/batchapi/in_progress_cache.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var (
2929
)
3030

3131
func inProgressS3Key(jobKey spec.JobKey) string {
32-
return path.Join(_inProgressFilePrefix, jobKey.APIName, jobKey.ID)
32+
return path.Join(config.Cluster.ClusterName, _inProgressFilePrefix, jobKey.APIName, jobKey.ID)
3333
}
3434

3535
func jobKeyFromInProgressS3Key(s3Key string) spec.JobKey {
@@ -57,15 +57,15 @@ func deleteInProgressFile(jobKey spec.JobKey) error {
5757
}
5858

5959
func deleteAllInProgressFilesByAPI(apiName string) error {
60-
err := config.AWS.DeleteS3Prefix(config.Cluster.Bucket, path.Join(_inProgressFilePrefix, apiName), true)
60+
err := config.AWS.DeleteS3Prefix(config.Cluster.Bucket, path.Join(config.Cluster.ClusterName, _inProgressFilePrefix, apiName), true)
6161
if err != nil {
6262
return err
6363
}
6464
return nil
6565
}
6666

6767
func listAllInProgressJobKeys() ([]spec.JobKey, error) {
68-
s3Objects, err := config.AWS.ListS3Dir(config.Cluster.Bucket, _inProgressFilePrefix, false, nil)
68+
s3Objects, err := config.AWS.ListS3Dir(config.Cluster.Bucket, path.Join(config.Cluster.ClusterName, _inProgressFilePrefix), false, nil)
6969
if err != nil {
7070
return nil, err
7171
}
@@ -79,7 +79,7 @@ func listAllInProgressJobKeys() ([]spec.JobKey, error) {
7979
}
8080

8181
func listAllInProgressJobKeysByAPI(apiName string) ([]spec.JobKey, error) {
82-
s3Objects, err := config.AWS.ListS3Dir(config.Cluster.Bucket, path.Join(_inProgressFilePrefix, apiName), false, nil)
82+
s3Objects, err := config.AWS.ListS3Dir(config.Cluster.Bucket, path.Join(config.Cluster.ClusterName, _inProgressFilePrefix, apiName), false, nil)
8383
if err != nil {
8484
return nil, err
8585
}

pkg/operator/resources/batchapi/job.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,15 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err
134134

135135
func downloadJobSpec(jobKey spec.JobKey) (*spec.Job, error) {
136136
jobSpec := spec.Job{}
137-
err := config.AWS.ReadJSONFromS3(&jobSpec, config.Cluster.Bucket, jobKey.SpecFilePath())
137+
err := config.AWS.ReadJSONFromS3(&jobSpec, config.Cluster.Bucket, jobKey.SpecFilePath(config.Cluster.ClusterName))
138138
if err != nil {
139139
return nil, errors.Wrap(err, "unable to download job specification", jobKey.UserString())
140140
}
141141
return &jobSpec, nil
142142
}
143143

144144
func uploadJobSpec(jobSpec *spec.Job) error {
145-
err := config.AWS.UploadJSONToS3(jobSpec, config.Cluster.Bucket, jobSpec.SpecFilePath())
145+
err := config.AWS.UploadJSONToS3(jobSpec, config.Cluster.Bucket, jobSpec.SpecFilePath(config.Cluster.ClusterName))
146146
if err != nil {
147147
return err
148148
}

pkg/operator/resources/batchapi/job_state.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func getStatusCode(lastUpdatedMap map[string]time.Time) status.JobCode {
9696
}
9797

9898
func getJobState(jobKey spec.JobKey) (*JobState, error) {
99-
s3Objects, err := config.AWS.ListS3Prefix(config.Cluster.Bucket, jobKey.Prefix(), false, nil)
99+
s3Objects, err := config.AWS.ListS3Prefix(config.Cluster.Bucket, jobKey.Prefix(config.Cluster.ClusterName), false, nil)
100100
if err != nil {
101101
return nil, errors.Wrap(err, "failed to get job state", jobKey.UserString())
102102
}
@@ -135,7 +135,7 @@ func getJobStateFromFiles(jobKey spec.JobKey, lastUpdatedFileMap map[string]time
135135

136136
func getMostRecentlySubmittedJobStates(apiName string, count int) ([]*JobState, error) {
137137
// a single job state may include 5 files on average, overshoot the number of files needed
138-
s3Objects, err := config.AWS.ListS3Prefix(config.Cluster.Bucket, spec.BatchAPIJobPrefix(apiName), false, pointer.Int64(int64(count*_averageFilesPerJobState)))
138+
s3Objects, err := config.AWS.ListS3Prefix(config.Cluster.Bucket, spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName), false, pointer.Int64(int64(count*_averageFilesPerJobState)))
139139
if err != nil {
140140
return nil, err
141141
}
@@ -197,7 +197,7 @@ func setStatusForJob(jobKey spec.JobKey, jobStatus status.JobCode) error {
197197
}
198198

199199
func setEnqueuingStatus(jobKey spec.JobKey) error {
200-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobEnqueuing.String()))
200+
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobEnqueuing.String()))
201201
if err != nil {
202202
return err
203203
}
@@ -211,7 +211,7 @@ func setEnqueuingStatus(jobKey spec.JobKey) error {
211211
}
212212

213213
func setRunningStatus(jobKey spec.JobKey) error {
214-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobRunning.String()))
214+
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobRunning.String()))
215215
if err != nil {
216216
return err
217217
}
@@ -225,7 +225,7 @@ func setRunningStatus(jobKey spec.JobKey) error {
225225
}
226226

227227
func setStoppedStatus(jobKey spec.JobKey) error {
228-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobStopped.String()))
228+
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobStopped.String()))
229229
if err != nil {
230230
return err
231231
}
@@ -239,7 +239,7 @@ func setStoppedStatus(jobKey spec.JobKey) error {
239239
}
240240

241241
func setSucceededStatus(jobKey spec.JobKey) error {
242-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobSucceeded.String()))
242+
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobSucceeded.String()))
243243
if err != nil {
244244
return err
245245
}
@@ -253,7 +253,7 @@ func setSucceededStatus(jobKey spec.JobKey) error {
253253
}
254254

255255
func setCompletedWithFailuresStatus(jobKey spec.JobKey) error {
256-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobCompletedWithFailures.String()))
256+
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobCompletedWithFailures.String()))
257257
if err != nil {
258258
return err
259259
}
@@ -267,7 +267,7 @@ func setCompletedWithFailuresStatus(jobKey spec.JobKey) error {
267267
}
268268

269269
func setWorkerErrorStatus(jobKey spec.JobKey) error {
270-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobWorkerError.String()))
270+
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobWorkerError.String()))
271271
if err != nil {
272272
return err
273273
}
@@ -281,7 +281,7 @@ func setWorkerErrorStatus(jobKey spec.JobKey) error {
281281
}
282282

283283
func setWorkerOOMStatus(jobKey spec.JobKey) error {
284-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobWorkerOOM.String()))
284+
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobWorkerOOM.String()))
285285
if err != nil {
286286
return err
287287
}
@@ -295,7 +295,7 @@ func setWorkerOOMStatus(jobKey spec.JobKey) error {
295295
}
296296

297297
func setEnqueueFailedStatus(jobKey spec.JobKey) error {
298-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobEnqueueFailed.String()))
298+
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobEnqueueFailed.String()))
299299
if err != nil {
300300
return err
301301
}
@@ -309,7 +309,7 @@ func setEnqueueFailedStatus(jobKey spec.JobKey) error {
309309
}
310310

311311
func setUnexpectedErrorStatus(jobKey spec.JobKey) error {
312-
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobUnexpectedError.String()))
312+
err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobUnexpectedError.String()))
313313
if err != nil {
314314
return err
315315
}

pkg/operator/resources/batchapi/k8s_specs.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func pythonPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) {
5151
if container.Name == operator.APIContainerName {
5252
containers[i].Env = append(container.Env, kcore.EnvVar{
5353
Name: "CORTEX_JOB_SPEC",
54-
Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(),
54+
Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(config.Cluster.ClusterName),
5555
})
5656
}
5757
}
@@ -100,7 +100,7 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, erro
100100
if container.Name == operator.APIContainerName {
101101
containers[i].Env = append(container.Env, kcore.EnvVar{
102102
Name: "CORTEX_JOB_SPEC",
103-
Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(),
103+
Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(config.Cluster.ClusterName),
104104
})
105105
}
106106
}
@@ -150,7 +150,7 @@ func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) {
150150
if container.Name == operator.APIContainerName {
151151
containers[i].Env = append(container.Env, kcore.EnvVar{
152152
Name: "CORTEX_JOB_SPEC",
153-
Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(),
153+
Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(config.Cluster.ClusterName),
154154
})
155155
}
156156
}

pkg/operator/resources/realtimeapi/api.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A
5252
deploymentID = prevDeployment.Labels["deploymentID"]
5353
}
5454

55-
api := spec.GetAPISpec(apiConfig, projectID, deploymentID)
55+
api := spec.GetAPISpec(apiConfig, projectID, deploymentID, config.Cluster.ClusterName)
5656

5757
if prevDeployment == nil {
5858
if err := config.AWS.UploadJSONToS3(api, config.Cluster.Bucket, api.Key); err != nil {
5959
return nil, "", errors.Wrap(err, "upload api spec")
6060
}
6161

62-
if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil {
62+
if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil {
6363
return nil, "", errors.Wrap(err, "upload raw api spec")
6464
}
6565

@@ -96,7 +96,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A
9696
return nil, "", errors.Wrap(err, "upload api spec")
9797
}
9898

99-
if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil {
99+
if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil {
100100
return nil, "", errors.Wrap(err, "upload raw api spec")
101101
}
102102

@@ -152,7 +152,7 @@ func RefreshAPI(apiName string, force bool) (string, error) {
152152
return "", err
153153
}
154154

155-
api = spec.GetAPISpec(api.API, api.ProjectID, deploymentID())
155+
api = spec.GetAPISpec(api.API, api.ProjectID, deploymentID(), config.Cluster.ClusterName)
156156

157157
if err := config.AWS.UploadJSONToS3(api, config.Cluster.Bucket, api.Key); err != nil {
158158
return "", errors.Wrap(err, "upload api spec")
@@ -436,7 +436,7 @@ func deleteK8sResources(apiName string) error {
436436
}
437437

438438
func deleteS3Resources(apiName string) error {
439-
prefix := filepath.Join("apis", apiName)
439+
prefix := filepath.Join(config.Cluster.ClusterName, "apis", apiName)
440440
return config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true)
441441
}
442442

0 commit comments

Comments
 (0)