原创
Kafka Streams以及数据清洗小案例
温馨提示:
本文最后更新于 2019年03月21日,已超过 2,077 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我。
Kafka Streams 概述
Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。
Kafka Streams 特点
1)功能强大
高扩展性,弹性,容错
2)轻量级
无需专门的集群
一个库,而不是框架
3)完全集成
100%的Kafka 0.10.0版本兼容
易于集成到现有的应用程序
4)实时性
毫秒级延迟
并非微批处理
窗口允许乱序数据
允许迟到数据
Kafka Streams 数据清洗案例
实时处理字符带有”>>>”前缀的内容。例如输入”111>>>222”,最终处理成“222”
package com.lzhpo.kafka.kafkaStreams;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* <p> Author:lzhpo </p>
* <p> Title:</p>
* <p> Description:
* 具体业务处理
* </p>
*/
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 如果包含“>>>”则只保留该标记后面的内容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 输出到下一个topic
context.forward("logProcessor".getBytes(), input.getBytes());
}else{
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
package com.lzhpo.kafka.kafkaStreams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Properties;
/**
* <p> Author:lzhpo </p>
* <p> Title:【Kafka数据清洗案例】</p>
* <p> Description:
*
* 实时处理字符带有”>>>”前缀的内容。例如输入”111>>>222”,最终处理成“222”
*
* 生产者(定义输入的topic):
* kafka-console-producer.sh --broker-list localhost:9092 --topic first
*
* 消费者(定义输出的topic):
* kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic second
* </p>
*/
public class APP {
public static void main(String[] args) {
// 定义输入的topic
String from = "first";
// 定义输出的topic
String to = "second";
// 设置参数
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.111:9092");
StreamsConfig config = new StreamsConfig(settings);
// 构建拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具体分析处理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
// 创建kafka stream
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
让APP一直保持运行:
开启一个生产者(定义输入的topic):
kafka-console-producer.sh --broker-list localhost:9092 --topic first
发送消息
开启一个消费者(定义输出的topic):
kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic second
Consumer接收到的消息:
可以看到:
生产者发送111>>>222,消费者收到222;
生产者发送666>>>999,消费者收到999;
生产者发送222>>>666,消费者收到666;
生产者发送其它不相关的,消费者收到原型。
- 本文标签: Kafka
- 本文链接: http://www.lzhpo.com/article/14
- 版权声明: 本文由lzhpo原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权