DEV Community

Cover image for Event Based System with Localstack (Elixir Edition): Notifing to SQS when a file its uploaded
Nicol Acosta
Nicol Acosta

Posted on

Event Based System with Localstack (Elixir Edition): Notifing to SQS when a file its uploaded

Configure SQS

Easy! if you read the past post "Uploading files to S3 with PresignedURL's" you know how to configure in the docker-compose.yml file, so add SQS to the localstack init its simple

The git diff its this

 localstack: image: localstack/localstack:latest ports: - 4566:4566 environment: # own env vars BUCKET_NAME: files + BUCKET_QUEUE: new_files_queue + QUEUES: general_events_queue,example_queue  # service env vars DEFAULT_REGION: us-west-2 AWS_ACCESS_KEY_ID: test AWS_SECRET_ACCESS_KEY: test - SERVICES: s3 + SERVICES: s3,sqs  DISABLE_CORS_CHECKS: 1 PROVIDER_OVERRIDE_S3: asf S3_SKIP_SIGNATURE_VALIDATION: 1 volumes: - ./.localstack:/var/lib/localstack - ./init_localstack.sh:/etc/localstack/init/ready.d/init_localstack.sh 
Enter fullscreen mode Exit fullscreen mode

Full code here

And add this diff of the localstack_init.sh file

... echo "########### Setting resource names as env variables ###########" LOCALSTACK_DUMMY_ID=000000000000  + guess_arn_for_sqs() { + local QUEUE_NAME=$1 + echo "arn:aws:sqs:${DEFAULT_REGION}:${LOCALSTACK_DUMMY_ID}:$QUEUE_+ NAME" + } + + create_queue() { + local QUEUE_NAME_TO_CREATE=$1 + awslocal sqs create-queue\ + --region $DEFAULT_REGION\ + --queue-name $QUEUE_NAME_TO_CREATE + } + + echo "########### Creating upload file event SQS ###########" + create_queue $BUCKET_QUEUE + BUCKET_QUEUE_ARN=$(guess_arn_for_sqs $BUCKET_QUEUE) + + echo "########### Creating queues in SQS ###########" + IFS=',' + read -ra Queues <<< "$QUEUES" + for q in "${Queues[@]}"; + do + create_queue $q + done + echo "########### Create S3 bucket ###########" awslocal s3api create-bucket\  --region $DEFAULT_REGION\ ... ] }'  + + echo "########### Set S3 bucket notification configurations ###########" + aws --endpoint-url=http://localhost:4566 s3api put-bucket-notification-configuration\ + --bucket $BUCKET_NAME\ + --notification-configuration '{ + "QueueConfigurations": + [ + { + "QueueArn": "'"$BUCKET_QUEUE_ARN"'", + "Events": ["s3:ObjectCreated:Put"] + } + ] + }' + echo "########### List S3 bucket ###########" awslocal s3api list-buckets + + echo "########### Get S3 bucket notification configurations ###########" + aws --endpoint-url=http://localhost:4566 s3api get-bucket-+ + notification-configuration\ + --bucket $BUCKET_NAME 
Enter fullscreen mode Exit fullscreen mode

Full code here

And add this config to the config.exs

aws_host = System.get_env("AWS_HOST") aws_region = System.get_env("AWS_REGION") aws_port = System.get_env("AWS_PORT") config :ex_aws, :sqs, scheme: "http://", region: aws_region, host: aws_host, port: aws_port, base_queue_url: "http://#{aws_host}:#{aws_port}/000000000000/", new_files_queue: System.get_env("AWS_SQS_NEW_FILES_QUEUE"), general_events_queue: System.get_env("AWS_SQS_GENERAL_EVENTS_QUEUE") 
Enter fullscreen mode Exit fullscreen mode

In this config we are configuring the whole AWS part and also a personal config that we will use in next steps

Listening the Queues

Now we have SQS Configured, but, who is listening?

To listen a message broker the most used library is broadway, this library helps to create GenServer's that listens a specific queue and process message by message (or by chunks).

The fisrt queue that needs to be listened it's the new files queues.

Basically get's the file data and adds a message to the next queue, see the function BroadwayGeneralEvents.insert_message(data)

defmodule EventsArqBackend.QueueWorkers.BroadwayNewFileEvents do @moduledoc false use Broadway alias Broadway.Message alias EventsArqBackend.QueueWorkers.BroadwayGeneralEvents def start_link(_opts) do {module, opts} = producer_module() options = opts ++ [queue_url: queue_url()] Broadway.start_link(__MODULE__, name: __MODULE__, producer: [ module: {module, options} ], processors: [ default: [] ] ) end @impl true def handle_message(_processor, %Message{data: data} = message, _context) do decoded_data = case Jason.decode!(data) do %{"Message" => message} -> Jason.decode!(message) message -> message end %{"Records" => records} = decoded_data Enum.each(records, fn %{ "eventName" => event_name, "eventSource" => "aws:s3", "eventTime" => event_time, "s3" => %{ "bucket" => %{ "name" => bucket_name }, "object" => %{ "eTag" => entity_id, "key" => object_key, "size" => object_size } } } when event_name in ["ObjectCreated:Put", "ObjectCreated:Post"] -> data = %{ bucket_name: bucket_name, entity_id: entity_id, object_key: object_key, object_size: object_size, inserted_at: event_time } BroadwayGeneralEvents.insert_message(data) _ -> :ok end) Message.update_data(message, fn _data -> decoded_data end) end defp queue_url, do: "#{Application.get_env(:ex_aws, :sqs)[:base_queue_url]}#{Application.get_env(:ex_aws, :sqs)[:new_files_queue]}" defp producer_module, do: Application.get_env(:events_arq_backend, :broadway)[:producer_module] end 
Enter fullscreen mode Exit fullscreen mode

And the other queue listener

defmodule EventsArqBackend.QueueWorkers.BroadwayGeneralEvents do @moduledoc false use Broadway alias Broadway.Message def start_link(_opts) do {module, opts} = producer_module() options = opts ++ [queue_url: queue_url()] Broadway.start_link(__MODULE__, name: __MODULE__, producer: [ module: {module, options} ], processors: [ default: [] ] ) end @impl true def handle_message(_processor, %Message{data: data} = message, _context) do decoded_data = case Jason.decode!(data) do %{"Message" => message} -> Jason.decode!(message) message -> message end IO.inspect(decoded_data, label: "*****************") # do something with the data # send notification Message.update_data(message, fn _data -> decoded_data end) end def insert_message(data), do: SqsClient.add_message_to_queue(queue_name(), data) defp queue_name, do: Application.get_env(:ex_aws, :sqs)[:general_events_queue] defp queue_url, do: "#{Application.get_env(:ex_aws, :sqs)[:base_queue_url]}#{queue_name()}" defp producer_module, do: Application.get_env(:events_arq_backend, :broadway)[:producer_module] end 
Enter fullscreen mode Exit fullscreen mode

So... 2 queues for the same event? why?

Exist a reason, normally all the queues that you use in sqs are the .fifo queues, and the queue for the s3 must be a default queue (no-.fifo-queue) and that's the reason that i created 2 queues, a regular queue for s3 events and a .fifo queue for the other ones

The full code of this changes it's here

In the next post we will:

  • Create a Channel on Phoenix
  • Integrate Phoenix Channels to a React project
  • Broadcast to all connected clients for some update

See you in the next posts

Top comments (0)