原创
SpringBoot集成Kafka
温馨提示:
本文最后更新于 2019年03月21日,已超过 2,074 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我。
pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!--转json-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
配置文件
application.properties:
spring.kafka.bootstrap-servers=192.168.200.111:9092
spring.kafka.consumer.group-id=lzhpo
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
消息实体类
package com.lzhpo.kafka.springbootkafkademo;
import java.util.Date;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
public class Message {
private Long id;
private String msg;
private Date sendTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", msg='" + msg + '\'' +
", sendTime=" + sendTime +
'}';
}
}
生产者
package com.lzhpo.kafka.springbootkafkademo;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.UUID;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void send(){
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
kafkaTemplate.send("first", gson.toJson(message));
}
}
消费者
package com.lzhpo.kafka.springbootkafkademo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:</p>
*/
@Component
public class KafkaReceiver {
private static final Logger log = LoggerFactory.getLogger(KafkaReceiver.class);
@KafkaListener(topics = {"first"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("record =" + record);
log.info("message =" + message);
}
}
}
启动类
package com.lzhpo.kafka.springbootkafkademo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class SpringbootKafkaDemoApplication {
public static void main(String[] args) {
//SpringApplication.run(SpringbootKafkaDemoApplication.class, args);
ConfigurableApplicationContext context = SpringApplication.run(SpringbootKafkaDemoApplication.class, args);
KafkaSender sender = context.getBean(KafkaSender.class);
for (int i = 0; i < 6; i++) {
sender.send();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果
- 本文标签: Kafka SpringBoot
- 本文链接: http://www.lzhpo.com/article/15
- 版权声明: 本文由lzhpo原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权