博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka的使用
阅读量:5065 次
发布时间:2019-06-12

本文共 6075 字,大约阅读时间需要 20 分钟。

kafka基于zookeeper。

需要安装kafka、zookeeper。

安装方法参考:

启动zookeeper:点击zkServer.cmd启动zookeeper。

启动kafka:

如果启动报错:

启动kafka的时候报错:

ERROR Error while deleting the clean shutdown file in dir E:\kafka_2.11-1.0.0\tmp\kafka-logs (kafka.server.LogDirFailureChannel)

java.nio.file.FileSystemException: E:\kafka_2.11-1.0.0\tmp\kafka-logs\__consumer_offsets-9\00000000000000000000.timeindex: 另一个程序正在使用此文件,进程无法访问。

解决办法:

删除日志:

日志的路径在kafka的文件中找到server.properties:log.dirs=/tmp/kafka-logs。删除tmp文件夹下的kafka-logs文件夹。重启kafka即可。

kafka配置:

kafka-beans.xml:

my-replicated-topic
127.0.0.1:9092
all
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringSerializer
33554432
my-replicated-topic
127.0.0.1:9092
group1
true
1000
30000
org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.serialization.StringDeserializer

 redis.properties:

# 控制一个pool可分配多少个jedis实例redis.pool.maxTotal=1000# 控制一个pool最多有多少个状态为idle(空闲)的jedis实例redis.pool.maxIdle=200# 表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionExceptionredis.pool.maxWaitMillis=2000#在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的redis.pool.testOnBorrow=true# redis 单机# 单机 hostjedis.host=127.0.0.1# 单机 portjedis.port=6379

KafkaProducerDemo.java:

package com.test.www.unionpay.producer;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerDemo { Properties properties; public KafkaProducerDemo() { } public KafkaProducerDemo(Properties properties) { super(); this.properties = properties; } public Properties getProperties() { return properties; } public void setProperties(Properties properties) { this.properties = properties; } public void sendMessage(String msg) { KafkaProducer
producer = new KafkaProducer
(properties); ProducerRecord
record = new ProducerRecord
(properties.getProperty("topic"),msg); producer.send(record); producer.close(); } }

KafkaConsumerDemo.java:

package com.test.www.unionpay.consumer;import java.util.Arrays;import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerDemo { private Properties props; public KafkaConsumerDemo() { } public KafkaConsumerDemo(Properties props) { super(); this.props = props; } public Properties getProps() { return props; } public void setProps(Properties props) { this.props = props; } public String receive(){ KafkaConsumer
consumer = new KafkaConsumer
(props); consumer.subscribe(Arrays.asList(props.getProperty("topic"))); String msg = ""; while(true){ ConsumerRecords
consumerRecords = consumer.poll(100); for(ConsumerRecord
consumerRecord:consumerRecords){ msg += consumerRecord.value(); } consumer.close(); return msg; } } }

KafkaController.java:

package com.test.www.web.controller;import java.text.SimpleDateFormat;import java.util.Date; import javax.annotation.Resource; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.servlet.ModelAndView; import com.test.www.unionpay.consumer.KafkaConsumerDemo; import com.test.www.unionpay.producer.KafkaProducerDemo; @Controller public class KafkaController { @Resource(name = "kafkaProducerDemo") KafkaProducerDemo producer; @Resource(name = "kafkaConsumerDemo") KafkaConsumerDemo consumer; @RequestMapping(value = "/welcome") public ModelAndView welcome() { System.out.println("--------welcome--------"); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/sendmessage", method = RequestMethod.GET) public ModelAndView sendMessage() { System.out.println("--------sendmessage--------"); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String now = sdf.format(date); ModelAndView mv = new ModelAndView(); mv.addObject("time", now); mv.setViewName("kafka_send"); return mv; } @RequestMapping(value = "/onsend", method = RequestMethod.POST) public ModelAndView onsend(@RequestParam("message") String msg) { System.out.println("--------onsend--------"); producer.sendMessage(msg); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/receive") public ModelAndView receive() { System.out.println("--------receive--------"); String msg = consumer.receive(); ModelAndView mv = new ModelAndView(); mv.addObject("msg", msg); mv.setViewName("kafka_receive"); return mv; } }

页面:

welcome.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>    
welcome

Welcome

Send a Message

Get a Message

kafka_send.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>    
kafka_send

Send a Message

MessageText:

RETURN HOME

kafka_receive.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>    
kafka_receive

Kafka_Reveive!!!

Receive Message : ${msg}

RETURN HOME

效果图:

如图,kafka发送消息、接受消息运行成功。

 

转载于:https://www.cnblogs.com/super-chao/p/9304565.html

你可能感兴趣的文章
C#实现无限层级树
查看>>
C# ASP.NET MVC HtmlHelper用法大全
查看>>
HDOJ acm steps 3.1.1
查看>>
vue+element+echarts柱状图+列表
查看>>
JMS ActiveMQ案例
查看>>
Android简化xml sax解析
查看>>
POJ 2763 Housewife Wind
查看>>
第8章 线性时间排序
查看>>
C语言中一个语句太长用什么换行?
查看>>
SQL with(unlock)与with(readpast) (转)
查看>>
什么是长尾理论
查看>>
html5-6 Frame框架窗口类型
查看>>
新东方雅思词汇---6.1、oppose
查看>>
DFS序
查看>>
js进阶ajax函数封装(匿名函数作为参数传递)(封装函数引入文件的方式非常好用)...
查看>>
tomcat服务器安装
查看>>
SQL中的long text
查看>>
jsp中<%! %>
查看>>
CSUOJ-1980 不堪重负的数(区间dp)
查看>>
你对博客中提到的评分规则有何意见和建议?
查看>>