博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka的使用
阅读量:5065 次
发布时间:2019-06-12

本文共 6075 字,大约阅读时间需要 20 分钟。

kafka基于zookeeper。

需要安装kafka、zookeeper。

安装方法参考:

启动zookeeper:点击zkServer.cmd启动zookeeper。

启动kafka:

如果启动报错:

启动kafka的时候报错:

ERROR Error while deleting the clean shutdown file in dir E:\kafka_2.11-1.0.0\tmp\kafka-logs (kafka.server.LogDirFailureChannel)

java.nio.file.FileSystemException: E:\kafka_2.11-1.0.0\tmp\kafka-logs\__consumer_offsets-9\00000000000000000000.timeindex: 另一个程序正在使用此文件,进程无法访问。

解决办法:

删除日志:

日志的路径在kafka的文件中找到server.properties:log.dirs=/tmp/kafka-logs。删除tmp文件夹下的kafka-logs文件夹。重启kafka即可。

kafka配置:

kafka-beans.xml:

my-replicated-topic
127.0.0.1:9092
all
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringSerializer
33554432
my-replicated-topic
127.0.0.1:9092
group1
true
1000
30000
org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.serialization.StringDeserializer

 redis.properties:

# 控制一个pool可分配多少个jedis实例redis.pool.maxTotal=1000# 控制一个pool最多有多少个状态为idle(空闲)的jedis实例redis.pool.maxIdle=200# 表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionExceptionredis.pool.maxWaitMillis=2000#在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的redis.pool.testOnBorrow=true# redis 单机# 单机 hostjedis.host=127.0.0.1# 单机 portjedis.port=6379

KafkaProducerDemo.java:

package com.test.www.unionpay.producer;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerDemo { Properties properties; public KafkaProducerDemo() { } public KafkaProducerDemo(Properties properties) { super(); this.properties = properties; } public Properties getProperties() { return properties; } public void setProperties(Properties properties) { this.properties = properties; } public void sendMessage(String msg) { KafkaProducer
producer = new KafkaProducer
(properties); ProducerRecord
record = new ProducerRecord
(properties.getProperty("topic"),msg); producer.send(record); producer.close(); } }

KafkaConsumerDemo.java:

package com.test.www.unionpay.consumer;import java.util.Arrays;import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerDemo { private Properties props; public KafkaConsumerDemo() { } public KafkaConsumerDemo(Properties props) { super(); this.props = props; } public Properties getProps() { return props; } public void setProps(Properties props) { this.props = props; } public String receive(){ KafkaConsumer
consumer = new KafkaConsumer
(props); consumer.subscribe(Arrays.asList(props.getProperty("topic"))); String msg = ""; while(true){ ConsumerRecords
consumerRecords = consumer.poll(100); for(ConsumerRecord
consumerRecord:consumerRecords){ msg += consumerRecord.value(); } consumer.close(); return msg; } } }

KafkaController.java:

package com.test.www.web.controller;import java.text.SimpleDateFormat;import java.util.Date; import javax.annotation.Resource; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.servlet.ModelAndView; import com.test.www.unionpay.consumer.KafkaConsumerDemo; import com.test.www.unionpay.producer.KafkaProducerDemo; @Controller public class KafkaController { @Resource(name = "kafkaProducerDemo") KafkaProducerDemo producer; @Resource(name = "kafkaConsumerDemo") KafkaConsumerDemo consumer; @RequestMapping(value = "/welcome") public ModelAndView welcome() { System.out.println("--------welcome--------"); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/sendmessage", method = RequestMethod.GET) public ModelAndView sendMessage() { System.out.println("--------sendmessage--------"); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String now = sdf.format(date); ModelAndView mv = new ModelAndView(); mv.addObject("time", now); mv.setViewName("kafka_send"); return mv; } @RequestMapping(value = "/onsend", method = RequestMethod.POST) public ModelAndView onsend(@RequestParam("message") String msg) { System.out.println("--------onsend--------"); producer.sendMessage(msg); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/receive") public ModelAndView receive() { System.out.println("--------receive--------"); String msg = consumer.receive(); ModelAndView mv = new ModelAndView(); mv.addObject("msg", msg); mv.setViewName("kafka_receive"); return mv; } }

页面:

welcome.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>    
welcome

Welcome

Send a Message

Get a Message

kafka_send.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>    
kafka_send

Send a Message

MessageText:

RETURN HOME

kafka_receive.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>    
kafka_receive

Kafka_Reveive!!!

Receive Message : ${msg}

RETURN HOME

效果图:

如图,kafka发送消息、接受消息运行成功。

 

转载于:https://www.cnblogs.com/super-chao/p/9304565.html

你可能感兴趣的文章
mysqladmin
查看>>
解决 No Entity Framework provider found for the ADO.NET provider
查看>>
设置虚拟机虚拟机中fedora上网配置-bridge连接方式(图解)
查看>>
[置顶] Android仿人人客户端(v5.7.1)——人人授权访问界面
查看>>
ES6内置方法find 和 filter的区别在哪
查看>>
Android实现 ScrollView + ListView无滚动条滚动
查看>>
java学习笔记之String类
查看>>
UVA 11082 Matrix Decompressing 矩阵解压(最大流,经典)
查看>>
硬件笔记之Thinkpad T470P更换2K屏幕
查看>>
iOS开发——缩放图片
查看>>
HTTP之URL的快捷方式
查看>>
满世界都是图论
查看>>
配置链路聚合中极小错误——失之毫厘谬以千里
查看>>
蓝桥杯-分小组-java
查看>>
Android Toast
查看>>
iOS开发UI篇—Quartz2D使用(绘制基本图形)
查看>>
docker固定IP地址重启不变
查看>>
桌面图标修复||桌面图标不正常
查看>>
JavaScript基础(四)关于对象及JSON
查看>>
JAVA面试常见问题之Redis篇
查看>>