Comment déployer une image docker de kafka connect avec un connecteur jdbc.
Prérequis
Docker installé et opérationnel :
- Installer docker, vous pouvez suivre ces instructions en fonction de l’environnement utilisé : https://docs.docker.com/engine/install/debian/
Pour mon cas, j’ai utilisé Docker Engine on debian.
Créer un fichier Dockerfile
Dans dossier ‘Docker’, nous allons créer un fichier ‘Dockerfile’ dont le contenu est :
FROM confluentinc/cp-kafka-connect-base:latest
run confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
Pour récupérer la dernière version de l’image de kafka-connect-base ainsi que la dernier version du connecteur Jdbc.
Build de l’image docker
Build l’image depuis Dockerfile :
docker build . -t containerproductitem:1.0.0
Créer un fichier (env.list)
Dans ce fichier, nous allons configurer notre accès au kafka Cluster et Kafka registry, afin de pouvoir consommer les messages depuis un topic.
CONNECT_BOOTSTRAP_SERVERS=serveur:port CONNECT_GROUP_ID=svc-TEST-app-connect-worker CONNECT_CONFIG_STORAGE_TOPIC= dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1-configs CONNECT_OFFSET_STORAGE_TOPIC= dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1-offsets CONNECT_STATUS_STORAGE_TOPIC= dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1-status CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 SCHEMA_REGISTRY_URL=io.confluent.connect.avro.AvroConverter BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=*******:******* CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=****** CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=https://schema-registry.*******-staging-europe-west1.streaming.data.******.cloud CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=******** CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=https://schema-registry.*******-staging-europe-west1.streaming.data.***.cloud CONNECT_REST_ADVERTISED_HOST_NAME="localhost" CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/ CONNECT_REST_PORT=8082 CONNECT_LOG4J_LOGGERS="kafka.controller=ERROR,kafka.foo.bar=ERROR" CONNECT_LOG4J_ROOT_LOGLEVEL=ERROR CONNECT_TOOLS_LOG4J_LOGLEVEL=ERROR CONNECT_SASL_MECHANISM=PLAIN CONNECT_SECURITY_PROTOCOL=SASL_SSL CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******"; CONNECT_PRODUCER_SASL_MECHANISM=PLAIN CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******"; CONNECT_PRODUCER_GROUP_ID=svc-TEST-app-connect-worker CONNECT_CONSUMER_SASL_MECHANISM=PLAIN CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******"; CONNECT_CONSUMER_GROUP_ID=svc-TEST-app-connect-worker CONNECT_CONSUMER_CLIENT_ID=svc-TEST-app-connect-worker
Run du container
Nous allons démarrer notre container « containerproductitem » avec les ressources suivantes :
docker run -e 8082 -l error --name containerproductitem --network host --env-file env.list containerproductitem:1.0.0
Config du connecteur jdbc
Maintenant que le Kafka Connect est lancé via le container containerproductitem, il reste à configurer le connecteur jbdc Sink, pour consommer un topic Kafka et pousser les messages dans la table « tb_references_step_import » sous postgresql.
Pour accéder au container et pouvoir lancer la commande curl :
docker exec -t -i containerproductitem /bin/bash
Commande curl :
Curl -X POST -H "Content-Type: application/json" --data '{"name":"connector-sinkJDBC","config": { "topics": "dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1","tasks.max": "1","connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","flush.size":"3","format.class":"io.confluent.connect.sftp.sink.format.csv.avroFormat","connection.url":"jdbc:postgresql://dfrlm****.int.com:5432/postgres?verifyServerCertificate:false&useSSL:true&requireSSL:true","connection.user":"***","connection.password":"****","table.name.format":"tb_references_step_import","pk.mode":"record_value","insert.mode":"upsert","puto.create":"true","auto.evolve":"true","delete.enabled":"false"}}' http://localhost:8082/connectors
Cette configuration sera stockée dans le topic config ‘dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1-configs‘ prédéfini dans notre fichier env.list.
Notre kafka connect est maintenant démarré. Le connecteur jdbc Sink est configuré, et les messages sont consommés du topic dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1 en format Avro, et stockés dans la table « tb_references_step_import » sous postgresql :