Kubernetes Setup

This page describes how to deploy a Flink Job and Session cluster on Kubernetes.

Info This page describes deploying a standalone Flink cluster on top of Kubernetes. You can find more information on native Kubernetes deployments here.

Setup Kubernetes

Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster. If you want to run Kubernetes locally, we recommend using MiniKube.

Note: If using MiniKube please make sure to execute minikube ssh 'sudo ip link set docker0 promisc on' before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.

Before deploying the Flink Kubernetes components, please read the Flink Docker image documentation, its tags, how to customize the Flink Docker image and enable plugins to use the image in the Kubernetes definition files.

Using the common resource definitions, launch the common cluster components with the kubectl command:

    kubectl create -f flink-configuration-configmap.yaml
    kubectl create -f jobmanager-service.yaml

Note that you could define your own customized options of flink-conf.yaml within flink-configuration-configmap.yaml.

Then launch the specific components depending on whether you want to deploy a Session or Job cluster.

You can then access the Flink UI via different ways:

./bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar
  • Create a NodePort service on the rest service of jobmanager:
    1. Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in appendix.
    2. Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://<public-node-ip>:<node-port> in your browser.
    3. If you use minikube, you can get its public ip by running minikube ip.
    4. Similarly to the port-forward solution, you could also use the following command below to submit jobs to the cluster:
./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar

In order to terminate the Flink cluster, delete the specific Session or Job cluster components and use kubectl to terminate the common components:

    kubectl delete -f jobmanager-service.yaml
    kubectl delete -f flink-configuration-configmap.yaml
    # if created then also the rest service
    kubectl delete -f jobmanager-rest-service.yaml

Deploy Session Cluster

A Flink Session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

A Flink Session cluster deployment in Kubernetes has at least three components:

  • a Deployment which runs a Flink Master
  • a Deployment for a pool of TaskManagers
  • a Service exposing the Flink Master’s REST and UI ports

After creating the common cluster components, use the Session specific resource definitions to launch the Session cluster with the kubectl command:

    kubectl create -f jobmanager-session-deployment.yaml
    kubectl create -f taskmanager-session-deployment.yaml

To terminate the Session cluster, these components can be deleted along with the common ones with the kubectl command:

    kubectl delete -f taskmanager-session-deployment.yaml
    kubectl delete -f jobmanager-session-deployment.yaml

Deploy Job Cluster

A Flink Job cluster is a dedicated cluster which runs a single job. You can find more details here.

A basic Flink Job cluster deployment in Kubernetes has three components:

  • a Job which runs a Flink Master
  • a Deployment for a pool of TaskManagers
  • a Service exposing the Flink Master’s REST and UI ports

Check the Job cluster specific resource definitions and adjust them accordingly.

The args attribute in the jobmanager-job.yaml has to specify the main class of the user job. See also how to specify the Flink Master arguments to understand how to pass other args to the Flink image in the jobmanager-job.yaml.

The job artifacts should be available from the job-artifacts-volume in the resource definition examples. The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster. If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts. Alternatively, you can build a custom image which already contains the artifacts instead.

After creating the common cluster components, use the Job cluster specific resource definitions to launch the cluster with the kubectl command:

    kubectl create -f jobmanager-job.yaml
    kubectl create -f taskmanager-job-deployment.yaml

To terminate the single job cluster, these components can be deleted along with the common ones with the kubectl command:

    kubectl delete -f taskmanager-job-deployment.yaml
    kubectl delete -f jobmanager-job.yaml

Appendix

Common cluster resource definitions

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.server.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
  log4j.properties: |+
    rootLogger.level = INFO
    rootLogger.appenderRef.file.ref = MainAppender
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO
    appender.main.name = MainAppender
    appender.main.type = File
    appender.main.append = false
    appender.main.fileName = ${sys:log.file}
    appender.main.layout.type = PatternLayout
    appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = ERROR

jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: query-state
    port: 6125
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 8081
  selector:
    app: flink
    component: jobmanager

Session cluster resource definitions

jobmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:latest
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 6125
          name: query-state
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

taskmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:latest
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

Job cluster resource definitions

jobmanager-job.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: flink:latest
          env:
          args: ["standalone-job", "--job-classname", "com.job.ClassName", ["--job-id", "<job id>",] ["--fromSavepoint", "/path/to/savepoint", ["--allowNonRestoredState",]] [job arguments]]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 6125
              name: query-state
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: job-artifacts-volume
              mountPath: /opt/flink/usrlib
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j.properties
                path: log4j.properties
        - name: job-artifacts-volume
          hostPath:
            path: /host/path/to/job/artifacts

taskmanager-job-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:latest
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: job-artifacts-volume
          mountPath: /opt/flink/usrlib
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties
      - name: job-artifacts-volume
        hostPath:
          path: /host/path/to/job/artifacts

Back to top