Skip to main content

jdbc connector

ben.wangzLess than 1 minute

jdbc connector

introduction

  • this is a demo for reading and writing data from/to jdbc, e.g. ClickHouse

source code

  • https://github.com/ben-wangz/blog/tree/main/flink/connectors/jdbc
  1. setup clickhouse server
  2. setup flink kubernetes operator
  3. copy secret clickhouse-admin-credentials from database namespace to flink namespace
    • kubectl -n database get secret clickhouse-admin-credentials -o json \
          | jq 'del(.metadata["namespace","creationTimestamp","resourceVersion","selfLink","uid"])' \
          | kubectl -n flink apply -f -
      
  4. build image and push to docker hub
    • #REGISTRY_USERNAME=your-registry-username
      #REGISTRY_PASSWORD=your-registry-password
      IMAGE=docker.io/wangz2019/flink-connectors-jdbc-demo:latest
      bash flink/connectors/jdbc/container/build.sh $IMAGE \
          && podman login -u $REGISTRY_USERNAME -p $REGISTRY_PASSWORD ${REGISTRY:-docker.io} \
          && podman push $IMAGE
      
  5. deploy flink job
    • prepare flink-job.template.yaml
      • apiVersion: flink.apache.org/v1beta1
        kind: FlinkDeployment
        metadata:
          name: flink-connectors-jdbc-demo
        spec:
          image: docker.io/wangz2019/flink-connectors-jdbc-demo:latest
          podTemplate:
            spec:
              containers:
              - name: flink-main-container
                imagePullPolicy: Always
                env:
                - name: TZ
                  value: Asia/Shanghai
                - name: CLICK_HOUSE_HOST
                  value: clickhouse.database
                - name: CLICK_HOUSE_PORT
                  value: "8123"
                - name: CLICK_HOUSE_USERNAME
                  value: admin
                - name: CLICK_HOUSE_PASSWORD
                  valueFrom:
                    secretKeyRef:
                      name: clickhouse-admin-credentials
                      key: password
          flinkVersion: v1_17
          ingress:
            template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
            className: nginx
            annotations:
              cert-manager.io/cluster-issuer: self-signed-ca-issuer
              nginx.ingress.kubernetes.io/rewrite-target: /$2
          flinkConfiguration:
            taskmanager.numberOfTaskSlots: "2"
          serviceAccount: flink
          jobManager:
            resource:
              memory: 1024m
              cpu: 0.5
          taskManager:
            resource:
              memory: 1024m
              cpu: 0.5
          job:
            jarURI: local:///app/flink-application.jar
            parallelism: 2
            entryClass: tech.geekcity.flink.connectors.jdbc.SinkToJdbc
            args: []
          mode: native
        
        
    • generate flink-job.yaml
      • IMAGE=docker.io/wangz2019/flink-connectors-jdbc-demo:latest
        #ENTRY_CLASS=tech.geekcity.flink.connectors.jdbc.SourceFromJdbc
        ENTRY_CLASS=tech.geekcity.flink.connectors.jdbc.SinkToJdbc
        cp flink-job.template.yaml flink-job.yaml \
            && yq eval ".spec.image = \"$IMAGE\"" -i flink-job.yaml \
            && yq eval ".spec.job.entryClass = \"$ENTRY_CLASS\"" -i flink-job.yaml
        
    • apply to k8s
      • kubectl -n flink apply -f flink-job.yaml
        
  6. check data in clickhouse
    • curl -k "https://admin:${PASSWORD}@clickhouse.dev.geekcity.tech:32443/?database=sink_to_jdbc" --data 'select * from users limit 10'