当前位置: 移动技术网 > IT编程>开发语言>c# > Winows上简单配置使用kafka(.net使用)

Winows上简单配置使用kafka(.net使用)

2019年10月23日  | 移动技术网IT编程  | 我要评论

一、kafka环境配置

1.jdk安装

安装文件:http://www.oracle.com/technetwork/java/javase/downloads/ 下载jdk
安装完成后需要添加以下的环境变量(右键点击“我的电脑” -> "高级系统设置" -> "环境变量" ):

java_home: c:\program files\java\jdk-13.0.1(jdk的安装路径)

path: 现有值后追加 "%java_home%\bin"

 

 

 

 

 

 2.zookeeper安装

kafka的运行依赖于zookeeper,所以在运行kafka之前我们需要安装并运行zookeeper

下载安装文件: http://zookeeper.apache.org/releases.html

解压文件 apache-zookeeper-3.5.6-bin.tar

打开zookeeper-3.5.6\conf,把zoo_sample.cfg重命名成zoo.cfg
从文本编辑器里打开zoo.cfg, 把datadir的值改成“./apache-zookeeper-3.5.6/data”
添加如下系统变量:

zookeeper_home: c:\users\yc\work\apache-zookeeper-3.5.6 (zookeeper目录)

path: 在现有的值后面添加 ";%zookeeper_home%\bin;"

 

 

 

 运行zookeeper: 打开cmd然后执行 zkserver

 

 3.安装并运行kafka

 下载安装文件: 
 解压文件
 打开kafka_2.12-2.3.0\config
 从文本编辑器里打开 server.properties
 修改:log.dirs=./logs
     listeners=plaintext://localhost:9092
 打开cmd
 执行命令:c:\users\yc>cd c:\users\yc\work\kafka_2.12-2.3.0(进入此目录中)
  再执行:.\bin\windows\kafka-server-start.bat .\config\server.properties

 

 4.创建topics

cmd执行命令:cd c:\users\yc\work\kafka_2.12-2.3.0\bin\windows

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

执行成功后出现 :created topic test

 

 

 5.生产者使用测试

打开cmd窗口执行命令:cd c:\users\yc\work\kafka_2.12-2.3.0\bin\windows

kafka-console-producer.bat --broker-list localhost:9092 --topic test

 

 

6.消费者使用测试

 打开cmd窗口执行命令:cd c:\users\yc\work\kafka_2.12-2.3.0\bin\windows

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

 

 7..net程序消费者简单使用

引入confluent.kafka包

 

 

 

 public static void main(string[] args)
        {
            //地址及端口号
            var conf = new producerconfig { bootstrapservers = "localhost:9092" };

            action<deliveryreport<null, string>> handler = r =>
                console.writeline(!r.error.iserror
                    ? $"delivered message to {r.topicpartitionoffset}"
                    : $"delivery error: {r.error.reason}");

            using (var p = new producerbuilder<null, string>(conf).build())
            {
                //for (int i = 0; i < 100; ++i)
                //{
                
                    p.produce("test", new message<null, string> { value = "messagehowsf"}, handler);//kafka协议数据发送
                //}

                // wait for up to 10 seconds for any inflight messages to be delivered.
                p.flush(timespan.fromseconds(10));
            }
        }

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网