快乐大本营失恋33天,新没头脑和不高兴,疯狂雷电
kafka-topics.sh --list --zookeeper 192.168.0.201:12181
kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testkj1
kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testkj1
bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testkj1
kafka-console-producer.sh --broker-list localhost:9092 --topic testkj1
kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testkj1
kafka-server-start.sh -daemon ../config/server.properties
producer端,引入confluent.kafka
install-package confluent.kafka -version 1.0-beta2
using confluent.kafka; using system; using system.collections.generic; using system.text; using system.threading.tasks; namespace kafkatest { class program { static void main(string[] args) { test().wait(); } static async task test() { var conf = new producerconfig { bootstrapservers = "39.**.**.**:9092" }; action<deliveryreportresult<null, string>> handler = r => console.writeline(!r.error.iserror ? $"delivered message to {r.topicpartitionoffset}" : $"delivery error: {r.error.reason}"); using (var p = new producer<null, string>(conf)) { for (int i = 0; i < 100000; ++i) { p.beginproduce("my-topic", new message<null, string> { value = i.tostring() }, handler); } // wait for up to 10 seconds for any inflight messages to be delivered. p.flush(timespan.fromseconds(10)); } } } }
consumer端,引入confluent.kafka
install-package confluent.kafka -version 1.0-beta2
using confluent.kafka; using system; using system.linq; using system.text; namespace kafkaclient { class program { static void main(string[] args) { var conf = new consumerconfig { groupid = "test-consumer-group4", bootstrapservers = "39.**.**.**:9092", // note: the autooffsetreset property determines the start offset in the event // there are not yet any committed offsets for the consumer group for the // topic/partitions of interest. by default, offsets are committed // automatically, so in this example, consumption will only start from the // earliest message in the topic 'my-topic' the first time you run the program. autooffsetreset = autooffsetresettype.earliest }; using (var c = new consumer<ignore, string>(conf)) { c.subscribe("my-topic"); bool consuming = true; // the client will automatically recover from non-fatal errors. you typically // don't need to take any action unless an error is marked as fatal. c.onerror += (_, e) => consuming = !e.isfatal; while (consuming) { try { var cr = c.consume(); console.writeline($"consumed message '{cr.value}' at: '{cr.topicpartitionoffset}'."); } catch (consumeexception e) { console.writeline($"error occured: {e.error.reason}"); } } // ensure the consumer leaves the group cleanly and final offsets are committed. c.close(); } } } }
如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复
Net Core Web Api项目与在NginX下发布的方法
asp.net core3.1 引用的元包dll版本兼容性问题解决方案
IdentityServer4实现.Net Core API接口权限认证(快速入门)
ASP.NET Core MVC通过IViewLocationExpander扩展视图搜索路径的实现
网友评论