@@ -44,6 +44,9 @@ func NewBlobStore(containerClient *container.Client, options *BlobStoreOptions)
4444
4545// ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns
4646// the actual partitions that were claimed.
47+ //
48+ // If we fail to claim ownership because of another update then it will be omitted from the
49+ // returned slice of [Ownership]'s. It is not considered an error.
4750func (b * BlobStore ) ClaimOwnership (ctx context.Context , partitionOwnership []azeventhubs.Ownership , options * azeventhubs.ClaimOwnershipOptions ) ([]azeventhubs.Ownership , error ) {
4851var ownerships []azeventhubs.Ownership
4952
@@ -54,13 +57,12 @@ func (b *BlobStore) ClaimOwnership(ctx context.Context, partitionOwnership []aze
5457if err != nil {
5558return nil , err
5659}
57- lastModified , etag , err := b .setMetadata (ctx , blobName , newOwnershipBlobMetadata ( po ), po . ETag )
60+ lastModified , etag , err := b .setOwnershipMetadata (ctx , blobName , po )
5861
5962if err != nil {
60- if bloberror .HasCode (err , bloberror .ConditionNotMet ) {
61- // we can fail to claim ownership and that's okay - it's expected that clients will
62- // attempt to claim with whatever state they hold locally. If they fail it just means
63- // someone else claimed ownership before them.
63+ if bloberror .HasCode (err ,
64+ bloberror .ConditionNotMet , // updated before we could update it
65+ bloberror .BlobAlreadyExists ) { // created before we could create it
6466continue
6567}
6668
@@ -179,25 +181,28 @@ func (b *BlobStore) ListOwnership(ctx context.Context, fullyQualifiedNamespace s
179181}
180182
181183// UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
184+ //
185+ // NOTE: This function doesn't attempt to prevent simultaneous checkpoint updates - ownership is assumed.
182186func (b * BlobStore ) UpdateCheckpoint (ctx context.Context , checkpoint azeventhubs.Checkpoint , options * azeventhubs.UpdateCheckpointOptions ) error {
183187blobName , err := nameForCheckpointBlob (checkpoint )
184188
185189if err != nil {
186190return err
187191}
188192
189- _ , _ , err = b .setMetadata (ctx , blobName , newCheckpointBlobMetadata ( checkpoint ), nil )
193+ _ , _ , err = b .setCheckpointMetadata (ctx , blobName , checkpoint )
190194return err
191195}
192196
193- func (b * BlobStore ) setMetadata (ctx context.Context , blobName string , blobMetadata map [string ]* string , etag * azcore.ETag ) (* time.Time , azcore.ETag , error ) {
197+ func (b * BlobStore ) setOwnershipMetadata (ctx context.Context , blobName string , ownership azeventhubs.Ownership ) (* time.Time , azcore.ETag , error ) {
198+ blobMetadata := newOwnershipBlobMetadata (ownership )
194199blobClient := b .cc .NewBlockBlobClient (blobName )
195200
196- if etag != nil {
201+ if ownership . ETag != nil {
197202setMetadataResp , err := blobClient .SetMetadata (ctx , blobMetadata , & blob.SetMetadataOptions {
198203AccessConditions : & blob.AccessConditions {
199204ModifiedAccessConditions : & blob.ModifiedAccessConditions {
200- IfMatch : etag ,
205+ IfMatch : ownership . ETag ,
201206},
202207},
203208})
@@ -207,29 +212,52 @@ func (b *BlobStore) setMetadata(ctx context.Context, blobName string, blobMetada
207212}
208213
209214return setMetadataResp .LastModified , * setMetadataResp .ETag , nil
210- } else {
211- setMetadataResp , err := blobClient .SetMetadata (ctx , blobMetadata , nil )
215+ }
212216
213- if err == nil {
214- return setMetadataResp .LastModified , * setMetadataResp .ETag , nil
215- }
217+ uploadResp , err := blobClient .Upload (ctx , streaming .NopCloser (bytes .NewReader ([]byte {})), & blockblob.UploadOptions {
218+ Metadata : blobMetadata ,
219+ AccessConditions : & blob.AccessConditions {
220+ ModifiedAccessConditions : & blob.ModifiedAccessConditions {
221+ IfNoneMatch : to .Ptr (azcore .ETag ("*" )),
222+ },
223+ },
224+ })
216225
217- if ! bloberror . HasCode ( err , bloberror . BlobNotFound ) {
218- return nil , "" , err
219- }
226+ if err != nil {
227+ return nil , "" , err
228+ }
220229
221- // in JS they check to see if the error is BlobNotFound. If it is, then they
222- // do a full upload of a blob instead.
223- uploadResp , err := blobClient .Upload (ctx , streaming .NopCloser (bytes .NewReader ([]byte {})), & blockblob.UploadOptions {
224- Metadata : blobMetadata ,
225- })
230+ return uploadResp .LastModified , * uploadResp .ETag , nil
231+ }
226232
227- if err != nil {
228- return nil , "" , err
229- }
233+ // setCheckpointMetadata sets the metadata for a checkpoint, falling back to creating
234+ // the blob if it doesn't already exist.
235+ //
236+ // NOTE: unlike [setOwnershipMetadata] this function doesn't attempt to prevent simultaneous
237+ // checkpoint updates - ownership is assumed.
238+ func (b * BlobStore ) setCheckpointMetadata (ctx context.Context , blobName string , checkpoint azeventhubs.Checkpoint ) (* time.Time , azcore.ETag , error ) {
239+ blobMetadata := newCheckpointBlobMetadata (checkpoint )
240+ blobClient := b .cc .NewBlockBlobClient (blobName )
241+
242+ setMetadataResp , err := blobClient .SetMetadata (ctx , blobMetadata , nil )
243+
244+ if err == nil {
245+ return setMetadataResp .LastModified , * setMetadataResp .ETag , nil
246+ }
230247
231- return uploadResp .LastModified , * uploadResp .ETag , nil
248+ if ! bloberror .HasCode (err , bloberror .BlobNotFound ) {
249+ return nil , "" , err
232250}
251+
252+ uploadResp , err := blobClient .Upload (ctx , streaming .NopCloser (bytes .NewReader ([]byte {})), & blockblob.UploadOptions {
253+ Metadata : blobMetadata ,
254+ })
255+
256+ if err != nil {
257+ return nil , "" , err
258+ }
259+
260+ return uploadResp .LastModified , * uploadResp .ETag , nil
233261}
234262
235263func nameForCheckpointBlob (a azeventhubs.Checkpoint ) (string , error ) {
0 commit comments