DEV Community

Naresh Maharaj for Aerospike

Posted on • Originally published at developer.aerospike.com

Using AWS IAM for client authentication

heroimg

In this blog post, we detail how to create an Amazon Managed Streaming for Apache Kafka (Amazon MSK) resource using AWS Identity and Access Management (AWS IAM) in roles and policies to authenticate user access. In the initial step, we establish an Aerospike Database cluster and insert sample messages into the database. Subsequently, we observe in real time how these messages are streamed to Amazon MSK using Aerospike's Kafka Source Connector. <!--truncate-->Below we provide a comprehensive, step-by-step guide for users to successfully implement this process.

image

AWS MKS Kafka

In this section, you will set up a simple three-node Kafka cluster.

  1. Visit the AWS console and select MSK service.

image

  1. Create a new cluster by selecting Create ClusterQuick Create.

image

  1. Select the provisioned cluster and instance type of kafka.t3.small.

image

  1. Select the EBS storage type per broker of 10 GB.

image

NOTE: Take note of the VPC, subnets, and security group ID, as you will require these details later in the article.

The next step is the critical step where you will create the AWS IAM policy and roles. This setup ensures that the Aerospike Database authenticates using AWS IAM to write data to MSK.

  1. From the AWS Console, select the AWS IAM service.

image

  1. To create a new AWS IAM policy, copy the following JSON and paste it in the JSON tab. Replace region:Account-ID with your own region and AWS account ID.

image

  1. Save the policy and name it msk-tutorial-policy.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster" ], "Resource": [ "arn:aws:kafka:region:Account-ID:cluster/MSKTutorialCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:region:Account-ID:topic/MSKTutorialCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:region:Account-ID:group/MSKTutorialCluster/*" ] } ] } 
Enter fullscreen mode Exit fullscreen mode
  1. Create the IAM role.

image

  1. Under Common Use Cases, select EC2 and then Next.

image

  1. Under Permissions, select the policy named msk-tutorial-policy and then Next.

image

  1. Give the role a name like msk-tutorial-role and click the Create Role button.

Kafka client machine

Next, create a client machine to install the Kafka tools necessary to access our MSK cluster.

  1. Create a new ec2 instance using type t2.micro

image

  1. Use the default AMI: Amazon Linux 2023

image
The AMI may be different depending on your region

  1. Create a key-pair if required. I am using an already existing key-pair.

image

  1. Under Advanced Options.IAM instance profile, select the IAM role created earlier.

image

  1. Launch the instance!

  2. Under instances launched, choose the instance you just created. Click on the ‘Security’ tab and note the security group associated with this instance.
    e.g., sg-0914e6271c97ae4c9 (launch-wizard-1)

  3. Navigate to the VPC section and select Security Groups from the left-hand menu. Locate the security group associated with the MSK cluster, such as sg-e5f51dfb, and choose Edit Inbound Rules.

  4. Create a new rule to allow all traffic from the new ec2 instance.

image

Kafka topics

After successfully establishing your initial Kafka cluster and Kafka client machine, proceed to conduct testing. Verify the functionality by accessing the MSK cluster, creating a topic, producing and consuming sample messages, and ensuring that everything operates as anticipated.

  1. From the MSK Cluster, note the Kafka version being used. This examples uses 2.8.1.

  2. From the Kafka client machine, install Java 11+.

sudo yum -y install java-11 
Enter fullscreen mode Exit fullscreen mode
  1. Download Apache Kafka using wget, then extract the archive using tar.
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz tar -xzf kafka_2.12-2.8.1.tgz 
Enter fullscreen mode Exit fullscreen mode
  1. To use IAM, you will need the MSK IAM Auth jar file. Download the jar to the Kafka libs folder you just extracted.
cd kafka_2.12-2.8.1/libs/ wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar cd ../bin/ 
Enter fullscreen mode Exit fullscreen mode
  1. Create a file called client.properties to use when authenticating to MSK. It will define the SASL mechanism to use and reference the Java class file that will handle your IAM callbacks.
cat <<EOF> client.properties security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler EOF 
Enter fullscreen mode Exit fullscreen mode

Creating topics

  1. Go to the AWS Console and view the MSK Cluster Client Information. There will be three endpoints to choose from, but you only require one.

Example choose:

B-2.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098

image

  1. From the kafka/bin folder, run the command to create a topic. Let's call it aerospike-airforce-1.
export BootstrapServerString="b-2.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098" 
Enter fullscreen mode Exit fullscreen mode
./kafka-topics.sh --create --bootstrap-server $BootstrapServerString --command-config client.properties --replication-factor 3 --partitions 1 --topic aerospike-airforce-1 
Enter fullscreen mode Exit fullscreen mode

Listing topics

To list the topics, use the following command. Notice our latest topic, called aerospike-airforce-1, just showed up.

./kafka-topics.sh --bootstrap-server $BootstrapServerString --command-config client.properties --list MSKTutorialTopic __amazon_msk_canary __consumer_offsets aerospike aerospike-airforce-1 
Enter fullscreen mode Exit fullscreen mode

Producer and consumer

I agree that this is more of a Kafka-101 rather than a straightforward Hello-World scenario. Nonetheless, it is essential to test our configuration by sending and receiving messages from the designated Kafka topic before proceeding further.

Produce some messages by opening a new window and running the following Kafka producer command. Type three or four messages, hitting the 'Return' key after each message

./kafka-console-producer.sh --broker-list $BootstrapServerString --producer.config client.properties --topic aerospike-airforce-1 >Instrument Check >Pre flight checks confirmed >Ready for takeoff >Full throttle, flaps 
Enter fullscreen mode Exit fullscreen mode

You're now ready to start a client consumer application. Open a new window and run the consumer. You should now see the same messages you published earlier.

./kafka-console-consumer.sh --bootstrap-server $BootstrapServerString --consumer.config client.properties --topic aerospike-airforce-1 --from-beginning Instrument Check Pre flight checks confirmed Ready for takeoff Full throttle, flaps 
Enter fullscreen mode Exit fullscreen mode

Database source

Let's review your achievements thus far. You've established a 3-node Kafka cluster in AWS utilizing MSK, incorporating IAM roles and permissions. Additionally, you have successfully created topics and demonstrated the production and consumption of messages using the IAM credentials established during the setup.

The next phase of your journey involves installing the Aerospike Database, inserting messages, and configuring a simple XDR component. XDR is a Cross Datacenter Replication tool and is crucial for transmitting data from the Aerospike Database to the Aerospike Kafka Source Connector allowing us to subsequently forward messages to Amazon MSK.

Create the Aerospike Database

  1. Start by creating a new ec2 instance. For this demo, you can use Linux Centos 8

Rocky 8 AMI: ami-043ceee68871e0bb5 ( us-east-1 )

image

  1. Select the instance type as t2.medium.

image

  1. Add the extra volume for the Aerospike data storage layer. EBS volume is all that is required for now.

image

  1. Launch the instance and connect to the host using ssh. If you have an Aerospike license feature file, upload it to the instance.

Install the Aerospike Database server

  1. Run the following to install the Aerospike Database Server.
export VER="6.1.0.2" sudo yum install java python3 openssl-devel wget git gcc maven bind-utils sysstat nc -y wget -O aerospike-tools.tgz 'https://www.aerospike.com/download/tools/latest/artifact/el8' tar -xvf aerospike-tools.tgz cd aerospike-tools_* sudo ./dep-check sudo ./asinstall wget -O aerospike.tgz https://enterprise.aerospike.com/enterprise/download/server/$VER/artifact/el8 tar -xvf aerospike.tgz cd aerospike-server-enterprise-$VER-el8 sudo ./asinstall sudo mkdir -p /var/log/aerospike/ sudo systemctl enable aerospike 
Enter fullscreen mode Exit fullscreen mode
  1. Confirm the storage disk for Aerospike.
lsblk NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT xvda 202:0 0 10G 0 disk └─xvda1 202:1 0 10G 0 part / xvdb 202:16 0 10G 0 disk <<----------------- This one! 
Enter fullscreen mode Exit fullscreen mode
  1. When its data is available, replace the Aerospike configuration file under /etc/aerospike/aerospike.conf with the configuration file listed below, also replacing the following lines:
    • Under heartbeat.address add in your internal 172.x.x.x address
    • For xdr.dc.node-address-port enter the {kafka-client-machine-address}:8080

Aerospike Database configuration file for use with systemd

service { # paxos-single-replica-limit 1 # Number of nodes where the replica count is automatically proto-fd-max 15000 service-threads 10 feature-key-file /etc/aerospike/features.conf node-id A1 cluster-name CLA } logging { file /var/log/aerospike/aerospike.log { context any info } } # public and private addresses network { service { address any port 3000 } heartbeat { mode mesh address 172.31.94.201 port 3002 # Heartbeat port for this node. interval 150 # controls how often to send a heartbeat packet timeout 10 # number of intervals after which a node is considered to be missing } fabric { port 3001 } info { port 3003 } } namespace test { replication-factor 2 memory-size 40G default-ttl 0 index-type shmem high-water-disk-pct 50 high-water-memory-pct 60 stop-writes-pct 90 nsup-period 0 storage-engine device { device /dev/xvdb data-in-memory false write-block-size 128K min-avail-pct 5 } } xdr { # Change notification XDR block that round-robins between two connector nodes dc aerospike-kafka-source { connector true node-address-port 172.31.58.190 8080 namespace test { } } } 
Enter fullscreen mode Exit fullscreen mode

Start the Aerospike service

  1. Copy the license feature file to the aerospike configuration directory.
sudo cp features.conf /etc/aerospike/ 
Enter fullscreen mode Exit fullscreen mode
  1. Start the Aerospike server and check the logs to ensure there are no errors.
sudo systemctl start aerospike 
Enter fullscreen mode Exit fullscreen mode
sudo systemctl status aerospike 
Enter fullscreen mode Exit fullscreen mode

Aerospike Kafka Source Connector

The seamless flow of data from Aerospike Database Enterprise Edition to Apache Kafka hinges on the utilization of the Aerospike Kafka source (outbound) connector. This connector subscribes to change notifications. Upon receiving these notifications, the connector converts them into messages, which are dispatched to Kafka topics. Going back to the ec2 instance you created earlier with our Kafka client configured, go ahead and install the Aerospike Kafka Source Connector. This is your outbound connector to send data from the Aerospike to MSK.

sudo yum install java #( install 11+ JDK ) 
Enter fullscreen mode Exit fullscreen mode
wget https://enterprise.aerospike.com/artifacts/enterprise/aerospike-kafka-outbound/5.0.1/aerospike-kafka-outbound-5.0.1-1.noarch.rpm 
Enter fullscreen mode Exit fullscreen mode
sudo rpm -i aerospike-kafka-outbound-5.0.0-1.noarch.rpm 
Enter fullscreen mode Exit fullscreen mode

Configure the outbound connector

The terms “outbound” and “source connector” are used interchangeably in this article.

  1. Locate the following file on the Kafka client box: /etc/aerospike-kafka-outbound/aerospike-kafka-outbound.yml.

  2. Replace the broker address for one of the node addresses in the MSK Kafka cluster producer-props.bootstrap.servers.

  3. Then add the following contents to the file with the changes that have been outlined.

# Change the configuration for your use case. # # Refer to https://www.aerospike.com/docs/connectors/enterprise/kafka/outbound/configuration/index.html # for details. # The connector's listening ports, TLS, and network interface. service: port: 8080 # Format of the Kafka destination message. format: mode: flat-json metadata-key: metadata # Aerospike record routing to a Kafka destination. routing: mode: static destination: aerospike # Kafka producer initialization properties. producer-props: bootstrap.servers: - b-3.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098 ssl.truststore.location: /etc/aerospike-kafka-outbound/kafka.client.truststore.jks ssl.truststore.password: changeit security.protocol: SASL_SSL sasl.mechanism: AWS_MSK_IAM sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=default; sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler # The logging properties. logging: file: /var/log/aerospike-kafka-outbound/aerospike-kafka-outbound.log enable-console-logging: true levels: root: debug record-parser: debug server: debug com.aerospike.connect: debug ticker-interval: 3600 
Enter fullscreen mode Exit fullscreen mode
  1. Create the CA certificate trust store for use in the Kafka Outbound Connector config. You can see the SSL trust store location referenced in the file above as ssl.truststore.location.
sudo cp /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts /etc/aerospike-kafka-outbound/kafka.client.truststore.jks 
Enter fullscreen mode Exit fullscreen mode
sudo chmod 755 /etc/aerospike-kafka-outbound/kafka.client.truststore.jks 
Enter fullscreen mode Exit fullscreen mode
  1. Finally, make the AWS IAM Kafka Auth Jar file available to the Aerospike Outbound Kafka Connector. This is the same jar file that you downloaded and added to the kafka/libs folder.
sudo cp kafka_2.12-2.8.1/libs/aws-msk-iam-auth-1.1.1-all.jar /opt/aerospike-kafka-outbound/lib/aws-msk-iam-auth-1.1.1-all.jar 
Enter fullscreen mode Exit fullscreen mode
  1. Start the service.
sudo systemctl enable aerospike-kafka-outbound 
Enter fullscreen mode Exit fullscreen mode
sudo systemctl start aerospike-kafka-outbound 
Enter fullscreen mode Exit fullscreen mode

Send data from Aerospike to Kafka

  1. Open a separate window so you can list all messages on the Aerospike Kafka topic. Start by adding one of the private endpoint bootstrap servers as an environment variable for ease of use.
export BootstrapServerString="b-3.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098" 
Enter fullscreen mode Exit fullscreen mode
  1. Run the consumer client as follows:
./kafka-console-consumer.sh --bootstrap-server $BootstrapServerString --consumer.config client.properties --topic aerospike --from-beginning 
Enter fullscreen mode Exit fullscreen mode
  1. In a new window, start AQL, the Aerospike command line client which connects to your Aerospike Database.
aql -U auser -P a-secret-pwd 
Enter fullscreen mode Exit fullscreen mode
  1. Insert some data
insert into test (pk, a) values(400, "Your winning lottery ticket awaits you") 
Enter fullscreen mode Exit fullscreen mode
  1. Check to see if the message appears in the Kafka consumer window
{"metadata":{"namespace":"test","userKey":400,"digest":"W7eGav2hKfOU00xx7mnOPYa2uCo=","msg":"write","gen":1,"lut":1681488437767,"exp":0},"a":"Your winning lottery ticket awaits you"} 
Enter fullscreen mode Exit fullscreen mode

Conclusion

You've just discovered how straightforward it is to transmit data from Aerospike to AWS MSK Kafka while ensuring client authentication through AWS IAM permissions! From establishing an Aerospike Database from scratch to configuring the AWS MSK Kafka cluster and employing the Aerospike Outbound Kafka Connector, you've effortlessly constructed a real-time streaming data pipeline. Congratulations on this accomplishment!

Share your experience! Your feedback is important to us. Join our Aerospike community!

Top comments (0)