Pulsar正则匹配订阅topic
Pulsar在很多情况下提供了比Kafka 更快的吞吐量和更低的延迟,并为开发人员提供了一组兼容的API,让他们可以很轻松地从Kafka 切换到Pulsar。 Pulsar 的最大优点在于它提供了比Apache Kafka 更简单明了、更健壮的一系列操作功能,特别在解决可观察性、地域复制和多租户方面的问题。
Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。 Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。
Pulsar 的关键特性如下:
- Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
- 极低的发布延迟和端到端延迟。
- 可无缝扩展到超过一百万个 topic。
- 简单的客户端 API,支持 Java、Go、Python 和 C++。
- 支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。
- 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
- 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
- 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。
消息模型
Pulsar 提供了统一的消息模型和 API。streaming 模式——独占和故障切换订阅方式; queuing 模式——共享订阅的方式。
Kafka 主要集中在 streaming 模式,对单个 partition 是独占消费,没有共享(Queue)的消费模式;架构
Pulsar Broker是无状态的,与存储相互分离 可以轻松添加和删除节点,而无需重新平衡整个集群
Kafka的数据直接存储在Broker上(有状态的) 任何容量扩展都需要重新平衡分区,同时还需要将被平衡的分区重新拷贝到新添加的Broker上Ack
Pulsar使用专门的 Cursor 管理。累积确认(cumulative acknowledgment)和 Kafka 效果一样;提供单条确认(individual ack)。
Kafka使用偏移 OffsetRetention
Pulsar 消息只有被所有订阅消费后才会删除,不会丢失数据。也允许设置保留期,保留被消费的数据。支持 TTL
Kafka根据设置的保留期来删除消息。有可能消息没被消费,过期后被删除。 不支持 TTL。
参考链接:
https://pulsar.apache.org/docs/zh-CN/concepts-overview/
http://lxkaka.wang/2019/03/25/pulsar/
Pulsar正则匹配订阅topic
1.Maven依赖
<properties>
<junit.version>4.12</junit.version>
<pulsar.version>2.5.2</pulsar.version>
<slf4j-log4j12.version>1.7.30</slf4j-log4j12.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j12.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
2.创建一个client
package com.lzhpo.listentopic;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
/**
* Pulsar客户端
*
* @author lzhpo
*/
public class Client {
private PulsarClient client;
public Client() throws PulsarClientException {
client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
// .authentication(AuthenticationFactory.token("123"))
.build();
}
public void Close() throws PulsarClientException {
client.close();
}
public PulsarClient getPulsarClient(){
return client;
}
}
3.创建一个消费者进行测试
package com.lzhpo.listentopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
* Pulsar消费者
*
* @author lzhpo
*/
public class PulsarConsumerApp {
private Client client;
private Consumer<byte[]> consumer;
/**
* 构造器初始化Client
*
* @throws PulsarClientException
*/
public PulsarConsumerApp() throws PulsarClientException {
client = new Client();
}
/**
* 创建消费者
*
* @param topic
* @param subscription
* @return
* @throws PulsarClientException
*/
private void createConsumerByTopic(String topic, String subscription) throws PulsarClientException {
consumer = client.getPulsarClient().newConsumer()
.topic(topic)
.subscriptionName(subscription)
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
}
/**
* 正则匹配订阅的topic
*
* @param topicsPattern
* @param subscription
* @throws PulsarClientException
*/
private void createConsumerByTopicsPattern(String topicsPattern, String subscription) throws PulsarClientException {
consumer = client.getPulsarClient().newConsumer()
.topicsPattern(topicsPattern)
.subscriptionName(subscription)
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
}
/**
* 1.持续接收消息
*
* @throws ExecutionException
* @throws InterruptedException
* @throws PulsarClientException
*/
public void receiveMessageAlways() throws ExecutionException, InterruptedException, PulsarClientException {
// 用来异步获取,保持回话
do {
// 异步接收消息
CompletableFuture<Message<byte[]>> msg = consumer.receiveAsync();
System.out.println("Message received from ["+msg.get().getTopicName()+"] topic: " +new String(msg.get().getData()));
// 确认消息,以便消息代理可以将其删除
consumer.acknowledge(msg.get());
} while (true);
}
/**
* 2.获取一次,就关闭会话
*
* @return
* @throws ExecutionException
* @throws InterruptedException
* @throws PulsarClientException
*/
public void receiveMessageOnce() throws ExecutionException, InterruptedException, PulsarClientException {
// 异步接收
CompletableFuture<Message<byte[]>> msg = consumer.receiveAsync();
System.out.println("Message received from ["+msg.get().getTopicName()+"] topic: " +new String(msg.get().getData()));
// 确认消息,以便消息代理可以将其删除
consumer.acknowledge(msg.get());
consumer.close();
client.Close();
}
/**
* @param args
* @throws PulsarClientException
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
// 持续接收消息
PulsarConsumerApp consumer = new PulsarConsumerApp();
consumer.createConsumerByTopicsPattern("persistent://public/default/.*", "test2-sub");
consumer.receiveMessageAlways();
// 接收一次消息就关闭
// PulsarConsumerApp consumer = new PulsarConsumerApp();
// consumer.createConsumerByTopic(
// "persistent://public/default/gluon-schema-AvAccountStatusRequest",
// "test2-sub");
// consumer.receiveMessageOnce();
}
}
4.测试结果
当你往pulsar的topic发消息的时候,这边就会显示对应的topic和数据。
Pulsar生产者
package com.lzhpo.listentopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.concurrent.TimeUnit;
/**
* Pulsar生产者
*
* @author lzhpo
*/
public class PulsarProducerApp {
private Client client;
private Producer<byte[]> producer;
/**
* 构造器初始化生产者
*
* @param topic 主题
* @throws PulsarClientException
*/
public PulsarProducerApp(String topic) throws PulsarClientException {
client = new Client();
producer = createProducer(topic);
}
/**
* 构建一个生产者
*
* @param topic 主题
* @return
* @throws PulsarClientException
*/
private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
return client.getPulsarClient().newProducer()
.topic(topic)
// 批处理最大发布延迟
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
// 延迟发送
.sendTimeout(10, TimeUnit.SECONDS)
// 如果队列已满则阻止
.blockIfQueueFull(true)
.create();
}
/**
* 1.异步发送消息
*
* @param message
*/
public void sendMessage(String message) {
producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent", msgId);
});
}
/**
* 2.发送一次就关闭
*
* @param message
*/
public void sendOnce(String message) {
try {
producer.send(message.getBytes());
System.out.printf("Message with content %s successfully sent", message);
producer.close();
client.Close();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
/**
* 手动异步关闭
*
* @param producer
*/
public void close(Producer<byte[]> producer){
producer.closeAsync()
.thenRun(() -> System.out.println("Producer closed"));
}
public static void main(String[] args) throws PulsarClientException {
PulsarProducerApp producer = new PulsarProducerApp("persistent://public/default/test01-topic");
// producer.sendMessage("Hello,I'm lewis.");
producer.sendOnce("Hello,lzhpo");
}
}
- 本文标签: Pulsar Java
- 本文链接: http://www.lzhpo.com/article/138
- 版权声明: 本文由lzhpo原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权