原创

Pulsar正则匹配订阅topic

温馨提示:
本文最后更新于 2020年07月29日,已超过 1,638 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

Pulsar在很多情况下提供了比Kafka 更快的吞吐量和更低的延迟,并为开发人员提供了一组兼容的API,让他们可以很轻松地从Kafka 切换到Pulsar。 Pulsar 的最大优点在于它提供了比Apache Kafka 更简单明了、更健壮的一系列操作功能,特别在解决可观察性、地域复制和多租户方面的问题。

Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。 Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。
Pulsar 的关键特性如下:

  1. Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
  2. 极低的发布延迟和端到端延迟。
  3. 可无缝扩展到超过一百万个 topic。
  4. 简单的客户端 API,支持 Java、Go、Python 和 C++。
  5. 支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。
  6. 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
    • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
      基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。

分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。

  1. 消息模型
    Pulsar 提供了统一的消息模型和 API。streaming 模式——独占和故障切换订阅方式; queuing 模式——共享订阅的方式。
    Kafka 主要集中在 streaming 模式,对单个 partition 是独占消费,没有共享(Queue)的消费模式;

  2. 架构
    Pulsar Broker是无状态的,与存储相互分离 可以轻松添加和删除节点,而无需重新平衡整个集群
    Kafka的数据直接存储在Broker上(有状态的) 任何容量扩展都需要重新平衡分区,同时还需要将被平衡的分区重新拷贝到新添加的Broker上

  3. Ack
    Pulsar使用专门的 Cursor 管理。累积确认(cumulative acknowledgment)和 Kafka 效果一样;提供单条确认(individual ack)。
    Kafka使用偏移 Offset

  4. Retention
    Pulsar 消息只有被所有订阅消费后才会删除,不会丢失数据。也允许设置保留期,保留被消费的数据。支持 TTL
    Kafka根据设置的保留期来删除消息。有可能消息没被消费,过期后被删除。 不支持 TTL。

参考链接:
https://pulsar.apache.org/docs/zh-CN/concepts-overview/
http://lxkaka.wang/2019/03/25/pulsar/

Pulsar正则匹配订阅topic

1.Maven依赖

    <properties>
        <junit.version>4.12</junit.version>
        <pulsar.version>2.5.2</pulsar.version>
        <slf4j-log4j12.version>1.7.30</slf4j-log4j12.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>${pulsar.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j-log4j12.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>

2.创建一个client

package com.lzhpo.listentopic;

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

/**
 * Pulsar客户端
 * 
 * @author lzhpo
 */
public class Client {

    private PulsarClient client;

    public Client() throws PulsarClientException {
        client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
//                .authentication(AuthenticationFactory.token("123"))
                .build();
    }
    public void Close() throws PulsarClientException {
        client.close();
    }
    public PulsarClient getPulsarClient(){
        return client;
    }
}

3.创建一个消费者进行测试

package com.lzhpo.listentopic;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

/**
 * Pulsar消费者
 *
 * @author lzhpo
 */
public class PulsarConsumerApp {

    private Client client;
    private Consumer<byte[]> consumer;

    /**
     * 构造器初始化Client
     *
     * @throws PulsarClientException
     */
    public PulsarConsumerApp() throws PulsarClientException {
        client = new Client();
    }

    /**
     * 创建消费者
     *
     * @param topic
     * @param subscription
     * @return
     * @throws PulsarClientException
     */
    private void createConsumerByTopic(String topic, String subscription) throws PulsarClientException {
        consumer = client.getPulsarClient().newConsumer()
                .topic(topic)
                .subscriptionName(subscription)
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();
    }

    /**
     * 正则匹配订阅的topic
     *
     * @param topicsPattern
     * @param subscription
     * @throws PulsarClientException
     */
    private void createConsumerByTopicsPattern(String topicsPattern, String subscription) throws PulsarClientException {
        consumer = client.getPulsarClient().newConsumer()
                .topicsPattern(topicsPattern)
                .subscriptionName(subscription)
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();
    }

    /**
     * 1.持续接收消息
     *
     * @throws ExecutionException
     * @throws InterruptedException
     * @throws PulsarClientException
     */
    public void receiveMessageAlways() throws ExecutionException, InterruptedException, PulsarClientException {
        // 用来异步获取,保持回话
        do {
            // 异步接收消息
            CompletableFuture<Message<byte[]>> msg = consumer.receiveAsync();

            System.out.println("Message received from ["+msg.get().getTopicName()+"] topic: " +new String(msg.get().getData()));

            // 确认消息,以便消息代理可以将其删除
            consumer.acknowledge(msg.get());
        } while (true);
    }

    /**
     * 2.获取一次,就关闭会话
     *
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     * @throws PulsarClientException
     */
    public void receiveMessageOnce() throws ExecutionException, InterruptedException, PulsarClientException {
        // 异步接收
        CompletableFuture<Message<byte[]>> msg = consumer.receiveAsync();
        System.out.println("Message received from ["+msg.get().getTopicName()+"] topic: " +new String(msg.get().getData()));

        // 确认消息,以便消息代理可以将其删除
        consumer.acknowledge(msg.get());
        consumer.close();
        client.Close();
    }

    /**
     * @param args
     * @throws PulsarClientException
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
        // 持续接收消息
        PulsarConsumerApp consumer = new PulsarConsumerApp();
        consumer.createConsumerByTopicsPattern("persistent://public/default/.*", "test2-sub");
        consumer.receiveMessageAlways();

        // 接收一次消息就关闭
//        PulsarConsumerApp consumer = new PulsarConsumerApp();
//        consumer.createConsumerByTopic(
//                "persistent://public/default/gluon-schema-AvAccountStatusRequest",
//                "test2-sub");
//        consumer.receiveMessageOnce();
    }
}

4.测试结果

当你往pulsar的topic发消息的时候,这边就会显示对应的topic和数据。

pulsar订阅多个topic.png

Pulsar生产者

package com.lzhpo.listentopic;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;

import java.util.concurrent.TimeUnit;

/**
 * Pulsar生产者
 * 
 * @author lzhpo
 */
public class PulsarProducerApp {

    private Client client;
    private Producer<byte[]> producer;

    /**
     * 构造器初始化生产者
     *
     * @param topic 主题
     * @throws PulsarClientException
     */
    public PulsarProducerApp(String topic) throws PulsarClientException {
        client = new Client();
        producer = createProducer(topic);
    }

    /**
     * 构建一个生产者
     *
     * @param topic 主题
     * @return
     * @throws PulsarClientException
     */
    private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
        return client.getPulsarClient().newProducer()
                .topic(topic)
                // 批处理最大发布延迟
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                // 延迟发送
                .sendTimeout(10, TimeUnit.SECONDS)
                // 如果队列已满则阻止
                .blockIfQueueFull(true)
                .create();
    }

    /**
     * 1.异步发送消息
     *
     * @param message
     */
    public void sendMessage(String message) {
        producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
            System.out.printf("Message with ID %s successfully sent", msgId);
        });

    }

    /**
     * 2.发送一次就关闭
     *
     * @param message
     */
    public void sendOnce(String message) {
        try {
            producer.send(message.getBytes());
            System.out.printf("Message with content %s successfully sent", message);
            producer.close();
            client.Close();
        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 手动异步关闭
     *
     * @param producer
     */
    public void close(Producer<byte[]> producer){
        producer.closeAsync()
                .thenRun(() -> System.out.println("Producer closed"));
    }

    public static void main(String[] args) throws PulsarClientException {
        PulsarProducerApp producer = new PulsarProducerApp("persistent://public/default/test01-topic");
//        producer.sendMessage("Hello,I'm lewis.");
        producer.sendOnce("Hello,lzhpo");
    }
}
本文目录