s3-with-parquet connector
About 2 min
s3-with-parquet connector
introduction
- this is a demo for reading and writing data from/to s3 with parquet format
source code
- https://github.com/ben-wangz/blog/tree/main/flink/demos/gaia3
how to run with flink-kubernetes-operator
- setup minio server
- reference: minio
- setup flink kubernetes operator
- reference: flink-operator
- copy secret
minio-credentials
fromstorage
namespace toflink
namespacekubectl -n storage get secret minio-credentials -o json \ | jq 'del(.metadata["namespace","creationTimestamp","resourceVersion","selfLink","uid"])' \ | kubectl -n flink apply -f -
- build image and push to docker hub
#REGISTRY_USERNAME=your-registry-username #REGISTRY_PASSWORD=your-registry-password IMAGE=docker.io/wangz2019/flink-demos-gaia3:latest bash flink/demos/gaia3/container/build.sh $IMAGE \ && podman login -u $REGISTRY_USERNAME -p $REGISTRY_PASSWORD ${REGISTRY:-docker.io} \ && podman push $IMAGE
- create bucket named
flink-demos-gaia3
# change K8S_MASTER_IP to your k8s master ip K8S_MASTER_IP=$(kubectl get node -l node-role.kubernetes.io/control-plane -o jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}') ACCESS_SECRET=$(kubectl -n storage get secret minio-credentials -o jsonpath='{.data.rootPassword}' | base64 -d) podman run --rm \ --entrypoint bash \ --add-host=minio-api.dev.geekcity.tech:${K8S_MASTER_IP} \ -it docker.io/minio/mc:latest \ -c "mc alias set minio http://minio-api.dev.geekcity.tech:32080 admin ${ACCESS_SECRET} \ && mc mb --ignore-existing minio/flink-demos-gaia3"
- deploy flink job
- prepare
flink-job.template.yaml
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: flink-demos-gaia3 spec: image: docker.io/wangz2019/flink-demos-gaia3:latest podTemplate: spec: containers: - name: flink-main-container imagePullPolicy: Always env: - name: TZ value: Asia/Shanghai - name: S3_SCHEMA value: http - name: S3_HOST value: minio.storage - name: S3_PORT value: "9000" - name: S3_ACCESS_KEY valueFrom: secretKeyRef: name: minio-credentials key: rootUser - name: S3_ACCESS_SECRET valueFrom: secretKeyRef: name: minio-credentials key: rootPassword - name: S3_BUCKET value: flink-demos-gaia3 - name: CHECKPOINT_INTERVAL value: "10000" - name: URL_LIMIT value: "10" flinkVersion: v1_19 ingress: template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)" className: nginx annotations: cert-manager.io/cluster-issuer: self-signed-ca-issuer nginx.ingress.kubernetes.io/rewrite-target: /$2 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" s3.path.style.access: "true" s3.endpoint: your-endpoint s3.access-key: your-access-key s3.secret-key: your-s3-secret-key serviceAccount: flink jobManager: resource: memory: 1024m cpu: 0.5 taskManager: resource: memory: 1024m cpu: 0.5 job: jarURI: local:///app/flink-application.jar parallelism: 2 entryClass: tech.geekcity.flink.demos.gaia3.LoadIntoS3 args: [] mode: native
- generate
flink-job.yaml
IMAGE=docker.io/wangz2019/flink-demos-gaia3:latest ENTRY_CLASS=tech.geekcity.flink.demos.gaia3.LoadIntoS3 cp flink-job.template.yaml flink-job.yaml \ && yq eval ".spec.image = \"$IMAGE\"" -i flink-job.yaml \ && yq eval ".spec.job.entryClass = \"$ENTRY_CLASS\"" -i flink-job.yaml
- add s3 configuration to flink conf
- reference: problem of s3 fs filesystem hadoop
S3_ENDPOINT=http://minio.storage:9000 S3_ACCESS_KEY=$(kubectl -n storage get secret minio-credentials -o jsonpath='{.data.rootUser}' | base64 -d) S3_SECRET_KEY=$(kubectl -n storage get secret minio-credentials -o jsonpath='{.data.rootPassword}' | base64 -d) yq eval ".spec.flinkConfiguration.\"s3.endpoint\" = \"$S3_ENDPOINT\"" -i flink-job.yaml \ && yq eval ".spec.flinkConfiguration.\"s3.access-key\" = \"$S3_ACCESS_KEY\"" -i flink-job.yaml \ && yq eval ".spec.flinkConfiguration.\"s3.secret-key\" = \"$S3_SECRET_KEY\"" -i flink-job.yaml
- apply to k8s
kubectl -n flink apply -f flink-job.yaml
- prepare
- check with mc client
# change K8S_MASTER_IP to your k8s master ip K8S_MASTER_IP=$(kubectl get node -l node-role.kubernetes.io/control-plane -o jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}') ACCESS_SECRET=$(kubectl -n storage get secret minio-credentials -o jsonpath='{.data.rootPassword}' | base64 -d) podman run --rm \ --entrypoint bash \ --add-host=minio-api.dev.geekcity.tech:${K8S_MASTER_IP} \ --env TZ=Asia/Shanghai \ -it docker.io/minio/mc:latest \ -c "mc alias set minio http://minio-api.dev.geekcity.tech:32080 admin ${ACCESS_SECRET} \ && mc ls minio/flink-demos-gaia3/load-into-s3/$(date '+%Y-%m-%d--%H')"
#PART_FILENAME=part-2a79ffe3-76e2-4e6f-9306-4a3ede731af1-0 # change K8S_MASTER_IP to your k8s master ip K8S_MASTER_IP=$(kubectl get node -l node-role.kubernetes.io/control-plane -o jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}') ACCESS_SECRET=$(kubectl -n storage get secret minio-credentials -o jsonpath='{.data.rootPassword}' | base64 -d) podman run --rm \ --entrypoint bash \ --add-host=minio-api.dev.geekcity.tech:${K8S_MASTER_IP} \ --env TZ=Asia/Shanghai \ -it docker.io/minio/mc:latest \ -c "mc alias set minio http://minio-api.dev.geekcity.tech:32080 admin ${ACCESS_SECRET} \ && mc head --lines 20 minio/flink-demos-gaia3/load-into-s3/$(date '+%Y-%m-%d--%H')/$PART_FILENAME"