流处理和批处理讲解、主流框架对比、流批一体架构
什么是流处理和批处理?
流处理:对数据进行实时处理的方式,数据会以流的形式不断地产生和处理。
流处理可以快速响应数据的变化,及时地进行数据处理和分析,适用于需要实时处理数据的场景。
例如:实时数仓、实时监控、实时推荐等等。
- 优点:
- 实时性:数据在产生的时候就立即被处理,能及时反馈结果。
- 高效性:不间断接受新数据并进行处理,因此可以更加高效利用硬件资源。
- 缺点:
- 数据突发性:因为流式数据具有不可预测性,可能会突然出现突发的高峰,会导致系统压力急剧增加。
- 处理复杂度高:实时处理可能需要更高的处理能力和更复杂的算法。
- 优点:
批处理:对数据进行离线处理的方式,数据会按照一定的时间间隔或者数据量进行批量处理。
批处理可以对大量数据进行高效处理和分析,适用于需要对历史数据进行分析和挖掘的场景。
例如:离线数仓、批量报表、离线推荐等等。
- 优点:
- 处理复杂度低:通常不需要考虑数据的顺序、时间窗口等因素。
- 容错性高:数据多批次集中处理,通常一条数据的失败不会影响后续数据的处理,也可以采用多种容错机制来确保任务正确完成。
- 缺点:
- 响应速度慢:由于批处理是周期性执行,不能及时响应数据变化。
- 处理结果滞后:由于批处理是周期性执行,在某些场景下可能会出现数据结果滞后的情况。
- 优点:
流处理和批处理都是常用的数据处理方式,它们各有优劣。流处理通常用于需要实时响应的场景,如在线监控和警报系统等。而批处理则通常用于离线数据分析和挖掘等大规模数据处理场景。选择合适的处理方式取决于具体的业务需求和数据处理场景。
什么是流批一体架构?
以前很多系统的架构都是采用的Lambda架构,它将所有的数据分成了三个层次:批处理层、服务层和速率层,每个层次都有自己的功能和目的。
- 批处理层:负责离线计算和历史数据的存储。
- 服务层:负责在线查询和实时数据的处理。
- 速率层:负责对实时数据进行快速的处理和查询。
这种架构,需要一套流处理平台和一套批处理平台,这就可能导致了一些问题:
- 资源浪费:一般来说,白天是流计算的高峰期,此时需要更多的计算资源,相对来说,批计算就没有严格的限制,可以选择凌晨或者白天任意时刻,但是,流计算和批计算的资源无法进行混合调度,无法对资源进行错峰使用,这就会导致资源的浪费。
- 成本高:流计算和批计算使用的是不同的技术,意味着需要维护两套代码,不论是学习成本还是维护成本都会更高。
- 数据一致性:两套平台都是不一样的,可能会导致数据不一致的问题。
因此,流批一体诞生了!
流批一体的技术理念最早是2015年提出的,初衷就是让开发能用同一套代码和API实现流计算和批计算,但是那时候实际落地的就少之又少,阿里巴巴在2020年双十一首次实际落地。
Flink流批一体架构:
有哪些流处理的框架?
Kafka Stream
基于 Kafka 的一个轻量级流式计算框架,我们可以使用它从一个或多个输入流中读取数据,对数据进行转换和处理,然后将结果写入一个或多个输出流中。
工作原理:读取数据流 -> 数据转换/时间窗口处理/状态管理 -> 任务调度 -> 输出结果
简单示例:统计20秒内每个input的key输入的次数,典型的例子:统计网站20秒内用户的点击次数。
public class WindowCountApplication {
private static final String STREAM_INPUT_TOPIC = "streams-window-input";
private static final String STREAM_OUTPUT_TOPIC = "streams-window-output";
public static void main(String[] args) {
Properties props = new Properties();
props.put(APPLICATION_ID_CONFIG, WindowCountApplication.class.getSimpleName());
props.put(BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVERS);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
builder.stream(STREAM_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> Console.log("[input] key={}, value={}", key, value))
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(20)))
.count()
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value))
.peek((key, value) -> Console.log("[output] key={}, value={}", key, value))
.to(STREAM_OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams kStreams = new KafkaStreams(builder.build(), props);
Runtime.getRuntime().addShutdownHook(new Thread(kStreams::close));
kStreams.start();
}
}
运行结果:{key}={value}
,发送了3次A=1,2次B=1,以及1次C=1,统计结果在预期之内,即A出现3次,B出现2次,C出现1次。
Pulsar Function
和 Kafka Stream 类似,也是轻量级的流处理框架,不过它是基于 Pulsar 实现的一个流处理框架,同样的,也是从一个或多个输入流中读取数据,对数据进行转换和处理,然后将结果写入一个或多个输出流中。感兴趣的可以参考我之前写的文章:Pulsar Function简介以及使用
工作原理:订阅消息流 -> 处理消息 -> 发布处理结果
简单示例:LocalRunner模式,按照逗号“,”去切分 input topic 的消息,然后转换成数字进行求和,结果发送至 output topic。
public class IntSumFunction implements Function<String, Integer> {
public static final String BROKER_SERVICE_URL = "pulsar://localhost:6650";
public static final String INPUT_TOPIC = "persistent://public/default/int-sum-input";
public static final String OUTPUT_TOPIC = "persistent://public/default/int-sum-output";
public static final String LOG_TOPIC = "persistent://public/default/int-sum-log";
@Override
public Integer process(String input, Context context) {
Console.log("input: {}", input);
return Arrays.stream(input.split(","))
.map(Integer::parseInt)
.mapToInt(Integer::intValue)
.sum();
}
public static void main(String[] args) throws Exception {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName(IntSumFunction.class.getSimpleName());
functionConfig.setClassName(IntSumFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setInputs(Collections.singleton(INPUT_TOPIC));
functionConfig.setOutput(OUTPUT_TOPIC);
functionConfig.setLogTopic(LOG_TOPIC);
LocalRunner localRunner = LocalRunner.builder()
.brokerServiceUrl(BROKER_SERVICE_URL)
.functionConfig(functionConfig)
.build();
localRunner.start(true);
}
}
运行结果:1+2+3+4+5+6=21
Flink
- 一种流处理框架,具有低延迟、高吞吐量和高可靠性的特性。
- 支持流处理和批处理,并支持基于事件时间和处理时间的窗口操作、状态管理、容错机制等。
- 提供了丰富的算子库和 API,支持复杂的数据流处理操作。
工作原理:接收数据流 -> 数据转换 -> 数据处理 -> 状态管理 -> 容错处理 -> 输出结果
简单来说就是将数据流分成多个分区,在多个任务中并行处理,同时维护状态信息,实现高吞吐量、低延迟的流处理。
简单示例:从9966端口读取数据,将输入的句子用空格分割成多个单词,每隔5秒做一次单词统计。
public class WindowSocketWordCount {
private static final String REGEX = " ";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStreamSource = env.socketTextStream("localhost", 9966);
SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = socketTextStreamSource
.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (sentence, collector) -> {
for (String word : sentence.split(REGEX)) {
collector.collect(new Tuple2<>(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
streamOperator.print();
env.execute();
}
}
运行结果:
Storm
- 一个开源的流处理引擎,旨在实现快速、可靠的数据流处理。
- 是业界最早出现的一个流处理框架(2011年),但是现在已经有许多其它优秀的流处理框架了,所以它在现在并不是唯一选择。
工作原理:将数据流分成多个小的流(也称为tuple),并将这些小流通过一系列的操作(也称为bolt)进行处理。
简单示例:在本地模式,使用Storm内置的RandomSentenceSpout
充当数据源进行测试,用空格拆分生成的句子为多个单词,统计每个单词出现次数。
public class WindowedWordCountApplication {
public static void main(String[] args) throws Exception {
StreamBuilder builder = new StreamBuilder();
builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
.window(TumblingWindows.of(Duration.seconds(2)))
.flatMap(sentence -> Arrays.asList(sentence.split(" ")))
.peek(sentence -> Console.log("Random sentence: {}", sentence))
.mapToPair(word -> Pair.of(word, 1))
.countByKey()
.peek(pair -> Console.log("Count word: ", pair.toString()));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("windowedWordCount", new Config(), builder.build());
Utils.sleep(20000);
cluster.shutdown();
}
}
内置的RandomSentenceSpout
随机生成数据关键源代码:
@Override
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{
sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")
};
final String sentence = sentences[rand.nextInt(sentences.length)];
LOG.debug("Emitting tuple: {}", sentence);
collector.emit(new Values(sentence));
}
运行结果:随机找一个单词“nature”,统计的次数为10次。
Spark Streaming
基于 Spark API 的扩展,支持对实时数据流进行可扩展、高吞吐量、容错的流处理。
工作原理:接收实时输入数据流并将数据分成批次,然后由 Spark 引擎处理以批次生成最终结果流。
简单示例:从 kafka 的 spark-streaming topic 读取数据,按照空格“ ”拆分,统计每一个单词出现的次数并打印。
public class JavaDirectKafkaWordCount {
private static final String KAFKA_BROKERS = "localhost:9092";
private static final String KAFKA_GROUP_ID = "spark-consumer-group";
private static final String KAFKA_TOPICS = "spark-streaming";
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
Configurator.setRootLevel(Level.WARN);
SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("spark-streaming-word-count");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(KAFKA_TOPICS.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP_ID);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
JavaDStream<String> linesStream = messages.map(ConsumerRecord::value);
JavaPairDStream<String, Integer> wordCountStream = linesStream
.flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey(Integer::sum);
wordCountStream.print();
streamingContext.start();
streamingContext.awaitTermination();
}
}
运行结果:
如何选择流处理框架?
简单数据流处理
如果只是轻量级使用的话,可以结合技术栈使用消息中间件自带的流处理框架就更节省成本。
使用的 Kafka 就用 Kafka Stream。
使用的 Pulsar 就用 Pulsar Function。
复杂数据流场景
Flink Spark Streaming Storm 容错性 基于CheckPoint机制 WAL及RDD机制 Records ACK 延迟性 亚秒级 秒级 亚秒级 吞吐量 非常高 高 中等 一致性 Excatly-Once Excatly-Once Excatly-Once 状态支持 √ √ × 流批一体 √ √ × 窗口支持 √ √ √ 机器学习 √ √ × SQL查询 √ √ × 图计算 √ √ × 社区活跃度 高 高 中等
综上,可以结合数据规模、技术栈、处理延迟功能特性、未来的考虑、社区活跃度、成本和可用性等等进行选择。
参考文章:
- 本文标签: Flink Spark Pulsar
- 本文链接: http://www.lzhpo.com/article/1649890657303629826
- 版权声明: 本文由lzhpo原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权