1. Docker 2. KSQL 3. Source Connector 4. Sink Connectors 5. Exception handeling 5. Driver info ************************************ 1. Docker ************************************************** 1.1 Install Docker $ sudo apt update $ sudo apt install docker.io $ sudo apt install docker-compose 1.2 Start Docker Containers [If containers are not there it will download, install and setup] **** It will install, connect and setup KSQL, Kafka-connect, Schema-registry, kafkacat, zookeeper and kafka **** $ cd docker/kafka vi docker-compose.yml //this file should be there $ docker-compose up -d [-d for detached mode] 1.3 Support for docker // Add user to docker group for multiple terminal access [Not Mandatory] $ sudo groupadd docker $ sudo usermod -aG docker ubuntu $ su -s ubuntu // Docker commands // View all containers $ docker ps -a // Stop all running containers $ docker stop $(docker ps -a -q) //to start docker-compose up -d // Remove all stopped containers $ docker rm $(docker ps -a -q) // Remove docker images $ docker rmi imageid/imagename // Docker view logs $ docker logs -f containername ************************************ 2. KSQL ************************************************** 2.1 Start KSQL console $ docker exec -it ksqldb ksql http://ksqldb:8088 2.2 View all topics > show topics; 2.3 View all connectors > show connectors; 2.4 Print topic data (After creating the topic) > print topicname from beginning; // All data > print topicname; // Upcoming data ************************************ 3. SOURCE CONNECTOR ************************************************** 3.1 Run source connector $ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d ' { "name": "mysql5-prodcution-source", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "10", "database.hostname": "10.0.3.173", "database.port": "3306", "database.user": "root", "database.password": "XXXXXXXXXX", "database.server.id": "223344", "database.server.name": "dbserver", "database.whitelist": "indianmo_imc_new", "table.whitelist": "indianmo_imc_new.associate_leads,indianmo_imc_new.md_members,indianmo_imc_new.associates,indianmo_imc_new.lead_requirement_details,indianmo_imc_new.need_analyzer,indianmo_imc_new.associates_recharge_bill_log", "column.blacklist": "indianmo_imc_new.md_members.mem_tot_ttt,indianmo_imc_new.associates.total_tt,indianmo_imc_new.need_analyzer.na_sal_comp_name", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "mysql5table", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "key.converter.schemas.enable":true, "value.converter.schemas.enable":true, "transforms": "unwrap,dropTopicPrefix,pushed_on,first_transfer_date,mem_dob,pushed_date,AL_Date,A_Last_login,live_time,A_Date,callMeDate,last_call_on,last_attempted_on,na_borrow_date,na_invest_date,na_edited_on,na_date_of_formation,arbl_updated_on", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)", "transforms.dropTopicPrefix.replacement":"$1", "time.precision.mode":"connect", "decimal.handling.mode":"string", "snapshot.mode":"schema_only", "include.schema.changes":true, "transforms.pushed_on.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.pushed_on.target.type":"string", "transforms.pushed_on.field":"pushed_on", "transforms.pushed_on.format":"YYYY-MM-dd HH:mm:ss", "transforms.first_transfer_date.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.first_transfer_date.target.type":"string", "transforms.first_transfer_date.field":"first_transfer_date", "transforms.first_transfer_date.format":"YYYY-MM-dd HH:mm:ss", "transforms.mem_dob.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.mem_dob.target.type":"string", "transforms.mem_dob.field":"mem_dob", "transforms.mem_dob.format":"YYYY-MM-dd", "transforms.pushed_date.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.pushed_date.target.type":"string", "transforms.pushed_date.field":"pushed_date", "transforms.pushed_date.format":"YYYY-MM-dd", "transforms.AL_Date.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.AL_Date.target.type":"string", "transforms.AL_Date.field":"AL_Date", "transforms.AL_Date.format":"YYYY-MM-dd", "transforms.A_Last_login.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.A_Last_login.target.type":"string", "transforms.A_Last_login.field":"A_Last_login", "transforms.A_Last_login.format":"YYYY-MM-dd HH:mm:ss", "transforms.live_time.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.live_time.target.type":"string", "transforms.live_time.field":"live_time", "transforms.live_time.format":"YYYY-MM-dd HH:mm:ss", "transforms.A_Date.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.A_Date.target.type":"string", "transforms.A_Date.field":"A_Date", "transforms.A_Date.format":"YYYY-MM-dd", "transforms.callMeDate.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.callMeDate.target.type":"string", "transforms.callMeDate.field":"callMeDate", "transforms.callMeDate.format":"YYYY-MM-dd HH:mm:ss", "transforms.last_call_on.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.last_call_on.target.type":"string", "transforms.last_call_on.field":"last_call_on", "transforms.last_call_on.format":"YYYY-MM-dd HH:mm:ss", "transforms.last_attempted_on.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.last_attempted_on.target.type":"string", "transforms.last_attempted_on.field":"last_attempted_on", "transforms.last_attempted_on.format":"YYYY-MM-dd HH:mm:ss", "transforms.na_borrow_date.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.na_borrow_date.target.type":"string", "transforms.na_borrow_date.field":"na_borrow_date", "transforms.na_borrow_date.format":"YYYY-MM-dd HH:mm:ss", "transforms.na_invest_date.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.na_invest_date.target.type":"string", "transforms.na_invest_date.field":"na_invest_date", "transforms.na_invest_date.format":"YYYY-MM-dd HH:mm:ss", "transforms.na_edited_on.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.na_edited_on.target.type":"string", "transforms.na_edited_on.field":"na_edited_on", "transforms.na_edited_on.format":"YYYY-MM-dd HH:mm:ss", "transforms.na_date_of_formation.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.na_date_of_formation.target.type":"string", "transforms.na_date_of_formation.field":"na_date_of_formation", "transforms.na_date_of_formation.format":"YYYY-MM-dd", "transforms.arbl_updated_on.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.arbl_updated_on.target.type":"string", "transforms.arbl_updated_on.field":"arbl_updated_on", "transforms.arbl_updated_on.format":"YYYY-MM-dd HH:mm:ss", "transforms.lm_subscription_expiry_on.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.lm_subscription_expiry_on.target.type":"string", "transforms.lm_subscription_expiry_on.field":"lm_subscription_expiry_on", "transforms.lm_subscription_expiry_on.format":"YYYY-MM-dd HH:mm:ss" } }' 3.2 Check kafka connect log that it is connected or not $ docker logs kafka-connect -f | grep -v "INFO" | grep -v "WARN" 3.3 Check KSQL topics created or not according to table names [FOllow No. 2 section] ************************************ 4. SINK CONNECTOR ************************************************** 4.1 Note * After source connector connected then only you should run the sink connectors. * After running each sink connector check kafka connect logs by $ docker logs kafka-connect -f | grep -v "INFO" | grep -v "WARN" * If no errors are there then run next sink connector // MD Members Sink ********* curl -X PUT http://localhost:8083/connectors/sink-jdbc-md-members/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://15.207.62.112:3306/imc_lead_market", "topics": "md_members", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "XXXXXXXXXX", "auto.create": false, "auto.evolve": true, "insert.mode": "upsert", "delete.enabled": true, "pk.mode": "record_key", "pk.fields": "mem_id", "transforms": "RenameKey,selectFields,renameFields,addTopicPrefix,convertTS", "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key", "transforms.RenameKey.renames": "mem_id:mem_id", "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.selectFields.whitelist": "mem_id,mem_mobile,mem_fname,mem_lname,mem_email,mem_gender,mem_dob,mem_zip,mem_marital_status,mem_state,mem_city,mem_primary_lang,mem_created_on,mem_updated_on,mem_tot_ttt,transfer_count,first_transfer_date,last_transfer_date,dnd_status,wda_session_id", "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.renameFields.renames": "mem_fname:mem_name,mem_primary_lang:mem_primary_language,mem_tot_ttt:m5_mem_total_talktime,transfer_count:mem_transfer_count,first_transfer_date:m5_mem_first_transferred_on,last_transfer_date:m5_mem_last_transferred_on,dnd_status:mem_dnd_status,mem_dob:m5_mem_dob,mem_created_on:m5_mem_created_date,mem_updated_on:m5_mem_updated_on,mem_created_on:m5_mem_created_on,mem_lname:m5_mem_lname,mem_zip:m5_mem_zip,wda_session_id:kafka_flag", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mdt_$1", "transforms.convertTS.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.convertTS.field":"first_transfer_date", "transforms.convertTS.format":"YYYY-MM-dd H:m:s", "transforms.convertTS.target.type":"unix", "value.converter.schemas.enable":false }' // Associate Leads Sink ********* curl -X PUT http://localhost:8083/connectors/sink-jdbc-associate-leads/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://15.207.62.112:3306/imc_lead_market", "topics": "associate_leads", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "XXXXXXXXXX", "auto.create": false, "auto.evolve": true, "insert.mode": "upsert", "delete.enabled": true, "pk.mode": "record_key", "pk.fields": "al_id", "transforms": "RenameKey,selectFields,renameFields,addTopicPrefix", "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key", "transforms.RenameKey.renames": "AL_Id:al_id", "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.selectFields.whitelist": "AL_Id,A_Id,L_Id,cityID,rtitle,lead_price,selling_price,lead_trans_type,R_Id,bought_by,bought_ip,leadSentBy,pushed_date,pushed_on,Al_Date,Al_Date_time,premium_lead_rating,lead_pincode,AL_Status,kafka_flag", "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.renameFields.renames": "AL_Id:al_id,A_Id:associate_id,L_Id:mem_id,cityID:city_id,rtitle:product_id,lead_price:lead_actual_price,selling_price:lead_selling_price,lead_trans_type:lead_type_msql5,R_Id:requirement_id,bought_by:lead_bought_by,bought_ip:lead_bought_ip,leadSentBy:lead_sent_by,premium_lead_rating:lead_rating,AL_Status:lead_al_status,pushed_date:m5_lead_pushed_date,pushed_on:lead_pushed_on,AL_Date:m5_lead_sold_date,AL_Date_Time:m5_lead_sold_on,pushed_on:m5_lead_pushed_on", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"ldt_lm_$1" }' // Requirement Details Sink ********* curl -X PUT http://localhost:8083/connectors/sink-jdbc-lead-req-details/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://15.207.62.112:3306/imc_lead_market", "topics": "lead_requirement_details", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "XXXXXXXXXX", "auto.create": false, "auto.evolve": true, "insert.mode": "upsert", "delete.enabled": true, "pk.mode": "record_key", "pk.fields": "lrd_id", "transforms": "RenameKey,selectFields,renameFields,addTopicPrefix", "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key", "transforms.RenameKey.renames": "lrd_id:lrd_id", "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.selectFields.whitelist": "lrd_id,A_Id,L_Id,R_Id,AL_Id,rtitle,reqName,rdField1,rdField2,rdField3,rdField4,rdField5,rdField6,rdField7,rdField8,rdField9,rdField10,rdField11,rdField12,rdField13,rdField14,rdField15,callMeDate,preferred_meeting_loc,reqActStatus,lastUpdatedOn,lastUpdatedBy,kafka_flag", "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.renameFields.renames": "A_Id:associate_id,L_Id:mem_id,R_Id:requirement_id,AL_Id:al_id,rtitle:product_id,reqName:lrd_name,rdField1:lrd_field1,rdField2:lrd_field2,rdField3:lrd_field3,rdField4:lrd_field4,rdField5:lrd_field5,rdField6:lrd_field6,rdField7:lrd_field7,rdField8:lrd_field8,rdField9:lrd_field9,rdField10:lrd_field10,rdField11:lrd_field11,rdField12:lrd_field12,rdField13:lrd_field13,rdField14:lrd_field14,rdField15:lrd_field15,preferred_meeting_loc:lrd_meeting_location,reqActStatus:lrd_status,lastUpdatedBy:lrd_created_by,lastUpdatedOn:m5_lrd_created_on,callMeDate:m5_lrd_call_me_date", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"ldt_lm_$1" }' // Associates Sink ********* curl -X PUT http://localhost:8083/connectors/sink-associates/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://15.207.62.112:3306/imc_lead_market", "topics": "associates", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "XXXXXXXXXX", "auto.create": false, "auto.evolve": true, "insert.mode": "upsert", "delete.enabled": true, "pk.mode": "record_key", "pk.fields": "associate_id", "transforms": "RenameKey,selectFields,renameFields,addTopicPrefix", "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key", "transforms.RenameKey.renames": "A_Id:associate_id", "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.selectFields.whitelist": "A_Id,A_Mobile,A_Email,A_Contact_Name,A_Password,A_Gender,A_DOB,A_State,A_City,Pincode,A_Date_Time,A_Date,acc_manager,gcm_reg_id,KYC_Status,A_Last_login,mobile_app_status,live_time,internal_block,aw_balance,current_pincode,A_Lastupdated,lm_subscription_expiry_on,A_DStatus,kafka_flag", "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.renameFields.renames": "A_Id:associate_id,A_Email:associate_email,A_Mobile:associate_mobile,A_Contact_Name:associate_name,A_Password:associate_password,A_DOB:m5_A_DOB,A_State:associate_state,A_City:associate_city,Pincode:associate_pincode,KYC_Status:associate_kyc_status,mobile_app_status:associate_mobile_app_status,internal_block:associate_internal_block_flag,aw_balance:associate_current_balance,current_pincode:associate_current_pincode,A_Date_Time:m5_A_Date_Time,A_Last_login:m5_A_Last_login,live_time:m5_live_time,A_Date:m5_A_Date,A_Lastupdated:m5_A_Lastupdated,A_Gender:m5_associate_gender,lm_subscription_expiry_on:m5_lm_subscription_expiry_on", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mdt_lm_$1" }' // Md Members Details Sink ********* curl -X PUT http://localhost:8083/connectors/sink-jdbc-md-members-details/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://15.207.62.112:3306/imc_lead_market", "topics": "md_members", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "XXXXXXXXXX", "auto.create": false, "auto.evolve": true, "insert.mode": "upsert", "delete.enabled": true, "pk.mode": "record_key", "pk.fields": "mem_id", "transforms": "selectFields,renameFields,addTopicPrefix", "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.selectFields.whitelist": "mem_id,mem_languages,mem_profile_img,mem_education,mem_address,mem_office_address,mem_occupation,mem_sub_occupation,mem_other_address,mem_income,mem_family_income,mem_act_income", "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.renameFields.renames": "mem_languages:mdl_languages,mem_profile_img:mdl_profile_img_path,mem_education:mdl_education,mem_address:mdl_address,mem_office_address:mdl_office_address,mem_occupation:mdl_occupation,mem_sub_occupation:mdl_sub_occupation,mem_other_address:mdl_other_address,mem_income:mdl_income,mem_family_income:mdl_family_income,mem_act_income:mdl_actual_income", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"ldt_$1_details_log" }' // Associate Details Sink ********* curl -X PUT http://localhost:8083/connectors/sink-jdbc-associate-details/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://15.207.62.112:3306/imc_lead_market", "topics": "associates", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "XXXXXXXXXX", "auto.create": false, "auto.evolve": true, "insert.mode": "upsert", "delete.enabled": true, "pk.mode": "record_key", "pk.fields": "associate_id", "transforms": "RenameKey,selectFields,renameFields,addTopicPrefix", "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key", "transforms.RenameKey.renames": "A_Id:associate_id", "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.selectFields.whitelist": "A_Id,asc_address_line1,A_Highest_Edu,A_Language_Known,A_Marital,last_call_on,last_attempted_on,gst_number", "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.renameFields.renames": "A_Id:associate_id,asc_address_line1:adl_address,A_Highest_Edu:adl_highest_education,A_Language_Known:adl_languages_known,A_Marital:adl_marital_status,gst_number:adl_gst_number,last_call_on:m5_last_call_on,last_attempted_on:m5_last_attempted_on", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"ldt_lm_$1_details_log" }' // Associate Listing Details Sink ********* curl -X PUT http://localhost:8083/connectors/sink-jdbc-associate-listing-details-listing/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://15.207.62.112:3306/imc_lead_market", "topics": "associates", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "XXXXXXXXXX", "auto.create": false, "auto.evolve": true, "insert.mode": "upsert", "delete.enabled": true, "pk.mode": "record_key", "pk.fields": "associate_id", "transforms": "RenameKey,selectFields,renameFields,addTopicPrefix", "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key", "transforms.RenameKey.renames": "A_Id:associate_id", "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.selectFields.whitelist": "A_Id,A_Companies,A_Designation,A_Exp_Month,A_Description,sp_products", "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.renameFields.renames": "A_Id:associate_id,A_Companies:ald_company_name,A_Designation:ald_designation,A_Exp_Month:ald_work_exp_months,A_Description:ald_description,sp_products:ald_sel_products", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"ldt_lm_$1_listing_details_log" }' // Need Analyzer Sink ********* curl -X PUT http://localhost:8083/connectors/sink-need-analyzer/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://15.207.62.112:3306/imc_lead_market", "topics": "need_analyzer", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "XXXXXXXXXX", "auto.create": false, "auto.evolve": true, "insert.mode": "upsert", "delete.enabled": true, "pk.mode": "record_key", "pk.fields": "na_id", "transforms": "renameFields", "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.renameFields.renames": "na_borrow_date:m5_na_borrow_date,na_invest_date:m5_na_invest_date,na_edited_on:m5_na_edited_on,na_date_of_formation:m5_na_date_of_formation,na_added_on:m5_na_added_on" }' // Associate Recharge Bill Log Sink ********* curl -X PUT http://localhost:8083/connectors/associates-recharge-bill-log/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://15.207.62.112:3306/imc_lead_market", "topics": "associates_recharge_bill_log", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "XXXXXXXXXX", "auto.create": false, "auto.evolve": true, "insert.mode": "upsert", "delete.enabled": true, "pk.mode": "record_key", "pk.fields": "arbl_id", "transforms": "addTopicPrefix,renameFields", "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.renameFields.renames": "aw_id:whl_id,arbl_added_on:m5_arbl_added_on,arbl_updated_on:m5_arbl_updated_on", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"ldt_lm_$1" }' ************************************ 5. Exception Handeling ************************************************** 5.1 If 5.5 server restarted $ curl -X POST localhost:8083/connectors/connectorname/tasks/0/restart 5.2 If 5.5 schema changed or failed // Check docker logs $ docker logs kafka-connect -f | grep -v "INFO" | grep -v "WARN" // Stop, remove and run again docker containers Check No 1 section 5.3 If source connector fails. docker stop $(docker ps -a -q) docker rm $(docker ps -a -q) cd docker/kafka docker-compose up -d [wait 2mins] run source connector [wait 3mins] run sink connectors with interval 30secs ************************************ 6. Check Drivers are properly installed [Support] ************************************************** // Go to kafka-connect container $ cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc -- Check this folder is exists or not mysql-connector-java-8.0.22 [Note: verisons can vary] -- If Not there Download and Extract Mysql JDBC Driver Link https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz Copy Folder (From root dir) $ docker cp /home/indianmoneycom/Downloads/mysql-connector-java-8.0.22/. kafka-connect:/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/mysql-connector-java-8.0.22 Youtube https://studio.youtube.com/channel/UC-QCVT-_8ULO-F5xO1J_-sw/playlists drive https://drive.google.com/file/d/1gTJeaAT3u8pII15SijH9YNuetf86L1nc/view?usp=sharing