Pulsar 2.5.0 之Java client

Pulsar 2.5.0 之Java client

官网原文标题《Pulsar Java client》

翻译时间:2020-02-14

官网原文地址:http://pulsar.apache.org/docs/en/client-libraries-java/

译者:本文介绍如何使用javaClient创建生产者、消费者以及通过管理后台接口读取消息。


Pulsar Java client

通过Java client 可以创建生产者、消费者以及读取消息,当前API版本为2.5.0,包括两大块内容

描述 Maven Artifact
org.apache.pulsar.client.api 创建生产和创建消息者 API org.apache.pulsar:pulsar-client:2.5.0
org.apache.pulsar.client.admin admin API org.apache.pulsar:pulsar-client-admin:2.5.0

本章重点是创建生产和创建消息者 API如何使用,关于 admin client API 仔细阅读文档 Pulsar admin interface

Java Client 导入包方式

Maven

在pom.xml 文件添加如下

<!-- in your <properties> block -->
<pulsar.version>2.5.0</pulsar.version>

<!-- in your <dependencies> block -->
<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>${pulsar.version}</version>
</dependency>

Gradle

build.gradle 文件添加如下信息

def pulsarVersion = '2.5.0'

dependencies {

compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion

}

Java Client URLS

客户端client 通过pulsar协议来进行通讯,类型列表如下

如何创建 PulsarClient 对象

默认示例:

PulsarClient client = PulsarClient.builder()

    .serviceUrl("pulsar://localhost:6650")

    .build();

多brokers示例

PulsarClient client = PulsarClient.builder()

    .serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")

   .build();

本地集群模式 默认 broker URL pulsar://localhost:6650

通过loadConf 可以进行自定义 PulsarClient 参数

参数配置

参数类型 参数名称 描述 默认值
String serviceUrl
String authPluginClassName
String authParams
long operationTimeoutMs 30000
long statsIntervalSeconds 60
int numIoThreads 1
int numListenerThreads 1
boolean useTcpNoDelay true
boolean useTls false
string tlsTrustCertsFilePath
boolean tlsAllowInsecureConnection false
boolean tlsHostnameVerificationEnable false
int concurrentLookupRequest 5000
int maxLookupRequest 50000
int maxNumberOfRejectedRequestPerConnection 50
int keepAliveIntervalSeconds 30
int connectionTimeoutMs 10000
int requestTimeoutMs 60000
int defaultBackoffIntervalNanos TimeUnit.MILLISECONDS.toNanos(100);
long maxBackoffIntervalNanos TimeUnit.SECONDS.toNanos(30)

Producer 对象创建与使用

发送消息,需要创建Producer对象对指定的topic发送消息

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();
Producer<byte[]> producer = client.newProducer()
    .topic("my-topic")
   .create();
*// You can then send messages to the broker and topic you specified:*
producer.send("My message".getBytes());

默认情况下 messages schema. 是字节数组,schema是可以根据自己的业务场景进行选择的。例如我们可以使用string 类型schema。

Producer<String> stringProducer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .create();
stringProducer.send("My message");

对象在使用完之后需要进行释放

producer.close();
consumer.close();
client.close();

关闭释放操作也有异步方法

producer.closeAsync()
  .thenRun(() -> System.out.println("Producer closed"))
  .exceptionally((ex) -> {
   System.err.println("Failed to close producer: " + ex);
    return null;
  });

Producer 对象可以使用默认配置,也可以自定义配置参数,自定义配置是通过loadConf 来进行配置。

参数类型 参数名称 描述 默认值
String topicName null
String producerName null
long sendTimeoutMs 30000
boolean blockIfQueueFull false
int maxPendingMessages 1000
int maxPendingMessagesAcrossPartitions 50000
MessageRoutingMode messageRoutingMode pulsar.RoundRobinDistribution
HashingScheme hashingScheme HashingScheme.JavaStringHash
ProducerCryptoFailureAction cryptoFailureAction ProducerCryptoFailureAction.FAIL
long batchingMaxPublishDelayMicros TimeUnit.MILLISECONDS.toMicros(1)
int batchingMaxMessages 1000
boolean batchingEnabled true
CompressionType compressionType No compression

Producer自定义配置示例:

Producer<byte[]> producer = client.newProducer()
  .topic("my-topic")
  .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
  .sendTimeout(10, TimeUnit.SECONDS)
  .blockIfQueueFull(true)
  .create();

如果Producer 创建了分片topic,发送消息时需要消息路由模式发送,消息路由了解更多,请阅读 Partitioned Topics cookbook.

异步发送消息,返回MessageId包装对象

producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
  System.out.printf("Message with ID %s successfully sent", msgId);
});

Message 对象自定义配置示例:

producer.newMessage()
  .key("my-message-key")
  .value("my-async-message".getBytes())
  .property("my-key", "my-value")
  .property("my-other-key", "my-other-value")
  .send();

还可以使用sendAsync()方法,这个方法有返回值


Consumer

通过PulsarClient 对象来创建Consumer对象,Consumer对象在创建的时候需要指定主题与订阅名称。对象创建完成,就可以来对订阅的主题进行消费

示例:

Consumer consumer = client.newConsumer()
    .topic("my-topic")
   .subscriptionName("my-subscription")
    .subscribe();

通过while 循环来监听与接收内容,如果接收失败,可以通过negativeAcknowledge方法,重新投递消息

while (true) {
 // Wait for a message
 Message msg = consumer.receive();
 try {
   // Do something with the message
   System.out.printf("Message received: %s", new String(msg.getData()));
   // Acknowledge the message so that it can be deleted by the message broker
  consumer.acknowledge(msg);
 } catch (Exception e) {
   // Message failed to process, redeliver later
   consumer.negativeAcknowledge(msg);
 }
}

可以使用默认配置,也可以通过loadConf 进行自定义配置

参数类型 参数名称 描述 默认值
Set<String> topicNames Sets.newTreeSet()
Pattern topicsPattern None
String subscriptionName None
SubscriptionType subscriptionType SubscriptionType.Exclusive
int receiverQueueSize 1000
long acknowledgementsGroupTimeMicros TimeUnit.MILLISECONDS.toMicros(100)
long negativeAckRedeliveryDelayMicros TimeUnit.MINUTES.toMicros(1)
int maxTotalReceiverQueueSizeAcrossPartitions 50000
String consumerName null
long ackTimeoutMillis 0
long tickDurationMillis 1000
int priorityLevel 0
ConsumerCryptoFailureAction cryptoFailureAction ConsumerCryptoFailureAction.FAIL
SortedMap<String, String> properties new TreeMap<>()
boolean readCompacted false
SubscriptionInitialPosition subscriptionInitialPosition SubscriptionInitialPosition.Latest
int patternAutoDiscoveryPeriod 1
RegexSubscriptionMode regexSubscriptionMode RegexSubscriptionMode.PersistentOnly
DeadLetterPolicy deadLetterPolicy None
boolean autoUpdatePartitions true
boolean replicateSubscriptionState false

示例

Consumer consumer = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .ackTimeout(10, TimeUnit.SECONDS)
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe();

Consumer接收方式

  • 异步接收

    CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

  • 批量接收

    Messages messages = consumer.batchReceive();
    for (message in messages) {
     // do something
    }
    consumer.acknowledge(messages)
    

自定义批量接收策略

Consumer consumer = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .batchReceivePolicy(BatchReceivePolicy.builder()
        .maxNumMessages(100)
        .maxNumBytes(1024 * 1024)
        .timeout(200, TimeUnit.MILLISECONDS)
        .build())
        .subscribe();

默认批量接收策略

BatchReceivePolicy.builder()
  .maxNumMessage(-1)
  .maxNumBytes(10 * 1024 * 1024)
  .timeout(100, TimeUnit.MILLISECONDS)
.build();

多主题订阅

当consumer订阅pulsar的主题,默认情况下,它订阅了一个指定的主题,例如:persistent://public/default/my-topic。从Pulsar的1.23.0-incubating的版本,Pulsar消费者可以同时订阅多个topic。你可以用以下两种方式定义topic的列表:

通过正则订阅多主题时,所有的主题必须在同一个命名空间(namespace)

当订阅多主题时,pulsar客户端会自动调用Pulsar的API来发现匹配表达式的所有topic,然后全部订阅。如果此时有暂不存在的topic,那么一旦这些topic被创建,conusmer会自动订阅。

不能保证顺序性

当消费者订阅多主题时,pulsar所提供对单一主题订阅的顺序保证,就hold不住了。如果你在使用pulsar的时候,遇到必须保证顺序的需求,我们强烈建议不要使用此特性

正则表达式订阅

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
   .subscriptionName(subscription);
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
   .topicsPattern(allTopicsInNamespace)
  .subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
   .topicsPattern(someTopicsInNamespace)
   .subscribe();

列表订阅

List<String> topics = Arrays.asList(
    "topic-1",
   "topic-2",
    "topic-3"
);
Consumer multiTopicConsumer = consumerBuilder
   .topics(topics)
   .subscribe();
*// Alternatively:*
Consumer multiTopicConsumer = consumerBuilder
  .topics(
     "topic-1",
     "topic-2",
     "topic-3"
   )
  .subscribe();

异步订阅

Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
  .topics(topics)
   .subscribeAsync()
   .thenAccept(this::receiveMessageFromConsumer);
private void receiveMessageFromConsumer(Consumer consumer) {
  consumer.receiveAsync().thenAccept(message -> {
     // Do something with the received message
       receiveMessageFromConsumer(consumer);
});
}

订阅模式

Pulsar 提供了多样化的订阅模式来满足实际中的业务场景。

为了能更好的理解订阅多样化的模式,我们通过创建一个生产者,往指定主题名称为my-topic的主题发送10条消息,来进行分析对比。

发送示例:

Producer<String> producer = client.newProducer(Schema.STRING)

   .topic("my-topic")

   .enableBatching(false)

   .create();

// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"

producer.newMessage().key("key-1").value("message-1-1").send();

producer.newMessage().key("key-1").value("message-1-2").send();

producer.newMessage().key("key-1").value("message-1-3").send();

producer.newMessage().key("key-2").value("message-2-1").send();

producer.newMessage().key("key-2").value("message-2-2").send();

producer.newMessage().key("key-2").value("message-2-3").send();

producer.newMessage().key("key-3").value("message-3-1").send();

producer.newMessage().key("key-3").value("message-3-2").send();

producer.newMessage().key("key-4").value("message-4-1").send();

producer.newMessage().key("key-4").value("message-4-2").send();

Exclusive订阅模式

独占模式,只能有一个消费者绑定到订阅(subscription)上。如果多于一个消费者尝试以同样方式去订阅主题,消费者将会收到错误。

Consumer consumer = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe()

Failover订阅模式

灾备模式中,多个consumer可以绑定到同一个subscription。consumer将会按字典顺序排序,第一个consumer被初始化为唯一接受消息的消费者。这个consumer被称为master consumer。当master consumer断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个consumer。

Consumer consumer1 = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
   .subscriptionType(SubscriptionType.Failover)
   .subscribe()
Consumer consumer2 = client.newConsumer()
    .topic("my-topic")
   .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Failover)
.subscribe()

运行结果

//conumser1 is the active consumer, consumer2 is the standby consumer.

//consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.

Shared 订阅模式

shared或者round robin模式中,多个消费者可以绑定到同一个订阅上。消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

Consumer consumer1 = client.newConsumer()
   .topic("my-topic")
   .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)
    .subscribe()
Consumer consumer2 = client.newConsumer()
   .topic("my-topic")
   .subscriptionName("my-subscription")
   .subscriptionType(SubscriptionType.Shared)
    .subscribe()
//Both consumer1 and consumer 2 is active consumers.

运行结果

消费者1

("key-1", "message-1-1")

("key-1", "message-1-3")

("key-2", "message-2-2")

("key-3", "message-3-1")

("key-4", "message-4-1")

消费者2

("key-1", "message-1-2")

("key-2", "message-2-1")

("key-2", "message-2-3")

("key-3", "message-3-2")

("key-4", "message-4-2")

Key_Shared订阅模式

2.4.0 版本之后新扩展的订阅模式

Consumer consumer1 = client.newConsumer()
     .topic("my-topic")
     .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Key_Shared)
     .subscribe()
Consumer consumer2 = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
     .subscriptionType(SubscriptionType.Key_Shared)
     .subscribe()
//Both consumer1 and consumer2 are active consumers.

运行结果

消费者1

("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-3", "message-3-1")
("key-3", "message-3-2")
 

消费者2

("key-2", "message-2-1")
("key-2", "message-2-2")
("key-2", "message-2-3")
("key-4", "message-4-1")
("key-4", "message-4-2")
 

Reader

使用Pulsar的 读取器接口, 应用程序可以手动管理游标。 当使用读取器连接到一个主题而非消费者时,在读取器连接到主题的时候就需要指定读取器从哪个位置开始读消息。当连接到某个主题时, 读取器从以下位置开始读消息:

  • ​ 主题中最早的可用消息
  • ​ 主题中最新可用消息
  • ​ 指定的消息ID

示例:

ReaderConfiguration conf = new ReaderConfiguration();
     byte[] msgIdBytes = *// Some message ID byte array*
      MessageId id = MessageId.fromByteArray(msgIdBytes);
   Reader reader = pulsarClient.newReader()
         .topic(topic)
         .startMessageId(id)
           .create();
     
      while (true) {
         Message message = reader.readNext();
        *// Process message*
      }

 

Reader 配置

Reader 配置

参数类型 参数名称 描述 默认值
String topicName None
int receiverQueueSize 1000
ReaderListener<T> readerListener None
String readerName null
String subscriptionRolePrefix null
CryptoKeyReader cryptoKeyReader null
ConsumerCryptoFailureAction cryptoFailureAction ConsumerCryptoFailureAction.FAIL
boolean readCompacted false
boolean resetIncludeHead false

reader范围读取策略

范围值区间0~65535,最大值不能超过65535

示例

pulsarClient.newReader()
     .topic(topic)
     .startMessageId(MessageId.earliest)
     .keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
     .create();

Schema

Pulsar应用中,如果开发者没有为topic指定schema,producer和consumer将会处理原始字节。但实际情况我们的期望不是这样的,我们期望能按自己的数据格式进行发送,我们需要支持不同的数据类型。 Message schemas 就能很好的支持这种业务场景。

Schema示例

public class SensorReading {
  public float temperature;
  public SensorReading(float temperature) {
    this.temperature = temperature;
  }
  // A no-arg constructor is required
  public SensorReading() {

  }
  public float getTemperature() {
   return temperature;
  }
  public void setTemperature(float temperature) {
    this.temperature = temperature;
  }

}
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
    .topic("sensor-readings")
    .create();

目前java client支持shcema 如下

  • Schema.BYTES
Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)

   .topic("some-raw-bytes-topic")

   .create();

//or

Producer<byte[]> bytesProducer = client.newProducer()

   .topic("some-raw-bytes-topic")

   .create();
  • Schema.STRING
Producer<String> stringProducer = client.newProducer(Schema.STRING)
   .topic("some-string-topic")
   .create();

 
  • Schema.JSON
Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))

   .topic("some-pojo-topic")

   .create();
  • Schema.PROTOBUF
Producer<MyProtobuf> protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class))

   .topic("some-protobuf-topic")

   .create(); 
  • Schema.AVRO
Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))

   .topic("some-avro-topic")

   .create();

身份验证与认证

Pulsar支持两种方式

  • TLS
  • Athenz

TLS 身份认证使用需要将setUseTls设置为true ,同时需要设置TLS cert 路径

示例

Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", "/path/to/client-cert.pem");
authParams.put("tlsKeyFile", "/path/to/client-key.pem");
Authentication tlsAuth = AuthenticationFactory
   .create(AuthenticationTls.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
   .serviceUrl("pulsar+ssl://my-broker.com:6651")
   .enableTls(true)
   .tlsTrustCertsFilePath("/path/to/cacert.pem")
    .authentication(tlsAuth)
   .build();

Athenz 需要设置TLS,同时需要初始化如下参数

  • tenantDomain
  • tenantService
  • providerDomain
  • privateKey

privateKey支持三种格式

  • file:///path/to/file
  • file:/path/to/file
  • data:application/x-pem-file;base64,<base64-encoded value>

示例

Map<String, String> authParams = new HashMap<>();
authParams.put("tenantDomain", "shopping"); // Tenant domain name
authParams.put("tenantService", "some_app"); // Tenant service name
authParams.put("providerDomain", "pulsar"); // Provider domain name
authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
Authentication athenzAuth = AuthenticationFactory
   .create(AuthenticationAthenz.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
   .serviceUrl("pulsar+ssl://my-broker.com:6651")
   .enableTls(true)
   .tlsTrustCertsFilePath("/path/to/cacert.pem")
  .authentication(athenzAuth)
  .build();
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。