flink-operator
Less than 1 minute
flink-operator
prepare
- k8s is ready
- argocd is ready and logged in
- ingress is ready
- cert-manager is ready
- the clusterissuer named
self-signed-ca-issuer
is ready
- the clusterissuer named
installation
- prepare
flink-operator.yaml
apiVersion: argoproj.io/v1alpha1 kind: Application metadata: name: flink-operator spec: syncPolicy: syncOptions: - CreateNamespace=true project: default source: repoURL: https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0 chart: flink-kubernetes-operator targetRevision: 1.8.0 helm: releaseName: flink-operator values: | image: repository: ghcr.io/apache/flink-kubernetes-operator version: v3 destination: server: https://kubernetes.default.svc namespace: flink
- apply to k8s
kubectl -n argocd apply -f flink-operator.yaml
- sync by argocd
argocd app sync argocd/flink-operator
- NOTE: flink operator 1.8.0 will be not synced
- CustomResourceDefinition named
flinkdeployments.flink.apache.org
: spec.versions[0].additionalPrinterColumns[0,1].priority = 0 will be generated automatically - CustomResourceDefinition named
flinksessionjobs.flink.apache.org
: spec.versions[0].additionalPrinterColumns[0,1].priority = 0 will be generated automatically
- CustomResourceDefinition named
deploy flink application
basic
- prepare
basic.yaml
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic spec: image: docker.io/library/flink:1.17 flinkVersion: v1_17 ingress: template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)" className: nginx annotations: cert-manager.io/cluster-issuer: self-signed-ca-issuer nginx.ingress.kubernetes.io/rewrite-target: /$2 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink jobManager: resource: memory: 1024m cpu: 0.5 taskManager: resource: memory: 1024m cpu: 0.5 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2
- apply to k8s
kubectl get namespaces flink > /dev/null 2>&1 || kubectl create namespace flink kubectl -n flink apply -f basic.yaml
- check status with web ui
- flink.k8s.io should be resolved to nginx-ingress
- for example, add
$K8S_MASTER_IP flink.k8s.io
to/etc/hosts
- for example, add
- https://flink.k8s.io:32443/flink/basic/
- flink.k8s.io should be resolved to nginx-ingress
pyflink
- build docker image and push to registry
- optional if you are using the pre-build image:
ghcr.io/ben.wangz/blog-pyflink:main
- prepare
docker/Dockerfile
FROM docker.io/flink:1.17 RUN apt-get update -y && \ apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \ wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \ tar -xvf Python-3.7.9.tgz && \ cd Python-3.7.9 && \ ./configure --without-tests --enable-shared && \ make -j6 && \ make install && \ ldconfig /usr/local/lib && \ cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \ ln -s /usr/local/bin/python3 /usr/local/bin/python && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* ARG PYPI_MIRROR=https://pypi.python.org/simple RUN pip3 install "apache-flink==1.17.2" -i ${PYPI_MIRROR} USER flink RUN mkdir /opt/flink/usrlib ADD python_demo.py /opt/flink/usrlib/python_demo.py
- prepare
docker/python_demo.py
import logging import sys from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment def python_demo(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql( """ CREATE TABLE orders ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen' )""" ) t_env.execute_sql( """ CREATE TABLE print_table WITH ('connector' = 'print') LIKE orders""" ) t_env.execute_sql( """ INSERT INTO print_table SELECT * FROM orders""" ) if __name__ == "__main__": logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") python_demo()
- build
# cd docker/ podman build --build-arg PYPI_REPO=https://mirrors.aliyun.com/pypi/simple -t ghcr.io/ben.wangz/blog-pyflink:main .
- optional if you are using the pre-build image:
- prepare
pyflink.yaml
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: ghcr.io/ben-wangz/blog-pyflink:main flinkVersion: v1_17 ingress: template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)" className: nginx annotations: cert-manager.io/cluster-issuer: self-signed-ca-issuer nginx.ingress.kubernetes.io/rewrite-target: /$2 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: resource: memory: 1024m cpu: 0.5 taskManager: resource: memory: 1024m cpu: 0.5 job: jarURI: local:///opt/flink/opt/flink-python_2.12-1.17.2.jar entryClass: org.apache.flink.client.python.PythonDriver args: - -pyclientexec - /usr/local/bin/python3 - -py - /opt/flink/usrlib/python_demo.py parallelism: 1 upgradeMode: stateless
- prepare
- apply to k8s
kubectl get namespaces flink > /dev/null 2>&1 || kubectl create namespace flink kubectl -n flink apply -f pyflink.yaml
- check status with web ui
- flink.k8s.io should be resolved to nginx-ingress
- for example, add
$K8S_MASTER_IP flink.k8s.io
to/etc/hosts
- for example, add
- https://flink.k8s.io:32443/flink/python-example/
- flink.k8s.io should be resolved to nginx-ingress