原创
Pulsar Function简介以及使用
温馨提示:
本文最后更新于 2020年09月13日,已超过 1,592 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我。
概览
- input topic: 数据来源
- output topic:数据存入的topic
- log topic:日志消息
Pulsar Functions 是轻量级计算流程,具有以下特点:
- 从一个或多个 Pulsar topic 中消费消息;
- 将用户提供的处理逻辑应用于每条消息;
- 将运行结果发布到另一个 topic。
Pulsar Functions背后的核心目标是使您能够轻松创建各种级别的复杂的的处理逻辑,而无需部署单独的类似系统(例如 Apache Storm, Apache Heron, Apache Flink, 等等)
Pulsar支持的三种语言环境
在pulsar-common模块下面
的org.apache.pulsar.common.functions.FunctionConfig
类:
pulsar function的三种处理语义
At least once
是指消息至少发送一次。如果消息未能接受成功,可能会重发,直到接收成功。在整个 Pulsar Functions 处理消息的过程中,如果失败,都需要对该 message 执行 nack(重发) 操作,来保证At least once
语义的正确性。At most once
是指消息最多会被处理一次。从 input topics 中接收到之后,在真正处理消息之前去执行, at-most-once 模式下,不管 function 是否执行成功,这个 message 都会被确认(ack),而且只发送一次,无论是否发送成功,都不会重发。Effectively once
是指消息会被有效执行一次。上述两种语意都没办法保证系统 crash 之后数据的一致性问题,Effectively once
可以保证只会对结果产生一次影响。Effectively once
本身更像是一个事务的处理过程,首先我们在 setup 生产者的时候需要保证生产者的幂等性;其次在处理消息的过程中,如果出现错误,我们需要让整个 function 停止操作,这点不同于At least once
。
Pulsar Function三种订阅模式
为了同时兼容 queue 和 stream 的消费方式,Pulsar 在消费者之间抽象了一层订阅层,在 Pulasr 中,订阅的方式主要分为如下三种:
- exclusive
- failover
- share
但是 Pulsar Functions 中并没有支持 exclusive
的订阅方式。这是为什么呢?
在大部分 functions 的特定场景下,exclusive
的订阅类型没多大用,我们分为两种情况来讨论:
1.如果只有一个 instance,那么 failover
就相当于独占的类型。
2.如果有多个 instance,exclusive
类型的订阅会不断的 crash、 restart,而 failover
的订阅是通过 failover 的方式来进行切换,保证有一个 active 的 worker。(这个是本质原因)
如何部署Pulsar Function
1. local run
在本地运行或者集群外运行一个 Pulsar Functions,适用于开发者。
2. cluster
在集群内运行 Pulsar Functions。在该模式下部署 function 时,Apache BookKeeper 将自动处理状态存储,目前 go 版本的 function 暂时不支持状态存储。
pulsar function示例demo
生产者
public class ProducerApp {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVER_URL)
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(INPUT_TOPIC)
.producerName("func-pro1")
.create();
MessageId messageId = producer.send("Hello,lzhpo,lewis");
System.out.println("Send ok, messageId:" +messageId);
client.close();
}
}
pulsar function
在这里我简单的演示一下:按照逗号“,”去切分input topic的消息,然后转换成大写进行输出。
/**
* 部署此Jar包到Pulsar集群中:
* <pre>
* {@code
* $ bin/pulsar-admin functions create \
* --jar target/my-jar-with-dependencies.jar \
* --classname org.example.functions.WordCountFunction \
* --tenant public \
* --namespace default \
* --name word-count \
* --inputs persistent://public/default/sentences \
* --output persistent://public/default/count
* }
* </pre>
* @author Zhaopo Liu
*/
@Slf4j
public class HelloWorldFunction implements Function<String, Void> {
/**
* 每次将消息发布到输入主题时,都会调用此函数
*
* @param input
* @param context
* @return
* @throws Exception
*/
@Override
public Void process(String input, final Context context) throws Exception {
log.info("收到来自 {} 的消息 {} " ,context.getInputTopics(), input);
for (String word : input.split(",")) {
String wordUpperCase = word.toUpperCase();
System.out.println(wordUpperCase);
}
return null;
}
/**
* <pre>
* 设置localrun运行参数(包括pulsar地址、functionConfig、sourceConfig、sinkConfig等等),参考 LocalRunner {@link LocalRunner}类。
* </pre>
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
FunctionConfig functionConfig = new FunctionConfig();
// 设置pulsar function的名字
functionConfig.setName("wordcount");
// 设置pulsar function类的名字
functionConfig.setClassName(HelloWorldFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
// input topic,output topic,log topic
functionConfig.setInputs(Collections.singleton(INPUT_TOPIC));
functionConfig.setOutput(OUTPUT_TOPIC);
functionConfig.setLogTopic(LOG_TOPIC);
// 设置localrun运行参数
LocalRunner localRunner = LocalRunner.builder()
// 默认是本地
.brokerServiceUrl(SERVER_URL)
// 设置functionConfig
.functionConfig(functionConfig)
.build();
// 非阻塞
localRunner.start(false);
}
}
设置pulsar-function运行参数
@Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig", hidden = true, converter = FunctionConfigConverter.class)
protected FunctionConfig functionConfig;
@Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig", hidden = true, converter = SourceConfigConverter.class)
protected SourceConfig sourceConfig;
@Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true, converter = SinkConfigConverter.class)
protected SinkConfig sinkConfig;
@Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
protected String stateStorageServiceUrl;
@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
protected String brokerServiceUrl;
@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
protected String clientAuthPlugin;
@Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
protected String clientAuthParams;
@Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1)
protected boolean useTls;
@Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n", hidden = true, arity = 1)
protected boolean tlsAllowInsecureConnection;
@Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true, arity = 1)
protected boolean tlsHostNameVerificationEnabled;
@Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
protected String tlsTrustCertFilePath;
@Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
protected int instanceIdOffset = 0;
@Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true, converter = RuntimeConverter.class)
protected RuntimeEnv runtimeEnv;
// 默认是在localhost,可以设置brokerServiceUrl进行更改
private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
- 本文标签: Pulsar Java
- 本文链接: http://www.lzhpo.com/article/146
- 版权声明: 本文由lzhpo原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权