mssqlsrc.json.bk


{
“name”: “MSSQLConn”,
“config”: {
“connector.class” : “io.debezium.connector.sqlserver.SqlServerConnector”,
“tasks.max” : “1”,
“database.server.name” : “TestDB1”,
“database.hostname” : “10.58.181.105”,
“database.port” : “1433”,
“database.user” : “testUser”,
“database.password” : “P#”,
“database.dbname” : “testDB1”,
“database.history.kafka.bootstrap.servers” : “localhost:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”
}
}


SQLSink.json.bk

{
“name”:”SQLSinkX”,
“config”:{
“connector.class”:”io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”:”1″,
“topics”:”TestDB1.dbo.customers”,
“connection.url”:”jdbc:sqlserver://10.58.181.105:1433;database=testdb;username=testUser;password=P#”,
“mode”:”insert”,
“table.name.format”:”customers”,
“pk.fields”:”id”,
“auto.create”:”true”,
“auto.evolve”:”true”,
“transforms”:”unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.UnwrapFromEnvelope”
}
}

Confluent how to set source and sink connector for MSSQL Server

{
“name”: “PolicyDataSrc”,
“config”: {
“connector.class” : “io.debezium.connector.sqlserver.SqlServerConnector”,
“tasks.max” : “1”,
“database.server.name” : “DBNAME”,
“database.hostname” : “10.10.10.10”,
“database.port” : “1433”,
“database.user” : “SSRS_User”,
“database.password” : “PP”,
“database.dbname” : “Premdat”,
“table.whitelist”:”PolicyData”,
“database.history.kafka.bootstrap.servers” : “localhost:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”
}
}

{
“name”:”PolicyDataSink”,
“config”:{
“connector.class”:”io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”:”1″,
“topics”:”DBNAME.dbo.PolicyData”,
“connection.url”:”jdbc:sqlserver://10.58.181.105:1433;database=Premdat;username=testUser;password=P”,
“insert.mode”:”upsert”,
“table.name.format”:”PolicyData”,
“pk.mode”:”record_key”,
“pk.fields”:”PolicyID”,
“auto.create”:”true”,
“auto.evolve”:”true”,
“transforms”:”unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.UnwrapFromEnvelope”
}
}

version tested:

confluent 5.0.3

debezium 0.95

Debezium config example for Src and Src COnnector

{

“name”: “MSSQLConn”,

“config”: {

“connector.class” : “io.debezium.connector.sqlserver.SqlServerConnector”,

“tasks.max” : “1”,

“database.server.name” : “TestDB1”,

“database.hostname” : “10.58.181.105”,

“database.port” : “1433”,

“database.user” : “testUser”,

“database.password” : “P#”,

“database.dbname” : “testDB1”,

“database.history.kafka.bootstrap.servers” : “localhost:9092”,

“database.history.kafka.topic”: “schema-changes.inventory”

}

}


{

“name”: “MSSQLConn”,

“config”: {

“connector.class” : “io.debezium.connector.sqlserver.SqlServerConnector”,

“tasks.max” : “1”,

“database.server.name” : “TestDB1”,

“database.hostname” : “10.58.181.105”,

“database.port” : “1433”,

“database.user” : “testUser”,

“database.password” : “P#”,

“database.dbname” : “testDB1”,

“database.history.kafka.bootstrap.servers” : “localhost:9092”,

“database.history.kafka.topic”: “schema-changes.inventory”

}

}

–sink

{

“name”:”SinkX”,

“config”:{

“connector.class”:”io.confluent.connect.jdbc.JdbcSinkConnector”,

“tasks.max”:”1″,

“topics”:”TestDB1.dbo.customers”,

“connection.url”:”jdbc:sqlserver://10.58.181.105:1433;database=testdb;username=testUser;password=P1″,

“insert.mode”:”upsert”,

“table.name.format”:”customers”,

“pk.mode”:”record_value”,

“pk_fields”:”record_value”,

“pk.fields”:”id”,

“auto.create”:”true”,

“auto.evolve”:”true”,

“transforms”:”unwrap”,

“transforms.unwrap.type”: “io.debezium.transforms.UnwrapFromEnvelope”

}

}