简介 #
Kubernetes(K8s)是一种流行的容器编排系统,用于自动化部署、扩展和管理应用程序。Flink的原生Kubernetes集成允许您直接在正在运行的 Kubernetes 集群上部署 Flink。此外,Flink 能够根据所需资源动态分配和取消分配TaskManager,因为它可以直接与Kubernetes通信。
Apache Flink还提供了Kubernetes Operator,用于管理Kubernetes上的Flink集群。它支持独立部署和原生部署模式,极大简化了Flink在Kubernetes上的部署、配置和生命周期管理。
更多信息请参考:Flink Kubernetes Operator文档。
准备 #
假设您正在运行的Kubernetes集群满足以下要求:
- Kubernetes版本 >= 1.9。
- KubeConfig,作为列出、创建、删除pods和services权限的入口,可通过
~/.kube/config进行配置。 您可以通过运行命令:kubectl auth can-i <list|create|edit|delete> pods来验证权限。 - 已启用 Kubernetes DNS。
default用户具有创建、删除POD的 RBAC 权限。
如果您在配置Kubernetes集群时遇到问题,请参考:如何配置Kubernetes集群。
Session模式 #
Flink可以在所有类UNIX环境上运行,即Linux、Mac OS X和Cygwin(适用于 Windows)。
您可以参考 overview页面,查看支持的Flink版本并下载发行包,然后解压:
tar -xzf flink-*.tgz 设置FLINK_HOME环境变量:
export FLINK_HOME=/path/flink-* 启动Session集群 #
要在k8s上启动Session集群,请运行 Flink 附带的 bash 脚本:
cd /path/flink-* ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster 成功启动集群后,返回如下信息:
org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124 org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122 org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP. org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://my-first-flink-cluster-rest.default:8081 请参考Flink文档来暴露Flink Web UI和REST端口。
请确保您提交作业的节点可以访问REST端口。
然后,将以下两个配置添加到flink-conf.yaml中:
rest.bind-port: {{REST_PORT}} rest.address: {{NODE_IP}} {{REST_PORT}}和{{NODE_IP}}替换为JobManager Web界面的实际值。
配置Flink CDC #
从发行页面下载Flink CDC的tar文件并解压:
tar -xzf flink-cdc-*.tar.gz 解压后的flink-cdc包含四个目录: bin,lib,log和conf。
从发行页面下载连接器,并移动到lib路径下。
下载链接仅适用于稳定版本,SNAPSHOT依赖项需要您根据特定分支自行构建。
提交Flink CDC作业 #
以下是mysql整库同步到doris的示例配置文件:mysql-to-doris.yaml
################################################################################ # Description: Sync MySQL all tables to Doris ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db.\.* server-id: 5400-5404 server-time-zone: UTC sink: type: doris fenodes: 127.0.0.1:8030 username: root password: "" pipeline: name: Sync MySQL Database to Doris parallelism: 2 请参考连接器信息,按需修改配置文件。
最后,通过Cli将作业提交到Flink Standalone集群。
cd /path/flink-cdc-* ./bin/flink-cdc.sh mysql-to-doris.yaml 成功提交作业后,返回如下信息:
Pipeline has been submitted to cluster. Job ID: ae30f4580f1918bebf16752d4963dc54 Job Description: Sync MySQL Database to Doris 通过Flink Web UI,您可以找到一个名为Sync MySQL Database to Doris的作业正在运行。
Kubernetes Operator模式 #
假设您已经在K8S集群上部署Flink Kubernetes Operator,您只需构建自定义的Flink CDC Docker镜像即可。
构建自定义Docker镜像 #
- 从发行页面下载Flink CDC的tar文件和需要的连接器,并移动到Docker镜像构建目录。
假设您的Docker构建目录为/opt/docker/flink-cdc,此时该目录下的文件结构如下:/opt/docker/flink-cdc ├── flink-cdc-3.5.0-bin.tar.gz ├── flink-cdc-pipeline-connector-doris-3.5.0.jar ├── flink-cdc-pipeline-connector-mysql-3.5.0.jar ├── mysql-connector-java-8.0.27.jar └── ... - 创建Dockerfile文件,从官方
flink镜像构建出自定义镜像并添加Flink CDC的依赖。Docker镜像构建目录最终如下:FROM flink:1.18.0-java8 ADD *.jar $FLINK_HOME/lib/ ADD flink-cdc*.tar.gz $FLINK_HOME/ RUN mv $FLINK_HOME/flink-cdc-3.5.0/lib/flink-cdc-dist-3.5.0.jar $FLINK_HOME/lib//opt/docker/flink-cdc ├── Dockerfile ├── flink-cdc-3.5.0-bin.tar.gz ├── flink-cdc-pipeline-connector-doris-3.5.0.jar ├── flink-cdc-pipeline-connector-mysql-3.5.0.jar ├── mysql-connector-java-8.0.27.jar └── ... - 构建自定义镜像并推送至仓库
docker build -t flink-cdc-pipeline:3.5.0 . docker push flink-cdc-pipeline:3.5.0
创建ConfigMap用于挂载Flink CDC配置文件 #
以下是一个示例文件,请修改其中对应的连接参数为实际值:
--- apiVersion: v1 data: flink-cdc.yaml: |- parallelism: 4 schema.change.behavior: EVOLVE mysql-to-doris.yaml: |- source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db.\.* server-id: 5400-5404 server-time-zone: UTC sink: type: doris fenodes: 127.0.0.1:8030 username: root password: "" pipeline: name: Sync MySQL Database to Doris parallelism: 2 kind: ConfigMap metadata: name: flink-cdc-pipeline-configmap 创建FlinkDeployment YAML文件 #
以下是示例文件flink-cdc-pipeline-job.yaml:
--- apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: flink-cdc-pipeline-job spec: flinkConfiguration: classloader.resolve-order: parent-first state.checkpoints.dir: 'file:///tmp/checkpoints' state.savepoints.dir: 'file:///tmp/savepoints' flinkVersion: v1_18 image: 'flink-cdc-pipeline:3.5.0' imagePullPolicy: Always job: args: - '--use-mini-cluster' - /opt/flink/flink-cdc-3.5.0/conf/mysql-to-doris.yaml entryClass: org.apache.flink.cdc.cli.CliFrontend jarURI: 'local:///opt/flink/flink-cdc-3.5.0/lib/flink-cdc-dist-3.5.0.jar' parallelism: 1 state: running upgradeMode: savepoint jobManager: replicas: 1 resource: cpu: 1 memory: 1024m podTemplate: apiVersion: v1 kind: Pod spec: containers: # don't modify this name - name: flink-main-container volumeMounts: - mountPath: /opt/flink/flink-cdc-3.5.0/conf name: flink-cdc-pipeline-config volumes: - configMap: name: flink-cdc-pipeline-configmap name: flink-cdc-pipeline-config restartNonce: 0 serviceAccount: flink taskManager: resource: cpu: 1 memory: 1024m
- 由于Flink的类加载机制,参数
classloader.resolve-order必须设置为parent-first。- Flink CDC默认提交作业到远程Flink集群,在Operator模式下,您需要通过指定
--use-mini-cluster参数在pod内部启动一个Standalone Flink集群。
提交Flink CDC作业 #
ConfigMap和FlinkDeployment YAML文件创建完成后,即可通过kubectl提交作业到Operator:
kubectl apply -f flink-cdc-pipeline-job.yaml 成功提交作业后,返回信息如下:
flinkdeployment.flink.apache.org/flink-cdc-pipeline-job created 如您需要查看日志、暴露Flink Web UI等,请参考:Flink Kubernetes Operator文档。
请注意,目前不支持使用native application mode提交作业。