jdbc connector
Less than 1 minute
jdbc connector
introduction
- this is a demo for reading and writing data from/to jdbc, e.g. ClickHouse
source code
- https://github.com/ben-wangz/blog/tree/main/flink/connectors/jdbc
how to run with flink-kubernetes-operator
- setup clickhouse server
- reference: clickhouse
- setup flink kubernetes operator
- reference: flink-operator
- copy secret
clickhouse-admin-credentials
fromdatabase
namespace toflink
namespacekubectl -n database get secret clickhouse-admin-credentials -o json \ | jq 'del(.metadata["namespace","creationTimestamp","resourceVersion","selfLink","uid"])' \ | kubectl -n flink apply -f -
- build image and push to docker hub
#REGISTRY_USERNAME=your-registry-username #REGISTRY_PASSWORD=your-registry-password IMAGE=docker.io/wangz2019/flink-connectors-jdbc-demo:latest bash flink/connectors/jdbc/container/build.sh $IMAGE \ && podman login -u $REGISTRY_USERNAME -p $REGISTRY_PASSWORD ${REGISTRY:-docker.io} \ && podman push $IMAGE
- deploy flink job
- prepare
flink-job.template.yaml
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: flink-connectors-jdbc-demo spec: image: docker.io/wangz2019/flink-connectors-jdbc-demo:latest podTemplate: spec: containers: - name: flink-main-container imagePullPolicy: Always env: - name: TZ value: Asia/Shanghai - name: CLICK_HOUSE_HOST value: clickhouse.database - name: CLICK_HOUSE_PORT value: "8123" - name: CLICK_HOUSE_USERNAME value: admin - name: CLICK_HOUSE_PASSWORD valueFrom: secretKeyRef: name: clickhouse-admin-credentials key: password flinkVersion: v1_17 ingress: template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)" className: nginx annotations: cert-manager.io/cluster-issuer: self-signed-ca-issuer nginx.ingress.kubernetes.io/rewrite-target: /$2 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink jobManager: resource: memory: 1024m cpu: 0.5 taskManager: resource: memory: 1024m cpu: 0.5 job: jarURI: local:///app/flink-application.jar parallelism: 2 entryClass: tech.geekcity.flink.connectors.jdbc.SinkToJdbc args: [] mode: native
- generate
flink-job.yaml
IMAGE=docker.io/wangz2019/flink-connectors-jdbc-demo:latest #ENTRY_CLASS=tech.geekcity.flink.connectors.jdbc.SourceFromJdbc ENTRY_CLASS=tech.geekcity.flink.connectors.jdbc.SinkToJdbc cp flink-job.template.yaml flink-job.yaml \ && yq eval ".spec.image = \"$IMAGE\"" -i flink-job.yaml \ && yq eval ".spec.job.entryClass = \"$ENTRY_CLASS\"" -i flink-job.yaml
- apply to k8s
kubectl -n flink apply -f flink-job.yaml
- prepare
- check data in clickhouse
curl -k "https://admin:${PASSWORD}@clickhouse.dev.geekcity.tech:32443/?database=sink_to_jdbc" --data 'select * from users limit 10'