在Kafka下载页面下载0.8版本,解压缩。
1.修改config目录下的server.properties 里面的host.name为机器的ip。假如部署kafka和开发运行kafka例子为同一台机器,不用修改,用默认的localhost也行。
2.修改config目录下的zookeeper.properties 里面的dataDir属性为你需要的目录。
3.假如你要配置集群,在kafka解压缩目录下新建zoo_data目录(第一次的时候需要新建),在zoo_data目录新建myid文件,设置内容为1。同时修改zookeeper.properties,具体可参考:solrcloud在tomcat下安装(三)
4.启动kafka。
//启动zookeeper server (用&是为了能退出命令行): bin/zookeeper-server-start.sh config/zookeeper.properties & //启动kafka server: bin/kafka-server-start.sh config/server.properties & |
5.新建一个生产者例子
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaTest { public static void main(String[] args) { Properties props = new Properties(); props.put("zk.connect", "10.103.22.47:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "10.103.22.47:9092"); props.put("request.required.acks", "1"); //props.put("partitioner.class", "com.xq.SimplePartitioner"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); String ip = "192.168.2.3"; String msg ="this is a messageuuu!"; KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", ip,msg); producer.send(data); producer.close(); } } |
新建一个消费者例子
import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.message.MessageAndMetadata; public class ConsumerSample { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("zookeeper.connect", "10.103.22.47:2181"); props.put("zookeeper.connectiontimeout.ms", "1000000"); props.put("group.id", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String,Integer> topics = new HashMap<String,Integer>(); topics.put("test", 2); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = connector.createMessageStreams(topics); List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test"); ExecutorService threadPool = Executors.newFixedThreadPool(2); for (final KafkaStream<byte[], byte[]> stream : streams) { threadPool.submit(new Runnable() { public void run() { for (MessageAndMetadata msgAndMetadata : stream) { // process message (msgAndMetadata.message()) System.out.println("topic: " + msgAndMetadata.topic()); Message message = (Message) msgAndMetadata.message(); ByteBuffer buffer = message.payload(); byte[] bytes = new byte[message.payloadSize()]; buffer.get(bytes); String tmp = new String(bytes); System.out.println("message content: " + tmp); } } }); } } } |
先启动消费者例子,然后再启动生产者例子,这样会立即看到效果。
相关推荐
kafka-java-demo 基于java的kafka生产消费者示例。 mvn
RabbitMQ和Kafka详细笔记以及示例代码
自己搭建的kafka开发环境,其中会参考其他的文档《03_重新设置maven的本地库位置》、《07_Nexus的介绍和安装》和《08_Nexus的配置》,这三篇文档也是我写的,可以在csdn中搜到。这个开发环境的搭建包括Kafka集群,...
kafka单机和集群的搭建步骤,都是亲测有效;还有kafka监控工具附带,亲测有效,需要自取哟,不懂私聊
本文不讲kafka集群原理,只谈部署步骤。 默认读者已对kafka有最基本的认知,纯粹作为部署笔记,方便回忆。 另外本文是基于Windows部署的,Linux的步骤是基本相同的(只是启动脚本位置不同)。 kafka集群类型: ...
示例涵盖了从环境搭建到代码实现的全过程,帮助您快速上手实时数据处理的开发。提供了一个完整的示例,演示了如何使用Spark Streaming和Kafka进行实时数据处理。通过该示例,我们可以学习到如何创建Kafka主题、发送...
kafka环境搭建(单机+集群) 详细,以及redis在虚拟机上的安装。
scala语言编写的spark streamming消费kafka数据存入hbase示例代码。打包成jar包可以在spark2.4下运行,测试环境是CDH6.2,运行没有问题。
kafka 配置 kerberos,设置 ACL权限, java 客户端连接。
kafka 安装手册(单机) 保证step by step 经验总结,生产环境和测试环境可用
《Kafka单机部署》可能用到的配置文件。需要的同学可以下载试一下。
通过定时任务生成消息,并定时消费kafka主题消息,大家可以试试
kafka单机版安装部署手册1
项目需要,自学初级kafka环境搭建,现将学习心得,解压包和样例代码上传供大家参考学习,有兴趣的可以私聊讨论。
使用带有Clojure的Kafka和Kafka流的示例示例。 我想通过Clojure而不是Clojure包装器来学习真正的Kafka API,因此这里是使用原始API的示例,它很干净,意味着您无需等待Clojure包装器库来升级Kafka。 有关示例的指南...
windows下kafka_2.12-2.9.0.rar(含单机伪分布式配置)
Kafka 是一种高吞吐量 的分布式发布订阅消息系统,本实例利用C#开发,同步数据,亲测可用!
整理的Kafka示例代码,包括集群上的生产者/消费者的Java示例代码,以及Scala编写的示例
Maven搭建Kafka Java开发环境需要的jar包,直接解压到maven本地仓库的com文件夹下
kafka单机版自动安装shell脚本、kafka_2.13-2.8.0.tgz、apache-zookeeper-3.6.3-bin.tar.gz