pulsar
The Pulsar Python client library is based on the existing C++ client library. All the same features are exposed through the Python interface.
Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.
Install from PyPI
Download Python wheel binary files for MacOS and Linux directly from the PyPI archive.
#!shell
$ sudo pip install pulsar-client
Install from sources
Follow the instructions to compile the Pulsar C++ client library. This method will also build the Python binding for the library.
To install the Python bindings:
#!shell
$ cd pulsar-client-cpp/python
$ sudo python setup.py install
Examples
Producer example
#!python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))
client.close()
Consumer Example
#!python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id())
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)
client.close()
Producer.send_async">Async producer example
#!python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
'my-topic',
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10
)
def send_callback(res, msg_id):
print('Message published res=%s', res)
while True:
producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
client.close()
1# 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, 13# software distributed under the License is distributed on an 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15# KIND, either express or implied. See the License for the 16# specific language governing permissions and limitations 17# under the License. 18# 19 20""" 21The Pulsar Python client library is based on the existing C++ client library. 22All the same features are exposed through the Python interface. 23 24Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8. 25 26## Install from PyPI 27 28Download Python wheel binary files for MacOS and Linux 29directly from the PyPI archive. 30 31 #!shell 32 $ sudo pip install pulsar-client 33 34## Install from sources 35 36Follow the instructions to compile the Pulsar C++ client library. This method 37will also build the Python binding for the library. 38 39To install the Python bindings: 40 41 #!shell 42 $ cd pulsar-client-cpp/python 43 $ sudo python setup.py install 44 45## Examples 46 47### [Producer](#pulsar.Producer) example 48 49 #!python 50 import pulsar 51 52 client = pulsar.Client('pulsar://localhost:6650') 53 54 producer = client.create_producer('my-topic') 55 56 for i in range(10): 57 producer.send(('Hello-%d' % i).encode('utf-8')) 58 59 client.close() 60 61#### [Consumer](#pulsar.Consumer) Example 62 63 #!python 64 import pulsar 65 66 client = pulsar.Client('pulsar://localhost:6650') 67 consumer = client.subscribe('my-topic', 'my-subscription') 68 69 while True: 70 msg = consumer.receive() 71 try: 72 print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id()) 73 consumer.acknowledge(msg) 74 except: 75 consumer.negative_acknowledge(msg) 76 77 client.close() 78 79### [Async producer](#pulsar.Producer.send_async) example 80 81 #!python 82 import pulsar 83 84 client = pulsar.Client('pulsar://localhost:6650') 85 86 producer = client.create_producer( 87 'my-topic', 88 block_if_queue_full=True, 89 batching_enabled=True, 90 batching_max_publish_delay_ms=10 91 ) 92 93 def send_callback(res, msg_id): 94 print('Message published res=%s', res) 95 96 while True: 97 producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback) 98 99 client.close() 100""" 101 102import logging 103import _pulsar 104 105from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401 106 107from pulsar.exceptions import * 108 109from pulsar.functions.function import Function 110from pulsar.functions.context import Context 111from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe 112from pulsar import schema 113_schema = schema 114 115import re 116_retype = type(re.compile('x')) 117 118import certifi 119from datetime import timedelta 120 121 122class MessageId: 123 """ 124 Represents a message id 125 """ 126 127 def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): 128 self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) 129 130 'Represents the earliest message stored in a topic' 131 earliest = _pulsar.MessageId.earliest 132 133 'Represents the latest message published on a topic' 134 latest = _pulsar.MessageId.latest 135 136 def ledger_id(self): 137 return self._msg_id.ledger_id() 138 139 def entry_id(self): 140 return self._msg_id.entry_id() 141 142 def batch_index(self): 143 return self._msg_id.batch_index() 144 145 def partition(self): 146 return self._msg_id.partition() 147 148 def serialize(self): 149 """ 150 Returns a bytes representation of the message id. 151 This bytes sequence can be stored and later deserialized. 152 """ 153 return self._msg_id.serialize() 154 155 @staticmethod 156 def deserialize(message_id_bytes): 157 """ 158 Deserialize a message id object from a previously 159 serialized bytes sequence. 160 """ 161 return _pulsar.MessageId.deserialize(message_id_bytes) 162 163 164class Message: 165 """ 166 Message objects are returned by a consumer, either by calling `receive` or 167 through a listener. 168 """ 169 170 def data(self): 171 """ 172 Returns object typed bytes with the payload of the message. 173 """ 174 return self._message.data() 175 176 def value(self): 177 """ 178 Returns object with the de-serialized version of the message content 179 """ 180 return self._schema.decode(self._message.data()) 181 182 def properties(self): 183 """ 184 Return the properties attached to the message. Properties are 185 application-defined key/value pairs that will be attached to the 186 message. 187 """ 188 return self._message.properties() 189 190 def partition_key(self): 191 """ 192 Get the partitioning key for the message. 193 """ 194 return self._message.partition_key() 195 196 def publish_timestamp(self): 197 """ 198 Get the timestamp in milliseconds with the message publish time. 199 """ 200 return self._message.publish_timestamp() 201 202 def event_timestamp(self): 203 """ 204 Get the timestamp in milliseconds with the message event time. 205 """ 206 return self._message.event_timestamp() 207 208 def message_id(self): 209 """ 210 The message ID that can be used to refere to this particular message. 211 """ 212 return self._message.message_id() 213 214 def topic_name(self): 215 """ 216 Get the topic Name from which this message originated from 217 """ 218 return self._message.topic_name() 219 220 def redelivery_count(self): 221 """ 222 Get the redelivery count for this message 223 """ 224 return self._message.redelivery_count() 225 226 def schema_version(self): 227 """ 228 Get the schema version for this message 229 """ 230 return self._message.schema_version() 231 232 @staticmethod 233 def _wrap(_message): 234 self = Message() 235 self._message = _message 236 return self 237 238 239class MessageBatch: 240 241 def __init__(self): 242 self._msg_batch = _pulsar.MessageBatch() 243 244 def with_message_id(self, msg_id): 245 if not isinstance(msg_id, _pulsar.MessageId): 246 if isinstance(msg_id, MessageId): 247 msg_id = msg_id._msg_id 248 else: 249 raise TypeError("unknown message id type") 250 self._msg_batch.with_message_id(msg_id) 251 return self 252 253 def parse_from(self, data, size): 254 self._msg_batch.parse_from(data, size) 255 _msgs = self._msg_batch.messages() 256 return list(map(Message._wrap, _msgs)) 257 258 259class Authentication: 260 """ 261 Authentication provider object. Used to load authentication from an external 262 shared library. 263 """ 264 def __init__(self, dynamicLibPath, authParamsString): 265 """ 266 Create the authentication provider instance. 267 268 **Args** 269 270 * `dynamicLibPath`: Path to the authentication provider shared library 271 (such as `tls.so`) 272 * `authParamsString`: Comma-separated list of provider-specific 273 configuration params 274 """ 275 _check_type(str, dynamicLibPath, 'dynamicLibPath') 276 _check_type(str, authParamsString, 'authParamsString') 277 self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString) 278 279 280class AuthenticationTLS(Authentication): 281 """ 282 TLS Authentication implementation 283 """ 284 def __init__(self, certificate_path, private_key_path): 285 """ 286 Create the TLS authentication provider instance. 287 288 **Args** 289 290 * `certificatePath`: Path to the public certificate 291 * `privateKeyPath`: Path to private TLS key 292 """ 293 _check_type(str, certificate_path, 'certificate_path') 294 _check_type(str, private_key_path, 'private_key_path') 295 self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path) 296 297 298class AuthenticationToken(Authentication): 299 """ 300 Token based authentication implementation 301 """ 302 def __init__(self, token): 303 """ 304 Create the token authentication provider instance. 305 306 **Args** 307 308 * `token`: A string containing the token or a functions that provides a 309 string with the token 310 """ 311 if not (isinstance(token, str) or callable(token)): 312 raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") 313 self.auth = _pulsar.AuthenticationToken(token) 314 315 316class AuthenticationAthenz(Authentication): 317 """ 318 Athenz Authentication implementation 319 """ 320 def __init__(self, auth_params_string): 321 """ 322 Create the Athenz authentication provider instance. 323 324 **Args** 325 326 * `auth_params_string`: JSON encoded configuration for Athenz client 327 """ 328 _check_type(str, auth_params_string, 'auth_params_string') 329 self.auth = _pulsar.AuthenticationAthenz(auth_params_string) 330 331class AuthenticationOauth2(Authentication): 332 """ 333 Oauth2 Authentication implementation 334 """ 335 def __init__(self, auth_params_string): 336 """ 337 Create the Oauth2 authentication provider instance. 338 339 **Args** 340 341 * `auth_params_string`: JSON encoded configuration for Oauth2 client 342 """ 343 _check_type(str, auth_params_string, 'auth_params_string') 344 self.auth = _pulsar.AuthenticationOauth2(auth_params_string) 345 346class Client: 347 """ 348 The Pulsar client. A single client instance can be used to create producers 349 and consumers on multiple topics. 350 351 The client will share the same connection pool and threads across all 352 producers and consumers. 353 """ 354 355 def __init__(self, service_url, 356 authentication=None, 357 operation_timeout_seconds=30, 358 io_threads=1, 359 message_listener_threads=1, 360 concurrent_lookup_requests=50000, 361 log_conf_file_path=None, 362 use_tls=False, 363 tls_trust_certs_file_path=None, 364 tls_allow_insecure_connection=False, 365 tls_validate_hostname=False, 366 logger=None, 367 connection_timeout_ms=10000, 368 ): 369 """ 370 Create a new Pulsar client instance. 371 372 **Args** 373 374 * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/ 375 376 **Options** 377 378 * `authentication`: 379 Set the authentication provider to be used with the broker. For example: 380 `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2` 381 * `operation_timeout_seconds`: 382 Set timeout on client operations (subscribe, create producer, close, 383 unsubscribe). 384 * `io_threads`: 385 Set the number of IO threads to be used by the Pulsar client. 386 * `message_listener_threads`: 387 Set the number of threads to be used by the Pulsar client when 388 delivering messages through message listener. The default is 1 thread 389 per Pulsar client. If using more than 1 thread, messages for distinct 390 `message_listener`s will be delivered in different threads, however a 391 single `MessageListener` will always be assigned to the same thread. 392 * `concurrent_lookup_requests`: 393 Number of concurrent lookup-requests allowed on each broker connection 394 to prevent overload on the broker. 395 * `log_conf_file_path`: 396 Initialize log4cxx from a configuration file. 397 * `use_tls`: 398 Configure whether to use TLS encryption on the connection. This setting 399 is deprecated. TLS will be automatically enabled if the `serviceUrl` is 400 set to `pulsar+ssl://` or `https://` 401 * `tls_trust_certs_file_path`: 402 Set the path to the trusted TLS certificate file. If empty defaults to 403 certifi. 404 * `tls_allow_insecure_connection`: 405 Configure whether the Pulsar client accepts untrusted TLS certificates 406 from the broker. 407 * `tls_validate_hostname`: 408 Configure whether the Pulsar client validates that the hostname of the 409 endpoint, matches the common name on the TLS certificate presented by 410 the endpoint. 411 * `logger`: 412 Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. 413 * `connection_timeout_ms`: 414 Set timeout in milliseconds on TCP connections. 415 """ 416 _check_type(str, service_url, 'service_url') 417 _check_type_or_none(Authentication, authentication, 'authentication') 418 _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') 419 _check_type(int, connection_timeout_ms, 'connection_timeout_ms') 420 _check_type(int, io_threads, 'io_threads') 421 _check_type(int, message_listener_threads, 'message_listener_threads') 422 _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') 423 _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') 424 _check_type(bool, use_tls, 'use_tls') 425 _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') 426 _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') 427 _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') 428 _check_type_or_none(logging.Logger, logger, 'logger') 429 430 conf = _pulsar.ClientConfiguration() 431 if authentication: 432 conf.authentication(authentication.auth) 433 conf.operation_timeout_seconds(operation_timeout_seconds) 434 conf.connection_timeout(connection_timeout_ms) 435 conf.io_threads(io_threads) 436 conf.message_listener_threads(message_listener_threads) 437 conf.concurrent_lookup_requests(concurrent_lookup_requests) 438 if log_conf_file_path: 439 conf.log_conf_file_path(log_conf_file_path) 440 if logger: 441 conf.set_logger(logger) 442 if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): 443 conf.use_tls(True) 444 if tls_trust_certs_file_path: 445 conf.tls_trust_certs_file_path(tls_trust_certs_file_path) 446 else: 447 conf.tls_trust_certs_file_path(certifi.where()) 448 conf.tls_allow_insecure_connection(tls_allow_insecure_connection) 449 conf.tls_validate_hostname(tls_validate_hostname) 450 self._client = _pulsar.Client(service_url, conf) 451 self._consumers = [] 452 453 def create_producer(self, topic, 454 producer_name=None, 455 schema=schema.BytesSchema(), 456 initial_sequence_id=None, 457 send_timeout_millis=30000, 458 compression_type=CompressionType.NONE, 459 max_pending_messages=1000, 460 max_pending_messages_across_partitions=50000, 461 block_if_queue_full=False, 462 batching_enabled=False, 463 batching_max_messages=1000, 464 batching_max_allowed_size_in_bytes=128*1024, 465 batching_max_publish_delay_ms=10, 466 message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, 467 lazy_start_partitioned_producers=False, 468 properties=None, 469 batching_type=BatchingType.Default, 470 encryption_key=None, 471 crypto_key_reader=None 472 ): 473 """ 474 Create a new producer on a given topic. 475 476 **Args** 477 478 * `topic`: 479 The topic name 480 481 **Options** 482 483 * `producer_name`: 484 Specify a name for the producer. If not assigned, 485 the system will generate a globally unique name which can be accessed 486 with `Producer.producer_name()`. When specifying a name, it is app to 487 the user to ensure that, for a given topic, the producer name is unique 488 across all Pulsar's clusters. 489 * `schema`: 490 Define the schema of the data that will be published by this producer. 491 The schema will be used for two purposes: 492 - Validate the data format against the topic defined schema 493 - Perform serialization/deserialization between data and objects 494 An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`. 495 * `initial_sequence_id`: 496 Set the baseline for the sequence ids for messages 497 published by the producer. First message will be using 498 `(initialSequenceId + 1)`` as its sequence id and subsequent messages will 499 be assigned incremental sequence ids, if not otherwise specified. 500 * `send_timeout_millis`: 501 If a message is not acknowledged by the server before the 502 `send_timeout` expires, an error will be reported. 503 * `compression_type`: 504 Set the compression type for the producer. By default, message 505 payloads are not compressed. Supported compression types are 506 `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`. 507 ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that 508 release in order to be able to receive messages compressed with ZSTD. 509 SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that 510 release in order to be able to receive messages compressed with SNAPPY. 511 * `max_pending_messages`: 512 Set the max size of the queue holding the messages pending to receive 513 an acknowledgment from the broker. 514 * `max_pending_messages_across_partitions`: 515 Set the max size of the queue holding the messages pending to receive 516 an acknowledgment across partitions from the broker. 517 * `block_if_queue_full`: Set whether `send_async` operations should 518 block when the outgoing message queue is full. 519 * `message_routing_mode`: 520 Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`, 521 other option is `PartitionsRoutingMode.UseSinglePartition` 522 * `lazy_start_partitioned_producers`: 523 This config affects producers of partitioned topics only. It controls whether 524 producers register and connect immediately to the owner broker of each partition 525 or start lazily on demand. The internal producer of one partition is always 526 started eagerly, chosen by the routing policy, but the internal producers of 527 any additional partitions are started on demand, upon receiving their first 528 message. 529 Using this mode can reduce the strain on brokers for topics with large numbers of 530 partitions and when the SinglePartition routing policy is used without keyed messages. 531 Because producer connection can be on demand, this can produce extra send latency 532 for the first messages of a given partition. 533 * `properties`: 534 Sets the properties for the producer. The properties associated with a producer 535 can be used for identify a producer at broker side. 536 * `batching_type`: 537 Sets the batching type for the producer. 538 There are two batching type: DefaultBatching and KeyBasedBatching. 539 - Default batching 540 incoming single messages: 541 (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) 542 batched into single batch message: 543 [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] 544 545 - KeyBasedBatching 546 incoming single messages: 547 (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) 548 batched into single batch message: 549 [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] 550 * encryption_key: 551 The key used for symmetric encryption, configured on the producer side 552 * crypto_key_reader: 553 Symmetric encryption class implementation, configuring public key encryption messages for the producer 554 and private key decryption messages for the consumer 555 """ 556 _check_type(str, topic, 'topic') 557 _check_type_or_none(str, producer_name, 'producer_name') 558 _check_type(_schema.Schema, schema, 'schema') 559 _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') 560 _check_type(int, send_timeout_millis, 'send_timeout_millis') 561 _check_type(CompressionType, compression_type, 'compression_type') 562 _check_type(int, max_pending_messages, 'max_pending_messages') 563 _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') 564 _check_type(bool, block_if_queue_full, 'block_if_queue_full') 565 _check_type(bool, batching_enabled, 'batching_enabled') 566 _check_type(int, batching_max_messages, 'batching_max_messages') 567 _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes') 568 _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') 569 _check_type_or_none(dict, properties, 'properties') 570 _check_type(BatchingType, batching_type, 'batching_type') 571 _check_type_or_none(str, encryption_key, 'encryption_key') 572 _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') 573 _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') 574 575 conf = _pulsar.ProducerConfiguration() 576 conf.send_timeout_millis(send_timeout_millis) 577 conf.compression_type(compression_type) 578 conf.max_pending_messages(max_pending_messages) 579 conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) 580 conf.block_if_queue_full(block_if_queue_full) 581 conf.batching_enabled(batching_enabled) 582 conf.batching_max_messages(batching_max_messages) 583 conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) 584 conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) 585 conf.partitions_routing_mode(message_routing_mode) 586 conf.batching_type(batching_type) 587 conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) 588 if producer_name: 589 conf.producer_name(producer_name) 590 if initial_sequence_id: 591 conf.initial_sequence_id(initial_sequence_id) 592 if properties: 593 for k, v in properties.items(): 594 conf.property(k, v) 595 596 conf.schema(schema.schema_info()) 597 if encryption_key: 598 conf.encryption_key(encryption_key) 599 if crypto_key_reader: 600 conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) 601 602 p = Producer() 603 p._producer = self._client.create_producer(topic, conf) 604 p._schema = schema 605 p._client = self._client 606 return p 607 608 def subscribe(self, topic, subscription_name, 609 consumer_type=ConsumerType.Exclusive, 610 schema=schema.BytesSchema(), 611 message_listener=None, 612 receiver_queue_size=1000, 613 max_total_receiver_queue_size_across_partitions=50000, 614 consumer_name=None, 615 unacked_messages_timeout_ms=None, 616 broker_consumer_stats_cache_time_ms=30000, 617 negative_ack_redelivery_delay_ms=60000, 618 is_read_compacted=False, 619 properties=None, 620 pattern_auto_discovery_period=60, 621 initial_position=InitialPosition.Latest, 622 crypto_key_reader=None, 623 replicate_subscription_state_enabled=False 624 ): 625 """ 626 Subscribe to the given topic and subscription combination. 627 628 **Args** 629 630 * `topic`: The name of the topic, list of topics or regex pattern. 631 This method will accept these forms: 632 - `topic='my-topic'` 633 - `topic=['topic-1', 'topic-2', 'topic-3']` 634 - `topic=re.compile('persistent://public/default/topic-*')` 635 * `subscription`: The name of the subscription. 636 637 **Options** 638 639 * `consumer_type`: 640 Select the subscription type to be used when subscribing to the topic. 641 * `schema`: 642 Define the schema of the data that will be received by this consumer. 643 * `message_listener`: 644 Sets a message listener for the consumer. When the listener is set, 645 the application will receive messages through it. Calls to 646 `consumer.receive()` will not be allowed. The listener function needs 647 to accept (consumer, message), for example: 648 649 #!python 650 def my_listener(consumer, message): 651 # process message 652 consumer.acknowledge(message) 653 654 * `receiver_queue_size`: 655 Sets the size of the consumer receive queue. The consumer receive 656 queue controls how many messages can be accumulated by the consumer 657 before the application calls `receive()`. Using a higher value could 658 potentially increase the consumer throughput at the expense of higher 659 memory utilization. Setting the consumer queue size to zero decreases 660 the throughput of the consumer by disabling pre-fetching of messages. 661 This approach improves the message distribution on shared subscription 662 by pushing messages only to those consumers that are ready to process 663 them. Neither receive with timeout nor partitioned topics can be used 664 if the consumer queue size is zero. The `receive()` function call 665 should not be interrupted when the consumer queue size is zero. The 666 default value is 1000 messages and should work well for most use 667 cases. 668 * `max_total_receiver_queue_size_across_partitions` 669 Set the max total receiver queue size across partitions. 670 This setting will be used to reduce the receiver queue size for individual partitions 671 * `consumer_name`: 672 Sets the consumer name. 673 * `unacked_messages_timeout_ms`: 674 Sets the timeout in milliseconds for unacknowledged messages. The 675 timeout needs to be greater than 10 seconds. An exception is thrown if 676 the given value is less than 10 seconds. If a successful 677 acknowledgement is not sent within the timeout, all the unacknowledged 678 messages are redelivered. 679 * `negative_ack_redelivery_delay_ms`: 680 The delay after which to redeliver the messages that failed to be 681 processed (with the `consumer.negative_acknowledge()`) 682 * `broker_consumer_stats_cache_time_ms`: 683 Sets the time duration for which the broker-side consumer stats will 684 be cached in the client. 685 * `is_read_compacted`: 686 Selects whether to read the compacted version of the topic 687 * `properties`: 688 Sets the properties for the consumer. The properties associated with a consumer 689 can be used for identify a consumer at broker side. 690 * `pattern_auto_discovery_period`: 691 Periods of seconds for consumer to auto discover match topics. 692 * `initial_position`: 693 Set the initial position of a consumer when subscribing to the topic. 694 It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`. 695 Default: `Latest`. 696 * crypto_key_reader: 697 Symmetric encryption class implementation, configuring public key encryption messages for the producer 698 and private key decryption messages for the consumer 699 * replicate_subscription_state_enabled: 700 Set whether the subscription status should be replicated. 701 Default: `False`. 702 """ 703 _check_type(str, subscription_name, 'subscription_name') 704 _check_type(ConsumerType, consumer_type, 'consumer_type') 705 _check_type(_schema.Schema, schema, 'schema') 706 _check_type(int, receiver_queue_size, 'receiver_queue_size') 707 _check_type(int, max_total_receiver_queue_size_across_partitions, 708 'max_total_receiver_queue_size_across_partitions') 709 _check_type_or_none(str, consumer_name, 'consumer_name') 710 _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') 711 _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') 712 _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms') 713 _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') 714 _check_type(bool, is_read_compacted, 'is_read_compacted') 715 _check_type_or_none(dict, properties, 'properties') 716 _check_type(InitialPosition, initial_position, 'initial_position') 717 _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') 718 719 conf = _pulsar.ConsumerConfiguration() 720 conf.consumer_type(consumer_type) 721 conf.read_compacted(is_read_compacted) 722 if message_listener: 723 conf.message_listener(_listener_wrapper(message_listener, schema)) 724 conf.receiver_queue_size(receiver_queue_size) 725 conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) 726 if consumer_name: 727 conf.consumer_name(consumer_name) 728 if unacked_messages_timeout_ms: 729 conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) 730 731 conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms) 732 conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) 733 if properties: 734 for k, v in properties.items(): 735 conf.property(k, v) 736 conf.subscription_initial_position(initial_position) 737 738 conf.schema(schema.schema_info()) 739 740 if crypto_key_reader: 741 conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) 742 743 conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) 744 745 c = Consumer() 746 if isinstance(topic, str): 747 # Single topic 748 c._consumer = self._client.subscribe(topic, subscription_name, conf) 749 elif isinstance(topic, list): 750 # List of topics 751 c._consumer = self._client.subscribe_topics(topic, subscription_name, conf) 752 elif isinstance(topic, _retype): 753 # Regex pattern 754 c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf) 755 else: 756 raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") 757 758 c._client = self 759 c._schema = schema 760 self._consumers.append(c) 761 return c 762 763 def create_reader(self, topic, start_message_id, 764 schema=schema.BytesSchema(), 765 reader_listener=None, 766 receiver_queue_size=1000, 767 reader_name=None, 768 subscription_role_prefix=None, 769 is_read_compacted=False, 770 crypto_key_reader=None 771 ): 772 """ 773 Create a reader on a particular topic 774 775 **Args** 776 777 * `topic`: The name of the topic. 778 * `start_message_id`: The initial reader positioning is done by specifying a message id. 779 The options are: 780 * `MessageId.earliest`: Start reading from the earliest message available in the topic 781 * `MessageId.latest`: Start reading from the end topic, only getting messages published 782 after the reader was created 783 * `MessageId`: When passing a particular message id, the reader will position itself on 784 that specific position. The first message to be read will be the message next to the 785 specified messageId. Message id can be serialized into a string and deserialized 786 back into a `MessageId` object: 787 788 # Serialize to string 789 s = msg.message_id().serialize() 790 791 # Deserialize from string 792 msg_id = MessageId.deserialize(s) 793 794 **Options** 795 796 * `schema`: 797 Define the schema of the data that will be received by this reader. 798 * `reader_listener`: 799 Sets a message listener for the reader. When the listener is set, 800 the application will receive messages through it. Calls to 801 `reader.read_next()` will not be allowed. The listener function needs 802 to accept (reader, message), for example: 803 804 def my_listener(reader, message): 805 # process message 806 pass 807 808 * `receiver_queue_size`: 809 Sets the size of the reader receive queue. The reader receive 810 queue controls how many messages can be accumulated by the reader 811 before the application calls `read_next()`. Using a higher value could 812 potentially increase the reader throughput at the expense of higher 813 memory utilization. 814 * `reader_name`: 815 Sets the reader name. 816 * `subscription_role_prefix`: 817 Sets the subscription role prefix. 818 * `is_read_compacted`: 819 Selects whether to read the compacted version of the topic 820 * crypto_key_reader: 821 Symmetric encryption class implementation, configuring public key encryption messages for the producer 822 and private key decryption messages for the consumer 823 """ 824 _check_type(str, topic, 'topic') 825 _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') 826 _check_type(_schema.Schema, schema, 'schema') 827 _check_type(int, receiver_queue_size, 'receiver_queue_size') 828 _check_type_or_none(str, reader_name, 'reader_name') 829 _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') 830 _check_type(bool, is_read_compacted, 'is_read_compacted') 831 _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') 832 833 conf = _pulsar.ReaderConfiguration() 834 if reader_listener: 835 conf.reader_listener(_listener_wrapper(reader_listener, schema)) 836 conf.receiver_queue_size(receiver_queue_size) 837 if reader_name: 838 conf.reader_name(reader_name) 839 if subscription_role_prefix: 840 conf.subscription_role_prefix(subscription_role_prefix) 841 conf.schema(schema.schema_info()) 842 conf.read_compacted(is_read_compacted) 843 if crypto_key_reader: 844 conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) 845 846 c = Reader() 847 c._reader = self._client.create_reader(topic, start_message_id, conf) 848 c._client = self 849 c._schema = schema 850 self._consumers.append(c) 851 return c 852 853 def get_topic_partitions(self, topic): 854 """ 855 Get the list of partitions for a given topic. 856 857 If the topic is partitioned, this will return a list of partition names. If the topic is not 858 partitioned, the returned list will contain the topic name itself. 859 860 This can be used to discover the partitions and create Reader, Consumer or Producer 861 instances directly on a particular partition. 862 :param topic: the topic name to lookup 863 :return: a list of partition name 864 """ 865 _check_type(str, topic, 'topic') 866 return self._client.get_topic_partitions(topic) 867 868 def shutdown(self): 869 """ 870 Perform immediate shutdown of Pulsar client. 871 872 Release all resources and close all producer, consumer, and readers without waiting 873 for ongoing operations to complete. 874 """ 875 self._client.shutdown() 876 877 def close(self): 878 """ 879 Close the client and all the associated producers and consumers 880 """ 881 self._client.close() 882 883 884class Producer: 885 """ 886 The Pulsar message producer, used to publish messages on a topic. 887 """ 888 889 def topic(self): 890 """ 891 Return the topic which producer is publishing to 892 """ 893 return self._producer.topic() 894 895 def producer_name(self): 896 """ 897 Return the producer name which could have been assigned by the 898 system or specified by the client 899 """ 900 return self._producer.producer_name() 901 902 def last_sequence_id(self): 903 """ 904 Get the last sequence id that was published by this producer. 905 906 This represent either the automatically assigned or custom sequence id 907 (set on the `MessageBuilder`) that was published and acknowledged by the broker. 908 909 After recreating a producer with the same producer name, this will return the 910 last message that was published in the previous producer session, or -1 if 911 there no message was ever published. 912 """ 913 return self._producer.last_sequence_id() 914 915 def send(self, content, 916 properties=None, 917 partition_key=None, 918 sequence_id=None, 919 replication_clusters=None, 920 disable_replication=False, 921 event_timestamp=None, 922 deliver_at=None, 923 deliver_after=None, 924 ): 925 """ 926 Publish a message on the topic. Blocks until the message is acknowledged 927 928 Returns a `MessageId` object that represents where the message is persisted. 929 930 **Args** 931 932 * `content`: 933 A `bytes` object with the message payload. 934 935 **Options** 936 937 * `properties`: 938 A dict of application-defined string properties. 939 * `partition_key`: 940 Sets the partition key for message routing. A hash of this key is used 941 to determine the message's topic partition. 942 * `sequence_id`: 943 Specify a custom sequence id for the message being published. 944 * `replication_clusters`: 945 Override namespace replication clusters. Note that it is the caller's 946 responsibility to provide valid cluster names and that all clusters 947 have been previously configured as topics. Given an empty list, 948 the message will replicate according to the namespace configuration. 949 * `disable_replication`: 950 Do not replicate this message. 951 * `event_timestamp`: 952 Timestamp in millis of the timestamp of event creation 953 * `deliver_at`: 954 Specify the this message should not be delivered earlier than the 955 specified timestamp. 956 The timestamp is milliseconds and based on UTC 957 * `deliver_after`: 958 Specify a delay in timedelta for the delivery of the messages. 959 960 """ 961 msg = self._build_msg(content, properties, partition_key, sequence_id, 962 replication_clusters, disable_replication, event_timestamp, 963 deliver_at, deliver_after) 964 return MessageId.deserialize(self._producer.send(msg)) 965 966 def send_async(self, content, callback, 967 properties=None, 968 partition_key=None, 969 sequence_id=None, 970 replication_clusters=None, 971 disable_replication=False, 972 event_timestamp=None, 973 deliver_at=None, 974 deliver_after=None, 975 ): 976 """ 977 Send a message asynchronously. 978 979 The `callback` will be invoked once the message has been acknowledged 980 by the broker. 981 982 Example: 983 984 #!python 985 def callback(res, msg_id): 986 print('Message published: %s' % res) 987 988 producer.send_async(msg, callback) 989 990 When the producer queue is full, by default the message will be rejected 991 and the callback invoked with an error code. 992 993 **Args** 994 995 * `content`: 996 A `bytes` object with the message payload. 997 998 **Options** 999 1000 * `properties`: 1001 A dict of application0-defined string properties. 1002 * `partition_key`: 1003 Sets the partition key for the message routing. A hash of this key is 1004 used to determine the message's topic partition. 1005 * `sequence_id`: 1006 Specify a custom sequence id for the message being published. 1007 * `replication_clusters`: Override namespace replication clusters. Note 1008 that it is the caller's responsibility to provide valid cluster names 1009 and that all clusters have been previously configured as topics. 1010 Given an empty list, the message will replicate per the namespace 1011 configuration. 1012 * `disable_replication`: 1013 Do not replicate this message. 1014 * `event_timestamp`: 1015 Timestamp in millis of the timestamp of event creation 1016 * `deliver_at`: 1017 Specify the this message should not be delivered earlier than the 1018 specified timestamp. 1019 The timestamp is milliseconds and based on UTC 1020 * `deliver_after`: 1021 Specify a delay in timedelta for the delivery of the messages. 1022 """ 1023 msg = self._build_msg(content, properties, partition_key, sequence_id, 1024 replication_clusters, disable_replication, event_timestamp, 1025 deliver_at, deliver_after) 1026 self._producer.send_async(msg, callback) 1027 1028 1029 def flush(self): 1030 """ 1031 Flush all the messages buffered in the client and wait until all messages have been 1032 successfully persisted 1033 """ 1034 self._producer.flush() 1035 1036 1037 def close(self): 1038 """ 1039 Close the producer. 1040 """ 1041 self._producer.close() 1042 1043 def _build_msg(self, content, properties, partition_key, sequence_id, 1044 replication_clusters, disable_replication, event_timestamp, 1045 deliver_at, deliver_after): 1046 data = self._schema.encode(content) 1047 1048 _check_type(bytes, data, 'data') 1049 _check_type_or_none(dict, properties, 'properties') 1050 _check_type_or_none(str, partition_key, 'partition_key') 1051 _check_type_or_none(int, sequence_id, 'sequence_id') 1052 _check_type_or_none(list, replication_clusters, 'replication_clusters') 1053 _check_type(bool, disable_replication, 'disable_replication') 1054 _check_type_or_none(int, event_timestamp, 'event_timestamp') 1055 _check_type_or_none(int, deliver_at, 'deliver_at') 1056 _check_type_or_none(timedelta, deliver_after, 'deliver_after') 1057 1058 mb = _pulsar.MessageBuilder() 1059 mb.content(data) 1060 if properties: 1061 for k, v in properties.items(): 1062 mb.property(k, v) 1063 if partition_key: 1064 mb.partition_key(partition_key) 1065 if sequence_id: 1066 mb.sequence_id(sequence_id) 1067 if replication_clusters: 1068 mb.replication_clusters(replication_clusters) 1069 if disable_replication: 1070 mb.disable_replication(disable_replication) 1071 if event_timestamp: 1072 mb.event_timestamp(event_timestamp) 1073 if deliver_at: 1074 mb.deliver_at(deliver_at) 1075 if deliver_after: 1076 mb.deliver_after(deliver_after) 1077 1078 return mb.build() 1079 1080 def is_connected(self): 1081 """ 1082 Check if the producer is connected or not. 1083 """ 1084 return self._producer.is_connected() 1085 1086 1087class Consumer: 1088 """ 1089 Pulsar consumer. 1090 """ 1091 1092 def topic(self): 1093 """ 1094 Return the topic this consumer is subscribed to. 1095 """ 1096 return self._consumer.topic() 1097 1098 def subscription_name(self): 1099 """ 1100 Return the subscription name. 1101 """ 1102 return self._consumer.subscription_name() 1103 1104 def unsubscribe(self): 1105 """ 1106 Unsubscribe the current consumer from the topic. 1107 1108 This method will block until the operation is completed. Once the 1109 consumer is unsubscribed, no more messages will be received and 1110 subsequent new messages will not be retained for this consumer. 1111 1112 This consumer object cannot be reused. 1113 """ 1114 return self._consumer.unsubscribe() 1115 1116 def receive(self, timeout_millis=None): 1117 """ 1118 Receive a single message. 1119 1120 If a message is not immediately available, this method will block until 1121 a new message is available. 1122 1123 **Options** 1124 1125 * `timeout_millis`: 1126 If specified, the receive will raise an exception if a message is not 1127 available within the timeout. 1128 """ 1129 if timeout_millis is None: 1130 msg = self._consumer.receive() 1131 else: 1132 _check_type(int, timeout_millis, 'timeout_millis') 1133 msg = self._consumer.receive(timeout_millis) 1134 1135 m = Message() 1136 m._message = msg 1137 m._schema = self._schema 1138 return m 1139 1140 def acknowledge(self, message): 1141 """ 1142 Acknowledge the reception of a single message. 1143 1144 This method will block until an acknowledgement is sent to the broker. 1145 After that, the message will not be re-delivered to this consumer. 1146 1147 **Args** 1148 1149 * `message`: 1150 The received message or message id. 1151 """ 1152 if isinstance(message, Message): 1153 self._consumer.acknowledge(message._message) 1154 else: 1155 self._consumer.acknowledge(message) 1156 1157 def acknowledge_cumulative(self, message): 1158 """ 1159 Acknowledge the reception of all the messages in the stream up to (and 1160 including) the provided message. 1161 1162 This method will block until an acknowledgement is sent to the broker. 1163 After that, the messages will not be re-delivered to this consumer. 1164 1165 **Args** 1166 1167 * `message`: 1168 The received message or message id. 1169 """ 1170 if isinstance(message, Message): 1171 self._consumer.acknowledge_cumulative(message._message) 1172 else: 1173 self._consumer.acknowledge_cumulative(message) 1174 1175 def negative_acknowledge(self, message): 1176 """ 1177 Acknowledge the failure to process a single message. 1178 1179 When a message is "negatively acked" it will be marked for redelivery after 1180 some fixed delay. The delay is configurable when constructing the consumer 1181 with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. 1182 1183 This call is not blocking. 1184 1185 **Args** 1186 1187 * `message`: 1188 The received message or message id. 1189 """ 1190 if isinstance(message, Message): 1191 self._consumer.negative_acknowledge(message._message) 1192 else: 1193 self._consumer.negative_acknowledge(message) 1194 1195 def pause_message_listener(self): 1196 """ 1197 Pause receiving messages via the `message_listener` until 1198 `resume_message_listener()` is called. 1199 """ 1200 self._consumer.pause_message_listener() 1201 1202 def resume_message_listener(self): 1203 """ 1204 Resume receiving the messages via the message listener. 1205 Asynchronously receive all the messages enqueued from the time 1206 `pause_message_listener()` was called. 1207 """ 1208 self._consumer.resume_message_listener() 1209 1210 def redeliver_unacknowledged_messages(self): 1211 """ 1212 Redelivers all the unacknowledged messages. In failover mode, the 1213 request is ignored if the consumer is not active for the given topic. In 1214 shared mode, the consumer's messages to be redelivered are distributed 1215 across all the connected consumers. This is a non-blocking call and 1216 doesn't throw an exception. In case the connection breaks, the messages 1217 are redelivered after reconnect. 1218 """ 1219 self._consumer.redeliver_unacknowledged_messages() 1220 1221 def seek(self, messageid): 1222 """ 1223 Reset the subscription associated with this consumer to a specific message id or publish timestamp. 1224 The message id can either be a specific message or represent the first or last messages in the topic. 1225 Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the 1226 seek() on the individual partitions. 1227 1228 **Args** 1229 1230 * `message`: 1231 The message id for seek, OR an integer event time to seek to 1232 """ 1233 self._consumer.seek(messageid) 1234 1235 def close(self): 1236 """ 1237 Close the consumer. 1238 """ 1239 self._consumer.close() 1240 self._client._consumers.remove(self) 1241 1242 def is_connected(self): 1243 """ 1244 Check if the consumer is connected or not. 1245 """ 1246 return self._consumer.is_connected() 1247 1248 1249 1250class Reader: 1251 """ 1252 Pulsar topic reader. 1253 """ 1254 1255 def topic(self): 1256 """ 1257 Return the topic this reader is reading from. 1258 """ 1259 return self._reader.topic() 1260 1261 def read_next(self, timeout_millis=None): 1262 """ 1263 Read a single message. 1264 1265 If a message is not immediately available, this method will block until 1266 a new message is available. 1267 1268 **Options** 1269 1270 * `timeout_millis`: 1271 If specified, the receive will raise an exception if a message is not 1272 available within the timeout. 1273 """ 1274 if timeout_millis is None: 1275 msg = self._reader.read_next() 1276 else: 1277 _check_type(int, timeout_millis, 'timeout_millis') 1278 msg = self._reader.read_next(timeout_millis) 1279 1280 m = Message() 1281 m._message = msg 1282 m._schema = self._schema 1283 return m 1284 1285 def has_message_available(self): 1286 """ 1287 Check if there is any message available to read from the current position. 1288 """ 1289 return self._reader.has_message_available(); 1290 1291 def seek(self, messageid): 1292 """ 1293 Reset this reader to a specific message id or publish timestamp. 1294 The message id can either be a specific message or represent the first or last messages in the topic. 1295 Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the 1296 seek() on the individual partitions. 1297 1298 **Args** 1299 1300 * `message`: 1301 The message id for seek, OR an integer event time to seek to 1302 """ 1303 self._reader.seek(messageid) 1304 1305 def close(self): 1306 """ 1307 Close the reader. 1308 """ 1309 self._reader.close() 1310 self._client._consumers.remove(self) 1311 1312 def is_connected(self): 1313 """ 1314 Check if the reader is connected or not. 1315 """ 1316 return self._reader.is_connected() 1317 1318 1319class CryptoKeyReader: 1320 """ 1321 Default crypto key reader implementation 1322 """ 1323 def __init__(self, public_key_path, private_key_path): 1324 """ 1325 Create crypto key reader. 1326 1327 **Args** 1328 1329 * `public_key_path`: Path to the public key 1330 * `private_key_path`: Path to private key 1331 """ 1332 _check_type(str, public_key_path, 'public_key_path') 1333 _check_type(str, private_key_path, 'private_key_path') 1334 self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) 1335 1336def _check_type(var_type, var, name): 1337 if not isinstance(var, var_type): 1338 raise ValueError("Argument %s is expected to be of type '%s' and not '%s'" 1339 % (name, var_type.__name__, type(var).__name__)) 1340 1341 1342def _check_type_or_none(var_type, var, name): 1343 if var is not None and not isinstance(var, var_type): 1344 raise ValueError("Argument %s is expected to be either None or of type '%s'" 1345 % (name, var_type.__name__)) 1346 1347 1348def _listener_wrapper(listener, schema): 1349 def wrapper(consumer, msg): 1350 c = Consumer() 1351 c._consumer = consumer 1352 m = Message() 1353 m._message = msg 1354 m._schema = schema 1355 listener(c, m) 1356 return wrapper
123class MessageId: 124 """ 125 Represents a message id 126 """ 127 128 def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): 129 self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) 130 131 'Represents the earliest message stored in a topic' 132 earliest = _pulsar.MessageId.earliest 133 134 'Represents the latest message published on a topic' 135 latest = _pulsar.MessageId.latest 136 137 def ledger_id(self): 138 return self._msg_id.ledger_id() 139 140 def entry_id(self): 141 return self._msg_id.entry_id() 142 143 def batch_index(self): 144 return self._msg_id.batch_index() 145 146 def partition(self): 147 return self._msg_id.partition() 148 149 def serialize(self): 150 """ 151 Returns a bytes representation of the message id. 152 This bytes sequence can be stored and later deserialized. 153 """ 154 return self._msg_id.serialize() 155 156 @staticmethod 157 def deserialize(message_id_bytes): 158 """ 159 Deserialize a message id object from a previously 160 serialized bytes sequence. 161 """ 162 return _pulsar.MessageId.deserialize(message_id_bytes)
Represents a message id
149 def serialize(self): 150 """ 151 Returns a bytes representation of the message id. 152 This bytes sequence can be stored and later deserialized. 153 """ 154 return self._msg_id.serialize()
Returns a bytes representation of the message id. This bytes sequence can be stored and later deserialized.
156 @staticmethod 157 def deserialize(message_id_bytes): 158 """ 159 Deserialize a message id object from a previously 160 serialized bytes sequence. 161 """ 162 return _pulsar.MessageId.deserialize(message_id_bytes)
Deserialize a message id object from a previously serialized bytes sequence.
165class Message: 166 """ 167 Message objects are returned by a consumer, either by calling `receive` or 168 through a listener. 169 """ 170 171 def data(self): 172 """ 173 Returns object typed bytes with the payload of the message. 174 """ 175 return self._message.data() 176 177 def value(self): 178 """ 179 Returns object with the de-serialized version of the message content 180 """ 181 return self._schema.decode(self._message.data()) 182 183 def properties(self): 184 """ 185 Return the properties attached to the message. Properties are 186 application-defined key/value pairs that will be attached to the 187 message. 188 """ 189 return self._message.properties() 190 191 def partition_key(self): 192 """ 193 Get the partitioning key for the message. 194 """ 195 return self._message.partition_key() 196 197 def publish_timestamp(self): 198 """ 199 Get the timestamp in milliseconds with the message publish time. 200 """ 201 return self._message.publish_timestamp() 202 203 def event_timestamp(self): 204 """ 205 Get the timestamp in milliseconds with the message event time. 206 """ 207 return self._message.event_timestamp() 208 209 def message_id(self): 210 """ 211 The message ID that can be used to refere to this particular message. 212 """ 213 return self._message.message_id() 214 215 def topic_name(self): 216 """ 217 Get the topic Name from which this message originated from 218 """ 219 return self._message.topic_name() 220 221 def redelivery_count(self): 222 """ 223 Get the redelivery count for this message 224 """ 225 return self._message.redelivery_count() 226 227 def schema_version(self): 228 """ 229 Get the schema version for this message 230 """ 231 return self._message.schema_version() 232 233 @staticmethod 234 def _wrap(_message): 235 self = Message() 236 self._message = _message 237 return self
Message objects are returned by a consumer, either by calling receive
or
through a listener.
171 def data(self): 172 """ 173 Returns object typed bytes with the payload of the message. 174 """ 175 return self._message.data()
Returns object typed bytes with the payload of the message.
177 def value(self): 178 """ 179 Returns object with the de-serialized version of the message content 180 """ 181 return self._schema.decode(self._message.data())
Returns object with the de-serialized version of the message content
183 def properties(self): 184 """ 185 Return the properties attached to the message. Properties are 186 application-defined key/value pairs that will be attached to the 187 message. 188 """ 189 return self._message.properties()
Return the properties attached to the message. Properties are application-defined key/value pairs that will be attached to the message.
191 def partition_key(self): 192 """ 193 Get the partitioning key for the message. 194 """ 195 return self._message.partition_key()
Get the partitioning key for the message.
197 def publish_timestamp(self): 198 """ 199 Get the timestamp in milliseconds with the message publish time. 200 """ 201 return self._message.publish_timestamp()
Get the timestamp in milliseconds with the message publish time.
203 def event_timestamp(self): 204 """ 205 Get the timestamp in milliseconds with the message event time. 206 """ 207 return self._message.event_timestamp()
Get the timestamp in milliseconds with the message event time.
209 def message_id(self): 210 """ 211 The message ID that can be used to refere to this particular message. 212 """ 213 return self._message.message_id()
The message ID that can be used to refere to this particular message.
215 def topic_name(self): 216 """ 217 Get the topic Name from which this message originated from 218 """ 219 return self._message.topic_name()
Get the topic Name from which this message originated from
240class MessageBatch: 241 242 def __init__(self): 243 self._msg_batch = _pulsar.MessageBatch() 244 245 def with_message_id(self, msg_id): 246 if not isinstance(msg_id, _pulsar.MessageId): 247 if isinstance(msg_id, MessageId): 248 msg_id = msg_id._msg_id 249 else: 250 raise TypeError("unknown message id type") 251 self._msg_batch.with_message_id(msg_id) 252 return self 253 254 def parse_from(self, data, size): 255 self._msg_batch.parse_from(data, size) 256 _msgs = self._msg_batch.messages() 257 return list(map(Message._wrap, _msgs))
260class Authentication: 261 """ 262 Authentication provider object. Used to load authentication from an external 263 shared library. 264 """ 265 def __init__(self, dynamicLibPath, authParamsString): 266 """ 267 Create the authentication provider instance. 268 269 **Args** 270 271 * `dynamicLibPath`: Path to the authentication provider shared library 272 (such as `tls.so`) 273 * `authParamsString`: Comma-separated list of provider-specific 274 configuration params 275 """ 276 _check_type(str, dynamicLibPath, 'dynamicLibPath') 277 _check_type(str, authParamsString, 'authParamsString') 278 self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
Authentication provider object. Used to load authentication from an external shared library.
265 def __init__(self, dynamicLibPath, authParamsString): 266 """ 267 Create the authentication provider instance. 268 269 **Args** 270 271 * `dynamicLibPath`: Path to the authentication provider shared library 272 (such as `tls.so`) 273 * `authParamsString`: Comma-separated list of provider-specific 274 configuration params 275 """ 276 _check_type(str, dynamicLibPath, 'dynamicLibPath') 277 _check_type(str, authParamsString, 'authParamsString') 278 self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
Create the authentication provider instance.
Args
dynamicLibPath
: Path to the authentication provider shared library (such astls.so
)authParamsString
: Comma-separated list of provider-specific configuration params
281class AuthenticationTLS(Authentication): 282 """ 283 TLS Authentication implementation 284 """ 285 def __init__(self, certificate_path, private_key_path): 286 """ 287 Create the TLS authentication provider instance. 288 289 **Args** 290 291 * `certificatePath`: Path to the public certificate 292 * `privateKeyPath`: Path to private TLS key 293 """ 294 _check_type(str, certificate_path, 'certificate_path') 295 _check_type(str, private_key_path, 'private_key_path') 296 self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
TLS Authentication implementation
285 def __init__(self, certificate_path, private_key_path): 286 """ 287 Create the TLS authentication provider instance. 288 289 **Args** 290 291 * `certificatePath`: Path to the public certificate 292 * `privateKeyPath`: Path to private TLS key 293 """ 294 _check_type(str, certificate_path, 'certificate_path') 295 _check_type(str, private_key_path, 'private_key_path') 296 self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
Create the TLS authentication provider instance.
Args
certificatePath
: Path to the public certificateprivateKeyPath
: Path to private TLS key
299class AuthenticationToken(Authentication): 300 """ 301 Token based authentication implementation 302 """ 303 def __init__(self, token): 304 """ 305 Create the token authentication provider instance. 306 307 **Args** 308 309 * `token`: A string containing the token or a functions that provides a 310 string with the token 311 """ 312 if not (isinstance(token, str) or callable(token)): 313 raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") 314 self.auth = _pulsar.AuthenticationToken(token)
Token based authentication implementation
303 def __init__(self, token): 304 """ 305 Create the token authentication provider instance. 306 307 **Args** 308 309 * `token`: A string containing the token or a functions that provides a 310 string with the token 311 """ 312 if not (isinstance(token, str) or callable(token)): 313 raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") 314 self.auth = _pulsar.AuthenticationToken(token)
Create the token authentication provider instance.
Args
token
: A string containing the token or a functions that provides a string with the token
317class AuthenticationAthenz(Authentication): 318 """ 319 Athenz Authentication implementation 320 """ 321 def __init__(self, auth_params_string): 322 """ 323 Create the Athenz authentication provider instance. 324 325 **Args** 326 327 * `auth_params_string`: JSON encoded configuration for Athenz client 328 """ 329 _check_type(str, auth_params_string, 'auth_params_string') 330 self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
Athenz Authentication implementation
321 def __init__(self, auth_params_string): 322 """ 323 Create the Athenz authentication provider instance. 324 325 **Args** 326 327 * `auth_params_string`: JSON encoded configuration for Athenz client 328 """ 329 _check_type(str, auth_params_string, 'auth_params_string') 330 self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
Create the Athenz authentication provider instance.
Args
auth_params_string
: JSON encoded configuration for Athenz client
332class AuthenticationOauth2(Authentication): 333 """ 334 Oauth2 Authentication implementation 335 """ 336 def __init__(self, auth_params_string): 337 """ 338 Create the Oauth2 authentication provider instance. 339 340 **Args** 341 342 * `auth_params_string`: JSON encoded configuration for Oauth2 client 343 """ 344 _check_type(str, auth_params_string, 'auth_params_string') 345 self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
Oauth2 Authentication implementation
336 def __init__(self, auth_params_string): 337 """ 338 Create the Oauth2 authentication provider instance. 339 340 **Args** 341 342 * `auth_params_string`: JSON encoded configuration for Oauth2 client 343 """ 344 _check_type(str, auth_params_string, 'auth_params_string') 345 self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
Create the Oauth2 authentication provider instance.
Args
auth_params_string
: JSON encoded configuration for Oauth2 client
347class Client: 348 """ 349 The Pulsar client. A single client instance can be used to create producers 350 and consumers on multiple topics. 351 352 The client will share the same connection pool and threads across all 353 producers and consumers. 354 """ 355 356 def __init__(self, service_url, 357 authentication=None, 358 operation_timeout_seconds=30, 359 io_threads=1, 360 message_listener_threads=1, 361 concurrent_lookup_requests=50000, 362 log_conf_file_path=None, 363 use_tls=False, 364 tls_trust_certs_file_path=None, 365 tls_allow_insecure_connection=False, 366 tls_validate_hostname=False, 367 logger=None, 368 connection_timeout_ms=10000, 369 ): 370 """ 371 Create a new Pulsar client instance. 372 373 **Args** 374 375 * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/ 376 377 **Options** 378 379 * `authentication`: 380 Set the authentication provider to be used with the broker. For example: 381 `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2` 382 * `operation_timeout_seconds`: 383 Set timeout on client operations (subscribe, create producer, close, 384 unsubscribe). 385 * `io_threads`: 386 Set the number of IO threads to be used by the Pulsar client. 387 * `message_listener_threads`: 388 Set the number of threads to be used by the Pulsar client when 389 delivering messages through message listener. The default is 1 thread 390 per Pulsar client. If using more than 1 thread, messages for distinct 391 `message_listener`s will be delivered in different threads, however a 392 single `MessageListener` will always be assigned to the same thread. 393 * `concurrent_lookup_requests`: 394 Number of concurrent lookup-requests allowed on each broker connection 395 to prevent overload on the broker. 396 * `log_conf_file_path`: 397 Initialize log4cxx from a configuration file. 398 * `use_tls`: 399 Configure whether to use TLS encryption on the connection. This setting 400 is deprecated. TLS will be automatically enabled if the `serviceUrl` is 401 set to `pulsar+ssl://` or `https://` 402 * `tls_trust_certs_file_path`: 403 Set the path to the trusted TLS certificate file. If empty defaults to 404 certifi. 405 * `tls_allow_insecure_connection`: 406 Configure whether the Pulsar client accepts untrusted TLS certificates 407 from the broker. 408 * `tls_validate_hostname`: 409 Configure whether the Pulsar client validates that the hostname of the 410 endpoint, matches the common name on the TLS certificate presented by 411 the endpoint. 412 * `logger`: 413 Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. 414 * `connection_timeout_ms`: 415 Set timeout in milliseconds on TCP connections. 416 """ 417 _check_type(str, service_url, 'service_url') 418 _check_type_or_none(Authentication, authentication, 'authentication') 419 _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') 420 _check_type(int, connection_timeout_ms, 'connection_timeout_ms') 421 _check_type(int, io_threads, 'io_threads') 422 _check_type(int, message_listener_threads, 'message_listener_threads') 423 _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') 424 _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') 425 _check_type(bool, use_tls, 'use_tls') 426 _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') 427 _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') 428 _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') 429 _check_type_or_none(logging.Logger, logger, 'logger') 430 431 conf = _pulsar.ClientConfiguration() 432 if authentication: 433 conf.authentication(authentication.auth) 434 conf.operation_timeout_seconds(operation_timeout_seconds) 435 conf.connection_timeout(connection_timeout_ms) 436 conf.io_threads(io_threads) 437 conf.message_listener_threads(message_listener_threads) 438 conf.concurrent_lookup_requests(concurrent_lookup_requests) 439 if log_conf_file_path: 440 conf.log_conf_file_path(log_conf_file_path) 441 if logger: 442 conf.set_logger(logger) 443 if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): 444 conf.use_tls(True) 445 if tls_trust_certs_file_path: 446 conf.tls_trust_certs_file_path(tls_trust_certs_file_path) 447 else: 448 conf.tls_trust_certs_file_path(certifi.where()) 449 conf.tls_allow_insecure_connection(tls_allow_insecure_connection) 450 conf.tls_validate_hostname(tls_validate_hostname) 451 self._client = _pulsar.Client(service_url, conf) 452 self._consumers = [] 453 454 def create_producer(self, topic, 455 producer_name=None, 456 schema=schema.BytesSchema(), 457 initial_sequence_id=None, 458 send_timeout_millis=30000, 459 compression_type=CompressionType.NONE, 460 max_pending_messages=1000, 461 max_pending_messages_across_partitions=50000, 462 block_if_queue_full=False, 463 batching_enabled=False, 464 batching_max_messages=1000, 465 batching_max_allowed_size_in_bytes=128*1024, 466 batching_max_publish_delay_ms=10, 467 message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, 468 lazy_start_partitioned_producers=False, 469 properties=None, 470 batching_type=BatchingType.Default, 471 encryption_key=None, 472 crypto_key_reader=None 473 ): 474 """ 475 Create a new producer on a given topic. 476 477 **Args** 478 479 * `topic`: 480 The topic name 481 482 **Options** 483 484 * `producer_name`: 485 Specify a name for the producer. If not assigned, 486 the system will generate a globally unique name which can be accessed 487 with `Producer.producer_name()`. When specifying a name, it is app to 488 the user to ensure that, for a given topic, the producer name is unique 489 across all Pulsar's clusters. 490 * `schema`: 491 Define the schema of the data that will be published by this producer. 492 The schema will be used for two purposes: 493 - Validate the data format against the topic defined schema 494 - Perform serialization/deserialization between data and objects 495 An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`. 496 * `initial_sequence_id`: 497 Set the baseline for the sequence ids for messages 498 published by the producer. First message will be using 499 `(initialSequenceId + 1)`` as its sequence id and subsequent messages will 500 be assigned incremental sequence ids, if not otherwise specified. 501 * `send_timeout_millis`: 502 If a message is not acknowledged by the server before the 503 `send_timeout` expires, an error will be reported. 504 * `compression_type`: 505 Set the compression type for the producer. By default, message 506 payloads are not compressed. Supported compression types are 507 `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`. 508 ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that 509 release in order to be able to receive messages compressed with ZSTD. 510 SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that 511 release in order to be able to receive messages compressed with SNAPPY. 512 * `max_pending_messages`: 513 Set the max size of the queue holding the messages pending to receive 514 an acknowledgment from the broker. 515 * `max_pending_messages_across_partitions`: 516 Set the max size of the queue holding the messages pending to receive 517 an acknowledgment across partitions from the broker. 518 * `block_if_queue_full`: Set whether `send_async` operations should 519 block when the outgoing message queue is full. 520 * `message_routing_mode`: 521 Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`, 522 other option is `PartitionsRoutingMode.UseSinglePartition` 523 * `lazy_start_partitioned_producers`: 524 This config affects producers of partitioned topics only. It controls whether 525 producers register and connect immediately to the owner broker of each partition 526 or start lazily on demand. The internal producer of one partition is always 527 started eagerly, chosen by the routing policy, but the internal producers of 528 any additional partitions are started on demand, upon receiving their first 529 message. 530 Using this mode can reduce the strain on brokers for topics with large numbers of 531 partitions and when the SinglePartition routing policy is used without keyed messages. 532 Because producer connection can be on demand, this can produce extra send latency 533 for the first messages of a given partition. 534 * `properties`: 535 Sets the properties for the producer. The properties associated with a producer 536 can be used for identify a producer at broker side. 537 * `batching_type`: 538 Sets the batching type for the producer. 539 There are two batching type: DefaultBatching and KeyBasedBatching. 540 - Default batching 541 incoming single messages: 542 (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) 543 batched into single batch message: 544 [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] 545 546 - KeyBasedBatching 547 incoming single messages: 548 (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) 549 batched into single batch message: 550 [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] 551 * encryption_key: 552 The key used for symmetric encryption, configured on the producer side 553 * crypto_key_reader: 554 Symmetric encryption class implementation, configuring public key encryption messages for the producer 555 and private key decryption messages for the consumer 556 """ 557 _check_type(str, topic, 'topic') 558 _check_type_or_none(str, producer_name, 'producer_name') 559 _check_type(_schema.Schema, schema, 'schema') 560 _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') 561 _check_type(int, send_timeout_millis, 'send_timeout_millis') 562 _check_type(CompressionType, compression_type, 'compression_type') 563 _check_type(int, max_pending_messages, 'max_pending_messages') 564 _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') 565 _check_type(bool, block_if_queue_full, 'block_if_queue_full') 566 _check_type(bool, batching_enabled, 'batching_enabled') 567 _check_type(int, batching_max_messages, 'batching_max_messages') 568 _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes') 569 _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') 570 _check_type_or_none(dict, properties, 'properties') 571 _check_type(BatchingType, batching_type, 'batching_type') 572 _check_type_or_none(str, encryption_key, 'encryption_key') 573 _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') 574 _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') 575 576 conf = _pulsar.ProducerConfiguration() 577 conf.send_timeout_millis(send_timeout_millis) 578 conf.compression_type(compression_type) 579 conf.max_pending_messages(max_pending_messages) 580 conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) 581 conf.block_if_queue_full(block_if_queue_full) 582 conf.batching_enabled(batching_enabled) 583 conf.batching_max_messages(batching_max_messages) 584 conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) 585 conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) 586 conf.partitions_routing_mode(message_routing_mode) 587 conf.batching_type(batching_type) 588 conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) 589 if producer_name: 590 conf.producer_name(producer_name) 591 if initial_sequence_id: 592 conf.initial_sequence_id(initial_sequence_id) 593 if properties: 594 for k, v in properties.items(): 595 conf.property(k, v) 596 597 conf.schema(schema.schema_info()) 598 if encryption_key: 599 conf.encryption_key(encryption_key) 600 if crypto_key_reader: 601 conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) 602 603 p = Producer() 604 p._producer = self._client.create_producer(topic, conf) 605 p._schema = schema 606 p._client = self._client 607 return p 608 609 def subscribe(self, topic, subscription_name, 610 consumer_type=ConsumerType.Exclusive, 611 schema=schema.BytesSchema(), 612 message_listener=None, 613 receiver_queue_size=1000, 614 max_total_receiver_queue_size_across_partitions=50000, 615 consumer_name=None, 616 unacked_messages_timeout_ms=None, 617 broker_consumer_stats_cache_time_ms=30000, 618 negative_ack_redelivery_delay_ms=60000, 619 is_read_compacted=False, 620 properties=None, 621 pattern_auto_discovery_period=60, 622 initial_position=InitialPosition.Latest, 623 crypto_key_reader=None, 624 replicate_subscription_state_enabled=False 625 ): 626 """ 627 Subscribe to the given topic and subscription combination. 628 629 **Args** 630 631 * `topic`: The name of the topic, list of topics or regex pattern. 632 This method will accept these forms: 633 - `topic='my-topic'` 634 - `topic=['topic-1', 'topic-2', 'topic-3']` 635 - `topic=re.compile('persistent://public/default/topic-*')` 636 * `subscription`: The name of the subscription. 637 638 **Options** 639 640 * `consumer_type`: 641 Select the subscription type to be used when subscribing to the topic. 642 * `schema`: 643 Define the schema of the data that will be received by this consumer. 644 * `message_listener`: 645 Sets a message listener for the consumer. When the listener is set, 646 the application will receive messages through it. Calls to 647 `consumer.receive()` will not be allowed. The listener function needs 648 to accept (consumer, message), for example: 649 650 #!python 651 def my_listener(consumer, message): 652 # process message 653 consumer.acknowledge(message) 654 655 * `receiver_queue_size`: 656 Sets the size of the consumer receive queue. The consumer receive 657 queue controls how many messages can be accumulated by the consumer 658 before the application calls `receive()`. Using a higher value could 659 potentially increase the consumer throughput at the expense of higher 660 memory utilization. Setting the consumer queue size to zero decreases 661 the throughput of the consumer by disabling pre-fetching of messages. 662 This approach improves the message distribution on shared subscription 663 by pushing messages only to those consumers that are ready to process 664 them. Neither receive with timeout nor partitioned topics can be used 665 if the consumer queue size is zero. The `receive()` function call 666 should not be interrupted when the consumer queue size is zero. The 667 default value is 1000 messages and should work well for most use 668 cases. 669 * `max_total_receiver_queue_size_across_partitions` 670 Set the max total receiver queue size across partitions. 671 This setting will be used to reduce the receiver queue size for individual partitions 672 * `consumer_name`: 673 Sets the consumer name. 674 * `unacked_messages_timeout_ms`: 675 Sets the timeout in milliseconds for unacknowledged messages. The 676 timeout needs to be greater than 10 seconds. An exception is thrown if 677 the given value is less than 10 seconds. If a successful 678 acknowledgement is not sent within the timeout, all the unacknowledged 679 messages are redelivered. 680 * `negative_ack_redelivery_delay_ms`: 681 The delay after which to redeliver the messages that failed to be 682 processed (with the `consumer.negative_acknowledge()`) 683 * `broker_consumer_stats_cache_time_ms`: 684 Sets the time duration for which the broker-side consumer stats will 685 be cached in the client. 686 * `is_read_compacted`: 687 Selects whether to read the compacted version of the topic 688 * `properties`: 689 Sets the properties for the consumer. The properties associated with a consumer 690 can be used for identify a consumer at broker side. 691 * `pattern_auto_discovery_period`: 692 Periods of seconds for consumer to auto discover match topics. 693 * `initial_position`: 694 Set the initial position of a consumer when subscribing to the topic. 695 It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`. 696 Default: `Latest`. 697 * crypto_key_reader: 698 Symmetric encryption class implementation, configuring public key encryption messages for the producer 699 and private key decryption messages for the consumer 700 * replicate_subscription_state_enabled: 701 Set whether the subscription status should be replicated. 702 Default: `False`. 703 """ 704 _check_type(str, subscription_name, 'subscription_name') 705 _check_type(ConsumerType, consumer_type, 'consumer_type') 706 _check_type(_schema.Schema, schema, 'schema') 707 _check_type(int, receiver_queue_size, 'receiver_queue_size') 708 _check_type(int, max_total_receiver_queue_size_across_partitions, 709 'max_total_receiver_queue_size_across_partitions') 710 _check_type_or_none(str, consumer_name, 'consumer_name') 711 _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') 712 _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') 713 _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms') 714 _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') 715 _check_type(bool, is_read_compacted, 'is_read_compacted') 716 _check_type_or_none(dict, properties, 'properties') 717 _check_type(InitialPosition, initial_position, 'initial_position') 718 _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') 719 720 conf = _pulsar.ConsumerConfiguration() 721 conf.consumer_type(consumer_type) 722 conf.read_compacted(is_read_compacted) 723 if message_listener: 724 conf.message_listener(_listener_wrapper(message_listener, schema)) 725 conf.receiver_queue_size(receiver_queue_size) 726 conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) 727 if consumer_name: 728 conf.consumer_name(consumer_name) 729 if unacked_messages_timeout_ms: 730 conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) 731 732 conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms) 733 conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) 734 if properties: 735 for k, v in properties.items(): 736 conf.property(k, v) 737 conf.subscription_initial_position(initial_position) 738 739 conf.schema(schema.schema_info()) 740 741 if crypto_key_reader: 742 conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) 743 744 conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) 745 746 c = Consumer() 747 if isinstance(topic, str): 748 # Single topic 749 c._consumer = self._client.subscribe(topic, subscription_name, conf) 750 elif isinstance(topic, list): 751 # List of topics 752 c._consumer = self._client.subscribe_topics(topic, subscription_name, conf) 753 elif isinstance(topic, _retype): 754 # Regex pattern 755 c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf) 756 else: 757 raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") 758 759 c._client = self 760 c._schema = schema 761 self._consumers.append(c) 762 return c 763 764 def create_reader(self, topic, start_message_id, 765 schema=schema.BytesSchema(), 766 reader_listener=None, 767 receiver_queue_size=1000, 768 reader_name=None, 769 subscription_role_prefix=None, 770 is_read_compacted=False, 771 crypto_key_reader=None 772 ): 773 """ 774 Create a reader on a particular topic 775 776 **Args** 777 778 * `topic`: The name of the topic. 779 * `start_message_id`: The initial reader positioning is done by specifying a message id. 780 The options are: 781 * `MessageId.earliest`: Start reading from the earliest message available in the topic 782 * `MessageId.latest`: Start reading from the end topic, only getting messages published 783 after the reader was created 784 * `MessageId`: When passing a particular message id, the reader will position itself on 785 that specific position. The first message to be read will be the message next to the 786 specified messageId. Message id can be serialized into a string and deserialized 787 back into a `MessageId` object: 788 789 # Serialize to string 790 s = msg.message_id().serialize() 791 792 # Deserialize from string 793 msg_id = MessageId.deserialize(s) 794 795 **Options** 796 797 * `schema`: 798 Define the schema of the data that will be received by this reader. 799 * `reader_listener`: 800 Sets a message listener for the reader. When the listener is set, 801 the application will receive messages through it. Calls to 802 `reader.read_next()` will not be allowed. The listener function needs 803 to accept (reader, message), for example: 804 805 def my_listener(reader, message): 806 # process message 807 pass 808 809 * `receiver_queue_size`: 810 Sets the size of the reader receive queue. The reader receive 811 queue controls how many messages can be accumulated by the reader 812 before the application calls `read_next()`. Using a higher value could 813 potentially increase the reader throughput at the expense of higher 814 memory utilization. 815 * `reader_name`: 816 Sets the reader name. 817 * `subscription_role_prefix`: 818 Sets the subscription role prefix. 819 * `is_read_compacted`: 820 Selects whether to read the compacted version of the topic 821 * crypto_key_reader: 822 Symmetric encryption class implementation, configuring public key encryption messages for the producer 823 and private key decryption messages for the consumer 824 """ 825 _check_type(str, topic, 'topic') 826 _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') 827 _check_type(_schema.Schema, schema, 'schema') 828 _check_type(int, receiver_queue_size, 'receiver_queue_size') 829 _check_type_or_none(str, reader_name, 'reader_name') 830 _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') 831 _check_type(bool, is_read_compacted, 'is_read_compacted') 832 _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') 833 834 conf = _pulsar.ReaderConfiguration() 835 if reader_listener: 836 conf.reader_listener(_listener_wrapper(reader_listener, schema)) 837 conf.receiver_queue_size(receiver_queue_size) 838 if reader_name: 839 conf.reader_name(reader_name) 840 if subscription_role_prefix: 841 conf.subscription_role_prefix(subscription_role_prefix) 842 conf.schema(schema.schema_info()) 843 conf.read_compacted(is_read_compacted) 844 if crypto_key_reader: 845 conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) 846 847 c = Reader() 848 c._reader = self._client.create_reader(topic, start_message_id, conf) 849 c._client = self 850 c._schema = schema 851 self._consumers.append(c) 852 return c 853 854 def get_topic_partitions(self, topic): 855 """ 856 Get the list of partitions for a given topic. 857 858 If the topic is partitioned, this will return a list of partition names. If the topic is not 859 partitioned, the returned list will contain the topic name itself. 860 861 This can be used to discover the partitions and create Reader, Consumer or Producer 862 instances directly on a particular partition. 863 :param topic: the topic name to lookup 864 :return: a list of partition name 865 """ 866 _check_type(str, topic, 'topic') 867 return self._client.get_topic_partitions(topic) 868 869 def shutdown(self): 870 """ 871 Perform immediate shutdown of Pulsar client. 872 873 Release all resources and close all producer, consumer, and readers without waiting 874 for ongoing operations to complete. 875 """ 876 self._client.shutdown() 877 878 def close(self): 879 """ 880 Close the client and all the associated producers and consumers 881 """ 882 self._client.close()
The Pulsar client. A single client instance can be used to create producers and consumers on multiple topics.
The client will share the same connection pool and threads across all producers and consumers.
356 def __init__(self, service_url, 357 authentication=None, 358 operation_timeout_seconds=30, 359 io_threads=1, 360 message_listener_threads=1, 361 concurrent_lookup_requests=50000, 362 log_conf_file_path=None, 363 use_tls=False, 364 tls_trust_certs_file_path=None, 365 tls_allow_insecure_connection=False, 366 tls_validate_hostname=False, 367 logger=None, 368 connection_timeout_ms=10000, 369 ): 370 """ 371 Create a new Pulsar client instance. 372 373 **Args** 374 375 * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/ 376 377 **Options** 378 379 * `authentication`: 380 Set the authentication provider to be used with the broker. For example: 381 `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2` 382 * `operation_timeout_seconds`: 383 Set timeout on client operations (subscribe, create producer, close, 384 unsubscribe). 385 * `io_threads`: 386 Set the number of IO threads to be used by the Pulsar client. 387 * `message_listener_threads`: 388 Set the number of threads to be used by the Pulsar client when 389 delivering messages through message listener. The default is 1 thread 390 per Pulsar client. If using more than 1 thread, messages for distinct 391 `message_listener`s will be delivered in different threads, however a 392 single `MessageListener` will always be assigned to the same thread. 393 * `concurrent_lookup_requests`: 394 Number of concurrent lookup-requests allowed on each broker connection 395 to prevent overload on the broker. 396 * `log_conf_file_path`: 397 Initialize log4cxx from a configuration file. 398 * `use_tls`: 399 Configure whether to use TLS encryption on the connection. This setting 400 is deprecated. TLS will be automatically enabled if the `serviceUrl` is 401 set to `pulsar+ssl://` or `https://` 402 * `tls_trust_certs_file_path`: 403 Set the path to the trusted TLS certificate file. If empty defaults to 404 certifi. 405 * `tls_allow_insecure_connection`: 406 Configure whether the Pulsar client accepts untrusted TLS certificates 407 from the broker. 408 * `tls_validate_hostname`: 409 Configure whether the Pulsar client validates that the hostname of the 410 endpoint, matches the common name on the TLS certificate presented by 411 the endpoint. 412 * `logger`: 413 Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. 414 * `connection_timeout_ms`: 415 Set timeout in milliseconds on TCP connections. 416 """ 417 _check_type(str, service_url, 'service_url') 418 _check_type_or_none(Authentication, authentication, 'authentication') 419 _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') 420 _check_type(int, connection_timeout_ms, 'connection_timeout_ms') 421 _check_type(int, io_threads, 'io_threads') 422 _check_type(int, message_listener_threads, 'message_listener_threads') 423 _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') 424 _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') 425 _check_type(bool, use_tls, 'use_tls') 426 _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') 427 _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') 428 _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') 429 _check_type_or_none(logging.Logger, logger, 'logger') 430 431 conf = _pulsar.ClientConfiguration() 432 if authentication: 433 conf.authentication(authentication.auth) 434 conf.operation_timeout_seconds(operation_timeout_seconds) 435 conf.connection_timeout(connection_timeout_ms) 436 conf.io_threads(io_threads) 437 conf.message_listener_threads(message_listener_threads) 438 conf.concurrent_lookup_requests(concurrent_lookup_requests) 439 if log_conf_file_path: 440 conf.log_conf_file_path(log_conf_file_path) 441 if logger: 442 conf.set_logger(logger) 443 if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): 444 conf.use_tls(True) 445 if tls_trust_certs_file_path: 446 conf.tls_trust_certs_file_path(tls_trust_certs_file_path) 447 else: 448 conf.tls_trust_certs_file_path(certifi.where()) 449 conf.tls_allow_insecure_connection(tls_allow_insecure_connection) 450 conf.tls_validate_hostname(tls_validate_hostname) 451 self._client = _pulsar.Client(service_url, conf) 452 self._consumers = []
Create a new Pulsar client instance.
Args
service_url
: The Pulsar service url eg: pulsar://my-broker.com:6650/
Options
authentication
: Set the authentication provider to be used with the broker. For example:AuthenticationTls
, AuthenticaionToken,AuthenticationAthenz
orAuthenticationOauth2
operation_timeout_seconds
: Set timeout on client operations (subscribe, create producer, close, unsubscribe).io_threads
: Set the number of IO threads to be used by the Pulsar client.message_listener_threads
: Set the number of threads to be used by the Pulsar client when delivering messages through message listener. The default is 1 thread per Pulsar client. If using more than 1 thread, messages for distinctmessage_listener
s will be delivered in different threads, however a singleMessageListener
will always be assigned to the same thread.concurrent_lookup_requests
: Number of concurrent lookup-requests allowed on each broker connection to prevent overload on the broker.log_conf_file_path
: Initialize log4cxx from a configuration file.use_tls
: Configure whether to use TLS encryption on the connection. This setting is deprecated. TLS will be automatically enabled if theserviceUrl
is set topulsar+ssl://
orhttps://
tls_trust_certs_file_path
: Set the path to the trusted TLS certificate file. If empty defaults to certifi.tls_allow_insecure_connection
: Configure whether the Pulsar client accepts untrusted TLS certificates from the broker.tls_validate_hostname
: Configure whether the Pulsar client validates that the hostname of the endpoint, matches the common name on the TLS certificate presented by the endpoint.logger
: Set a Python logger for this Pulsar client. Should be an instance oflogging.Logger
.connection_timeout_ms
: Set timeout in milliseconds on TCP connections.
454 def create_producer(self, topic, 455 producer_name=None, 456 schema=schema.BytesSchema(), 457 initial_sequence_id=None, 458 send_timeout_millis=30000, 459 compression_type=CompressionType.NONE, 460 max_pending_messages=1000, 461 max_pending_messages_across_partitions=50000, 462 block_if_queue_full=False, 463 batching_enabled=False, 464 batching_max_messages=1000, 465 batching_max_allowed_size_in_bytes=128*1024, 466 batching_max_publish_delay_ms=10, 467 message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, 468 lazy_start_partitioned_producers=False, 469 properties=None, 470 batching_type=BatchingType.Default, 471 encryption_key=None, 472 crypto_key_reader=None 473 ): 474 """ 475 Create a new producer on a given topic. 476 477 **Args** 478 479 * `topic`: 480 The topic name 481 482 **Options** 483 484 * `producer_name`: 485 Specify a name for the producer. If not assigned, 486 the system will generate a globally unique name which can be accessed 487 with `Producer.producer_name()`. When specifying a name, it is app to 488 the user to ensure that, for a given topic, the producer name is unique 489 across all Pulsar's clusters. 490 * `schema`: 491 Define the schema of the data that will be published by this producer. 492 The schema will be used for two purposes: 493 - Validate the data format against the topic defined schema 494 - Perform serialization/deserialization between data and objects 495 An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`. 496 * `initial_sequence_id`: 497 Set the baseline for the sequence ids for messages 498 published by the producer. First message will be using 499 `(initialSequenceId + 1)`` as its sequence id and subsequent messages will 500 be assigned incremental sequence ids, if not otherwise specified. 501 * `send_timeout_millis`: 502 If a message is not acknowledged by the server before the 503 `send_timeout` expires, an error will be reported. 504 * `compression_type`: 505 Set the compression type for the producer. By default, message 506 payloads are not compressed. Supported compression types are 507 `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`. 508 ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that 509 release in order to be able to receive messages compressed with ZSTD. 510 SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that 511 release in order to be able to receive messages compressed with SNAPPY. 512 * `max_pending_messages`: 513 Set the max size of the queue holding the messages pending to receive 514 an acknowledgment from the broker. 515 * `max_pending_messages_across_partitions`: 516 Set the max size of the queue holding the messages pending to receive 517 an acknowledgment across partitions from the broker. 518 * `block_if_queue_full`: Set whether `send_async` operations should 519 block when the outgoing message queue is full. 520 * `message_routing_mode`: 521 Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`, 522 other option is `PartitionsRoutingMode.UseSinglePartition` 523 * `lazy_start_partitioned_producers`: 524 This config affects producers of partitioned topics only. It controls whether 525 producers register and connect immediately to the owner broker of each partition 526 or start lazily on demand. The internal producer of one partition is always 527 started eagerly, chosen by the routing policy, but the internal producers of 528 any additional partitions are started on demand, upon receiving their first 529 message. 530 Using this mode can reduce the strain on brokers for topics with large numbers of 531 partitions and when the SinglePartition routing policy is used without keyed messages. 532 Because producer connection can be on demand, this can produce extra send latency 533 for the first messages of a given partition. 534 * `properties`: 535 Sets the properties for the producer. The properties associated with a producer 536 can be used for identify a producer at broker side. 537 * `batching_type`: 538 Sets the batching type for the producer. 539 There are two batching type: DefaultBatching and KeyBasedBatching. 540 - Default batching 541 incoming single messages: 542 (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) 543 batched into single batch message: 544 [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] 545 546 - KeyBasedBatching 547 incoming single messages: 548 (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) 549 batched into single batch message: 550 [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] 551 * encryption_key: 552 The key used for symmetric encryption, configured on the producer side 553 * crypto_key_reader: 554 Symmetric encryption class implementation, configuring public key encryption messages for the producer 555 and private key decryption messages for the consumer 556 """ 557 _check_type(str, topic, 'topic') 558 _check_type_or_none(str, producer_name, 'producer_name') 559 _check_type(_schema.Schema, schema, 'schema') 560 _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') 561 _check_type(int, send_timeout_millis, 'send_timeout_millis') 562 _check_type(CompressionType, compression_type, 'compression_type') 563 _check_type(int, max_pending_messages, 'max_pending_messages') 564 _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') 565 _check_type(bool, block_if_queue_full, 'block_if_queue_full') 566 _check_type(bool, batching_enabled, 'batching_enabled') 567 _check_type(int, batching_max_messages, 'batching_max_messages') 568 _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes') 569 _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') 570 _check_type_or_none(dict, properties, 'properties') 571 _check_type(BatchingType, batching_type, 'batching_type') 572 _check_type_or_none(str, encryption_key, 'encryption_key') 573 _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') 574 _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') 575 576 conf = _pulsar.ProducerConfiguration() 577 conf.send_timeout_millis(send_timeout_millis) 578 conf.compression_type(compression_type) 579 conf.max_pending_messages(max_pending_messages) 580 conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) 581 conf.block_if_queue_full(block_if_queue_full) 582 conf.batching_enabled(batching_enabled) 583 conf.batching_max_messages(batching_max_messages) 584 conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) 585 conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) 586 conf.partitions_routing_mode(message_routing_mode) 587 conf.batching_type(batching_type) 588 conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) 589 if producer_name: 590 conf.producer_name(producer_name) 591 if initial_sequence_id: 592 conf.initial_sequence_id(initial_sequence_id) 593 if properties: 594 for k, v in properties.items(): 595 conf.property(k, v) 596 597 conf.schema(schema.schema_info()) 598 if encryption_key: 599 conf.encryption_key(encryption_key) 600 if crypto_key_reader: 601 conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) 602 603 p = Producer() 604 p._producer = self._client.create_producer(topic, conf) 605 p._schema = schema 606 p._client = self._client 607 return p
Create a new producer on a given topic.
Args
topic
: The topic name
Options
producer_name
: Specify a name for the producer. If not assigned, the system will generate a globally unique name which can be accessed withProducer.producer_name()
. When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters.schema
: Define the schema of the data that will be published by this producer. The schema will be used for two purposes:- Validate the data format against the topic defined schema
- Perform serialization/deserialization between data and objects
An example for this parameter would be to pass
schema=JsonSchema(MyRecordClass)
.
initial_sequence_id
: Set the baseline for the sequence ids for messages published by the producer. First message will be using `(initialSequenceId + 1)`` as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified.send_timeout_millis
: If a message is not acknowledged by the server before thesend_timeout
expires, an error will be reported.compression_type
: Set the compression type for the producer. By default, message payloads are not compressed. Supported compression types areCompressionType.LZ4
,CompressionType.ZLib
,CompressionType.ZSTD
andCompressionType.SNAPPY
. ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that release in order to be able to receive messages compressed with ZSTD. SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to be able to receive messages compressed with SNAPPY.max_pending_messages
: Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.max_pending_messages_across_partitions
: Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions from the broker.block_if_queue_full
: Set whethersend_async
operations should block when the outgoing message queue is full.message_routing_mode
: Set the message routing mode for the partitioned producer. Default isPartitionsRoutingMode.RoundRobinDistribution
, other option isPartitionsRoutingMode.UseSinglePartition
lazy_start_partitioned_producers
: This config affects producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition or start lazily on demand. The internal producer of one partition is always started eagerly, chosen by the routing policy, but the internal producers of any additional partitions are started on demand, upon receiving their first message. Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition routing policy is used without keyed messages. Because producer connection can be on demand, this can produce extra send latency for the first messages of a given partition.properties
: Sets the properties for the producer. The properties associated with a producer can be used for identify a producer at broker side.batching_type
: Sets the batching type for the producer. There are two batching type: DefaultBatching and KeyBasedBatching.Default batching incoming single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) batched into single batch message: [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
KeyBasedBatching incoming single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) batched into single batch message: [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
- encryption_key: The key used for symmetric encryption, configured on the producer side
- crypto_key_reader: Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer
609 def subscribe(self, topic, subscription_name, 610 consumer_type=ConsumerType.Exclusive, 611 schema=schema.BytesSchema(), 612 message_listener=None, 613 receiver_queue_size=1000, 614 max_total_receiver_queue_size_across_partitions=50000, 615 consumer_name=None, 616 unacked_messages_timeout_ms=None, 617 broker_consumer_stats_cache_time_ms=30000, 618 negative_ack_redelivery_delay_ms=60000, 619 is_read_compacted=False, 620 properties=None, 621 pattern_auto_discovery_period=60, 622 initial_position=InitialPosition.Latest, 623 crypto_key_reader=None, 624 replicate_subscription_state_enabled=False 625 ): 626 """ 627 Subscribe to the given topic and subscription combination. 628 629 **Args** 630 631 * `topic`: The name of the topic, list of topics or regex pattern. 632 This method will accept these forms: 633 - `topic='my-topic'` 634 - `topic=['topic-1', 'topic-2', 'topic-3']` 635 - `topic=re.compile('persistent://public/default/topic-*')` 636 * `subscription`: The name of the subscription. 637 638 **Options** 639 640 * `consumer_type`: 641 Select the subscription type to be used when subscribing to the topic. 642 * `schema`: 643 Define the schema of the data that will be received by this consumer. 644 * `message_listener`: 645 Sets a message listener for the consumer. When the listener is set, 646 the application will receive messages through it. Calls to 647 `consumer.receive()` will not be allowed. The listener function needs 648 to accept (consumer, message), for example: 649 650 #!python 651 def my_listener(consumer, message): 652 # process message 653 consumer.acknowledge(message) 654 655 * `receiver_queue_size`: 656 Sets the size of the consumer receive queue. The consumer receive 657 queue controls how many messages can be accumulated by the consumer 658 before the application calls `receive()`. Using a higher value could 659 potentially increase the consumer throughput at the expense of higher 660 memory utilization. Setting the consumer queue size to zero decreases 661 the throughput of the consumer by disabling pre-fetching of messages. 662 This approach improves the message distribution on shared subscription 663 by pushing messages only to those consumers that are ready to process 664 them. Neither receive with timeout nor partitioned topics can be used 665 if the consumer queue size is zero. The `receive()` function call 666 should not be interrupted when the consumer queue size is zero. The 667 default value is 1000 messages and should work well for most use 668 cases. 669 * `max_total_receiver_queue_size_across_partitions` 670 Set the max total receiver queue size across partitions. 671 This setting will be used to reduce the receiver queue size for individual partitions 672 * `consumer_name`: 673 Sets the consumer name. 674 * `unacked_messages_timeout_ms`: 675 Sets the timeout in milliseconds for unacknowledged messages. The 676 timeout needs to be greater than 10 seconds. An exception is thrown if 677 the given value is less than 10 seconds. If a successful 678 acknowledgement is not sent within the timeout, all the unacknowledged 679 messages are redelivered. 680 * `negative_ack_redelivery_delay_ms`: 681 The delay after which to redeliver the messages that failed to be 682 processed (with the `consumer.negative_acknowledge()`) 683 * `broker_consumer_stats_cache_time_ms`: 684 Sets the time duration for which the broker-side consumer stats will 685 be cached in the client. 686 * `is_read_compacted`: 687 Selects whether to read the compacted version of the topic 688 * `properties`: 689 Sets the properties for the consumer. The properties associated with a consumer 690 can be used for identify a consumer at broker side. 691 * `pattern_auto_discovery_period`: 692 Periods of seconds for consumer to auto discover match topics. 693 * `initial_position`: 694 Set the initial position of a consumer when subscribing to the topic. 695 It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`. 696 Default: `Latest`. 697 * crypto_key_reader: 698 Symmetric encryption class implementation, configuring public key encryption messages for the producer 699 and private key decryption messages for the consumer 700 * replicate_subscription_state_enabled: 701 Set whether the subscription status should be replicated. 702 Default: `False`. 703 """ 704 _check_type(str, subscription_name, 'subscription_name') 705 _check_type(ConsumerType, consumer_type, 'consumer_type') 706 _check_type(_schema.Schema, schema, 'schema') 707 _check_type(int, receiver_queue_size, 'receiver_queue_size') 708 _check_type(int, max_total_receiver_queue_size_across_partitions, 709 'max_total_receiver_queue_size_across_partitions') 710 _check_type_or_none(str, consumer_name, 'consumer_name') 711 _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') 712 _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') 713 _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms') 714 _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') 715 _check_type(bool, is_read_compacted, 'is_read_compacted') 716 _check_type_or_none(dict, properties, 'properties') 717 _check_type(InitialPosition, initial_position, 'initial_position') 718 _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') 719 720 conf = _pulsar.ConsumerConfiguration() 721 conf.consumer_type(consumer_type) 722 conf.read_compacted(is_read_compacted) 723 if message_listener: 724 conf.message_listener(_listener_wrapper(message_listener, schema)) 725 conf.receiver_queue_size(receiver_queue_size) 726 conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) 727 if consumer_name: 728 conf.consumer_name(consumer_name) 729 if unacked_messages_timeout_ms: 730 conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) 731 732 conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms) 733 conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) 734 if properties: 735 for k, v in properties.items(): 736 conf.property(k, v) 737 conf.subscription_initial_position(initial_position) 738 739 conf.schema(schema.schema_info()) 740 741 if crypto_key_reader: 742 conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) 743 744 conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) 745 746 c = Consumer() 747 if isinstance(topic, str): 748 # Single topic 749 c._consumer = self._client.subscribe(topic, subscription_name, conf) 750 elif isinstance(topic, list): 751 # List of topics 752 c._consumer = self._client.subscribe_topics(topic, subscription_name, conf) 753 elif isinstance(topic, _retype): 754 # Regex pattern 755 c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf) 756 else: 757 raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") 758 759 c._client = self 760 c._schema = schema 761 self._consumers.append(c) 762 return c
Subscribe to the given topic and subscription combination.
Args
topic
: The name of the topic, list of topics or regex pattern. This method will accept these forms:topic='my-topic'
topic=['topic-1', 'topic-2', 'topic-3']
topic=re.compile('persistent://public/default/topic-*')
subscription
: The name of the subscription.
Options
consumer_type
: Select the subscription type to be used when subscribing to the topic.schema
: Define the schema of the data that will be received by this consumer.message_listener
: Sets a message listener for the consumer. When the listener is set, the application will receive messages through it. Calls toconsumer.receive()
will not be allowed. The listener function needs to accept (consumer, message), for example:#!python def my_listener(consumer, message): # process message consumer.acknowledge(message)
receiver_queue_size
: Sets the size of the consumer receive queue. The consumer receive queue controls how many messages can be accumulated by the consumer before the application callsreceive()
. Using a higher value could potentially increase the consumer throughput at the expense of higher memory utilization. Setting the consumer queue size to zero decreases the throughput of the consumer by disabling pre-fetching of messages. This approach improves the message distribution on shared subscription by pushing messages only to those consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used if the consumer queue size is zero. Thereceive()
function call should not be interrupted when the consumer queue size is zero. The default value is 1000 messages and should work well for most use cases.max_total_receiver_queue_size_across_partitions
Set the max total receiver queue size across partitions. This setting will be used to reduce the receiver queue size for individual partitionsconsumer_name
: Sets the consumer name.unacked_messages_timeout_ms
: Sets the timeout in milliseconds for unacknowledged messages. The timeout needs to be greater than 10 seconds. An exception is thrown if the given value is less than 10 seconds. If a successful acknowledgement is not sent within the timeout, all the unacknowledged messages are redelivered.negative_ack_redelivery_delay_ms
: The delay after which to redeliver the messages that failed to be processed (with theconsumer.negative_acknowledge()
)broker_consumer_stats_cache_time_ms
: Sets the time duration for which the broker-side consumer stats will be cached in the client.is_read_compacted
: Selects whether to read the compacted version of the topicproperties
: Sets the properties for the consumer. The properties associated with a consumer can be used for identify a consumer at broker side.pattern_auto_discovery_period
: Periods of seconds for consumer to auto discover match topics.initial_position
: Set the initial position of a consumer when subscribing to the topic. It could be either:InitialPosition.Earliest
orInitialPosition.Latest
. Default:Latest
.- crypto_key_reader: Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer
- replicate_subscription_state_enabled:
Set whether the subscription status should be replicated.
Default:
False
.
764 def create_reader(self, topic, start_message_id, 765 schema=schema.BytesSchema(), 766 reader_listener=None, 767 receiver_queue_size=1000, 768 reader_name=None, 769 subscription_role_prefix=None, 770 is_read_compacted=False, 771 crypto_key_reader=None 772 ): 773 """ 774 Create a reader on a particular topic 775 776 **Args** 777 778 * `topic`: The name of the topic. 779 * `start_message_id`: The initial reader positioning is done by specifying a message id. 780 The options are: 781 * `MessageId.earliest`: Start reading from the earliest message available in the topic 782 * `MessageId.latest`: Start reading from the end topic, only getting messages published 783 after the reader was created 784 * `MessageId`: When passing a particular message id, the reader will position itself on 785 that specific position. The first message to be read will be the message next to the 786 specified messageId. Message id can be serialized into a string and deserialized 787 back into a `MessageId` object: 788 789 # Serialize to string 790 s = msg.message_id().serialize() 791 792 # Deserialize from string 793 msg_id = MessageId.deserialize(s) 794 795 **Options** 796 797 * `schema`: 798 Define the schema of the data that will be received by this reader. 799 * `reader_listener`: 800 Sets a message listener for the reader. When the listener is set, 801 the application will receive messages through it. Calls to 802 `reader.read_next()` will not be allowed. The listener function needs 803 to accept (reader, message), for example: 804 805 def my_listener(reader, message): 806 # process message 807 pass 808 809 * `receiver_queue_size`: 810 Sets the size of the reader receive queue. The reader receive 811 queue controls how many messages can be accumulated by the reader 812 before the application calls `read_next()`. Using a higher value could 813 potentially increase the reader throughput at the expense of higher 814 memory utilization. 815 * `reader_name`: 816 Sets the reader name. 817 * `subscription_role_prefix`: 818 Sets the subscription role prefix. 819 * `is_read_compacted`: 820 Selects whether to read the compacted version of the topic 821 * crypto_key_reader: 822 Symmetric encryption class implementation, configuring public key encryption messages for the producer 823 and private key decryption messages for the consumer 824 """ 825 _check_type(str, topic, 'topic') 826 _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') 827 _check_type(_schema.Schema, schema, 'schema') 828 _check_type(int, receiver_queue_size, 'receiver_queue_size') 829 _check_type_or_none(str, reader_name, 'reader_name') 830 _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') 831 _check_type(bool, is_read_compacted, 'is_read_compacted') 832 _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') 833 834 conf = _pulsar.ReaderConfiguration() 835 if reader_listener: 836 conf.reader_listener(_listener_wrapper(reader_listener, schema)) 837 conf.receiver_queue_size(receiver_queue_size) 838 if reader_name: 839 conf.reader_name(reader_name) 840 if subscription_role_prefix: 841 conf.subscription_role_prefix(subscription_role_prefix) 842 conf.schema(schema.schema_info()) 843 conf.read_compacted(is_read_compacted) 844 if crypto_key_reader: 845 conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) 846 847 c = Reader() 848 c._reader = self._client.create_reader(topic, start_message_id, conf) 849 c._client = self 850 c._schema = schema 851 self._consumers.append(c) 852 return c
Create a reader on a particular topic
Args
topic
: The name of the topic.start_message_id
: The initial reader positioning is done by specifying a message id. The options are:MessageId.earliest
: Start reading from the earliest message available in the topicMessageId.latest
: Start reading from the end topic, only getting messages published after the reader was createdMessageId
: When passing a particular message id, the reader will position itself on that specific position. The first message to be read will be the message next to the specified messageId. Message id can be serialized into a string and deserialized back into aMessageId
object:# Serialize to string s = msg.message_id().serialize()
# Deserialize from string msg_id = MessageId.deserialize(s)
Options
schema
: Define the schema of the data that will be received by this reader.reader_listener
: Sets a message listener for the reader. When the listener is set, the application will receive messages through it. Calls toreader.read_next()
will not be allowed. The listener function needs to accept (reader, message), for example:def my_listener(reader, message): # process message pass
receiver_queue_size
: Sets the size of the reader receive queue. The reader receive queue controls how many messages can be accumulated by the reader before the application callsread_next()
. Using a higher value could potentially increase the reader throughput at the expense of higher memory utilization.reader_name
: Sets the reader name.subscription_role_prefix
: Sets the subscription role prefix.is_read_compacted
: Selects whether to read the compacted version of the topic- crypto_key_reader: Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer
854 def get_topic_partitions(self, topic): 855 """ 856 Get the list of partitions for a given topic. 857 858 If the topic is partitioned, this will return a list of partition names. If the topic is not 859 partitioned, the returned list will contain the topic name itself. 860 861 This can be used to discover the partitions and create Reader, Consumer or Producer 862 instances directly on a particular partition. 863 :param topic: the topic name to lookup 864 :return: a list of partition name 865 """ 866 _check_type(str, topic, 'topic') 867 return self._client.get_topic_partitions(topic)
Get the list of partitions for a given topic.
If the topic is partitioned, this will return a list of partition names. If the topic is not partitioned, the returned list will contain the topic name itself.
This can be used to discover the partitions and create Reader, Consumer or Producer instances directly on a particular partition.
Parameters
- topic: the topic name to lookup
Returns
a list of partition name
869 def shutdown(self): 870 """ 871 Perform immediate shutdown of Pulsar client. 872 873 Release all resources and close all producer, consumer, and readers without waiting 874 for ongoing operations to complete. 875 """ 876 self._client.shutdown()
Perform immediate shutdown of Pulsar client.
Release all resources and close all producer, consumer, and readers without waiting for ongoing operations to complete.
885class Producer: 886 """ 887 The Pulsar message producer, used to publish messages on a topic. 888 """ 889 890 def topic(self): 891 """ 892 Return the topic which producer is publishing to 893 """ 894 return self._producer.topic() 895 896 def producer_name(self): 897 """ 898 Return the producer name which could have been assigned by the 899 system or specified by the client 900 """ 901 return self._producer.producer_name() 902 903 def last_sequence_id(self): 904 """ 905 Get the last sequence id that was published by this producer. 906 907 This represent either the automatically assigned or custom sequence id 908 (set on the `MessageBuilder`) that was published and acknowledged by the broker. 909 910 After recreating a producer with the same producer name, this will return the 911 last message that was published in the previous producer session, or -1 if 912 there no message was ever published. 913 """ 914 return self._producer.last_sequence_id() 915 916 def send(self, content, 917 properties=None, 918 partition_key=None, 919 sequence_id=None, 920 replication_clusters=None, 921 disable_replication=False, 922 event_timestamp=None, 923 deliver_at=None, 924 deliver_after=None, 925 ): 926 """ 927 Publish a message on the topic. Blocks until the message is acknowledged 928 929 Returns a `MessageId` object that represents where the message is persisted. 930 931 **Args** 932 933 * `content`: 934 A `bytes` object with the message payload. 935 936 **Options** 937 938 * `properties`: 939 A dict of application-defined string properties. 940 * `partition_key`: 941 Sets the partition key for message routing. A hash of this key is used 942 to determine the message's topic partition. 943 * `sequence_id`: 944 Specify a custom sequence id for the message being published. 945 * `replication_clusters`: 946 Override namespace replication clusters. Note that it is the caller's 947 responsibility to provide valid cluster names and that all clusters 948 have been previously configured as topics. Given an empty list, 949 the message will replicate according to the namespace configuration. 950 * `disable_replication`: 951 Do not replicate this message. 952 * `event_timestamp`: 953 Timestamp in millis of the timestamp of event creation 954 * `deliver_at`: 955 Specify the this message should not be delivered earlier than the 956 specified timestamp. 957 The timestamp is milliseconds and based on UTC 958 * `deliver_after`: 959 Specify a delay in timedelta for the delivery of the messages. 960 961 """ 962 msg = self._build_msg(content, properties, partition_key, sequence_id, 963 replication_clusters, disable_replication, event_timestamp, 964 deliver_at, deliver_after) 965 return MessageId.deserialize(self._producer.send(msg)) 966 967 def send_async(self, content, callback, 968 properties=None, 969 partition_key=None, 970 sequence_id=None, 971 replication_clusters=None, 972 disable_replication=False, 973 event_timestamp=None, 974 deliver_at=None, 975 deliver_after=None, 976 ): 977 """ 978 Send a message asynchronously. 979 980 The `callback` will be invoked once the message has been acknowledged 981 by the broker. 982 983 Example: 984 985 #!python 986 def callback(res, msg_id): 987 print('Message published: %s' % res) 988 989 producer.send_async(msg, callback) 990 991 When the producer queue is full, by default the message will be rejected 992 and the callback invoked with an error code. 993 994 **Args** 995 996 * `content`: 997 A `bytes` object with the message payload. 998 999 **Options** 1000 1001 * `properties`: 1002 A dict of application0-defined string properties. 1003 * `partition_key`: 1004 Sets the partition key for the message routing. A hash of this key is 1005 used to determine the message's topic partition. 1006 * `sequence_id`: 1007 Specify a custom sequence id for the message being published. 1008 * `replication_clusters`: Override namespace replication clusters. Note 1009 that it is the caller's responsibility to provide valid cluster names 1010 and that all clusters have been previously configured as topics. 1011 Given an empty list, the message will replicate per the namespace 1012 configuration. 1013 * `disable_replication`: 1014 Do not replicate this message. 1015 * `event_timestamp`: 1016 Timestamp in millis of the timestamp of event creation 1017 * `deliver_at`: 1018 Specify the this message should not be delivered earlier than the 1019 specified timestamp. 1020 The timestamp is milliseconds and based on UTC 1021 * `deliver_after`: 1022 Specify a delay in timedelta for the delivery of the messages. 1023 """ 1024 msg = self._build_msg(content, properties, partition_key, sequence_id, 1025 replication_clusters, disable_replication, event_timestamp, 1026 deliver_at, deliver_after) 1027 self._producer.send_async(msg, callback) 1028 1029 1030 def flush(self): 1031 """ 1032 Flush all the messages buffered in the client and wait until all messages have been 1033 successfully persisted 1034 """ 1035 self._producer.flush() 1036 1037 1038 def close(self): 1039 """ 1040 Close the producer. 1041 """ 1042 self._producer.close() 1043 1044 def _build_msg(self, content, properties, partition_key, sequence_id, 1045 replication_clusters, disable_replication, event_timestamp, 1046 deliver_at, deliver_after): 1047 data = self._schema.encode(content) 1048 1049 _check_type(bytes, data, 'data') 1050 _check_type_or_none(dict, properties, 'properties') 1051 _check_type_or_none(str, partition_key, 'partition_key') 1052 _check_type_or_none(int, sequence_id, 'sequence_id') 1053 _check_type_or_none(list, replication_clusters, 'replication_clusters') 1054 _check_type(bool, disable_replication, 'disable_replication') 1055 _check_type_or_none(int, event_timestamp, 'event_timestamp') 1056 _check_type_or_none(int, deliver_at, 'deliver_at') 1057 _check_type_or_none(timedelta, deliver_after, 'deliver_after') 1058 1059 mb = _pulsar.MessageBuilder() 1060 mb.content(data) 1061 if properties: 1062 for k, v in properties.items(): 1063 mb.property(k, v) 1064 if partition_key: 1065 mb.partition_key(partition_key) 1066 if sequence_id: 1067 mb.sequence_id(sequence_id) 1068 if replication_clusters: 1069 mb.replication_clusters(replication_clusters) 1070 if disable_replication: 1071 mb.disable_replication(disable_replication) 1072 if event_timestamp: 1073 mb.event_timestamp(event_timestamp) 1074 if deliver_at: 1075 mb.deliver_at(deliver_at) 1076 if deliver_after: 1077 mb.deliver_after(deliver_after) 1078 1079 return mb.build() 1080 1081 def is_connected(self): 1082 """ 1083 Check if the producer is connected or not. 1084 """ 1085 return self._producer.is_connected()
The Pulsar message producer, used to publish messages on a topic.
890 def topic(self): 891 """ 892 Return the topic which producer is publishing to 893 """ 894 return self._producer.topic()
Return the topic which producer is publishing to
896 def producer_name(self): 897 """ 898 Return the producer name which could have been assigned by the 899 system or specified by the client 900 """ 901 return self._producer.producer_name()
Return the producer name which could have been assigned by the system or specified by the client
903 def last_sequence_id(self): 904 """ 905 Get the last sequence id that was published by this producer. 906 907 This represent either the automatically assigned or custom sequence id 908 (set on the `MessageBuilder`) that was published and acknowledged by the broker. 909 910 After recreating a producer with the same producer name, this will return the 911 last message that was published in the previous producer session, or -1 if 912 there no message was ever published. 913 """ 914 return self._producer.last_sequence_id()
Get the last sequence id that was published by this producer.
This represent either the automatically assigned or custom sequence id
(set on the MessageBuilder
) that was published and acknowledged by the broker.
After recreating a producer with the same producer name, this will return the last message that was published in the previous producer session, or -1 if there no message was ever published.
916 def send(self, content, 917 properties=None, 918 partition_key=None, 919 sequence_id=None, 920 replication_clusters=None, 921 disable_replication=False, 922 event_timestamp=None, 923 deliver_at=None, 924 deliver_after=None, 925 ): 926 """ 927 Publish a message on the topic. Blocks until the message is acknowledged 928 929 Returns a `MessageId` object that represents where the message is persisted. 930 931 **Args** 932 933 * `content`: 934 A `bytes` object with the message payload. 935 936 **Options** 937 938 * `properties`: 939 A dict of application-defined string properties. 940 * `partition_key`: 941 Sets the partition key for message routing. A hash of this key is used 942 to determine the message's topic partition. 943 * `sequence_id`: 944 Specify a custom sequence id for the message being published. 945 * `replication_clusters`: 946 Override namespace replication clusters. Note that it is the caller's 947 responsibility to provide valid cluster names and that all clusters 948 have been previously configured as topics. Given an empty list, 949 the message will replicate according to the namespace configuration. 950 * `disable_replication`: 951 Do not replicate this message. 952 * `event_timestamp`: 953 Timestamp in millis of the timestamp of event creation 954 * `deliver_at`: 955 Specify the this message should not be delivered earlier than the 956 specified timestamp. 957 The timestamp is milliseconds and based on UTC 958 * `deliver_after`: 959 Specify a delay in timedelta for the delivery of the messages. 960 961 """ 962 msg = self._build_msg(content, properties, partition_key, sequence_id, 963 replication_clusters, disable_replication, event_timestamp, 964 deliver_at, deliver_after) 965 return MessageId.deserialize(self._producer.send(msg))
Publish a message on the topic. Blocks until the message is acknowledged
Returns a MessageId
object that represents where the message is persisted.
Args
content
: Abytes
object with the message payload.
Options
properties
: A dict of application-defined string properties.partition_key
: Sets the partition key for message routing. A hash of this key is used to determine the message's topic partition.sequence_id
: Specify a custom sequence id for the message being published.replication_clusters
: Override namespace replication clusters. Note that it is the caller's responsibility to provide valid cluster names and that all clusters have been previously configured as topics. Given an empty list, the message will replicate according to the namespace configuration.disable_replication
: Do not replicate this message.event_timestamp
: Timestamp in millis of the timestamp of event creationdeliver_at
: Specify the this message should not be delivered earlier than the specified timestamp. The timestamp is milliseconds and based on UTCdeliver_after
: Specify a delay in timedelta for the delivery of the messages.
967 def send_async(self, content, callback, 968 properties=None, 969 partition_key=None, 970 sequence_id=None, 971 replication_clusters=None, 972 disable_replication=False, 973 event_timestamp=None, 974 deliver_at=None, 975 deliver_after=None, 976 ): 977 """ 978 Send a message asynchronously. 979 980 The `callback` will be invoked once the message has been acknowledged 981 by the broker. 982 983 Example: 984 985 #!python 986 def callback(res, msg_id): 987 print('Message published: %s' % res) 988 989 producer.send_async(msg, callback) 990 991 When the producer queue is full, by default the message will be rejected 992 and the callback invoked with an error code. 993 994 **Args** 995 996 * `content`: 997 A `bytes` object with the message payload. 998 999 **Options** 1000 1001 * `properties`: 1002 A dict of application0-defined string properties. 1003 * `partition_key`: 1004 Sets the partition key for the message routing. A hash of this key is 1005 used to determine the message's topic partition. 1006 * `sequence_id`: 1007 Specify a custom sequence id for the message being published. 1008 * `replication_clusters`: Override namespace replication clusters. Note 1009 that it is the caller's responsibility to provide valid cluster names 1010 and that all clusters have been previously configured as topics. 1011 Given an empty list, the message will replicate per the namespace 1012 configuration. 1013 * `disable_replication`: 1014 Do not replicate this message. 1015 * `event_timestamp`: 1016 Timestamp in millis of the timestamp of event creation 1017 * `deliver_at`: 1018 Specify the this message should not be delivered earlier than the 1019 specified timestamp. 1020 The timestamp is milliseconds and based on UTC 1021 * `deliver_after`: 1022 Specify a delay in timedelta for the delivery of the messages. 1023 """ 1024 msg = self._build_msg(content, properties, partition_key, sequence_id, 1025 replication_clusters, disable_replication, event_timestamp, 1026 deliver_at, deliver_after) 1027 self._producer.send_async(msg, callback)
Send a message asynchronously.
The callback
will be invoked once the message has been acknowledged
by the broker.
Example:
#!python
def callback(res, msg_id):
print('Message published: %s' % res)
producer.send_async(msg, callback)
When the producer queue is full, by default the message will be rejected and the callback invoked with an error code.
Args
content
: Abytes
object with the message payload.
Options
properties
: A dict of application0-defined string properties.partition_key
: Sets the partition key for the message routing. A hash of this key is used to determine the message's topic partition.sequence_id
: Specify a custom sequence id for the message being published.replication_clusters
: Override namespace replication clusters. Note that it is the caller's responsibility to provide valid cluster names and that all clusters have been previously configured as topics. Given an empty list, the message will replicate per the namespace configuration.disable_replication
: Do not replicate this message.event_timestamp
: Timestamp in millis of the timestamp of event creationdeliver_at
: Specify the this message should not be delivered earlier than the specified timestamp. The timestamp is milliseconds and based on UTCdeliver_after
: Specify a delay in timedelta for the delivery of the messages.
1030 def flush(self): 1031 """ 1032 Flush all the messages buffered in the client and wait until all messages have been 1033 successfully persisted 1034 """ 1035 self._producer.flush()
Flush all the messages buffered in the client and wait until all messages have been successfully persisted
1088class Consumer: 1089 """ 1090 Pulsar consumer. 1091 """ 1092 1093 def topic(self): 1094 """ 1095 Return the topic this consumer is subscribed to. 1096 """ 1097 return self._consumer.topic() 1098 1099 def subscription_name(self): 1100 """ 1101 Return the subscription name. 1102 """ 1103 return self._consumer.subscription_name() 1104 1105 def unsubscribe(self): 1106 """ 1107 Unsubscribe the current consumer from the topic. 1108 1109 This method will block until the operation is completed. Once the 1110 consumer is unsubscribed, no more messages will be received and 1111 subsequent new messages will not be retained for this consumer. 1112 1113 This consumer object cannot be reused. 1114 """ 1115 return self._consumer.unsubscribe() 1116 1117 def receive(self, timeout_millis=None): 1118 """ 1119 Receive a single message. 1120 1121 If a message is not immediately available, this method will block until 1122 a new message is available. 1123 1124 **Options** 1125 1126 * `timeout_millis`: 1127 If specified, the receive will raise an exception if a message is not 1128 available within the timeout. 1129 """ 1130 if timeout_millis is None: 1131 msg = self._consumer.receive() 1132 else: 1133 _check_type(int, timeout_millis, 'timeout_millis') 1134 msg = self._consumer.receive(timeout_millis) 1135 1136 m = Message() 1137 m._message = msg 1138 m._schema = self._schema 1139 return m 1140 1141 def acknowledge(self, message): 1142 """ 1143 Acknowledge the reception of a single message. 1144 1145 This method will block until an acknowledgement is sent to the broker. 1146 After that, the message will not be re-delivered to this consumer. 1147 1148 **Args** 1149 1150 * `message`: 1151 The received message or message id. 1152 """ 1153 if isinstance(message, Message): 1154 self._consumer.acknowledge(message._message) 1155 else: 1156 self._consumer.acknowledge(message) 1157 1158 def acknowledge_cumulative(self, message): 1159 """ 1160 Acknowledge the reception of all the messages in the stream up to (and 1161 including) the provided message. 1162 1163 This method will block until an acknowledgement is sent to the broker. 1164 After that, the messages will not be re-delivered to this consumer. 1165 1166 **Args** 1167 1168 * `message`: 1169 The received message or message id. 1170 """ 1171 if isinstance(message, Message): 1172 self._consumer.acknowledge_cumulative(message._message) 1173 else: 1174 self._consumer.acknowledge_cumulative(message) 1175 1176 def negative_acknowledge(self, message): 1177 """ 1178 Acknowledge the failure to process a single message. 1179 1180 When a message is "negatively acked" it will be marked for redelivery after 1181 some fixed delay. The delay is configurable when constructing the consumer 1182 with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. 1183 1184 This call is not blocking. 1185 1186 **Args** 1187 1188 * `message`: 1189 The received message or message id. 1190 """ 1191 if isinstance(message, Message): 1192 self._consumer.negative_acknowledge(message._message) 1193 else: 1194 self._consumer.negative_acknowledge(message) 1195 1196 def pause_message_listener(self): 1197 """ 1198 Pause receiving messages via the `message_listener` until 1199 `resume_message_listener()` is called. 1200 """ 1201 self._consumer.pause_message_listener() 1202 1203 def resume_message_listener(self): 1204 """ 1205 Resume receiving the messages via the message listener. 1206 Asynchronously receive all the messages enqueued from the time 1207 `pause_message_listener()` was called. 1208 """ 1209 self._consumer.resume_message_listener() 1210 1211 def redeliver_unacknowledged_messages(self): 1212 """ 1213 Redelivers all the unacknowledged messages. In failover mode, the 1214 request is ignored if the consumer is not active for the given topic. In 1215 shared mode, the consumer's messages to be redelivered are distributed 1216 across all the connected consumers. This is a non-blocking call and 1217 doesn't throw an exception. In case the connection breaks, the messages 1218 are redelivered after reconnect. 1219 """ 1220 self._consumer.redeliver_unacknowledged_messages() 1221 1222 def seek(self, messageid): 1223 """ 1224 Reset the subscription associated with this consumer to a specific message id or publish timestamp. 1225 The message id can either be a specific message or represent the first or last messages in the topic. 1226 Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the 1227 seek() on the individual partitions. 1228 1229 **Args** 1230 1231 * `message`: 1232 The message id for seek, OR an integer event time to seek to 1233 """ 1234 self._consumer.seek(messageid) 1235 1236 def close(self): 1237 """ 1238 Close the consumer. 1239 """ 1240 self._consumer.close() 1241 self._client._consumers.remove(self) 1242 1243 def is_connected(self): 1244 """ 1245 Check if the consumer is connected or not. 1246 """ 1247 return self._consumer.is_connected()
Pulsar consumer.
1093 def topic(self): 1094 """ 1095 Return the topic this consumer is subscribed to. 1096 """ 1097 return self._consumer.topic()
Return the topic this consumer is subscribed to.
1099 def subscription_name(self): 1100 """ 1101 Return the subscription name. 1102 """ 1103 return self._consumer.subscription_name()
Return the subscription name.
1105 def unsubscribe(self): 1106 """ 1107 Unsubscribe the current consumer from the topic. 1108 1109 This method will block until the operation is completed. Once the 1110 consumer is unsubscribed, no more messages will be received and 1111 subsequent new messages will not be retained for this consumer. 1112 1113 This consumer object cannot be reused. 1114 """ 1115 return self._consumer.unsubscribe()
Unsubscribe the current consumer from the topic.
This method will block until the operation is completed. Once the consumer is unsubscribed, no more messages will be received and subsequent new messages will not be retained for this consumer.
This consumer object cannot be reused.
1117 def receive(self, timeout_millis=None): 1118 """ 1119 Receive a single message. 1120 1121 If a message is not immediately available, this method will block until 1122 a new message is available. 1123 1124 **Options** 1125 1126 * `timeout_millis`: 1127 If specified, the receive will raise an exception if a message is not 1128 available within the timeout. 1129 """ 1130 if timeout_millis is None: 1131 msg = self._consumer.receive() 1132 else: 1133 _check_type(int, timeout_millis, 'timeout_millis') 1134 msg = self._consumer.receive(timeout_millis) 1135 1136 m = Message() 1137 m._message = msg 1138 m._schema = self._schema 1139 return m
Receive a single message.
If a message is not immediately available, this method will block until a new message is available.
Options
timeout_millis
: If specified, the receive will raise an exception if a message is not available within the timeout.
1141 def acknowledge(self, message): 1142 """ 1143 Acknowledge the reception of a single message. 1144 1145 This method will block until an acknowledgement is sent to the broker. 1146 After that, the message will not be re-delivered to this consumer. 1147 1148 **Args** 1149 1150 * `message`: 1151 The received message or message id. 1152 """ 1153 if isinstance(message, Message): 1154 self._consumer.acknowledge(message._message) 1155 else: 1156 self._consumer.acknowledge(message)
Acknowledge the reception of a single message.
This method will block until an acknowledgement is sent to the broker. After that, the message will not be re-delivered to this consumer.
Args
message
: The received message or message id.
1158 def acknowledge_cumulative(self, message): 1159 """ 1160 Acknowledge the reception of all the messages in the stream up to (and 1161 including) the provided message. 1162 1163 This method will block until an acknowledgement is sent to the broker. 1164 After that, the messages will not be re-delivered to this consumer. 1165 1166 **Args** 1167 1168 * `message`: 1169 The received message or message id. 1170 """ 1171 if isinstance(message, Message): 1172 self._consumer.acknowledge_cumulative(message._message) 1173 else: 1174 self._consumer.acknowledge_cumulative(message)
Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
This method will block until an acknowledgement is sent to the broker. After that, the messages will not be re-delivered to this consumer.
Args
message
: The received message or message id.
1176 def negative_acknowledge(self, message): 1177 """ 1178 Acknowledge the failure to process a single message. 1179 1180 When a message is "negatively acked" it will be marked for redelivery after 1181 some fixed delay. The delay is configurable when constructing the consumer 1182 with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. 1183 1184 This call is not blocking. 1185 1186 **Args** 1187 1188 * `message`: 1189 The received message or message id. 1190 """ 1191 if isinstance(message, Message): 1192 self._consumer.negative_acknowledge(message._message) 1193 else: 1194 self._consumer.negative_acknowledge(message)
Acknowledge the failure to process a single message.
When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
This call is not blocking.
Args
message
: The received message or message id.
1196 def pause_message_listener(self): 1197 """ 1198 Pause receiving messages via the `message_listener` until 1199 `resume_message_listener()` is called. 1200 """ 1201 self._consumer.pause_message_listener()
Pause receiving messages via the message_listener
until
resume_message_listener()
is called.
1203 def resume_message_listener(self): 1204 """ 1205 Resume receiving the messages via the message listener. 1206 Asynchronously receive all the messages enqueued from the time 1207 `pause_message_listener()` was called. 1208 """ 1209 self._consumer.resume_message_listener()
Resume receiving the messages via the message listener.
Asynchronously receive all the messages enqueued from the time
pause_message_listener()
was called.
1211 def redeliver_unacknowledged_messages(self): 1212 """ 1213 Redelivers all the unacknowledged messages. In failover mode, the 1214 request is ignored if the consumer is not active for the given topic. In 1215 shared mode, the consumer's messages to be redelivered are distributed 1216 across all the connected consumers. This is a non-blocking call and 1217 doesn't throw an exception. In case the connection breaks, the messages 1218 are redelivered after reconnect. 1219 """ 1220 self._consumer.redeliver_unacknowledged_messages()
Redelivers all the unacknowledged messages. In failover mode, the request is ignored if the consumer is not active for the given topic. In shared mode, the consumer's messages to be redelivered are distributed across all the connected consumers. This is a non-blocking call and doesn't throw an exception. In case the connection breaks, the messages are redelivered after reconnect.
1222 def seek(self, messageid): 1223 """ 1224 Reset the subscription associated with this consumer to a specific message id or publish timestamp. 1225 The message id can either be a specific message or represent the first or last messages in the topic. 1226 Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the 1227 seek() on the individual partitions. 1228 1229 **Args** 1230 1231 * `message`: 1232 The message id for seek, OR an integer event time to seek to 1233 """ 1234 self._consumer.seek(messageid)
Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.
Args
message
: The message id for seek, OR an integer event time to seek to
1251class Reader: 1252 """ 1253 Pulsar topic reader. 1254 """ 1255 1256 def topic(self): 1257 """ 1258 Return the topic this reader is reading from. 1259 """ 1260 return self._reader.topic() 1261 1262 def read_next(self, timeout_millis=None): 1263 """ 1264 Read a single message. 1265 1266 If a message is not immediately available, this method will block until 1267 a new message is available. 1268 1269 **Options** 1270 1271 * `timeout_millis`: 1272 If specified, the receive will raise an exception if a message is not 1273 available within the timeout. 1274 """ 1275 if timeout_millis is None: 1276 msg = self._reader.read_next() 1277 else: 1278 _check_type(int, timeout_millis, 'timeout_millis') 1279 msg = self._reader.read_next(timeout_millis) 1280 1281 m = Message() 1282 m._message = msg 1283 m._schema = self._schema 1284 return m 1285 1286 def has_message_available(self): 1287 """ 1288 Check if there is any message available to read from the current position. 1289 """ 1290 return self._reader.has_message_available(); 1291 1292 def seek(self, messageid): 1293 """ 1294 Reset this reader to a specific message id or publish timestamp. 1295 The message id can either be a specific message or represent the first or last messages in the topic. 1296 Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the 1297 seek() on the individual partitions. 1298 1299 **Args** 1300 1301 * `message`: 1302 The message id for seek, OR an integer event time to seek to 1303 """ 1304 self._reader.seek(messageid) 1305 1306 def close(self): 1307 """ 1308 Close the reader. 1309 """ 1310 self._reader.close() 1311 self._client._consumers.remove(self) 1312 1313 def is_connected(self): 1314 """ 1315 Check if the reader is connected or not. 1316 """ 1317 return self._reader.is_connected()
Pulsar topic reader.
1256 def topic(self): 1257 """ 1258 Return the topic this reader is reading from. 1259 """ 1260 return self._reader.topic()
Return the topic this reader is reading from.
1262 def read_next(self, timeout_millis=None): 1263 """ 1264 Read a single message. 1265 1266 If a message is not immediately available, this method will block until 1267 a new message is available. 1268 1269 **Options** 1270 1271 * `timeout_millis`: 1272 If specified, the receive will raise an exception if a message is not 1273 available within the timeout. 1274 """ 1275 if timeout_millis is None: 1276 msg = self._reader.read_next() 1277 else: 1278 _check_type(int, timeout_millis, 'timeout_millis') 1279 msg = self._reader.read_next(timeout_millis) 1280 1281 m = Message() 1282 m._message = msg 1283 m._schema = self._schema 1284 return m
Read a single message.
If a message is not immediately available, this method will block until a new message is available.
Options
timeout_millis
: If specified, the receive will raise an exception if a message is not available within the timeout.
1286 def has_message_available(self): 1287 """ 1288 Check if there is any message available to read from the current position. 1289 """ 1290 return self._reader.has_message_available();
Check if there is any message available to read from the current position.
1292 def seek(self, messageid): 1293 """ 1294 Reset this reader to a specific message id or publish timestamp. 1295 The message id can either be a specific message or represent the first or last messages in the topic. 1296 Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the 1297 seek() on the individual partitions. 1298 1299 **Args** 1300 1301 * `message`: 1302 The message id for seek, OR an integer event time to seek to 1303 """ 1304 self._reader.seek(messageid)
Reset this reader to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.
Args
message
: The message id for seek, OR an integer event time to seek to
1320class CryptoKeyReader: 1321 """ 1322 Default crypto key reader implementation 1323 """ 1324 def __init__(self, public_key_path, private_key_path): 1325 """ 1326 Create crypto key reader. 1327 1328 **Args** 1329 1330 * `public_key_path`: Path to the public key 1331 * `private_key_path`: Path to private key 1332 """ 1333 _check_type(str, public_key_path, 'public_key_path') 1334 _check_type(str, private_key_path, 'private_key_path') 1335 self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
Default crypto key reader implementation
1324 def __init__(self, public_key_path, private_key_path): 1325 """ 1326 Create crypto key reader. 1327 1328 **Args** 1329 1330 * `public_key_path`: Path to the public key 1331 * `private_key_path`: Path to private key 1332 """ 1333 _check_type(str, public_key_path, 'public_key_path') 1334 _check_type(str, private_key_path, 'private_key_path') 1335 self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
Create crypto key reader.
Args
public_key_path
: Path to the public keyprivate_key_path
: Path to private key