Skip to main content

flink-operator

ben.wangzLess than 1 minute

flink-operator

prepare

  1. k8s is ready
  2. argocd is ready and logged in
  3. ingress is ready
  4. cert-manager is ready
    • the clusterissuer named self-signed-ca-issuer is ready

installation

  1. prepare flink-operator.yaml
    • apiVersion: argoproj.io/v1alpha1
      kind: Application
      metadata:
        name: flink-operator
      spec:
        syncPolicy:
          syncOptions:
          - CreateNamespace=true
        project: default
        source:
          repoURL: https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0
          chart: flink-kubernetes-operator
          targetRevision: 1.8.0
          helm:
            releaseName: flink-operator
            values: |
              image:
                repository: ghcr.io/apache/flink-kubernetes-operator
            version: v3
        destination:
          server: https://kubernetes.default.svc
          namespace: flink
      
      
  2. apply to k8s
    • kubectl -n argocd apply -f flink-operator.yaml
      
  3. sync by argocd
    • argocd app sync argocd/flink-operator
      
  4. NOTE: flink operator 1.8.0 will be not synced
    • CustomResourceDefinition named flinkdeployments.flink.apache.org: spec.versions[0].additionalPrinterColumns[0,1].priority = 0 will be generated automatically
    • CustomResourceDefinition named flinksessionjobs.flink.apache.org: spec.versions[0].additionalPrinterColumns[0,1].priority = 0 will be generated automatically

basic

  1. prepare basic.yaml
    • apiVersion: flink.apache.org/v1beta1
      kind: FlinkDeployment
      metadata:
        name: basic
      spec:
        image: docker.io/library/flink:1.17
        flinkVersion: v1_17
        ingress:
          template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
          className: nginx
          annotations:
            cert-manager.io/cluster-issuer: self-signed-ca-issuer
            nginx.ingress.kubernetes.io/rewrite-target: /$2
        flinkConfiguration:
          taskmanager.numberOfTaskSlots: "2"
        serviceAccount: flink
        jobManager:
          resource:
            memory: 1024m
            cpu: 0.5
        taskManager:
          resource:
            memory: 1024m
            cpu: 0.5
        job:
          jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
          parallelism: 2
      
      
  2. apply to k8s
    • kubectl get namespaces flink > /dev/null 2>&1 || kubectl create namespace flink
      kubectl -n flink apply -f basic.yaml
      
  3. check status with web ui
    • flink.k8s.io should be resolved to nginx-ingress
      • for example, add $K8S_MASTER_IP flink.k8s.io to /etc/hosts
    • https://flink.k8s.io:32443/flink/basic/
  1. build docker image and push to registry
    • optional if you are using the pre-build image: ghcr.io/ben.wangz/blog-pyflink:main
    • prepare docker/Dockerfile
      • FROM docker.io/flink:1.17
        
        RUN apt-get update -y && \
            apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
            wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
            tar -xvf Python-3.7.9.tgz && \
            cd Python-3.7.9 && \
            ./configure --without-tests --enable-shared && \
            make -j6 && \
            make install && \
            ldconfig /usr/local/lib && \
            cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
            ln -s /usr/local/bin/python3 /usr/local/bin/python && \
            apt-get clean && \
            rm -rf /var/lib/apt/lists/*
        
        ARG PYPI_MIRROR=https://pypi.python.org/simple
        RUN pip3 install "apache-flink==1.17.2" -i ${PYPI_MIRROR}
        
        USER flink
        RUN mkdir /opt/flink/usrlib
        ADD python_demo.py /opt/flink/usrlib/python_demo.py
        
        
    • prepare docker/python_demo.py
      • import logging
        import sys
        
        from pyflink.datastream import StreamExecutionEnvironment
        from pyflink.table import StreamTableEnvironment
        
        
        def python_demo():
            env = StreamExecutionEnvironment.get_execution_environment()
            env.set_parallelism(1)
        
            t_env = StreamTableEnvironment.create(stream_execution_environment=env)
            t_env.execute_sql(
                """
            CREATE TABLE orders (
              order_number BIGINT,
              price        DECIMAL(32,2),
              buyer        ROW<first_name STRING, last_name STRING>,
              order_time   TIMESTAMP(3)
            ) WITH (
              'connector' = 'datagen'
            )"""
            )
        
            t_env.execute_sql(
                """
                CREATE TABLE print_table WITH ('connector' = 'print')
                  LIKE orders"""
            )
            t_env.execute_sql(
                """
                INSERT INTO print_table SELECT * FROM orders"""
            )
        
        
        if __name__ == "__main__":
            logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
            python_demo()
        
        
    • build
      • # cd docker/
        podman build --build-arg PYPI_REPO=https://mirrors.aliyun.com/pypi/simple -t ghcr.io/ben.wangz/blog-pyflink:main .
        
    • prepare pyflink.yaml
    • apiVersion: flink.apache.org/v1beta1
      kind: FlinkDeployment
      metadata:
        name: python-example
      spec:
        image: ghcr.io/ben-wangz/blog-pyflink:main
        flinkVersion: v1_17
        ingress:
          template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
          className: nginx
          annotations:
            cert-manager.io/cluster-issuer: self-signed-ca-issuer
            nginx.ingress.kubernetes.io/rewrite-target: /$2
        flinkConfiguration:
          taskmanager.numberOfTaskSlots: "1"
        serviceAccount: flink
        jobManager:
          resource:
            memory: 1024m
            cpu: 0.5
        taskManager:
          resource:
            memory: 1024m
            cpu: 0.5
        job:
          jarURI: local:///opt/flink/opt/flink-python_2.12-1.17.2.jar
          entryClass: org.apache.flink.client.python.PythonDriver
          args:
          - -pyclientexec
          - /usr/local/bin/python3
          - -py
          - /opt/flink/usrlib/python_demo.py
          parallelism: 1
          upgradeMode: stateless
      
      
  2. apply to k8s
    • kubectl get namespaces flink > /dev/null 2>&1 || kubectl create namespace flink
      kubectl -n flink apply -f pyflink.yaml
      
  3. check status with web ui
    • flink.k8s.io should be resolved to nginx-ingress
      • for example, add $K8S_MASTER_IP flink.k8s.io to /etc/hosts
    • https://flink.k8s.io:32443/flink/python-example/