DEV Community

Flavio Curella
Flavio Curella

Posted on

Refining exceptions with Context Decorators

Very often, 3rd party API clients are designed to return a generic "API Exception", with details contained inside some property. One notorious example of this is boto3: most exceptions come as a ClientError with a response.

Usually this is fine, but occasionally I need finer-grained exceptions than what the client is giving me.

One common scenario is configuring a celery task to retry a while after the rate limiting: I want to get errors for all exceptions, but for rate-limiting, I just want to retry the task later. Celery offers a convenient autoretry_for argument for that, but we can't use it just for throttling because boto3 does not return an exception specific enough.

I could wrap the login in a try ... except clause and inspect the exception, but that get repetitive pretty quickly, especially as I add more and more tasks.

For these kind of situations, I create a context decorator. The decorator inspects the exception for me and, if it's the one I'm looking, raises a custom exception that then I can catch:

# myproject/exceptions.py  import contexlib from botocore.exceptions import ClientError class ThrottleException(ClientError): pass class raise_throttle(contextlib.ContextDecorator): def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): if exc_type == ClientError: if exc_value.response.get("Error", {}).get("Code") == "Throttling": raise ThrottleException( exc_value.response, exc_value.operation_name ) from exc_value # returning `False` makes the decorator  # raise the original exception, if any.  return False 

This decorator can be used on any function or method that calls the API:

# myproject/myapp/aws.py import boto3 @raise_throttle() def upload(content): s3 = boto3.resource("s3") bucket = s3.Bucket("my-bucket") bucket.put_object( Body=b"lorem ipsum", Key="Hamlet.txt", ) 

And the exception can be caught by celery's autoretry_for:

# myproject/myapp/tasks.py  from myproject.celery import app from myproject.exceptions import ThrottleException from myproject.myapp.aws import upload @app.task(autoretry_for=(ThrottleException,)) def my_task(content): upload(content) 

But I don't want to retry the task immediately, since the throttling takes a while to be lifted. Therefore I create a Task base class that retries with exponential backoff:

# myproject/tasks.py  import random from celery import Task def jitter(jitter_max=1.4): return random.uniform(1, jitter_max) def exponential(retries, factor=3): return (factor ** (retries + 1)) def exp_jitter(retries, exp_factor=3, jitter_max=1.4): return exponential(retries, exp_factor) * jitter(jitter_max) class ExpBackoffTask(Task): abstract = True max_retries = 10 def retry(self, *args, **kwargs): countdown = kwargs.get('countdown', None) if countdown is None: # if no explicit countdown is given, use an exponential backoff  # with some random jitter, giving us a top-end under 24 hours at  # which the last retry will be attempted.  kwargs['countdown'] = int( exp_jitter(self.request.retries) ) return super().retry(*args, **kwargs) 

I can then use the custom class as base:

# myproject/myapp/tasks.py  from myproject.celery import app from myproject.exceptions import ThrottleException from myproject.tasks import ExpBackoffTask from myproject.myapp.aws import upload @app.task(base=ExpBackoffTask, autoretry_for=(ThrottleException,)) def my_task(content): upload(content) 

Top comments (0)