DEV Community

Cover image for Apache Airflow In EKS Cluster

Apache Airflow In EKS Cluster

Abstract

  • TL;DR
  • Airflow is one of the most popular tools for running workflows especially data-pipeline.
  • A successful pipeline moves data efficiently, minimizing pauses and blockages between tasks, keeping every process along the way operational. Apache Airflow provides a single customizable environment for building and managing data pipelines
  • In this post, it provides step-by-step to deploy airflow on EKS cluster using Helm for the default chart with customization in values.yaml, cdk for creating AWS resources such as EFS, node group with Taints for pod toleration in the SPOT instance.

Table Of Contents


πŸš€ Architecture Overview

  • This blog guide you deploy airflow on an existing EKS cluster using namespace airflow and its own resources
  • Prerequiste:
    • EKS cluster
    • IAM admin role or enough permission to create AWS resources
    • CDK installation (Typescript)
    • Helm-chart

Architecture Overview

  • Airflow components: Worker, scheduler, web server, flower. Other things are airflow database, redis and sync.
  • Use EFS to share logs, DAGs between components such as worker, scheduler and webserver. Plus use EFS to store airflow database metadata
  • There are 3 node groups:
    • On-demand EC2: workers (If you can control the retries of Airflow task and DAGS, this group is better to use Spot instances instead)
    • On-demand EC2: Airflow database, pgBouncer and redis
    • Spot-fleet EC2: Schedulers, webs, and other components

πŸš€ Using CDK to create EFS and Access Point (AP)

  • It's neccessary to create EFS as a way to persistent airflow DAGs, logs between workers and scheduler
  • Checkout AWS EKS With EFS CSI Driver And IRSA Using CDK to undestand AWS EFS and CSI Driver provisioner
  • Create EFS within the VPC of EKS cluster and add security group for open access from EKS nodes

efs-stack.ts
import * as efs from '@aws-cdk/aws-efs'; import * as ec2 from '@aws-cdk/aws-ec2'; import { RemovalPolicy, App, Stack, StackProps, Tags } from '@aws-cdk/core'; export class AirflowEfsStack extends Stack { constructor(scope: App, id: string, pattern: string, vpc: ec2.IVpc, ip: string, env_tag: string, props?: StackProps) { super(scope, id, props); const securityGroup = new ec2.SecurityGroup(this, 'AirflowEfsSG', { vpc, securityGroupName: `${pattern}-airflow-sg`, description: 'Security group for EFS CSI', allowAllOutbound: true }) securityGroup.addIngressRule(ec2.Peer.ipv4(ip), ec2.Port.allTraffic(), 'Allow internal private vpc') Tags.of(securityGroup).add('Name', `${pattern}-efs-sg`) Tags.of(securityGroup).add('cdk:sg:stack', 'sg-stack') Tags.of(securityGroup).add('env', env_tag) const fileSystem = new efs.FileSystem( this, 'AirflowEFS', { vpc, securityGroup, fileSystemName: `${pattern}-airflow-efs`, lifecyclePolicy: efs.LifecyclePolicy.AFTER_14_DAYS, removalPolicy: RemovalPolicy.DESTROY } ) Tags.of(fileSystem).add('Name', `${pattern}-airflow-efs`) Tags.of(fileSystem).add('cdk:efs:stack', `${pattern}-efs-${env_tag}`) Tags.of(fileSystem).add('env', env_tag) const fsAcccessPoint = fileSystem.addAccessPoint( `${pattern}EFSAccesPoint`, { posixUser: { uid: '1001', gid: '1001' }, createAcl: { ownerUid: '1001', ownerGid: '1001', permissions: '0700' }, path: '/airflow/data' } ) Tags.of(fsAcccessPoint).add('Name', `${pattern}-data-airflow-pg`) } } 
Enter fullscreen mode Exit fullscreen mode

πŸš€ Install EFS CSI Driver using helm-chart within airflow workers only

  • CSI driver provisioner is DaemonSet, it will deploy to all worker nodes as default but we can restrict to run EFS kube-system pods for airflow nodes only by adding toleration and affinity in values.yaml

values.yaml
controller: tolerations: - key: 'dedicated' operator: 'Equal' value: 'airflow' effect: 'NoSchedule' affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: role operator: In values: - airflow node: tolerations: - key: 'dedicated' operator: 'Equal' value: 'airflow' effect: 'NoSchedule' nodeSelector: role: airflow 
Enter fullscreen mode Exit fullscreen mode

  • Deploy
helm repo add aws-efs-csi-driver <https://kubernetes-sigs.github.io/aws-efs-csi-driver/> helm repo update helm upgrade -i aws-efs-csi-driver aws-efs-csi-driver/aws-efs-csi-driver --values efs-values.yaml -n kube-system 
Enter fullscreen mode Exit fullscreen mode

πŸš€ Create EFS storage class, PV, and PVC

  • Pre-requisite: EFS and AP IDs which are created from previous step
  • Update the EFS and AP ID to the storage class and persistent volume then create them

pvc.yaml
kind: StorageClass apiVersion: storage.k8s.io/v1 metadata: name: af-efs-sc provisioner: efs.csi.aws.com parameters: provisioningMode: efs-ap fileSystemId: fs-3529be16 directoryPerms: "700" gidRangeStart: "1000" gidRangeEnd: "2000" --- apiVersion: v1 kind: PersistentVolume metadata: name: af-efs-pg-pv spec: capacity: storage: 8Gi volumeMode: Filesystem accessModes: - ReadWriteMany persistentVolumeReclaimPolicy: Retain storageClassName: af-efs-sc csi: driver: efs.csi.aws.com volumeHandle: fs-3529be15::fsap-0679931424ff5f0ce --- apiVersion: v1 kind: PersistentVolumeClaim metadata: name: data-airflow-postgresql spec: accessModes: - ReadWriteMany storageClassName: af-efs-sc resources: requests: storage: 8Gi 
Enter fullscreen mode Exit fullscreen mode

  • Get PVC, we will need this to update to values.yaml later
$ kubectl get pvc NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE data-airflow-postgresql Bound efs-pg-pv 8Gi RWX efs-sc 29h 
Enter fullscreen mode Exit fullscreen mode

πŸš€ Use CDK to create airflow IAM role

  • Best practice is to use IAM role service account (IRSA), but within airflow we can create its own role as instance profile with permission for node to join EKS cluster and permission to read/write EFS or other AWS resources

iamRole.ts
 createWorkerRole(): IRole { // Airflow IAM worker role const worker_role = new Role( this, 'AirflowIamRole', { assumedBy: new ServicePrincipal('ec2.amazonaws.com'), roleName: `${this.eksCluserName}-airflow` } ); const attachPolicies = ['AmazonEC2ContainerRegistryReadOnly', 'AmazonEKSWorkerNodePolicy', 'AmazonS3ReadOnlyAccess', 'AmazonEKS_CNI_Policy']; for (var policy of attachPolicies) { worker_role.addManagedPolicy(ManagedPolicy.fromAwsManagedPolicyName(policy)) } Tags.of(worker_role).add('Name', `${this.eksCluserName}-airflow`) Tags.of(worker_role).add('cdk:iam:stack', `${this.pattern}-iam-${this.env_tag}`) Tags.of(worker_role).add('env', this.env_tag) const autoscalingStatement = new PolicyStatement({ sid: 'AutoScalingGroup', actions: [ "autoscaling:DescribeAutoScalingGroups", "autoscaling:DescribeAutoScalingInstances", "autoscaling:DescribeLaunchConfigurations", "autoscaling:DescribeTags", "autoscaling:CreateOrUpdateTags", "autoscaling:UpdateAutoScalingGroup", "autoscaling:TerminateInstanceInAutoScalingGroup", "ec2:DescribeLaunchTemplateVersions", "elasticfilesystem:*", "tag:GetResources", ], effect: Effect.ALLOW, resources: ['*'], conditions: { 'StringEquals': {"aws:RequestedRegion": this.region} } }); worker_role.addToPolicy(autoscalingStatement); return worker_role; }; 
Enter fullscreen mode Exit fullscreen mode

  • Note: ASG launch template will add this Airflow role to aws-auth configmap mapRoles for joining new nodes to the EKS cluster
  • Checkout AWS EKS - Launch Template Of Node Group for understanding how IAM role in nodegroup works

πŸš€ Use CDK to create Auto scaling groups (ASG)

  • Why do we need auto scaling here? In order to handle work loads between when running workflows
  • Create a manage node group with spot instances for saving cost? Of course we should care about spot instance might be interrupted. If a workflow task gets interrupted due to node termination, you may lose a part of progress, but the cost of losing ten minutes of progress is much lower than losing four hours of work.
  • Using Spot instances for long-running tasks can be disruptive if node running the task gets terminated by Spot. Consider breaking long-running tasks into multiple sub-tasks with a shorter execution time that can be used to checkpoint a workflow’s progress.
  • We also recommend that you configure automatic retries in DAGs, especially when using Spot.

  • Two autoscaling groups: Instance types are based on the need of Airflow processes such as workload, data size, number of DAGs, tasks and their complexity

  1. ASG-pet: min 2, max 2

    • spot: 100%
    • type: c5a.xlarge, c5a.large
    • components: 2 sch, 2 web, 1 flower, 1 sync
  2. ASG-sts (stateful set): min 1, max 3

    • calculate the number of workers and auto-scaling
    • spot: 100%
    • type: c5a.2xlarge, c5a.large
    • components: workers
  3. ASG-db (database): min 1, max 1

    • on-demand: 100%
    • type: c5a.xlarge
    • components: airflow database, redis, pgBouncer

asg-stack.ts
import { Stack, App, Tags, StackProps} from '@aws-cdk/core'; import { Cluster, Nodegroup, CapacityType, TaintEffect } from '@aws-cdk/aws-eks' import { Role, ServicePrincipal, ManagedPolicy, PolicyStatement, Effect, IRole } from '@aws-cdk/aws-iam'; import * as ec2 from '@aws-cdk/aws-ec2'; interface Ec2Type { pet: Array<string>, pet_size: Array<number>, sts: string, sts_size: Array<number>, db: string }; export class AirflowAsgStack extends Stack { public eks_cluster: any; public worker_role: any; public airflowSG: any; public eksCluserName: string; public launchTemplate: any; constructor(public scope: App, id: string, public pattern: string, public vpc: ec2.IVpc, public vpcSg: ec2.ISecurityGroup, public region: string, public ip: string, ec2_types: Ec2Type, public env_tag: string, props?: StackProps) { super(scope, id, props); this.eksCluserName = 'us-eks' // EKS cluster this.eks_cluster = Cluster.fromClusterAttributes( this, `EksCluster${pattern.toUpperCase()}`, { vpc, clusterName: this.eksCluserName } ); this.launchTemplate = this.createLaunchTemplate(); // Airflow-SG to workers this.airflowSG = this.createAirflowSG(); this.worker_role = this.createWorkerRole(); this.createAsgPet(ec2_types.pet, ec2_types.pet_size); this.createAsgSts(ec2_types.sts, ec2_types.sts_size); this.createAsgDb(ec2_types.db); }; createAirflowSG(): any { /** * Airflow security group */ const airflowSG = new ec2.SecurityGroup(this, 'AirflowairflowSG', { vpc: this.vpc, securityGroupName: 'airflow-priv-sg', description: 'Security group to access to worker in private vpc', allowAllOutbound: true }); airflowSG.connections.allowFrom(airflowSG, ec2.Port.allTcp(), 'Allow node to communicate with each other'); airflowSG.connections.allowFrom(this.vpcSg, ec2.Port.allTcp(), 'Allow nodes in another ASG communicate to airflow nodes'); Tags.of(airflowSG).add('Name', 'airflow-priv-ssh-sg'); Tags.of(airflowSG).add('cdk:sg:stack', `${this.pattern}-sg-${this.env_tag}`); Tags.of(airflowSG).add('env', this.env_tag); return airflowSG; }; createLaunchTemplate(): any { /** * More about launch-templates: https://github.com/awsdocs/amazon-eks-user-guide/blob/master/doc_source/launch-templates.md * Notes: * - Nodegroup auto-generates role if not specify * - Launch template node group automatically add the worker role to aws-auth configmap */ const airflowLaunchTemplate = new ec2.LaunchTemplate(this, 'AirflowLaunchTemplate-lt', { launchTemplateName: 'airflow-asg-lt', securityGroup: this.airflowSG, blockDevices: [{ deviceName: '/dev/xvda', volume: ec2.BlockDeviceVolume.ebs(20) }], keyName: `${this.pattern}-airflow` }); Tags.of(airflowLaunchTemplate).add('Name', 'airflow-asg-lt'); Tags.of(airflowLaunchTemplate).add('cdk:lt:stack', `${this.pattern}:lt:${this.env_tag}`); Tags.of(airflowLaunchTemplate).add('env', this.env_tag); return airflowLaunchTemplate; } createAsgPet(types: Array<string>, sizes: Array<number>) { /** * ASG Pet is used to assign deployments. Due to using spot instances so recommendation min size 2 */ const asgNodeGroupName = 'eks-airflow-nodegroup-pet'; const asgPet = new Nodegroup(this, 'AirflowPetAsg', { nodegroupName: 'eks-airflow-nodegroup-pet', subnets: this.eks_cluster.vpc.selectSubnets({subnetType: ec2.SubnetType.PRIVATE}), cluster: this.eks_cluster, capacityType: CapacityType.SPOT, nodeRole: this.worker_role, instanceTypes: [ new ec2.InstanceType(types[0]), new ec2.InstanceType(types[1]) ], minSize: sizes[0], maxSize: sizes[1], labels: { 'role': 'airflow', 'type': 'af-stateless', 'lifecycle': 'spot' }, taints: [ { effect: TaintEffect.NO_SCHEDULE, key: 'dedicated', value: 'airflow' } ], tags: { 'Name': 'eks-airflow-nodegroup-pet', 'cdk:asg:stack': `${this.pattern}-asg-${this.env_tag}` }, launchTemplateSpec: { id: this.launchTemplate.launchTemplateId! } }); }; createAsgSts(type: string, sizes: Array<number>) { /** * ASG STS: Assign Airflow workers */ const asgNodeGroupName = 'eks-airflow-nodegroup-sts'; const asgStatefulSet = new Nodegroup(this, 'AirflowStsAsg', { nodegroupName: asgNodeGroupName, subnets: this.eks_cluster.vpc.selectSubnets({subnetType: ec2.SubnetType.PRIVATE}), cluster: this.eks_cluster, nodeRole: this.worker_role, instanceTypes: [new ec2.InstanceType(type)], capacityType: CapacityType.SPOT, minSize: sizes[0], maxSize: sizes[1], labels: { 'role': 'airflow', 'type': 'af-stateful', 'lifecycle': 'spot' }, taints: [ { effect: TaintEffect.NO_SCHEDULE, key: 'dedicated', value: 'airflow' } ], tags: { 'Name': 'eks-airflow-nodegroup-sts', 'cdk:asg:stack': `${this.pattern}-asg-${this.env_tag}` }, launchTemplateSpec: { id: this.launchTemplate.launchTemplateId! } }); }; createAsgDb(type: string) { /** * ASG Db: Assign Airflow database and redis to this worker group */ const asgNodeGroupName = 'eks-airflow-nodegroup-db'; const asgStatefulSet = new Nodegroup(this, 'AirflowDbAsg', { nodegroupName: asgNodeGroupName, subnets: this.eks_cluster.vpc.selectSubnets({subnetType: ec2.SubnetType.PRIVATE}), cluster: this.eks_cluster, nodeRole: this.worker_role, instanceTypes: [new ec2.InstanceType(type)], capacityType: CapacityType.ON_DEMAND, minSize: 1, maxSize: 1, labels: { 'role': 'airflow', 'type': 'af-db', 'lifecycle': 'on-demand' }, taints: [ { effect: TaintEffect.NO_SCHEDULE, key: 'dedicated', value: 'airflow' } ], tags: { 'Name': 'eks-airflow-nodegroup-db', 'cdk:asg:stack': `${this.pattern}-asg-${this.env_tag}` }, launchTemplateSpec: { id: this.launchTemplate.launchTemplateId! } }); }; } 
Enter fullscreen mode Exit fullscreen mode

  • Deploy
$ cdk deploy AirflowAsgStackUS 
Enter fullscreen mode Exit fullscreen mode
  • Check result
# kf get node |grep 20-eks ip-172-10-13-169.us-east-1.compute.internal Ready <none> 2d2h v1.18.20-eks-c9f1ce ip-172-10-41-179.us-east-1.compute.internal Ready <none> 2d14h v1.18.20-eks-c9f1ce ip-172-10-51-199.us-east-1.compute.internal Ready <none> 23h v1.18.20-eks-c9f1ce 
Enter fullscreen mode Exit fullscreen mode

EKS ASG

  • IMPORTANT: The above code contains creating Airflow security group, you should checkout Understand Pods communication for deep understanding how EKS node join EKS cluster and how Pods communicate

πŸš€ Set up gitlab repo for git-sync sidecar

  • Create new project to store DAGs on Git and let git-sync update DAGs automatically.

  • Create user airflow and add to project as reporter/developer

  • Generate its token as impersonating which will be used later

πŸš€ Create values.yaml

  • values.yaml is used to customize some attributes from airflow helm chart to adapt with our system and deployments such as credentials, Airflow configs, Pods assignment, autoscaling, etc.

  • Use community airflow helm chart (not the official one). Template values.yaml from airflow-helm/charts

  • Customizations
    1. Update airflow image if you'd like to deploy from your own one or use latest public one

    airflow: image: repository: apache/airflow tag: 2.1.4-python3.9 pullPolicy: IfNotPresent 

2. Persist airflow logs using EFS

logs
 airflow: kubernetesPodTemplate: securityContext: fsGroup: 65534 sync: securityContext: fsGroup: 65534 scheduler: securityContext: fsGroup: 65534 web: securityContext: fsGroup: 65534 workers: securityContext: fsGroup: 65534 flower: securityContext: fsGroup: 65534 logs: ## the airflow logs folder ## path: /opt/airflow/efs/logs persistence: enabled: true subPath: "" storageClass: "efs-sc" accessMode: ReadWriteMany size: 1Gi 
Enter fullscreen mode Exit fullscreen mode

Reference to How to persist airflow logs? for more options.

3. Update web.service.type to NodePort for later use Ingress

web: service: type: NodePort 
Enter fullscreen mode Exit fullscreen mode

4. Set up workers replicas for balance workload
How to set up celery worker autoscaling?

5. Add toleration to airflow components
- Airflow nodes will only accept airflow Pods
- Global setting airflow.defaultTolerations

 defaultTolerations: - key: 'dedicated' operator: 'Equal' value: 'airflow' effect: 'NoSchedule' 
Enter fullscreen mode Exit fullscreen mode
  • For other extra components such as postgresql, redis, pgBouncer we need to add tolerations at each field

6. Add nodeAffinity to assign airflow components to their node group

  • For deployment services such as web, scheduler, sync, flower
 affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: type operator: In values: - af-stateless 
Enter fullscreen mode Exit fullscreen mode
  • For stateful service such as workers
 affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: type operator: In values: - af-stateful 
Enter fullscreen mode Exit fullscreen mode
  • For postgresql, redis, pgBouncer
 affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: type operator: In values: - af-db 
Enter fullscreen mode Exit fullscreen mode

7. Persist DAGs using git-sync
7.1 git-sync sidecar (SSH auth)
- Pre-requiste: Create Linux user airflow, generate its rsa ssh-key, add public key to airflow gitlab user and then use id_rsa to generate secret

 $ kubectl create secret generic \ airflow-ssh-git-secret \ --from-file=id_rsa=$(pwd)/id_rsa \ --namespace airflow secret/airflow-ssh-git-secret created 
Enter fullscreen mode Exit fullscreen mode
  • Get gitlab known_hosts
 ssh-keyscan -H gitlab.cloudopz.co >> known_hosts 
Enter fullscreen mode Exit fullscreen mode
  • Update values.yaml at dags.gitSync
 dags: ## the airflow dags folder ## path: /opt/airflow/efs/dags gitSync: enabled: true repo: "git@gitlab.cloudopz.co:devops/airflow-dags.git" branch: "master" revision: "HEAD" syncWait: 60 sshSecret: "airflow-ssh-git-secret" sshSecretKey: "id_rsa" # "known_hosts" verification can be disabled by setting to "" sshKnownHosts: |- gitlab.cloudopz.co ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDD18bjpsGTussTrWvrU9LzqDdb0DJ5XVlOvmv4E+svIN6ItUvS3UrNNKiABs9IlexsEnDTdYTfZkV14Ft7f79lTeLKqvEJowF6RZg1SDOtW4ljPxRUh8n8MgGftI37m3baRYBqdsSbTebjfDRkykgI3k5AW+Lw9syeckncL3ZLAUQ894bh/uIqu4tCP59UlxZ43B6n6nna9g+ZFz/EGCsQTxPsTUIy13Lrgeh6kLQk77zvxdz838/JDH7ELdmd1nwCJIXOKhFwwSDkRUAkHgVSHWsFZdfNM4Na3TRBR0IUCnEqwDuEqSlTh1yfv1oymu6FeA/JaiTgfhvDVleB4wSx 
Enter fullscreen mode Exit fullscreen mode

7.2 git-sync sidecar (HTTP auth)

 kubectl create secret generic \ airflow-http-git-secret \ --from-literal=username=airflow \ --from-literal=password=EKZvPi8TCKVaNSz45rjn \ --namespace airflow 
Enter fullscreen mode Exit fullscreen mode
  • Update values.yaml at dags.gitSync
 dags: ## the airflow dags folder ## path: /opt/airflow/efs/dags gitSync: enabled: true repo: "https://gitlab.cloudopz.co/devops/airflow-dags.git" branch: "master" revision: "HEAD" syncWait: 60 httpSecret: "airflow-http-git-secret" httpSecretUsernameKey: username httpSecretPasswordKey: password 
Enter fullscreen mode Exit fullscreen mode

8. Database - PostgreSQL Chart with enabling debug and using EFS

 postgresql: enabled: true postgresqlDatabase: airflow postgresqlUsername: postgres postgresqlPassword: airflow existingSecretKey: "postgresql-password" extraEnv: - name: BITNAMI_DEBUG value: "true" persistence: enabled: true storageClass: "efs-sc" existingClaim: "data-airflow-postgresql" accessModes: - ReadWriteMany size: 8Gi master: podAnnotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" volumePermissions: enabled: true 
Enter fullscreen mode Exit fullscreen mode

9. Create Airflow variables

  • We can use plain/text or Secrets/Configmaps
  • How-to

10. Generate Fernet

  • Airflow uses Fernet to encrypt passwords in the connection configuration and the variable configuration. It guarantees that a password encrypted using it cannot be manipulated or read without the key. Fernet is an implementation of symmetric (also known as β€œsecret key”) authenticated cryptography.
  • Generating Fernet key
 from cryptography.fernet import Fernet fernet_key= Fernet.generate_key() print(fernet_key.decode()) # your fernet_key, keep it in secured place! 
Enter fullscreen mode Exit fullscreen mode

πŸš€ Airflow secrets

  • For airflow users, reference to Access Control to assign proper permissions.

airflow-secret.yaml
 apiVersion: v1 kind: Secret metadata: name: airflow-secret namespace: airflow data: airflow-admin-password: == airflow-user-password: airflow-viewer-password: = airflow-fernet: = clh-password: == db_user: db_password: ttdb_user: = ttdb_password: == slack_host: slack_webhook: == # airflow database postgresql-password: = # git-sync access token airflow-gitlab-user: == airflow-gitlab-token: = 
Enter fullscreen mode Exit fullscreen mode

πŸš€ Deploy Airflow using helm chart

  • The community usage helm chart
# add this repo as "airflow-stable" helm repo add airflow-stable https://airflow-helm.github.io/charts helm repo update kubectl create namespace airflow helm upgrade -i "airflow" airflow-stable/airflow \ --version "8.5.2" \ --namespace "airflow" \ --values values.yaml 
Enter fullscreen mode Exit fullscreen mode
  • Check PVC
$ kubectl get pvc NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE airflow-logs Bound pvc-177df93c-331f-4514-9733-9e67be7612ee 1Gi RWX af-efs-sc 7d18h data-airflow-postgresql Bound af-efs-pg-pv 8Gi RWX af-efs-sc 8d 
Enter fullscreen mode Exit fullscreen mode

  • Check pods
$ kubectl get pod -owide NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES airflow-db-migrations-8666f9b775-p9n6p 2/2 Running 0 7d2h 10.0.7.85 ip-10-0-7-188.ec2.internal <none> <none> airflow-flower-79bf6f7b95-mst9x 2/2 Running 0 6d15h 10.0.6.117 ip-10-0-6-175.ec2.internal <none> <none> airflow-pgbouncer-75b9ddcf89-q8vkf 1/1 Running 0 7d2h 10.0.9.109 ip-10-0-9-82.ec2.internal <none> <none> airflow-postgresql-0 1/1 Running 0 7d2h 10.0.9.65 ip-10-0-9-82.ec2.internal <none> <none> airflow-redis-master-0 1/1 Running 0 7d2h 10.0.9.237 ip-10-0-9-82.ec2.internal <none> <none> airflow-scheduler-6dc74f564f-44tzx 2/2 Running 0 6d15h 10.0.6.33 ip-10-0-6-175.ec2.internal <none> <none> airflow-scheduler-6dc74f564f-cb5d4 2/2 Running 0 7d2h 10.0.15.232 ip-10-0-15-181.ec2.internal <none> <none> airflow-sync-connections-8579c86d44-dg9jq 2/2 Running 0 6d15h 10.0.15.150 ip-10-0-15-181.ec2.internal <none> <none> airflow-sync-pools-7f4f55776b-qdcjn 2/2 Running 0 6d15h 10.0.6.203 ip-10-0-6-175.ec2.internal <none> <none> airflow-sync-users-6d98bb49ff-zx5rs 2/2 Running 0 6d15h 10.0.15.140 ip-10-0-15-181.ec2.internal <none> <none> airflow-sync-variables-7f9959cd-j8p8x 2/2 Running 0 6d15h 10.0.6.27 ip-10-0-6-175.ec2.internal <none> <none> airflow-web-7f46664c6c-bvgls 2/2 Running 0 7d2h 10.0.15.111 ip-10-0-15-181.ec2.internal <none> <none> airflow-web-7f46664c6c-vq6kr 2/2 Running 0 6d15h 10.0.6.4 ip-10-0-6-175.ec2.internal <none> <none> airflow-worker-0 2/2 Running 0 7d2h 10.0.7.100 ip-10-0-7-188.ec2.internal <none> <none> airflow-worker-1 2/2 Running 0 7d2h 10.0.14.153 ip-10-0-14-28.ec2.internal <none> <none> 
Enter fullscreen mode Exit fullscreen mode
  • We see that the two workers locate on different nodes. Check node to be taints and pods are in toleration
$ kubectl describe node ip-10-0-7-188.ec2.internal | grep Taints -B 1 CreationTimestamp: Thu, 15 Jul 2021 07:19:29 +0000 Taints: dedicated=airflow:NoSchedule kubectl describe pod airflow-worker-0 |grep Tolerations Tolerations: dedicated=airflow:NoSchedule $ kubectl describe pod airflow-worker-1 |grep Tolerations Tolerations: dedicated=airflow:NoSchedule 
Enter fullscreen mode Exit fullscreen mode
  • Check git-sync for up-to-date DAGs. The DAG file 101_postgres_operator.py


$ kubectl exec -it airflow-worker-0 -- ls -la efs/dags/repo/ Defaulting container name to airflow-worker. Use 'kubectl describe pod/airflow-worker-0 -n airflow' to see all of the containers in this pod. total 8 drwxr-sr-x 2 65533 nogroup 67 Jul 15 16:33 . drwxrwsrwx 4 root nogroup 82 Jul 15 16:33 .. -rw-r--r-- 1 65533 nogroup 71 Jul 15 16:33 .git -rw-r--r-- 1 65533 nogroup 3221 Jul 15 16:33 101_postgres_operator.py -rw-r--r-- 1 65533 nogroup 0 Jul 15 16:33 README.md 
Enter fullscreen mode Exit fullscreen mode

πŸš€ Create airflow ingress

airflowingress.k8s.yaml
apiVersion: networking.k8s.io/v1beta1 kind: Ingress metadata: annotations: alb.ingress.kubernetes.io/backend-protocol: HTTP alb.ingress.kubernetes.io/certificate-arn: arn:aws:acm:ap-northeast-2:123456789012:certificate/b8299bf5-ae3b-4f69-zzzz-xxxxyyyy alb.ingress.kubernetes.io/group.name: dev alb.ingress.kubernetes.io/group.order: "9" alb.ingress.kubernetes.io/listen-ports: '[{"HTTPS":443}]' kubernetes.io/ingress.class: alb labels: dev: airflow name: airflow namespace: airflow spec: rules: - host: airflow.vinceredev.com http: paths: - backend: serviceName: airflow-web servicePort: 8080 
Enter fullscreen mode Exit fullscreen mode

πŸš€ Conclusion

  • With all of steps above, you can now deploy airflow in EKS cluster with control assigment of airflow components to AWS node group
  • By using Amazon EC2 Spot, you can reduce your spend on EC2 instances significantly with minimal configuration changes.
  • By using EFS with CSI driver you can persite data, logs cross multiple zones.
  • By using CDK, you get more familiar to create IaC and Kubernetes yaml as code.

Top comments (4)

Collapse
 
ranjithreddyn9 profile image
Ranjith

How and where can I perform changes for configurations of Airflow i.e., airflow.cfg. I can see the below files and directories under airflow helm directory.

Image description

Collapse
 
rafaelfol profile image
RafaelFOl

How i submit spark job to k8s spark-operator?
I try this but not work

stackoverflow.com/questions/725398...

Collapse
 
ranjithreddyn9 profile image
Ranjith

I followed the steps and at last showing the pods status as given screenshot.
Image description

Collapse
 
vumdao profile image
πŸš€ Vu Dao πŸš€

The pod airflow-postgresql-0 as well as airflow-redis-master-0 are the main ones for other airflow components work. You should check why they were not scheduled on nodes.

At the time of writing this post, I use AWS nodegroup but now using Karpenter is better way.