pulsar

The Pulsar Python client library is based on the existing C++ client library. All the same features are exposed through the Python interface.

Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.

Install from PyPI

Download Python wheel binary files for MacOS and Linux directly from the PyPI archive.

#!shell
$ sudo pip install pulsar-client

Install from sources

Follow the instructions to compile the Pulsar C++ client library. This method will also build the Python binding for the library.

To install the Python bindings:

#!shell
$ cd pulsar-client-cpp/python
$ sudo python setup.py install

Examples

Producer example

#!python
import pulsar

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('my-topic')

for i in range(10):
    producer.send(('Hello-%d' % i).encode('utf-8'))

client.close()

Consumer Example

#!python
import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', 'my-subscription')

while True:
    msg = consumer.receive()
    try:
        print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id())
        consumer.acknowledge(msg)
    except:
        consumer.negative_acknowledge(msg)

client.close()

Producer.send_async">Async producer example

#!python
import pulsar

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer(
                'my-topic',
                block_if_queue_full=True,
                batching_enabled=True,
                batching_max_publish_delay_ms=10
            )

def send_callback(res, msg_id):
    print('Message published res=%s', res)

while True:
    producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)

client.close()
   1#
   2# Licensed to the Apache Software Foundation (ASF) under one
   3# or more contributor license agreements.  See the NOTICE file
   4# distributed with this work for additional information
   5# regarding copyright ownership.  The ASF licenses this file
   6# to you under the Apache License, Version 2.0 (the
   7# "License"); you may not use this file except in compliance
   8# with the License.  You may obtain a copy of the License at
   9#
  10#   http://www.apache.org/licenses/LICENSE-2.0
  11#
  12# Unless required by applicable law or agreed to in writing,
  13# software distributed under the License is distributed on an
  14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15# KIND, either express or implied.  See the License for the
  16# specific language governing permissions and limitations
  17# under the License.
  18#
  19
  20"""
  21The Pulsar Python client library is based on the existing C++ client library.
  22All the same features are exposed through the Python interface.
  23
  24Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.
  25
  26## Install from PyPI
  27
  28Download Python wheel binary files for MacOS and Linux
  29directly from the PyPI archive.
  30
  31    #!shell
  32    $ sudo pip install pulsar-client
  33
  34## Install from sources
  35
  36Follow the instructions to compile the Pulsar C++ client library. This method
  37will also build the Python binding for the library.
  38
  39To install the Python bindings:
  40
  41    #!shell
  42    $ cd pulsar-client-cpp/python
  43    $ sudo python setup.py install
  44
  45## Examples
  46
  47### [Producer](#pulsar.Producer) example
  48
  49    #!python
  50    import pulsar
  51
  52    client = pulsar.Client('pulsar://localhost:6650')
  53
  54    producer = client.create_producer('my-topic')
  55
  56    for i in range(10):
  57        producer.send(('Hello-%d' % i).encode('utf-8'))
  58
  59    client.close()
  60
  61#### [Consumer](#pulsar.Consumer) Example
  62
  63    #!python
  64    import pulsar
  65
  66    client = pulsar.Client('pulsar://localhost:6650')
  67    consumer = client.subscribe('my-topic', 'my-subscription')
  68
  69    while True:
  70        msg = consumer.receive()
  71        try:
  72            print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id())
  73            consumer.acknowledge(msg)
  74        except:
  75            consumer.negative_acknowledge(msg)
  76
  77    client.close()
  78
  79### [Async producer](#pulsar.Producer.send_async) example
  80
  81    #!python
  82    import pulsar
  83
  84    client = pulsar.Client('pulsar://localhost:6650')
  85
  86    producer = client.create_producer(
  87                    'my-topic',
  88                    block_if_queue_full=True,
  89                    batching_enabled=True,
  90                    batching_max_publish_delay_ms=10
  91                )
  92
  93    def send_callback(res, msg_id):
  94        print('Message published res=%s', res)
  95
  96    while True:
  97        producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
  98
  99    client.close()
 100"""
 101
 102import logging
 103import _pulsar
 104
 105from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType  # noqa: F401
 106
 107from pulsar.exceptions import *
 108
 109from pulsar.functions.function import Function
 110from pulsar.functions.context import Context
 111from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
 112from pulsar import schema
 113_schema = schema
 114
 115import re
 116_retype = type(re.compile('x'))
 117
 118import certifi
 119from datetime import timedelta
 120
 121
 122class MessageId:
 123    """
 124    Represents a message id
 125    """
 126
 127    def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
 128        self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
 129
 130    'Represents the earliest message stored in a topic'
 131    earliest = _pulsar.MessageId.earliest
 132
 133    'Represents the latest message published on a topic'
 134    latest = _pulsar.MessageId.latest
 135
 136    def ledger_id(self):
 137        return self._msg_id.ledger_id()
 138
 139    def entry_id(self):
 140        return self._msg_id.entry_id()
 141
 142    def batch_index(self):
 143        return self._msg_id.batch_index()
 144
 145    def partition(self):
 146        return self._msg_id.partition()
 147
 148    def serialize(self):
 149        """
 150        Returns a bytes representation of the message id.
 151        This bytes sequence can be stored and later deserialized.
 152        """
 153        return self._msg_id.serialize()
 154
 155    @staticmethod
 156    def deserialize(message_id_bytes):
 157        """
 158        Deserialize a message id object from a previously
 159        serialized bytes sequence.
 160        """
 161        return _pulsar.MessageId.deserialize(message_id_bytes)
 162
 163
 164class Message:
 165    """
 166    Message objects are returned by a consumer, either by calling `receive` or
 167    through a listener.
 168    """
 169
 170    def data(self):
 171        """
 172        Returns object typed bytes with the payload of the message.
 173        """
 174        return self._message.data()
 175
 176    def value(self):
 177        """
 178        Returns object with the de-serialized version of the message content
 179        """
 180        return self._schema.decode(self._message.data())
 181
 182    def properties(self):
 183        """
 184        Return the properties attached to the message. Properties are
 185        application-defined key/value pairs that will be attached to the
 186        message.
 187        """
 188        return self._message.properties()
 189
 190    def partition_key(self):
 191        """
 192        Get the partitioning key for the message.
 193        """
 194        return self._message.partition_key()
 195
 196    def publish_timestamp(self):
 197        """
 198        Get the timestamp in milliseconds with the message publish time.
 199        """
 200        return self._message.publish_timestamp()
 201
 202    def event_timestamp(self):
 203        """
 204        Get the timestamp in milliseconds with the message event time.
 205        """
 206        return self._message.event_timestamp()
 207
 208    def message_id(self):
 209        """
 210        The message ID that can be used to refere to this particular message.
 211        """
 212        return self._message.message_id()
 213
 214    def topic_name(self):
 215        """
 216        Get the topic Name from which this message originated from
 217        """
 218        return self._message.topic_name()
 219
 220    def redelivery_count(self):
 221        """
 222        Get the redelivery count for this message
 223        """
 224        return self._message.redelivery_count()
 225
 226    def schema_version(self):
 227        """
 228        Get the schema version for this message
 229        """
 230        return self._message.schema_version()
 231
 232    @staticmethod
 233    def _wrap(_message):
 234        self = Message()
 235        self._message = _message
 236        return self
 237
 238
 239class MessageBatch:
 240
 241    def __init__(self):
 242        self._msg_batch = _pulsar.MessageBatch()
 243
 244    def with_message_id(self, msg_id):
 245        if not isinstance(msg_id, _pulsar.MessageId):
 246            if isinstance(msg_id, MessageId):
 247                msg_id = msg_id._msg_id
 248            else:
 249                raise TypeError("unknown message id type")
 250        self._msg_batch.with_message_id(msg_id)
 251        return self
 252
 253    def parse_from(self, data, size):
 254        self._msg_batch.parse_from(data, size)
 255        _msgs = self._msg_batch.messages()
 256        return list(map(Message._wrap, _msgs))
 257
 258
 259class Authentication:
 260    """
 261    Authentication provider object. Used to load authentication from an external
 262    shared library.
 263    """
 264    def __init__(self, dynamicLibPath, authParamsString):
 265        """
 266        Create the authentication provider instance.
 267
 268        **Args**
 269
 270        * `dynamicLibPath`: Path to the authentication provider shared library
 271          (such as `tls.so`)
 272        * `authParamsString`: Comma-separated list of provider-specific
 273          configuration params
 274        """
 275        _check_type(str, dynamicLibPath, 'dynamicLibPath')
 276        _check_type(str, authParamsString, 'authParamsString')
 277        self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
 278
 279
 280class AuthenticationTLS(Authentication):
 281    """
 282    TLS Authentication implementation
 283    """
 284    def __init__(self, certificate_path, private_key_path):
 285        """
 286        Create the TLS authentication provider instance.
 287
 288        **Args**
 289
 290        * `certificatePath`: Path to the public certificate
 291        * `privateKeyPath`: Path to private TLS key
 292        """
 293        _check_type(str, certificate_path, 'certificate_path')
 294        _check_type(str, private_key_path, 'private_key_path')
 295        self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
 296
 297
 298class AuthenticationToken(Authentication):
 299    """
 300    Token based authentication implementation
 301    """
 302    def __init__(self, token):
 303        """
 304        Create the token authentication provider instance.
 305
 306        **Args**
 307
 308        * `token`: A string containing the token or a functions that provides a
 309                   string with the token
 310        """
 311        if not (isinstance(token, str) or callable(token)):
 312            raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
 313        self.auth = _pulsar.AuthenticationToken(token)
 314
 315
 316class AuthenticationAthenz(Authentication):
 317    """
 318    Athenz Authentication implementation
 319    """
 320    def __init__(self, auth_params_string):
 321        """
 322        Create the Athenz authentication provider instance.
 323
 324        **Args**
 325
 326        * `auth_params_string`: JSON encoded configuration for Athenz client
 327        """
 328        _check_type(str, auth_params_string, 'auth_params_string')
 329        self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
 330
 331class AuthenticationOauth2(Authentication):
 332    """
 333    Oauth2 Authentication implementation
 334    """
 335    def __init__(self, auth_params_string):
 336        """
 337        Create the Oauth2 authentication provider instance.
 338
 339        **Args**
 340
 341        * `auth_params_string`: JSON encoded configuration for Oauth2 client
 342        """
 343        _check_type(str, auth_params_string, 'auth_params_string')
 344        self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
 345
 346class Client:
 347    """
 348    The Pulsar client. A single client instance can be used to create producers
 349    and consumers on multiple topics.
 350
 351    The client will share the same connection pool and threads across all
 352    producers and consumers.
 353    """
 354
 355    def __init__(self, service_url,
 356                 authentication=None,
 357                 operation_timeout_seconds=30,
 358                 io_threads=1,
 359                 message_listener_threads=1,
 360                 concurrent_lookup_requests=50000,
 361                 log_conf_file_path=None,
 362                 use_tls=False,
 363                 tls_trust_certs_file_path=None,
 364                 tls_allow_insecure_connection=False,
 365                 tls_validate_hostname=False,
 366                 logger=None,
 367                 connection_timeout_ms=10000,
 368                 ):
 369        """
 370        Create a new Pulsar client instance.
 371
 372        **Args**
 373
 374        * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
 375
 376        **Options**
 377
 378        * `authentication`:
 379          Set the authentication provider to be used with the broker. For example:
 380          `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
 381        * `operation_timeout_seconds`:
 382          Set timeout on client operations (subscribe, create producer, close,
 383          unsubscribe).
 384        * `io_threads`:
 385          Set the number of IO threads to be used by the Pulsar client.
 386        * `message_listener_threads`:
 387          Set the number of threads to be used by the Pulsar client when
 388          delivering messages through message listener. The default is 1 thread
 389          per Pulsar client. If using more than 1 thread, messages for distinct
 390          `message_listener`s will be delivered in different threads, however a
 391          single `MessageListener` will always be assigned to the same thread.
 392        * `concurrent_lookup_requests`:
 393          Number of concurrent lookup-requests allowed on each broker connection
 394          to prevent overload on the broker.
 395        * `log_conf_file_path`:
 396          Initialize log4cxx from a configuration file.
 397        * `use_tls`:
 398          Configure whether to use TLS encryption on the connection. This setting
 399          is deprecated. TLS will be automatically enabled if the `serviceUrl` is
 400          set to `pulsar+ssl://` or `https://`
 401        * `tls_trust_certs_file_path`:
 402          Set the path to the trusted TLS certificate file. If empty defaults to
 403          certifi.
 404        * `tls_allow_insecure_connection`:
 405          Configure whether the Pulsar client accepts untrusted TLS certificates
 406          from the broker.
 407        * `tls_validate_hostname`:
 408          Configure whether the Pulsar client validates that the hostname of the
 409          endpoint, matches the common name on the TLS certificate presented by
 410          the endpoint.
 411        * `logger`:
 412          Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
 413        * `connection_timeout_ms`:
 414          Set timeout in milliseconds on TCP connections.
 415        """
 416        _check_type(str, service_url, 'service_url')
 417        _check_type_or_none(Authentication, authentication, 'authentication')
 418        _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
 419        _check_type(int, connection_timeout_ms, 'connection_timeout_ms')
 420        _check_type(int, io_threads, 'io_threads')
 421        _check_type(int, message_listener_threads, 'message_listener_threads')
 422        _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
 423        _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
 424        _check_type(bool, use_tls, 'use_tls')
 425        _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
 426        _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
 427        _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
 428        _check_type_or_none(logging.Logger, logger, 'logger')
 429
 430        conf = _pulsar.ClientConfiguration()
 431        if authentication:
 432            conf.authentication(authentication.auth)
 433        conf.operation_timeout_seconds(operation_timeout_seconds)
 434        conf.connection_timeout(connection_timeout_ms)
 435        conf.io_threads(io_threads)
 436        conf.message_listener_threads(message_listener_threads)
 437        conf.concurrent_lookup_requests(concurrent_lookup_requests)
 438        if log_conf_file_path:
 439            conf.log_conf_file_path(log_conf_file_path)
 440        if logger:
 441            conf.set_logger(logger)
 442        if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
 443            conf.use_tls(True)
 444        if tls_trust_certs_file_path:
 445            conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
 446        else:
 447            conf.tls_trust_certs_file_path(certifi.where())
 448        conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
 449        conf.tls_validate_hostname(tls_validate_hostname)
 450        self._client = _pulsar.Client(service_url, conf)
 451        self._consumers = []
 452
 453    def create_producer(self, topic,
 454                        producer_name=None,
 455                        schema=schema.BytesSchema(),
 456                        initial_sequence_id=None,
 457                        send_timeout_millis=30000,
 458                        compression_type=CompressionType.NONE,
 459                        max_pending_messages=1000,
 460                        max_pending_messages_across_partitions=50000,
 461                        block_if_queue_full=False,
 462                        batching_enabled=False,
 463                        batching_max_messages=1000,
 464                        batching_max_allowed_size_in_bytes=128*1024,
 465                        batching_max_publish_delay_ms=10,
 466                        message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
 467                        lazy_start_partitioned_producers=False,
 468                        properties=None,
 469                        batching_type=BatchingType.Default,
 470                        encryption_key=None,
 471                        crypto_key_reader=None
 472                        ):
 473        """
 474        Create a new producer on a given topic.
 475
 476        **Args**
 477
 478        * `topic`:
 479          The topic name
 480
 481        **Options**
 482
 483        * `producer_name`:
 484           Specify a name for the producer. If not assigned,
 485           the system will generate a globally unique name which can be accessed
 486           with `Producer.producer_name()`. When specifying a name, it is app to
 487           the user to ensure that, for a given topic, the producer name is unique
 488           across all Pulsar's clusters.
 489        * `schema`:
 490           Define the schema of the data that will be published by this producer.
 491           The schema will be used for two purposes:
 492             - Validate the data format against the topic defined schema
 493             - Perform serialization/deserialization between data and objects
 494           An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
 495        * `initial_sequence_id`:
 496           Set the baseline for the sequence ids for messages
 497           published by the producer. First message will be using
 498           `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
 499           be assigned incremental sequence ids, if not otherwise specified.
 500        * `send_timeout_millis`:
 501          If a message is not acknowledged by the server before the
 502          `send_timeout` expires, an error will be reported.
 503        * `compression_type`:
 504          Set the compression type for the producer. By default, message
 505          payloads are not compressed. Supported compression types are
 506          `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
 507          ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
 508          release in order to be able to receive messages compressed with ZSTD.
 509          SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
 510          release in order to be able to receive messages compressed with SNAPPY.
 511        * `max_pending_messages`:
 512          Set the max size of the queue holding the messages pending to receive
 513          an acknowledgment from the broker.
 514        * `max_pending_messages_across_partitions`:
 515          Set the max size of the queue holding the messages pending to receive
 516          an acknowledgment across partitions from the broker.
 517        * `block_if_queue_full`: Set whether `send_async` operations should
 518          block when the outgoing message queue is full.
 519        * `message_routing_mode`:
 520          Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
 521          other option is `PartitionsRoutingMode.UseSinglePartition`
 522        * `lazy_start_partitioned_producers`:
 523          This config affects producers of partitioned topics only. It controls whether
 524          producers register and connect immediately to the owner broker of each partition
 525          or start lazily on demand. The internal producer of one partition is always
 526          started eagerly, chosen by the routing policy, but the internal producers of
 527          any additional partitions are started on demand, upon receiving their first
 528          message.
 529          Using this mode can reduce the strain on brokers for topics with large numbers of
 530          partitions and when the SinglePartition routing policy is used without keyed messages.
 531          Because producer connection can be on demand, this can produce extra send latency
 532          for the first messages of a given partition.
 533        * `properties`:
 534          Sets the properties for the producer. The properties associated with a producer
 535          can be used for identify a producer at broker side.
 536        * `batching_type`:
 537          Sets the batching type for the producer.
 538          There are two batching type: DefaultBatching and KeyBasedBatching.
 539            - Default batching
 540            incoming single messages:
 541            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
 542            batched into single batch message:
 543            [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
 544
 545            - KeyBasedBatching
 546            incoming single messages:
 547            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
 548            batched into single batch message:
 549            [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
 550        * encryption_key:
 551           The key used for symmetric encryption, configured on the producer side
 552        * crypto_key_reader:
 553           Symmetric encryption class implementation, configuring public key encryption messages for the producer
 554           and private key decryption messages for the consumer
 555        """
 556        _check_type(str, topic, 'topic')
 557        _check_type_or_none(str, producer_name, 'producer_name')
 558        _check_type(_schema.Schema, schema, 'schema')
 559        _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
 560        _check_type(int, send_timeout_millis, 'send_timeout_millis')
 561        _check_type(CompressionType, compression_type, 'compression_type')
 562        _check_type(int, max_pending_messages, 'max_pending_messages')
 563        _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
 564        _check_type(bool, block_if_queue_full, 'block_if_queue_full')
 565        _check_type(bool, batching_enabled, 'batching_enabled')
 566        _check_type(int, batching_max_messages, 'batching_max_messages')
 567        _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
 568        _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
 569        _check_type_or_none(dict, properties, 'properties')
 570        _check_type(BatchingType, batching_type, 'batching_type')
 571        _check_type_or_none(str, encryption_key, 'encryption_key')
 572        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
 573        _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
 574
 575        conf = _pulsar.ProducerConfiguration()
 576        conf.send_timeout_millis(send_timeout_millis)
 577        conf.compression_type(compression_type)
 578        conf.max_pending_messages(max_pending_messages)
 579        conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
 580        conf.block_if_queue_full(block_if_queue_full)
 581        conf.batching_enabled(batching_enabled)
 582        conf.batching_max_messages(batching_max_messages)
 583        conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
 584        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
 585        conf.partitions_routing_mode(message_routing_mode)
 586        conf.batching_type(batching_type)
 587        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
 588        if producer_name:
 589            conf.producer_name(producer_name)
 590        if initial_sequence_id:
 591            conf.initial_sequence_id(initial_sequence_id)
 592        if properties:
 593            for k, v in properties.items():
 594                conf.property(k, v)
 595
 596        conf.schema(schema.schema_info())
 597        if encryption_key:
 598            conf.encryption_key(encryption_key)
 599        if crypto_key_reader:
 600            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
 601
 602        p = Producer()
 603        p._producer = self._client.create_producer(topic, conf)
 604        p._schema = schema
 605        p._client = self._client
 606        return p
 607
 608    def subscribe(self, topic, subscription_name,
 609                  consumer_type=ConsumerType.Exclusive,
 610                  schema=schema.BytesSchema(),
 611                  message_listener=None,
 612                  receiver_queue_size=1000,
 613                  max_total_receiver_queue_size_across_partitions=50000,
 614                  consumer_name=None,
 615                  unacked_messages_timeout_ms=None,
 616                  broker_consumer_stats_cache_time_ms=30000,
 617                  negative_ack_redelivery_delay_ms=60000,
 618                  is_read_compacted=False,
 619                  properties=None,
 620                  pattern_auto_discovery_period=60,
 621                  initial_position=InitialPosition.Latest,
 622                  crypto_key_reader=None,
 623                  replicate_subscription_state_enabled=False
 624                  ):
 625        """
 626        Subscribe to the given topic and subscription combination.
 627
 628        **Args**
 629
 630        * `topic`: The name of the topic, list of topics or regex pattern.
 631                  This method will accept these forms:
 632                    - `topic='my-topic'`
 633                    - `topic=['topic-1', 'topic-2', 'topic-3']`
 634                    - `topic=re.compile('persistent://public/default/topic-*')`
 635        * `subscription`: The name of the subscription.
 636
 637        **Options**
 638
 639        * `consumer_type`:
 640          Select the subscription type to be used when subscribing to the topic.
 641        * `schema`:
 642           Define the schema of the data that will be received by this consumer.
 643        * `message_listener`:
 644          Sets a message listener for the consumer. When the listener is set,
 645          the application will receive messages through it. Calls to
 646          `consumer.receive()` will not be allowed. The listener function needs
 647          to accept (consumer, message), for example:
 648
 649                #!python
 650                def my_listener(consumer, message):
 651                    # process message
 652                    consumer.acknowledge(message)
 653
 654        * `receiver_queue_size`:
 655          Sets the size of the consumer receive queue. The consumer receive
 656          queue controls how many messages can be accumulated by the consumer
 657          before the application calls `receive()`. Using a higher value could
 658          potentially increase the consumer throughput at the expense of higher
 659          memory utilization. Setting the consumer queue size to zero decreases
 660          the throughput of the consumer by disabling pre-fetching of messages.
 661          This approach improves the message distribution on shared subscription
 662          by pushing messages only to those consumers that are ready to process
 663          them. Neither receive with timeout nor partitioned topics can be used
 664          if the consumer queue size is zero. The `receive()` function call
 665          should not be interrupted when the consumer queue size is zero. The
 666          default value is 1000 messages and should work well for most use
 667          cases.
 668        * `max_total_receiver_queue_size_across_partitions`
 669          Set the max total receiver queue size across partitions.
 670          This setting will be used to reduce the receiver queue size for individual partitions
 671        * `consumer_name`:
 672          Sets the consumer name.
 673        * `unacked_messages_timeout_ms`:
 674          Sets the timeout in milliseconds for unacknowledged messages. The
 675          timeout needs to be greater than 10 seconds. An exception is thrown if
 676          the given value is less than 10 seconds. If a successful
 677          acknowledgement is not sent within the timeout, all the unacknowledged
 678          messages are redelivered.
 679        * `negative_ack_redelivery_delay_ms`:
 680           The delay after which to redeliver the messages that failed to be
 681           processed (with the `consumer.negative_acknowledge()`)
 682        * `broker_consumer_stats_cache_time_ms`:
 683          Sets the time duration for which the broker-side consumer stats will
 684          be cached in the client.
 685        * `is_read_compacted`:
 686          Selects whether to read the compacted version of the topic
 687        * `properties`:
 688          Sets the properties for the consumer. The properties associated with a consumer
 689          can be used for identify a consumer at broker side.
 690        * `pattern_auto_discovery_period`:
 691          Periods of seconds for consumer to auto discover match topics.
 692        * `initial_position`:
 693          Set the initial position of a consumer  when subscribing to the topic.
 694          It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
 695          Default: `Latest`.
 696        * crypto_key_reader:
 697           Symmetric encryption class implementation, configuring public key encryption messages for the producer
 698           and private key decryption messages for the consumer
 699        * replicate_subscription_state_enabled:
 700          Set whether the subscription status should be replicated.
 701          Default: `False`.
 702        """
 703        _check_type(str, subscription_name, 'subscription_name')
 704        _check_type(ConsumerType, consumer_type, 'consumer_type')
 705        _check_type(_schema.Schema, schema, 'schema')
 706        _check_type(int, receiver_queue_size, 'receiver_queue_size')
 707        _check_type(int, max_total_receiver_queue_size_across_partitions,
 708                    'max_total_receiver_queue_size_across_partitions')
 709        _check_type_or_none(str, consumer_name, 'consumer_name')
 710        _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
 711        _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
 712        _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
 713        _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
 714        _check_type(bool, is_read_compacted, 'is_read_compacted')
 715        _check_type_or_none(dict, properties, 'properties')
 716        _check_type(InitialPosition, initial_position, 'initial_position')
 717        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
 718
 719        conf = _pulsar.ConsumerConfiguration()
 720        conf.consumer_type(consumer_type)
 721        conf.read_compacted(is_read_compacted)
 722        if message_listener:
 723            conf.message_listener(_listener_wrapper(message_listener, schema))
 724        conf.receiver_queue_size(receiver_queue_size)
 725        conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
 726        if consumer_name:
 727            conf.consumer_name(consumer_name)
 728        if unacked_messages_timeout_ms:
 729            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
 730
 731        conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
 732        conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
 733        if properties:
 734            for k, v in properties.items():
 735                conf.property(k, v)
 736        conf.subscription_initial_position(initial_position)
 737
 738        conf.schema(schema.schema_info())
 739
 740        if crypto_key_reader:
 741            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
 742
 743        conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
 744
 745        c = Consumer()
 746        if isinstance(topic, str):
 747            # Single topic
 748            c._consumer = self._client.subscribe(topic, subscription_name, conf)
 749        elif isinstance(topic, list):
 750            # List of topics
 751            c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
 752        elif isinstance(topic, _retype):
 753            # Regex pattern
 754            c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
 755        else:
 756            raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
 757
 758        c._client = self
 759        c._schema = schema
 760        self._consumers.append(c)
 761        return c
 762
 763    def create_reader(self, topic, start_message_id,
 764                      schema=schema.BytesSchema(),
 765                      reader_listener=None,
 766                      receiver_queue_size=1000,
 767                      reader_name=None,
 768                      subscription_role_prefix=None,
 769                      is_read_compacted=False,
 770                      crypto_key_reader=None
 771                      ):
 772        """
 773        Create a reader on a particular topic
 774
 775        **Args**
 776
 777        * `topic`: The name of the topic.
 778        * `start_message_id`: The initial reader positioning is done by specifying a message id.
 779           The options are:
 780            * `MessageId.earliest`: Start reading from the earliest message available in the topic
 781            * `MessageId.latest`: Start reading from the end topic, only getting messages published
 782               after the reader was created
 783            * `MessageId`: When passing a particular message id, the reader will position itself on
 784               that specific position. The first message to be read will be the message next to the
 785               specified messageId. Message id can be serialized into a string and deserialized
 786               back into a `MessageId` object:
 787
 788                   # Serialize to string
 789                   s = msg.message_id().serialize()
 790
 791                   # Deserialize from string
 792                   msg_id = MessageId.deserialize(s)
 793
 794        **Options**
 795
 796        * `schema`:
 797           Define the schema of the data that will be received by this reader.
 798        * `reader_listener`:
 799          Sets a message listener for the reader. When the listener is set,
 800          the application will receive messages through it. Calls to
 801          `reader.read_next()` will not be allowed. The listener function needs
 802          to accept (reader, message), for example:
 803
 804                def my_listener(reader, message):
 805                    # process message
 806                    pass
 807
 808        * `receiver_queue_size`:
 809          Sets the size of the reader receive queue. The reader receive
 810          queue controls how many messages can be accumulated by the reader
 811          before the application calls `read_next()`. Using a higher value could
 812          potentially increase the reader throughput at the expense of higher
 813          memory utilization.
 814        * `reader_name`:
 815          Sets the reader name.
 816        * `subscription_role_prefix`:
 817          Sets the subscription role prefix.
 818        * `is_read_compacted`:
 819          Selects whether to read the compacted version of the topic
 820        * crypto_key_reader:
 821           Symmetric encryption class implementation, configuring public key encryption messages for the producer
 822           and private key decryption messages for the consumer
 823        """
 824        _check_type(str, topic, 'topic')
 825        _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
 826        _check_type(_schema.Schema, schema, 'schema')
 827        _check_type(int, receiver_queue_size, 'receiver_queue_size')
 828        _check_type_or_none(str, reader_name, 'reader_name')
 829        _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
 830        _check_type(bool, is_read_compacted, 'is_read_compacted')
 831        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
 832
 833        conf = _pulsar.ReaderConfiguration()
 834        if reader_listener:
 835            conf.reader_listener(_listener_wrapper(reader_listener, schema))
 836        conf.receiver_queue_size(receiver_queue_size)
 837        if reader_name:
 838            conf.reader_name(reader_name)
 839        if subscription_role_prefix:
 840            conf.subscription_role_prefix(subscription_role_prefix)
 841        conf.schema(schema.schema_info())
 842        conf.read_compacted(is_read_compacted)
 843        if crypto_key_reader:
 844            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
 845
 846        c = Reader()
 847        c._reader = self._client.create_reader(topic, start_message_id, conf)
 848        c._client = self
 849        c._schema = schema
 850        self._consumers.append(c)
 851        return c
 852
 853    def get_topic_partitions(self, topic):
 854        """
 855        Get the list of partitions for a given topic.
 856
 857        If the topic is partitioned, this will return a list of partition names. If the topic is not
 858        partitioned, the returned list will contain the topic name itself.
 859
 860        This can be used to discover the partitions and create Reader, Consumer or Producer
 861        instances directly on a particular partition.
 862        :param topic: the topic name to lookup
 863        :return: a list of partition name
 864        """
 865        _check_type(str, topic, 'topic')
 866        return self._client.get_topic_partitions(topic)
 867
 868    def shutdown(self):
 869        """
 870        Perform immediate shutdown of Pulsar client.
 871
 872        Release all resources and close all producer, consumer, and readers without waiting
 873        for ongoing operations to complete.
 874        """
 875        self._client.shutdown()
 876
 877    def close(self):
 878        """
 879        Close the client and all the associated producers and consumers
 880        """
 881        self._client.close()
 882
 883
 884class Producer:
 885    """
 886    The Pulsar message producer, used to publish messages on a topic.
 887    """
 888
 889    def topic(self):
 890        """
 891        Return the topic which producer is publishing to
 892        """
 893        return self._producer.topic()
 894
 895    def producer_name(self):
 896        """
 897        Return the producer name which could have been assigned by the
 898        system or specified by the client
 899        """
 900        return self._producer.producer_name()
 901
 902    def last_sequence_id(self):
 903        """
 904        Get the last sequence id that was published by this producer.
 905
 906        This represent either the automatically assigned or custom sequence id
 907        (set on the `MessageBuilder`) that was published and acknowledged by the broker.
 908
 909        After recreating a producer with the same producer name, this will return the
 910        last message that was published in the previous producer session, or -1 if
 911        there no message was ever published.
 912        """
 913        return self._producer.last_sequence_id()
 914
 915    def send(self, content,
 916             properties=None,
 917             partition_key=None,
 918             sequence_id=None,
 919             replication_clusters=None,
 920             disable_replication=False,
 921             event_timestamp=None,
 922             deliver_at=None,
 923             deliver_after=None,
 924             ):
 925        """
 926        Publish a message on the topic. Blocks until the message is acknowledged
 927
 928        Returns a `MessageId` object that represents where the message is persisted.
 929
 930        **Args**
 931
 932        * `content`:
 933          A `bytes` object with the message payload.
 934
 935        **Options**
 936
 937        * `properties`:
 938          A dict of application-defined string properties.
 939        * `partition_key`:
 940          Sets the partition key for message routing. A hash of this key is used
 941          to determine the message's topic partition.
 942        * `sequence_id`:
 943          Specify a custom sequence id for the message being published.
 944        * `replication_clusters`:
 945          Override namespace replication clusters. Note that it is the caller's
 946          responsibility to provide valid cluster names and that all clusters
 947          have been previously configured as topics. Given an empty list,
 948          the message will replicate according to the namespace configuration.
 949        * `disable_replication`:
 950          Do not replicate this message.
 951        * `event_timestamp`:
 952          Timestamp in millis of the timestamp of event creation
 953        * `deliver_at`:
 954          Specify the this message should not be delivered earlier than the
 955          specified timestamp.
 956          The timestamp is milliseconds and based on UTC
 957        * `deliver_after`:
 958          Specify a delay in timedelta for the delivery of the messages.
 959
 960        """
 961        msg = self._build_msg(content, properties, partition_key, sequence_id,
 962                              replication_clusters, disable_replication, event_timestamp,
 963                              deliver_at, deliver_after)
 964        return MessageId.deserialize(self._producer.send(msg))
 965
 966    def send_async(self, content, callback,
 967                   properties=None,
 968                   partition_key=None,
 969                   sequence_id=None,
 970                   replication_clusters=None,
 971                   disable_replication=False,
 972                   event_timestamp=None,
 973                   deliver_at=None,
 974                   deliver_after=None,
 975                   ):
 976        """
 977        Send a message asynchronously.
 978
 979        The `callback` will be invoked once the message has been acknowledged
 980        by the broker.
 981
 982        Example:
 983
 984            #!python
 985            def callback(res, msg_id):
 986                print('Message published: %s' % res)
 987
 988            producer.send_async(msg, callback)
 989
 990        When the producer queue is full, by default the message will be rejected
 991        and the callback invoked with an error code.
 992
 993        **Args**
 994
 995        * `content`:
 996          A `bytes` object with the message payload.
 997
 998        **Options**
 999
1000        * `properties`:
1001          A dict of application0-defined string properties.
1002        * `partition_key`:
1003          Sets the partition key for the message routing. A hash of this key is
1004          used to determine the message's topic partition.
1005        * `sequence_id`:
1006          Specify a custom sequence id for the message being published.
1007        * `replication_clusters`: Override namespace replication clusters. Note
1008          that it is the caller's responsibility to provide valid cluster names
1009          and that all clusters have been previously configured as topics.
1010          Given an empty list, the message will replicate per the namespace
1011          configuration.
1012        * `disable_replication`:
1013          Do not replicate this message.
1014        * `event_timestamp`:
1015          Timestamp in millis of the timestamp of event creation
1016        * `deliver_at`:
1017          Specify the this message should not be delivered earlier than the
1018          specified timestamp.
1019          The timestamp is milliseconds and based on UTC
1020        * `deliver_after`:
1021          Specify a delay in timedelta for the delivery of the messages.
1022        """
1023        msg = self._build_msg(content, properties, partition_key, sequence_id,
1024                              replication_clusters, disable_replication, event_timestamp,
1025                              deliver_at, deliver_after)
1026        self._producer.send_async(msg, callback)
1027
1028
1029    def flush(self):
1030        """
1031        Flush all the messages buffered in the client and wait until all messages have been
1032        successfully persisted
1033        """
1034        self._producer.flush()
1035
1036
1037    def close(self):
1038        """
1039        Close the producer.
1040        """
1041        self._producer.close()
1042
1043    def _build_msg(self, content, properties, partition_key, sequence_id,
1044                   replication_clusters, disable_replication, event_timestamp,
1045                   deliver_at, deliver_after):
1046        data = self._schema.encode(content)
1047
1048        _check_type(bytes, data, 'data')
1049        _check_type_or_none(dict, properties, 'properties')
1050        _check_type_or_none(str, partition_key, 'partition_key')
1051        _check_type_or_none(int, sequence_id, 'sequence_id')
1052        _check_type_or_none(list, replication_clusters, 'replication_clusters')
1053        _check_type(bool, disable_replication, 'disable_replication')
1054        _check_type_or_none(int, event_timestamp, 'event_timestamp')
1055        _check_type_or_none(int, deliver_at, 'deliver_at')
1056        _check_type_or_none(timedelta, deliver_after, 'deliver_after')
1057
1058        mb = _pulsar.MessageBuilder()
1059        mb.content(data)
1060        if properties:
1061            for k, v in properties.items():
1062                mb.property(k, v)
1063        if partition_key:
1064            mb.partition_key(partition_key)
1065        if sequence_id:
1066            mb.sequence_id(sequence_id)
1067        if replication_clusters:
1068            mb.replication_clusters(replication_clusters)
1069        if disable_replication:
1070            mb.disable_replication(disable_replication)
1071        if event_timestamp:
1072            mb.event_timestamp(event_timestamp)
1073        if deliver_at:
1074            mb.deliver_at(deliver_at)
1075        if deliver_after:
1076            mb.deliver_after(deliver_after)
1077
1078        return mb.build()
1079
1080    def is_connected(self):
1081        """
1082        Check if the producer is connected or not.
1083        """
1084        return self._producer.is_connected()
1085
1086
1087class Consumer:
1088    """
1089    Pulsar consumer.
1090    """
1091
1092    def topic(self):
1093        """
1094        Return the topic this consumer is subscribed to.
1095        """
1096        return self._consumer.topic()
1097
1098    def subscription_name(self):
1099        """
1100        Return the subscription name.
1101        """
1102        return self._consumer.subscription_name()
1103
1104    def unsubscribe(self):
1105        """
1106        Unsubscribe the current consumer from the topic.
1107
1108        This method will block until the operation is completed. Once the
1109        consumer is unsubscribed, no more messages will be received and
1110        subsequent new messages will not be retained for this consumer.
1111
1112        This consumer object cannot be reused.
1113        """
1114        return self._consumer.unsubscribe()
1115
1116    def receive(self, timeout_millis=None):
1117        """
1118        Receive a single message.
1119
1120        If a message is not immediately available, this method will block until
1121        a new message is available.
1122
1123        **Options**
1124
1125        * `timeout_millis`:
1126          If specified, the receive will raise an exception if a message is not
1127          available within the timeout.
1128        """
1129        if timeout_millis is None:
1130            msg = self._consumer.receive()
1131        else:
1132            _check_type(int, timeout_millis, 'timeout_millis')
1133            msg = self._consumer.receive(timeout_millis)
1134
1135        m = Message()
1136        m._message = msg
1137        m._schema = self._schema
1138        return m
1139
1140    def acknowledge(self, message):
1141        """
1142        Acknowledge the reception of a single message.
1143
1144        This method will block until an acknowledgement is sent to the broker.
1145        After that, the message will not be re-delivered to this consumer.
1146
1147        **Args**
1148
1149        * `message`:
1150          The received message or message id.
1151        """
1152        if isinstance(message, Message):
1153            self._consumer.acknowledge(message._message)
1154        else:
1155            self._consumer.acknowledge(message)
1156
1157    def acknowledge_cumulative(self, message):
1158        """
1159        Acknowledge the reception of all the messages in the stream up to (and
1160        including) the provided message.
1161
1162        This method will block until an acknowledgement is sent to the broker.
1163        After that, the messages will not be re-delivered to this consumer.
1164
1165        **Args**
1166
1167        * `message`:
1168          The received message or message id.
1169        """
1170        if isinstance(message, Message):
1171            self._consumer.acknowledge_cumulative(message._message)
1172        else:
1173            self._consumer.acknowledge_cumulative(message)
1174
1175    def negative_acknowledge(self, message):
1176        """
1177        Acknowledge the failure to process a single message.
1178
1179        When a message is "negatively acked" it will be marked for redelivery after
1180        some fixed delay. The delay is configurable when constructing the consumer
1181        with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
1182
1183        This call is not blocking.
1184
1185        **Args**
1186
1187        * `message`:
1188          The received message or message id.
1189        """
1190        if isinstance(message, Message):
1191            self._consumer.negative_acknowledge(message._message)
1192        else:
1193            self._consumer.negative_acknowledge(message)
1194
1195    def pause_message_listener(self):
1196        """
1197        Pause receiving messages via the `message_listener` until
1198        `resume_message_listener()` is called.
1199        """
1200        self._consumer.pause_message_listener()
1201
1202    def resume_message_listener(self):
1203        """
1204        Resume receiving the messages via the message listener.
1205        Asynchronously receive all the messages enqueued from the time
1206        `pause_message_listener()` was called.
1207        """
1208        self._consumer.resume_message_listener()
1209
1210    def redeliver_unacknowledged_messages(self):
1211        """
1212        Redelivers all the unacknowledged messages. In failover mode, the
1213        request is ignored if the consumer is not active for the given topic. In
1214        shared mode, the consumer's messages to be redelivered are distributed
1215        across all the connected consumers. This is a non-blocking call and
1216        doesn't throw an exception. In case the connection breaks, the messages
1217        are redelivered after reconnect.
1218        """
1219        self._consumer.redeliver_unacknowledged_messages()
1220
1221    def seek(self, messageid):
1222        """
1223        Reset the subscription associated with this consumer to a specific message id or publish timestamp.
1224        The message id can either be a specific message or represent the first or last messages in the topic.
1225        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
1226        seek() on the individual partitions.
1227
1228        **Args**
1229
1230        * `message`:
1231          The message id for seek, OR an integer event time to seek to
1232        """
1233        self._consumer.seek(messageid)
1234
1235    def close(self):
1236        """
1237        Close the consumer.
1238        """
1239        self._consumer.close()
1240        self._client._consumers.remove(self)
1241
1242    def is_connected(self):
1243        """
1244        Check if the consumer is connected or not.
1245        """
1246        return self._consumer.is_connected()
1247
1248
1249
1250class Reader:
1251    """
1252    Pulsar topic reader.
1253    """
1254
1255    def topic(self):
1256        """
1257        Return the topic this reader is reading from.
1258        """
1259        return self._reader.topic()
1260
1261    def read_next(self, timeout_millis=None):
1262        """
1263        Read a single message.
1264
1265        If a message is not immediately available, this method will block until
1266        a new message is available.
1267
1268        **Options**
1269
1270        * `timeout_millis`:
1271          If specified, the receive will raise an exception if a message is not
1272          available within the timeout.
1273        """
1274        if timeout_millis is None:
1275            msg = self._reader.read_next()
1276        else:
1277            _check_type(int, timeout_millis, 'timeout_millis')
1278            msg = self._reader.read_next(timeout_millis)
1279
1280        m = Message()
1281        m._message = msg
1282        m._schema = self._schema
1283        return m
1284
1285    def has_message_available(self):
1286        """
1287        Check if there is any message available to read from the current position.
1288        """
1289        return self._reader.has_message_available();
1290
1291    def seek(self, messageid):
1292        """
1293        Reset this reader to a specific message id or publish timestamp.
1294        The message id can either be a specific message or represent the first or last messages in the topic.
1295        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
1296        seek() on the individual partitions.
1297
1298        **Args**
1299
1300        * `message`:
1301          The message id for seek, OR an integer event time to seek to
1302        """
1303        self._reader.seek(messageid)
1304
1305    def close(self):
1306        """
1307        Close the reader.
1308        """
1309        self._reader.close()
1310        self._client._consumers.remove(self)
1311
1312    def is_connected(self):
1313        """
1314        Check if the reader is connected or not.
1315        """
1316        return self._reader.is_connected()
1317
1318
1319class CryptoKeyReader:
1320    """
1321    Default crypto key reader implementation
1322    """
1323    def __init__(self, public_key_path, private_key_path):
1324        """
1325        Create crypto key reader.
1326
1327        **Args**
1328
1329        * `public_key_path`: Path to the public key
1330        * `private_key_path`: Path to private key
1331        """
1332        _check_type(str, public_key_path, 'public_key_path')
1333        _check_type(str, private_key_path, 'private_key_path')
1334        self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
1335
1336def _check_type(var_type, var, name):
1337    if not isinstance(var, var_type):
1338        raise ValueError("Argument %s is expected to be of type '%s' and not '%s'"
1339                         % (name, var_type.__name__, type(var).__name__))
1340
1341
1342def _check_type_or_none(var_type, var, name):
1343    if var is not None and not isinstance(var, var_type):
1344        raise ValueError("Argument %s is expected to be either None or of type '%s'"
1345                         % (name, var_type.__name__))
1346
1347
1348def _listener_wrapper(listener, schema):
1349    def wrapper(consumer, msg):
1350        c = Consumer()
1351        c._consumer = consumer
1352        m = Message()
1353        m._message = msg
1354        m._schema = schema
1355        listener(c, m)
1356    return wrapper
class MessageId:
123class MessageId:
124    """
125    Represents a message id
126    """
127
128    def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
129        self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
130
131    'Represents the earliest message stored in a topic'
132    earliest = _pulsar.MessageId.earliest
133
134    'Represents the latest message published on a topic'
135    latest = _pulsar.MessageId.latest
136
137    def ledger_id(self):
138        return self._msg_id.ledger_id()
139
140    def entry_id(self):
141        return self._msg_id.entry_id()
142
143    def batch_index(self):
144        return self._msg_id.batch_index()
145
146    def partition(self):
147        return self._msg_id.partition()
148
149    def serialize(self):
150        """
151        Returns a bytes representation of the message id.
152        This bytes sequence can be stored and later deserialized.
153        """
154        return self._msg_id.serialize()
155
156    @staticmethod
157    def deserialize(message_id_bytes):
158        """
159        Deserialize a message id object from a previously
160        serialized bytes sequence.
161        """
162        return _pulsar.MessageId.deserialize(message_id_bytes)

Represents a message id

MessageId(partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1)
128    def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
129        self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
earliest = <_pulsar.MessageId object>

Represents the latest message published on a topic

latest = <_pulsar.MessageId object>
def ledger_id(self)
137    def ledger_id(self):
138        return self._msg_id.ledger_id()
def entry_id(self)
140    def entry_id(self):
141        return self._msg_id.entry_id()
def batch_index(self)
143    def batch_index(self):
144        return self._msg_id.batch_index()
def partition(self)
146    def partition(self):
147        return self._msg_id.partition()
def serialize(self)
149    def serialize(self):
150        """
151        Returns a bytes representation of the message id.
152        This bytes sequence can be stored and later deserialized.
153        """
154        return self._msg_id.serialize()

Returns a bytes representation of the message id. This bytes sequence can be stored and later deserialized.

@staticmethod
def deserialize(message_id_bytes)
156    @staticmethod
157    def deserialize(message_id_bytes):
158        """
159        Deserialize a message id object from a previously
160        serialized bytes sequence.
161        """
162        return _pulsar.MessageId.deserialize(message_id_bytes)

Deserialize a message id object from a previously serialized bytes sequence.

class Message:
165class Message:
166    """
167    Message objects are returned by a consumer, either by calling `receive` or
168    through a listener.
169    """
170
171    def data(self):
172        """
173        Returns object typed bytes with the payload of the message.
174        """
175        return self._message.data()
176
177    def value(self):
178        """
179        Returns object with the de-serialized version of the message content
180        """
181        return self._schema.decode(self._message.data())
182
183    def properties(self):
184        """
185        Return the properties attached to the message. Properties are
186        application-defined key/value pairs that will be attached to the
187        message.
188        """
189        return self._message.properties()
190
191    def partition_key(self):
192        """
193        Get the partitioning key for the message.
194        """
195        return self._message.partition_key()
196
197    def publish_timestamp(self):
198        """
199        Get the timestamp in milliseconds with the message publish time.
200        """
201        return self._message.publish_timestamp()
202
203    def event_timestamp(self):
204        """
205        Get the timestamp in milliseconds with the message event time.
206        """
207        return self._message.event_timestamp()
208
209    def message_id(self):
210        """
211        The message ID that can be used to refere to this particular message.
212        """
213        return self._message.message_id()
214
215    def topic_name(self):
216        """
217        Get the topic Name from which this message originated from
218        """
219        return self._message.topic_name()
220
221    def redelivery_count(self):
222        """
223        Get the redelivery count for this message
224        """
225        return self._message.redelivery_count()
226
227    def schema_version(self):
228        """
229        Get the schema version for this message
230        """
231        return self._message.schema_version()
232
233    @staticmethod
234    def _wrap(_message):
235        self = Message()
236        self._message = _message
237        return self

Message objects are returned by a consumer, either by calling receive or through a listener.

Message()
def data(self)
171    def data(self):
172        """
173        Returns object typed bytes with the payload of the message.
174        """
175        return self._message.data()

Returns object typed bytes with the payload of the message.

def value(self)
177    def value(self):
178        """
179        Returns object with the de-serialized version of the message content
180        """
181        return self._schema.decode(self._message.data())

Returns object with the de-serialized version of the message content

def properties(self)
183    def properties(self):
184        """
185        Return the properties attached to the message. Properties are
186        application-defined key/value pairs that will be attached to the
187        message.
188        """
189        return self._message.properties()

Return the properties attached to the message. Properties are application-defined key/value pairs that will be attached to the message.

def partition_key(self)
191    def partition_key(self):
192        """
193        Get the partitioning key for the message.
194        """
195        return self._message.partition_key()

Get the partitioning key for the message.

def publish_timestamp(self)
197    def publish_timestamp(self):
198        """
199        Get the timestamp in milliseconds with the message publish time.
200        """
201        return self._message.publish_timestamp()

Get the timestamp in milliseconds with the message publish time.

def event_timestamp(self)
203    def event_timestamp(self):
204        """
205        Get the timestamp in milliseconds with the message event time.
206        """
207        return self._message.event_timestamp()

Get the timestamp in milliseconds with the message event time.

def message_id(self)
209    def message_id(self):
210        """
211        The message ID that can be used to refere to this particular message.
212        """
213        return self._message.message_id()

The message ID that can be used to refere to this particular message.

def topic_name(self)
215    def topic_name(self):
216        """
217        Get the topic Name from which this message originated from
218        """
219        return self._message.topic_name()

Get the topic Name from which this message originated from

def redelivery_count(self)
221    def redelivery_count(self):
222        """
223        Get the redelivery count for this message
224        """
225        return self._message.redelivery_count()

Get the redelivery count for this message

def schema_version(self)
227    def schema_version(self):
228        """
229        Get the schema version for this message
230        """
231        return self._message.schema_version()

Get the schema version for this message

class MessageBatch:
240class MessageBatch:
241
242    def __init__(self):
243        self._msg_batch = _pulsar.MessageBatch()
244
245    def with_message_id(self, msg_id):
246        if not isinstance(msg_id, _pulsar.MessageId):
247            if isinstance(msg_id, MessageId):
248                msg_id = msg_id._msg_id
249            else:
250                raise TypeError("unknown message id type")
251        self._msg_batch.with_message_id(msg_id)
252        return self
253
254    def parse_from(self, data, size):
255        self._msg_batch.parse_from(data, size)
256        _msgs = self._msg_batch.messages()
257        return list(map(Message._wrap, _msgs))
MessageBatch()
242    def __init__(self):
243        self._msg_batch = _pulsar.MessageBatch()
def with_message_id(self, msg_id)
245    def with_message_id(self, msg_id):
246        if not isinstance(msg_id, _pulsar.MessageId):
247            if isinstance(msg_id, MessageId):
248                msg_id = msg_id._msg_id
249            else:
250                raise TypeError("unknown message id type")
251        self._msg_batch.with_message_id(msg_id)
252        return self
def parse_from(self, data, size)
254    def parse_from(self, data, size):
255        self._msg_batch.parse_from(data, size)
256        _msgs = self._msg_batch.messages()
257        return list(map(Message._wrap, _msgs))
class Authentication:
260class Authentication:
261    """
262    Authentication provider object. Used to load authentication from an external
263    shared library.
264    """
265    def __init__(self, dynamicLibPath, authParamsString):
266        """
267        Create the authentication provider instance.
268
269        **Args**
270
271        * `dynamicLibPath`: Path to the authentication provider shared library
272          (such as `tls.so`)
273        * `authParamsString`: Comma-separated list of provider-specific
274          configuration params
275        """
276        _check_type(str, dynamicLibPath, 'dynamicLibPath')
277        _check_type(str, authParamsString, 'authParamsString')
278        self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)

Authentication provider object. Used to load authentication from an external shared library.

Authentication(dynamicLibPath, authParamsString)
265    def __init__(self, dynamicLibPath, authParamsString):
266        """
267        Create the authentication provider instance.
268
269        **Args**
270
271        * `dynamicLibPath`: Path to the authentication provider shared library
272          (such as `tls.so`)
273        * `authParamsString`: Comma-separated list of provider-specific
274          configuration params
275        """
276        _check_type(str, dynamicLibPath, 'dynamicLibPath')
277        _check_type(str, authParamsString, 'authParamsString')
278        self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)

Create the authentication provider instance.

Args

  • dynamicLibPath: Path to the authentication provider shared library (such as tls.so)
  • authParamsString: Comma-separated list of provider-specific configuration params
class AuthenticationTLS(Authentication):
281class AuthenticationTLS(Authentication):
282    """
283    TLS Authentication implementation
284    """
285    def __init__(self, certificate_path, private_key_path):
286        """
287        Create the TLS authentication provider instance.
288
289        **Args**
290
291        * `certificatePath`: Path to the public certificate
292        * `privateKeyPath`: Path to private TLS key
293        """
294        _check_type(str, certificate_path, 'certificate_path')
295        _check_type(str, private_key_path, 'private_key_path')
296        self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)

TLS Authentication implementation

AuthenticationTLS(certificate_path, private_key_path)
285    def __init__(self, certificate_path, private_key_path):
286        """
287        Create the TLS authentication provider instance.
288
289        **Args**
290
291        * `certificatePath`: Path to the public certificate
292        * `privateKeyPath`: Path to private TLS key
293        """
294        _check_type(str, certificate_path, 'certificate_path')
295        _check_type(str, private_key_path, 'private_key_path')
296        self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)

Create the TLS authentication provider instance.

Args

  • certificatePath: Path to the public certificate
  • privateKeyPath: Path to private TLS key
class AuthenticationToken(Authentication):
299class AuthenticationToken(Authentication):
300    """
301    Token based authentication implementation
302    """
303    def __init__(self, token):
304        """
305        Create the token authentication provider instance.
306
307        **Args**
308
309        * `token`: A string containing the token or a functions that provides a
310                   string with the token
311        """
312        if not (isinstance(token, str) or callable(token)):
313            raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
314        self.auth = _pulsar.AuthenticationToken(token)

Token based authentication implementation

AuthenticationToken(token)
303    def __init__(self, token):
304        """
305        Create the token authentication provider instance.
306
307        **Args**
308
309        * `token`: A string containing the token or a functions that provides a
310                   string with the token
311        """
312        if not (isinstance(token, str) or callable(token)):
313            raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
314        self.auth = _pulsar.AuthenticationToken(token)

Create the token authentication provider instance.

Args

  • token: A string containing the token or a functions that provides a string with the token
class AuthenticationAthenz(Authentication):
317class AuthenticationAthenz(Authentication):
318    """
319    Athenz Authentication implementation
320    """
321    def __init__(self, auth_params_string):
322        """
323        Create the Athenz authentication provider instance.
324
325        **Args**
326
327        * `auth_params_string`: JSON encoded configuration for Athenz client
328        """
329        _check_type(str, auth_params_string, 'auth_params_string')
330        self.auth = _pulsar.AuthenticationAthenz(auth_params_string)

Athenz Authentication implementation

AuthenticationAthenz(auth_params_string)
321    def __init__(self, auth_params_string):
322        """
323        Create the Athenz authentication provider instance.
324
325        **Args**
326
327        * `auth_params_string`: JSON encoded configuration for Athenz client
328        """
329        _check_type(str, auth_params_string, 'auth_params_string')
330        self.auth = _pulsar.AuthenticationAthenz(auth_params_string)

Create the Athenz authentication provider instance.

Args

  • auth_params_string: JSON encoded configuration for Athenz client
class AuthenticationOauth2(Authentication):
332class AuthenticationOauth2(Authentication):
333    """
334    Oauth2 Authentication implementation
335    """
336    def __init__(self, auth_params_string):
337        """
338        Create the Oauth2 authentication provider instance.
339
340        **Args**
341
342        * `auth_params_string`: JSON encoded configuration for Oauth2 client
343        """
344        _check_type(str, auth_params_string, 'auth_params_string')
345        self.auth = _pulsar.AuthenticationOauth2(auth_params_string)

Oauth2 Authentication implementation

AuthenticationOauth2(auth_params_string)
336    def __init__(self, auth_params_string):
337        """
338        Create the Oauth2 authentication provider instance.
339
340        **Args**
341
342        * `auth_params_string`: JSON encoded configuration for Oauth2 client
343        """
344        _check_type(str, auth_params_string, 'auth_params_string')
345        self.auth = _pulsar.AuthenticationOauth2(auth_params_string)

Create the Oauth2 authentication provider instance.

Args

  • auth_params_string: JSON encoded configuration for Oauth2 client
class Client:
347class Client:
348    """
349    The Pulsar client. A single client instance can be used to create producers
350    and consumers on multiple topics.
351
352    The client will share the same connection pool and threads across all
353    producers and consumers.
354    """
355
356    def __init__(self, service_url,
357                 authentication=None,
358                 operation_timeout_seconds=30,
359                 io_threads=1,
360                 message_listener_threads=1,
361                 concurrent_lookup_requests=50000,
362                 log_conf_file_path=None,
363                 use_tls=False,
364                 tls_trust_certs_file_path=None,
365                 tls_allow_insecure_connection=False,
366                 tls_validate_hostname=False,
367                 logger=None,
368                 connection_timeout_ms=10000,
369                 ):
370        """
371        Create a new Pulsar client instance.
372
373        **Args**
374
375        * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
376
377        **Options**
378
379        * `authentication`:
380          Set the authentication provider to be used with the broker. For example:
381          `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
382        * `operation_timeout_seconds`:
383          Set timeout on client operations (subscribe, create producer, close,
384          unsubscribe).
385        * `io_threads`:
386          Set the number of IO threads to be used by the Pulsar client.
387        * `message_listener_threads`:
388          Set the number of threads to be used by the Pulsar client when
389          delivering messages through message listener. The default is 1 thread
390          per Pulsar client. If using more than 1 thread, messages for distinct
391          `message_listener`s will be delivered in different threads, however a
392          single `MessageListener` will always be assigned to the same thread.
393        * `concurrent_lookup_requests`:
394          Number of concurrent lookup-requests allowed on each broker connection
395          to prevent overload on the broker.
396        * `log_conf_file_path`:
397          Initialize log4cxx from a configuration file.
398        * `use_tls`:
399          Configure whether to use TLS encryption on the connection. This setting
400          is deprecated. TLS will be automatically enabled if the `serviceUrl` is
401          set to `pulsar+ssl://` or `https://`
402        * `tls_trust_certs_file_path`:
403          Set the path to the trusted TLS certificate file. If empty defaults to
404          certifi.
405        * `tls_allow_insecure_connection`:
406          Configure whether the Pulsar client accepts untrusted TLS certificates
407          from the broker.
408        * `tls_validate_hostname`:
409          Configure whether the Pulsar client validates that the hostname of the
410          endpoint, matches the common name on the TLS certificate presented by
411          the endpoint.
412        * `logger`:
413          Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
414        * `connection_timeout_ms`:
415          Set timeout in milliseconds on TCP connections.
416        """
417        _check_type(str, service_url, 'service_url')
418        _check_type_or_none(Authentication, authentication, 'authentication')
419        _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
420        _check_type(int, connection_timeout_ms, 'connection_timeout_ms')
421        _check_type(int, io_threads, 'io_threads')
422        _check_type(int, message_listener_threads, 'message_listener_threads')
423        _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
424        _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
425        _check_type(bool, use_tls, 'use_tls')
426        _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
427        _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
428        _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
429        _check_type_or_none(logging.Logger, logger, 'logger')
430
431        conf = _pulsar.ClientConfiguration()
432        if authentication:
433            conf.authentication(authentication.auth)
434        conf.operation_timeout_seconds(operation_timeout_seconds)
435        conf.connection_timeout(connection_timeout_ms)
436        conf.io_threads(io_threads)
437        conf.message_listener_threads(message_listener_threads)
438        conf.concurrent_lookup_requests(concurrent_lookup_requests)
439        if log_conf_file_path:
440            conf.log_conf_file_path(log_conf_file_path)
441        if logger:
442            conf.set_logger(logger)
443        if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
444            conf.use_tls(True)
445        if tls_trust_certs_file_path:
446            conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
447        else:
448            conf.tls_trust_certs_file_path(certifi.where())
449        conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
450        conf.tls_validate_hostname(tls_validate_hostname)
451        self._client = _pulsar.Client(service_url, conf)
452        self._consumers = []
453
454    def create_producer(self, topic,
455                        producer_name=None,
456                        schema=schema.BytesSchema(),
457                        initial_sequence_id=None,
458                        send_timeout_millis=30000,
459                        compression_type=CompressionType.NONE,
460                        max_pending_messages=1000,
461                        max_pending_messages_across_partitions=50000,
462                        block_if_queue_full=False,
463                        batching_enabled=False,
464                        batching_max_messages=1000,
465                        batching_max_allowed_size_in_bytes=128*1024,
466                        batching_max_publish_delay_ms=10,
467                        message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
468                        lazy_start_partitioned_producers=False,
469                        properties=None,
470                        batching_type=BatchingType.Default,
471                        encryption_key=None,
472                        crypto_key_reader=None
473                        ):
474        """
475        Create a new producer on a given topic.
476
477        **Args**
478
479        * `topic`:
480          The topic name
481
482        **Options**
483
484        * `producer_name`:
485           Specify a name for the producer. If not assigned,
486           the system will generate a globally unique name which can be accessed
487           with `Producer.producer_name()`. When specifying a name, it is app to
488           the user to ensure that, for a given topic, the producer name is unique
489           across all Pulsar's clusters.
490        * `schema`:
491           Define the schema of the data that will be published by this producer.
492           The schema will be used for two purposes:
493             - Validate the data format against the topic defined schema
494             - Perform serialization/deserialization between data and objects
495           An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
496        * `initial_sequence_id`:
497           Set the baseline for the sequence ids for messages
498           published by the producer. First message will be using
499           `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
500           be assigned incremental sequence ids, if not otherwise specified.
501        * `send_timeout_millis`:
502          If a message is not acknowledged by the server before the
503          `send_timeout` expires, an error will be reported.
504        * `compression_type`:
505          Set the compression type for the producer. By default, message
506          payloads are not compressed. Supported compression types are
507          `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
508          ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
509          release in order to be able to receive messages compressed with ZSTD.
510          SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
511          release in order to be able to receive messages compressed with SNAPPY.
512        * `max_pending_messages`:
513          Set the max size of the queue holding the messages pending to receive
514          an acknowledgment from the broker.
515        * `max_pending_messages_across_partitions`:
516          Set the max size of the queue holding the messages pending to receive
517          an acknowledgment across partitions from the broker.
518        * `block_if_queue_full`: Set whether `send_async` operations should
519          block when the outgoing message queue is full.
520        * `message_routing_mode`:
521          Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
522          other option is `PartitionsRoutingMode.UseSinglePartition`
523        * `lazy_start_partitioned_producers`:
524          This config affects producers of partitioned topics only. It controls whether
525          producers register and connect immediately to the owner broker of each partition
526          or start lazily on demand. The internal producer of one partition is always
527          started eagerly, chosen by the routing policy, but the internal producers of
528          any additional partitions are started on demand, upon receiving their first
529          message.
530          Using this mode can reduce the strain on brokers for topics with large numbers of
531          partitions and when the SinglePartition routing policy is used without keyed messages.
532          Because producer connection can be on demand, this can produce extra send latency
533          for the first messages of a given partition.
534        * `properties`:
535          Sets the properties for the producer. The properties associated with a producer
536          can be used for identify a producer at broker side.
537        * `batching_type`:
538          Sets the batching type for the producer.
539          There are two batching type: DefaultBatching and KeyBasedBatching.
540            - Default batching
541            incoming single messages:
542            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
543            batched into single batch message:
544            [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
545
546            - KeyBasedBatching
547            incoming single messages:
548            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
549            batched into single batch message:
550            [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
551        * encryption_key:
552           The key used for symmetric encryption, configured on the producer side
553        * crypto_key_reader:
554           Symmetric encryption class implementation, configuring public key encryption messages for the producer
555           and private key decryption messages for the consumer
556        """
557        _check_type(str, topic, 'topic')
558        _check_type_or_none(str, producer_name, 'producer_name')
559        _check_type(_schema.Schema, schema, 'schema')
560        _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
561        _check_type(int, send_timeout_millis, 'send_timeout_millis')
562        _check_type(CompressionType, compression_type, 'compression_type')
563        _check_type(int, max_pending_messages, 'max_pending_messages')
564        _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
565        _check_type(bool, block_if_queue_full, 'block_if_queue_full')
566        _check_type(bool, batching_enabled, 'batching_enabled')
567        _check_type(int, batching_max_messages, 'batching_max_messages')
568        _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
569        _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
570        _check_type_or_none(dict, properties, 'properties')
571        _check_type(BatchingType, batching_type, 'batching_type')
572        _check_type_or_none(str, encryption_key, 'encryption_key')
573        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
574        _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
575
576        conf = _pulsar.ProducerConfiguration()
577        conf.send_timeout_millis(send_timeout_millis)
578        conf.compression_type(compression_type)
579        conf.max_pending_messages(max_pending_messages)
580        conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
581        conf.block_if_queue_full(block_if_queue_full)
582        conf.batching_enabled(batching_enabled)
583        conf.batching_max_messages(batching_max_messages)
584        conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
585        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
586        conf.partitions_routing_mode(message_routing_mode)
587        conf.batching_type(batching_type)
588        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
589        if producer_name:
590            conf.producer_name(producer_name)
591        if initial_sequence_id:
592            conf.initial_sequence_id(initial_sequence_id)
593        if properties:
594            for k, v in properties.items():
595                conf.property(k, v)
596
597        conf.schema(schema.schema_info())
598        if encryption_key:
599            conf.encryption_key(encryption_key)
600        if crypto_key_reader:
601            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
602
603        p = Producer()
604        p._producer = self._client.create_producer(topic, conf)
605        p._schema = schema
606        p._client = self._client
607        return p
608
609    def subscribe(self, topic, subscription_name,
610                  consumer_type=ConsumerType.Exclusive,
611                  schema=schema.BytesSchema(),
612                  message_listener=None,
613                  receiver_queue_size=1000,
614                  max_total_receiver_queue_size_across_partitions=50000,
615                  consumer_name=None,
616                  unacked_messages_timeout_ms=None,
617                  broker_consumer_stats_cache_time_ms=30000,
618                  negative_ack_redelivery_delay_ms=60000,
619                  is_read_compacted=False,
620                  properties=None,
621                  pattern_auto_discovery_period=60,
622                  initial_position=InitialPosition.Latest,
623                  crypto_key_reader=None,
624                  replicate_subscription_state_enabled=False
625                  ):
626        """
627        Subscribe to the given topic and subscription combination.
628
629        **Args**
630
631        * `topic`: The name of the topic, list of topics or regex pattern.
632                  This method will accept these forms:
633                    - `topic='my-topic'`
634                    - `topic=['topic-1', 'topic-2', 'topic-3']`
635                    - `topic=re.compile('persistent://public/default/topic-*')`
636        * `subscription`: The name of the subscription.
637
638        **Options**
639
640        * `consumer_type`:
641          Select the subscription type to be used when subscribing to the topic.
642        * `schema`:
643           Define the schema of the data that will be received by this consumer.
644        * `message_listener`:
645          Sets a message listener for the consumer. When the listener is set,
646          the application will receive messages through it. Calls to
647          `consumer.receive()` will not be allowed. The listener function needs
648          to accept (consumer, message), for example:
649
650                #!python
651                def my_listener(consumer, message):
652                    # process message
653                    consumer.acknowledge(message)
654
655        * `receiver_queue_size`:
656          Sets the size of the consumer receive queue. The consumer receive
657          queue controls how many messages can be accumulated by the consumer
658          before the application calls `receive()`. Using a higher value could
659          potentially increase the consumer throughput at the expense of higher
660          memory utilization. Setting the consumer queue size to zero decreases
661          the throughput of the consumer by disabling pre-fetching of messages.
662          This approach improves the message distribution on shared subscription
663          by pushing messages only to those consumers that are ready to process
664          them. Neither receive with timeout nor partitioned topics can be used
665          if the consumer queue size is zero. The `receive()` function call
666          should not be interrupted when the consumer queue size is zero. The
667          default value is 1000 messages and should work well for most use
668          cases.
669        * `max_total_receiver_queue_size_across_partitions`
670          Set the max total receiver queue size across partitions.
671          This setting will be used to reduce the receiver queue size for individual partitions
672        * `consumer_name`:
673          Sets the consumer name.
674        * `unacked_messages_timeout_ms`:
675          Sets the timeout in milliseconds for unacknowledged messages. The
676          timeout needs to be greater than 10 seconds. An exception is thrown if
677          the given value is less than 10 seconds. If a successful
678          acknowledgement is not sent within the timeout, all the unacknowledged
679          messages are redelivered.
680        * `negative_ack_redelivery_delay_ms`:
681           The delay after which to redeliver the messages that failed to be
682           processed (with the `consumer.negative_acknowledge()`)
683        * `broker_consumer_stats_cache_time_ms`:
684          Sets the time duration for which the broker-side consumer stats will
685          be cached in the client.
686        * `is_read_compacted`:
687          Selects whether to read the compacted version of the topic
688        * `properties`:
689          Sets the properties for the consumer. The properties associated with a consumer
690          can be used for identify a consumer at broker side.
691        * `pattern_auto_discovery_period`:
692          Periods of seconds for consumer to auto discover match topics.
693        * `initial_position`:
694          Set the initial position of a consumer  when subscribing to the topic.
695          It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
696          Default: `Latest`.
697        * crypto_key_reader:
698           Symmetric encryption class implementation, configuring public key encryption messages for the producer
699           and private key decryption messages for the consumer
700        * replicate_subscription_state_enabled:
701          Set whether the subscription status should be replicated.
702          Default: `False`.
703        """
704        _check_type(str, subscription_name, 'subscription_name')
705        _check_type(ConsumerType, consumer_type, 'consumer_type')
706        _check_type(_schema.Schema, schema, 'schema')
707        _check_type(int, receiver_queue_size, 'receiver_queue_size')
708        _check_type(int, max_total_receiver_queue_size_across_partitions,
709                    'max_total_receiver_queue_size_across_partitions')
710        _check_type_or_none(str, consumer_name, 'consumer_name')
711        _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
712        _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
713        _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
714        _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
715        _check_type(bool, is_read_compacted, 'is_read_compacted')
716        _check_type_or_none(dict, properties, 'properties')
717        _check_type(InitialPosition, initial_position, 'initial_position')
718        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
719
720        conf = _pulsar.ConsumerConfiguration()
721        conf.consumer_type(consumer_type)
722        conf.read_compacted(is_read_compacted)
723        if message_listener:
724            conf.message_listener(_listener_wrapper(message_listener, schema))
725        conf.receiver_queue_size(receiver_queue_size)
726        conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
727        if consumer_name:
728            conf.consumer_name(consumer_name)
729        if unacked_messages_timeout_ms:
730            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
731
732        conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
733        conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
734        if properties:
735            for k, v in properties.items():
736                conf.property(k, v)
737        conf.subscription_initial_position(initial_position)
738
739        conf.schema(schema.schema_info())
740
741        if crypto_key_reader:
742            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
743
744        conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
745
746        c = Consumer()
747        if isinstance(topic, str):
748            # Single topic
749            c._consumer = self._client.subscribe(topic, subscription_name, conf)
750        elif isinstance(topic, list):
751            # List of topics
752            c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
753        elif isinstance(topic, _retype):
754            # Regex pattern
755            c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
756        else:
757            raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
758
759        c._client = self
760        c._schema = schema
761        self._consumers.append(c)
762        return c
763
764    def create_reader(self, topic, start_message_id,
765                      schema=schema.BytesSchema(),
766                      reader_listener=None,
767                      receiver_queue_size=1000,
768                      reader_name=None,
769                      subscription_role_prefix=None,
770                      is_read_compacted=False,
771                      crypto_key_reader=None
772                      ):
773        """
774        Create a reader on a particular topic
775
776        **Args**
777
778        * `topic`: The name of the topic.
779        * `start_message_id`: The initial reader positioning is done by specifying a message id.
780           The options are:
781            * `MessageId.earliest`: Start reading from the earliest message available in the topic
782            * `MessageId.latest`: Start reading from the end topic, only getting messages published
783               after the reader was created
784            * `MessageId`: When passing a particular message id, the reader will position itself on
785               that specific position. The first message to be read will be the message next to the
786               specified messageId. Message id can be serialized into a string and deserialized
787               back into a `MessageId` object:
788
789                   # Serialize to string
790                   s = msg.message_id().serialize()
791
792                   # Deserialize from string
793                   msg_id = MessageId.deserialize(s)
794
795        **Options**
796
797        * `schema`:
798           Define the schema of the data that will be received by this reader.
799        * `reader_listener`:
800          Sets a message listener for the reader. When the listener is set,
801          the application will receive messages through it. Calls to
802          `reader.read_next()` will not be allowed. The listener function needs
803          to accept (reader, message), for example:
804
805                def my_listener(reader, message):
806                    # process message
807                    pass
808
809        * `receiver_queue_size`:
810          Sets the size of the reader receive queue. The reader receive
811          queue controls how many messages can be accumulated by the reader
812          before the application calls `read_next()`. Using a higher value could
813          potentially increase the reader throughput at the expense of higher
814          memory utilization.
815        * `reader_name`:
816          Sets the reader name.
817        * `subscription_role_prefix`:
818          Sets the subscription role prefix.
819        * `is_read_compacted`:
820          Selects whether to read the compacted version of the topic
821        * crypto_key_reader:
822           Symmetric encryption class implementation, configuring public key encryption messages for the producer
823           and private key decryption messages for the consumer
824        """
825        _check_type(str, topic, 'topic')
826        _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
827        _check_type(_schema.Schema, schema, 'schema')
828        _check_type(int, receiver_queue_size, 'receiver_queue_size')
829        _check_type_or_none(str, reader_name, 'reader_name')
830        _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
831        _check_type(bool, is_read_compacted, 'is_read_compacted')
832        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
833
834        conf = _pulsar.ReaderConfiguration()
835        if reader_listener:
836            conf.reader_listener(_listener_wrapper(reader_listener, schema))
837        conf.receiver_queue_size(receiver_queue_size)
838        if reader_name:
839            conf.reader_name(reader_name)
840        if subscription_role_prefix:
841            conf.subscription_role_prefix(subscription_role_prefix)
842        conf.schema(schema.schema_info())
843        conf.read_compacted(is_read_compacted)
844        if crypto_key_reader:
845            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
846
847        c = Reader()
848        c._reader = self._client.create_reader(topic, start_message_id, conf)
849        c._client = self
850        c._schema = schema
851        self._consumers.append(c)
852        return c
853
854    def get_topic_partitions(self, topic):
855        """
856        Get the list of partitions for a given topic.
857
858        If the topic is partitioned, this will return a list of partition names. If the topic is not
859        partitioned, the returned list will contain the topic name itself.
860
861        This can be used to discover the partitions and create Reader, Consumer or Producer
862        instances directly on a particular partition.
863        :param topic: the topic name to lookup
864        :return: a list of partition name
865        """
866        _check_type(str, topic, 'topic')
867        return self._client.get_topic_partitions(topic)
868
869    def shutdown(self):
870        """
871        Perform immediate shutdown of Pulsar client.
872
873        Release all resources and close all producer, consumer, and readers without waiting
874        for ongoing operations to complete.
875        """
876        self._client.shutdown()
877
878    def close(self):
879        """
880        Close the client and all the associated producers and consumers
881        """
882        self._client.close()

The Pulsar client. A single client instance can be used to create producers and consumers on multiple topics.

The client will share the same connection pool and threads across all producers and consumers.

Client( service_url, authentication=None, operation_timeout_seconds=30, io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, tls_validate_hostname=False, logger=None, connection_timeout_ms=10000)
356    def __init__(self, service_url,
357                 authentication=None,
358                 operation_timeout_seconds=30,
359                 io_threads=1,
360                 message_listener_threads=1,
361                 concurrent_lookup_requests=50000,
362                 log_conf_file_path=None,
363                 use_tls=False,
364                 tls_trust_certs_file_path=None,
365                 tls_allow_insecure_connection=False,
366                 tls_validate_hostname=False,
367                 logger=None,
368                 connection_timeout_ms=10000,
369                 ):
370        """
371        Create a new Pulsar client instance.
372
373        **Args**
374
375        * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
376
377        **Options**
378
379        * `authentication`:
380          Set the authentication provider to be used with the broker. For example:
381          `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
382        * `operation_timeout_seconds`:
383          Set timeout on client operations (subscribe, create producer, close,
384          unsubscribe).
385        * `io_threads`:
386          Set the number of IO threads to be used by the Pulsar client.
387        * `message_listener_threads`:
388          Set the number of threads to be used by the Pulsar client when
389          delivering messages through message listener. The default is 1 thread
390          per Pulsar client. If using more than 1 thread, messages for distinct
391          `message_listener`s will be delivered in different threads, however a
392          single `MessageListener` will always be assigned to the same thread.
393        * `concurrent_lookup_requests`:
394          Number of concurrent lookup-requests allowed on each broker connection
395          to prevent overload on the broker.
396        * `log_conf_file_path`:
397          Initialize log4cxx from a configuration file.
398        * `use_tls`:
399          Configure whether to use TLS encryption on the connection. This setting
400          is deprecated. TLS will be automatically enabled if the `serviceUrl` is
401          set to `pulsar+ssl://` or `https://`
402        * `tls_trust_certs_file_path`:
403          Set the path to the trusted TLS certificate file. If empty defaults to
404          certifi.
405        * `tls_allow_insecure_connection`:
406          Configure whether the Pulsar client accepts untrusted TLS certificates
407          from the broker.
408        * `tls_validate_hostname`:
409          Configure whether the Pulsar client validates that the hostname of the
410          endpoint, matches the common name on the TLS certificate presented by
411          the endpoint.
412        * `logger`:
413          Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
414        * `connection_timeout_ms`:
415          Set timeout in milliseconds on TCP connections.
416        """
417        _check_type(str, service_url, 'service_url')
418        _check_type_or_none(Authentication, authentication, 'authentication')
419        _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
420        _check_type(int, connection_timeout_ms, 'connection_timeout_ms')
421        _check_type(int, io_threads, 'io_threads')
422        _check_type(int, message_listener_threads, 'message_listener_threads')
423        _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
424        _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
425        _check_type(bool, use_tls, 'use_tls')
426        _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
427        _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
428        _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
429        _check_type_or_none(logging.Logger, logger, 'logger')
430
431        conf = _pulsar.ClientConfiguration()
432        if authentication:
433            conf.authentication(authentication.auth)
434        conf.operation_timeout_seconds(operation_timeout_seconds)
435        conf.connection_timeout(connection_timeout_ms)
436        conf.io_threads(io_threads)
437        conf.message_listener_threads(message_listener_threads)
438        conf.concurrent_lookup_requests(concurrent_lookup_requests)
439        if log_conf_file_path:
440            conf.log_conf_file_path(log_conf_file_path)
441        if logger:
442            conf.set_logger(logger)
443        if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
444            conf.use_tls(True)
445        if tls_trust_certs_file_path:
446            conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
447        else:
448            conf.tls_trust_certs_file_path(certifi.where())
449        conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
450        conf.tls_validate_hostname(tls_validate_hostname)
451        self._client = _pulsar.Client(service_url, conf)
452        self._consumers = []

Create a new Pulsar client instance.

Args

  • service_url: The Pulsar service url eg: pulsar://my-broker.com:6650/

Options

  • authentication: Set the authentication provider to be used with the broker. For example: AuthenticationTls, AuthenticaionToken, AuthenticationAthenzor AuthenticationOauth2
  • operation_timeout_seconds: Set timeout on client operations (subscribe, create producer, close, unsubscribe).
  • io_threads: Set the number of IO threads to be used by the Pulsar client.
  • message_listener_threads: Set the number of threads to be used by the Pulsar client when delivering messages through message listener. The default is 1 thread per Pulsar client. If using more than 1 thread, messages for distinct message_listeners will be delivered in different threads, however a single MessageListener will always be assigned to the same thread.
  • concurrent_lookup_requests: Number of concurrent lookup-requests allowed on each broker connection to prevent overload on the broker.
  • log_conf_file_path: Initialize log4cxx from a configuration file.
  • use_tls: Configure whether to use TLS encryption on the connection. This setting is deprecated. TLS will be automatically enabled if the serviceUrl is set to pulsar+ssl:// or https://
  • tls_trust_certs_file_path: Set the path to the trusted TLS certificate file. If empty defaults to certifi.
  • tls_allow_insecure_connection: Configure whether the Pulsar client accepts untrusted TLS certificates from the broker.
  • tls_validate_hostname: Configure whether the Pulsar client validates that the hostname of the endpoint, matches the common name on the TLS certificate presented by the endpoint.
  • logger: Set a Python logger for this Pulsar client. Should be an instance of logging.Logger.
  • connection_timeout_ms: Set timeout in milliseconds on TCP connections.
def create_producer( self, topic, producer_name=None, schema=<pulsar.schema.schema.BytesSchema object>, initial_sequence_id=None, send_timeout_millis=30000, compression_type=_pulsar.CompressionType.NONE, max_pending_messages=1000, max_pending_messages_across_partitions=50000, block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000, batching_max_allowed_size_in_bytes=131072, batching_max_publish_delay_ms=10, message_routing_mode=_pulsar.PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers=False, properties=None, batching_type=_pulsar.BatchingType.Default, encryption_key=None, crypto_key_reader=None)
454    def create_producer(self, topic,
455                        producer_name=None,
456                        schema=schema.BytesSchema(),
457                        initial_sequence_id=None,
458                        send_timeout_millis=30000,
459                        compression_type=CompressionType.NONE,
460                        max_pending_messages=1000,
461                        max_pending_messages_across_partitions=50000,
462                        block_if_queue_full=False,
463                        batching_enabled=False,
464                        batching_max_messages=1000,
465                        batching_max_allowed_size_in_bytes=128*1024,
466                        batching_max_publish_delay_ms=10,
467                        message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
468                        lazy_start_partitioned_producers=False,
469                        properties=None,
470                        batching_type=BatchingType.Default,
471                        encryption_key=None,
472                        crypto_key_reader=None
473                        ):
474        """
475        Create a new producer on a given topic.
476
477        **Args**
478
479        * `topic`:
480          The topic name
481
482        **Options**
483
484        * `producer_name`:
485           Specify a name for the producer. If not assigned,
486           the system will generate a globally unique name which can be accessed
487           with `Producer.producer_name()`. When specifying a name, it is app to
488           the user to ensure that, for a given topic, the producer name is unique
489           across all Pulsar's clusters.
490        * `schema`:
491           Define the schema of the data that will be published by this producer.
492           The schema will be used for two purposes:
493             - Validate the data format against the topic defined schema
494             - Perform serialization/deserialization between data and objects
495           An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
496        * `initial_sequence_id`:
497           Set the baseline for the sequence ids for messages
498           published by the producer. First message will be using
499           `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
500           be assigned incremental sequence ids, if not otherwise specified.
501        * `send_timeout_millis`:
502          If a message is not acknowledged by the server before the
503          `send_timeout` expires, an error will be reported.
504        * `compression_type`:
505          Set the compression type for the producer. By default, message
506          payloads are not compressed. Supported compression types are
507          `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
508          ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
509          release in order to be able to receive messages compressed with ZSTD.
510          SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
511          release in order to be able to receive messages compressed with SNAPPY.
512        * `max_pending_messages`:
513          Set the max size of the queue holding the messages pending to receive
514          an acknowledgment from the broker.
515        * `max_pending_messages_across_partitions`:
516          Set the max size of the queue holding the messages pending to receive
517          an acknowledgment across partitions from the broker.
518        * `block_if_queue_full`: Set whether `send_async` operations should
519          block when the outgoing message queue is full.
520        * `message_routing_mode`:
521          Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
522          other option is `PartitionsRoutingMode.UseSinglePartition`
523        * `lazy_start_partitioned_producers`:
524          This config affects producers of partitioned topics only. It controls whether
525          producers register and connect immediately to the owner broker of each partition
526          or start lazily on demand. The internal producer of one partition is always
527          started eagerly, chosen by the routing policy, but the internal producers of
528          any additional partitions are started on demand, upon receiving their first
529          message.
530          Using this mode can reduce the strain on brokers for topics with large numbers of
531          partitions and when the SinglePartition routing policy is used without keyed messages.
532          Because producer connection can be on demand, this can produce extra send latency
533          for the first messages of a given partition.
534        * `properties`:
535          Sets the properties for the producer. The properties associated with a producer
536          can be used for identify a producer at broker side.
537        * `batching_type`:
538          Sets the batching type for the producer.
539          There are two batching type: DefaultBatching and KeyBasedBatching.
540            - Default batching
541            incoming single messages:
542            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
543            batched into single batch message:
544            [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
545
546            - KeyBasedBatching
547            incoming single messages:
548            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
549            batched into single batch message:
550            [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
551        * encryption_key:
552           The key used for symmetric encryption, configured on the producer side
553        * crypto_key_reader:
554           Symmetric encryption class implementation, configuring public key encryption messages for the producer
555           and private key decryption messages for the consumer
556        """
557        _check_type(str, topic, 'topic')
558        _check_type_or_none(str, producer_name, 'producer_name')
559        _check_type(_schema.Schema, schema, 'schema')
560        _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
561        _check_type(int, send_timeout_millis, 'send_timeout_millis')
562        _check_type(CompressionType, compression_type, 'compression_type')
563        _check_type(int, max_pending_messages, 'max_pending_messages')
564        _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
565        _check_type(bool, block_if_queue_full, 'block_if_queue_full')
566        _check_type(bool, batching_enabled, 'batching_enabled')
567        _check_type(int, batching_max_messages, 'batching_max_messages')
568        _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
569        _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
570        _check_type_or_none(dict, properties, 'properties')
571        _check_type(BatchingType, batching_type, 'batching_type')
572        _check_type_or_none(str, encryption_key, 'encryption_key')
573        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
574        _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
575
576        conf = _pulsar.ProducerConfiguration()
577        conf.send_timeout_millis(send_timeout_millis)
578        conf.compression_type(compression_type)
579        conf.max_pending_messages(max_pending_messages)
580        conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
581        conf.block_if_queue_full(block_if_queue_full)
582        conf.batching_enabled(batching_enabled)
583        conf.batching_max_messages(batching_max_messages)
584        conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
585        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
586        conf.partitions_routing_mode(message_routing_mode)
587        conf.batching_type(batching_type)
588        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
589        if producer_name:
590            conf.producer_name(producer_name)
591        if initial_sequence_id:
592            conf.initial_sequence_id(initial_sequence_id)
593        if properties:
594            for k, v in properties.items():
595                conf.property(k, v)
596
597        conf.schema(schema.schema_info())
598        if encryption_key:
599            conf.encryption_key(encryption_key)
600        if crypto_key_reader:
601            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
602
603        p = Producer()
604        p._producer = self._client.create_producer(topic, conf)
605        p._schema = schema
606        p._client = self._client
607        return p

Create a new producer on a given topic.

Args

  • topic: The topic name

Options

  • producer_name: Specify a name for the producer. If not assigned, the system will generate a globally unique name which can be accessed with Producer.producer_name(). When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters.
  • schema: Define the schema of the data that will be published by this producer. The schema will be used for two purposes:
    • Validate the data format against the topic defined schema
    • Perform serialization/deserialization between data and objects An example for this parameter would be to pass schema=JsonSchema(MyRecordClass).
  • initial_sequence_id: Set the baseline for the sequence ids for messages published by the producer. First message will be using `(initialSequenceId + 1)`` as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified.
  • send_timeout_millis: If a message is not acknowledged by the server before the send_timeout expires, an error will be reported.
  • compression_type: Set the compression type for the producer. By default, message payloads are not compressed. Supported compression types are CompressionType.LZ4, CompressionType.ZLib, CompressionType.ZSTD and CompressionType.SNAPPY. ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that release in order to be able to receive messages compressed with ZSTD. SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to be able to receive messages compressed with SNAPPY.
  • max_pending_messages: Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
  • max_pending_messages_across_partitions: Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions from the broker.
  • block_if_queue_full: Set whether send_async operations should block when the outgoing message queue is full.
  • message_routing_mode: Set the message routing mode for the partitioned producer. Default is PartitionsRoutingMode.RoundRobinDistribution, other option is PartitionsRoutingMode.UseSinglePartition
  • lazy_start_partitioned_producers: This config affects producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition or start lazily on demand. The internal producer of one partition is always started eagerly, chosen by the routing policy, but the internal producers of any additional partitions are started on demand, upon receiving their first message. Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition routing policy is used without keyed messages. Because producer connection can be on demand, this can produce extra send latency for the first messages of a given partition.
  • properties: Sets the properties for the producer. The properties associated with a producer can be used for identify a producer at broker side.
  • batching_type: Sets the batching type for the producer. There are two batching type: DefaultBatching and KeyBasedBatching.

    • Default batching incoming single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) batched into single batch message: [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]

    • KeyBasedBatching incoming single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) batched into single batch message: [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]

  • encryption_key: The key used for symmetric encryption, configured on the producer side
  • crypto_key_reader: Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer
def subscribe( self, topic, subscription_name, consumer_type=_pulsar.ConsumerType.Exclusive, schema=<pulsar.schema.schema.BytesSchema object>, message_listener=None, receiver_queue_size=1000, max_total_receiver_queue_size_across_partitions=50000, consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, properties=None, pattern_auto_discovery_period=60, initial_position=_pulsar.InitialPosition.Latest, crypto_key_reader=None, replicate_subscription_state_enabled=False)
609    def subscribe(self, topic, subscription_name,
610                  consumer_type=ConsumerType.Exclusive,
611                  schema=schema.BytesSchema(),
612                  message_listener=None,
613                  receiver_queue_size=1000,
614                  max_total_receiver_queue_size_across_partitions=50000,
615                  consumer_name=None,
616                  unacked_messages_timeout_ms=None,
617                  broker_consumer_stats_cache_time_ms=30000,
618                  negative_ack_redelivery_delay_ms=60000,
619                  is_read_compacted=False,
620                  properties=None,
621                  pattern_auto_discovery_period=60,
622                  initial_position=InitialPosition.Latest,
623                  crypto_key_reader=None,
624                  replicate_subscription_state_enabled=False
625                  ):
626        """
627        Subscribe to the given topic and subscription combination.
628
629        **Args**
630
631        * `topic`: The name of the topic, list of topics or regex pattern.
632                  This method will accept these forms:
633                    - `topic='my-topic'`
634                    - `topic=['topic-1', 'topic-2', 'topic-3']`
635                    - `topic=re.compile('persistent://public/default/topic-*')`
636        * `subscription`: The name of the subscription.
637
638        **Options**
639
640        * `consumer_type`:
641          Select the subscription type to be used when subscribing to the topic.
642        * `schema`:
643           Define the schema of the data that will be received by this consumer.
644        * `message_listener`:
645          Sets a message listener for the consumer. When the listener is set,
646          the application will receive messages through it. Calls to
647          `consumer.receive()` will not be allowed. The listener function needs
648          to accept (consumer, message), for example:
649
650                #!python
651                def my_listener(consumer, message):
652                    # process message
653                    consumer.acknowledge(message)
654
655        * `receiver_queue_size`:
656          Sets the size of the consumer receive queue. The consumer receive
657          queue controls how many messages can be accumulated by the consumer
658          before the application calls `receive()`. Using a higher value could
659          potentially increase the consumer throughput at the expense of higher
660          memory utilization. Setting the consumer queue size to zero decreases
661          the throughput of the consumer by disabling pre-fetching of messages.
662          This approach improves the message distribution on shared subscription
663          by pushing messages only to those consumers that are ready to process
664          them. Neither receive with timeout nor partitioned topics can be used
665          if the consumer queue size is zero. The `receive()` function call
666          should not be interrupted when the consumer queue size is zero. The
667          default value is 1000 messages and should work well for most use
668          cases.
669        * `max_total_receiver_queue_size_across_partitions`
670          Set the max total receiver queue size across partitions.
671          This setting will be used to reduce the receiver queue size for individual partitions
672        * `consumer_name`:
673          Sets the consumer name.
674        * `unacked_messages_timeout_ms`:
675          Sets the timeout in milliseconds for unacknowledged messages. The
676          timeout needs to be greater than 10 seconds. An exception is thrown if
677          the given value is less than 10 seconds. If a successful
678          acknowledgement is not sent within the timeout, all the unacknowledged
679          messages are redelivered.
680        * `negative_ack_redelivery_delay_ms`:
681           The delay after which to redeliver the messages that failed to be
682           processed (with the `consumer.negative_acknowledge()`)
683        * `broker_consumer_stats_cache_time_ms`:
684          Sets the time duration for which the broker-side consumer stats will
685          be cached in the client.
686        * `is_read_compacted`:
687          Selects whether to read the compacted version of the topic
688        * `properties`:
689          Sets the properties for the consumer. The properties associated with a consumer
690          can be used for identify a consumer at broker side.
691        * `pattern_auto_discovery_period`:
692          Periods of seconds for consumer to auto discover match topics.
693        * `initial_position`:
694          Set the initial position of a consumer  when subscribing to the topic.
695          It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
696          Default: `Latest`.
697        * crypto_key_reader:
698           Symmetric encryption class implementation, configuring public key encryption messages for the producer
699           and private key decryption messages for the consumer
700        * replicate_subscription_state_enabled:
701          Set whether the subscription status should be replicated.
702          Default: `False`.
703        """
704        _check_type(str, subscription_name, 'subscription_name')
705        _check_type(ConsumerType, consumer_type, 'consumer_type')
706        _check_type(_schema.Schema, schema, 'schema')
707        _check_type(int, receiver_queue_size, 'receiver_queue_size')
708        _check_type(int, max_total_receiver_queue_size_across_partitions,
709                    'max_total_receiver_queue_size_across_partitions')
710        _check_type_or_none(str, consumer_name, 'consumer_name')
711        _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
712        _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
713        _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
714        _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
715        _check_type(bool, is_read_compacted, 'is_read_compacted')
716        _check_type_or_none(dict, properties, 'properties')
717        _check_type(InitialPosition, initial_position, 'initial_position')
718        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
719
720        conf = _pulsar.ConsumerConfiguration()
721        conf.consumer_type(consumer_type)
722        conf.read_compacted(is_read_compacted)
723        if message_listener:
724            conf.message_listener(_listener_wrapper(message_listener, schema))
725        conf.receiver_queue_size(receiver_queue_size)
726        conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
727        if consumer_name:
728            conf.consumer_name(consumer_name)
729        if unacked_messages_timeout_ms:
730            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
731
732        conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
733        conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
734        if properties:
735            for k, v in properties.items():
736                conf.property(k, v)
737        conf.subscription_initial_position(initial_position)
738
739        conf.schema(schema.schema_info())
740
741        if crypto_key_reader:
742            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
743
744        conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
745
746        c = Consumer()
747        if isinstance(topic, str):
748            # Single topic
749            c._consumer = self._client.subscribe(topic, subscription_name, conf)
750        elif isinstance(topic, list):
751            # List of topics
752            c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
753        elif isinstance(topic, _retype):
754            # Regex pattern
755            c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
756        else:
757            raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
758
759        c._client = self
760        c._schema = schema
761        self._consumers.append(c)
762        return c

Subscribe to the given topic and subscription combination.

Args

  • topic: The name of the topic, list of topics or regex pattern. This method will accept these forms: - topic='my-topic' - topic=['topic-1', 'topic-2', 'topic-3'] - topic=re.compile('persistent://public/default/topic-*')
  • subscription: The name of the subscription.

Options

  • consumer_type: Select the subscription type to be used when subscribing to the topic.
  • schema: Define the schema of the data that will be received by this consumer.
  • message_listener: Sets a message listener for the consumer. When the listener is set, the application will receive messages through it. Calls to consumer.receive() will not be allowed. The listener function needs to accept (consumer, message), for example:

    #!python
    def my_listener(consumer, message):
        # process message
        consumer.acknowledge(message)
    
  • receiver_queue_size: Sets the size of the consumer receive queue. The consumer receive queue controls how many messages can be accumulated by the consumer before the application calls receive(). Using a higher value could potentially increase the consumer throughput at the expense of higher memory utilization. Setting the consumer queue size to zero decreases the throughput of the consumer by disabling pre-fetching of messages. This approach improves the message distribution on shared subscription by pushing messages only to those consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used if the consumer queue size is zero. The receive() function call should not be interrupted when the consumer queue size is zero. The default value is 1000 messages and should work well for most use cases.

  • max_total_receiver_queue_size_across_partitions Set the max total receiver queue size across partitions. This setting will be used to reduce the receiver queue size for individual partitions
  • consumer_name: Sets the consumer name.
  • unacked_messages_timeout_ms: Sets the timeout in milliseconds for unacknowledged messages. The timeout needs to be greater than 10 seconds. An exception is thrown if the given value is less than 10 seconds. If a successful acknowledgement is not sent within the timeout, all the unacknowledged messages are redelivered.
  • negative_ack_redelivery_delay_ms: The delay after which to redeliver the messages that failed to be processed (with the consumer.negative_acknowledge())
  • broker_consumer_stats_cache_time_ms: Sets the time duration for which the broker-side consumer stats will be cached in the client.
  • is_read_compacted: Selects whether to read the compacted version of the topic
  • properties: Sets the properties for the consumer. The properties associated with a consumer can be used for identify a consumer at broker side.
  • pattern_auto_discovery_period: Periods of seconds for consumer to auto discover match topics.
  • initial_position: Set the initial position of a consumer when subscribing to the topic. It could be either: InitialPosition.Earliest or InitialPosition.Latest. Default: Latest.
  • crypto_key_reader: Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer
  • replicate_subscription_state_enabled: Set whether the subscription status should be replicated. Default: False.
def create_reader( self, topic, start_message_id, schema=<pulsar.schema.schema.BytesSchema object>, reader_listener=None, receiver_queue_size=1000, reader_name=None, subscription_role_prefix=None, is_read_compacted=False, crypto_key_reader=None)
764    def create_reader(self, topic, start_message_id,
765                      schema=schema.BytesSchema(),
766                      reader_listener=None,
767                      receiver_queue_size=1000,
768                      reader_name=None,
769                      subscription_role_prefix=None,
770                      is_read_compacted=False,
771                      crypto_key_reader=None
772                      ):
773        """
774        Create a reader on a particular topic
775
776        **Args**
777
778        * `topic`: The name of the topic.
779        * `start_message_id`: The initial reader positioning is done by specifying a message id.
780           The options are:
781            * `MessageId.earliest`: Start reading from the earliest message available in the topic
782            * `MessageId.latest`: Start reading from the end topic, only getting messages published
783               after the reader was created
784            * `MessageId`: When passing a particular message id, the reader will position itself on
785               that specific position. The first message to be read will be the message next to the
786               specified messageId. Message id can be serialized into a string and deserialized
787               back into a `MessageId` object:
788
789                   # Serialize to string
790                   s = msg.message_id().serialize()
791
792                   # Deserialize from string
793                   msg_id = MessageId.deserialize(s)
794
795        **Options**
796
797        * `schema`:
798           Define the schema of the data that will be received by this reader.
799        * `reader_listener`:
800          Sets a message listener for the reader. When the listener is set,
801          the application will receive messages through it. Calls to
802          `reader.read_next()` will not be allowed. The listener function needs
803          to accept (reader, message), for example:
804
805                def my_listener(reader, message):
806                    # process message
807                    pass
808
809        * `receiver_queue_size`:
810          Sets the size of the reader receive queue. The reader receive
811          queue controls how many messages can be accumulated by the reader
812          before the application calls `read_next()`. Using a higher value could
813          potentially increase the reader throughput at the expense of higher
814          memory utilization.
815        * `reader_name`:
816          Sets the reader name.
817        * `subscription_role_prefix`:
818          Sets the subscription role prefix.
819        * `is_read_compacted`:
820          Selects whether to read the compacted version of the topic
821        * crypto_key_reader:
822           Symmetric encryption class implementation, configuring public key encryption messages for the producer
823           and private key decryption messages for the consumer
824        """
825        _check_type(str, topic, 'topic')
826        _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
827        _check_type(_schema.Schema, schema, 'schema')
828        _check_type(int, receiver_queue_size, 'receiver_queue_size')
829        _check_type_or_none(str, reader_name, 'reader_name')
830        _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
831        _check_type(bool, is_read_compacted, 'is_read_compacted')
832        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
833
834        conf = _pulsar.ReaderConfiguration()
835        if reader_listener:
836            conf.reader_listener(_listener_wrapper(reader_listener, schema))
837        conf.receiver_queue_size(receiver_queue_size)
838        if reader_name:
839            conf.reader_name(reader_name)
840        if subscription_role_prefix:
841            conf.subscription_role_prefix(subscription_role_prefix)
842        conf.schema(schema.schema_info())
843        conf.read_compacted(is_read_compacted)
844        if crypto_key_reader:
845            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
846
847        c = Reader()
848        c._reader = self._client.create_reader(topic, start_message_id, conf)
849        c._client = self
850        c._schema = schema
851        self._consumers.append(c)
852        return c

Create a reader on a particular topic

Args

  • topic: The name of the topic.
  • start_message_id: The initial reader positioning is done by specifying a message id. The options are:

    • MessageId.earliest: Start reading from the earliest message available in the topic
    • MessageId.latest: Start reading from the end topic, only getting messages published after the reader was created
    • MessageId: When passing a particular message id, the reader will position itself on that specific position. The first message to be read will be the message next to the specified messageId. Message id can be serialized into a string and deserialized back into a MessageId object:

      # Serialize to string s = msg.message_id().serialize()

      # Deserialize from string msg_id = MessageId.deserialize(s)

Options

  • schema: Define the schema of the data that will be received by this reader.
  • reader_listener: Sets a message listener for the reader. When the listener is set, the application will receive messages through it. Calls to reader.read_next() will not be allowed. The listener function needs to accept (reader, message), for example:

    def my_listener(reader, message):
        # process message
        pass
    
  • receiver_queue_size: Sets the size of the reader receive queue. The reader receive queue controls how many messages can be accumulated by the reader before the application calls read_next(). Using a higher value could potentially increase the reader throughput at the expense of higher memory utilization.

  • reader_name: Sets the reader name.
  • subscription_role_prefix: Sets the subscription role prefix.
  • is_read_compacted: Selects whether to read the compacted version of the topic
  • crypto_key_reader: Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer
def get_topic_partitions(self, topic)
854    def get_topic_partitions(self, topic):
855        """
856        Get the list of partitions for a given topic.
857
858        If the topic is partitioned, this will return a list of partition names. If the topic is not
859        partitioned, the returned list will contain the topic name itself.
860
861        This can be used to discover the partitions and create Reader, Consumer or Producer
862        instances directly on a particular partition.
863        :param topic: the topic name to lookup
864        :return: a list of partition name
865        """
866        _check_type(str, topic, 'topic')
867        return self._client.get_topic_partitions(topic)

Get the list of partitions for a given topic.

If the topic is partitioned, this will return a list of partition names. If the topic is not partitioned, the returned list will contain the topic name itself.

This can be used to discover the partitions and create Reader, Consumer or Producer instances directly on a particular partition.

Parameters
  • topic: the topic name to lookup
Returns

a list of partition name

def shutdown(self)
869    def shutdown(self):
870        """
871        Perform immediate shutdown of Pulsar client.
872
873        Release all resources and close all producer, consumer, and readers without waiting
874        for ongoing operations to complete.
875        """
876        self._client.shutdown()

Perform immediate shutdown of Pulsar client.

Release all resources and close all producer, consumer, and readers without waiting for ongoing operations to complete.

def close(self)
878    def close(self):
879        """
880        Close the client and all the associated producers and consumers
881        """
882        self._client.close()

Close the client and all the associated producers and consumers

class Producer:
 885class Producer:
 886    """
 887    The Pulsar message producer, used to publish messages on a topic.
 888    """
 889
 890    def topic(self):
 891        """
 892        Return the topic which producer is publishing to
 893        """
 894        return self._producer.topic()
 895
 896    def producer_name(self):
 897        """
 898        Return the producer name which could have been assigned by the
 899        system or specified by the client
 900        """
 901        return self._producer.producer_name()
 902
 903    def last_sequence_id(self):
 904        """
 905        Get the last sequence id that was published by this producer.
 906
 907        This represent either the automatically assigned or custom sequence id
 908        (set on the `MessageBuilder`) that was published and acknowledged by the broker.
 909
 910        After recreating a producer with the same producer name, this will return the
 911        last message that was published in the previous producer session, or -1 if
 912        there no message was ever published.
 913        """
 914        return self._producer.last_sequence_id()
 915
 916    def send(self, content,
 917             properties=None,
 918             partition_key=None,
 919             sequence_id=None,
 920             replication_clusters=None,
 921             disable_replication=False,
 922             event_timestamp=None,
 923             deliver_at=None,
 924             deliver_after=None,
 925             ):
 926        """
 927        Publish a message on the topic. Blocks until the message is acknowledged
 928
 929        Returns a `MessageId` object that represents where the message is persisted.
 930
 931        **Args**
 932
 933        * `content`:
 934          A `bytes` object with the message payload.
 935
 936        **Options**
 937
 938        * `properties`:
 939          A dict of application-defined string properties.
 940        * `partition_key`:
 941          Sets the partition key for message routing. A hash of this key is used
 942          to determine the message's topic partition.
 943        * `sequence_id`:
 944          Specify a custom sequence id for the message being published.
 945        * `replication_clusters`:
 946          Override namespace replication clusters. Note that it is the caller's
 947          responsibility to provide valid cluster names and that all clusters
 948          have been previously configured as topics. Given an empty list,
 949          the message will replicate according to the namespace configuration.
 950        * `disable_replication`:
 951          Do not replicate this message.
 952        * `event_timestamp`:
 953          Timestamp in millis of the timestamp of event creation
 954        * `deliver_at`:
 955          Specify the this message should not be delivered earlier than the
 956          specified timestamp.
 957          The timestamp is milliseconds and based on UTC
 958        * `deliver_after`:
 959          Specify a delay in timedelta for the delivery of the messages.
 960
 961        """
 962        msg = self._build_msg(content, properties, partition_key, sequence_id,
 963                              replication_clusters, disable_replication, event_timestamp,
 964                              deliver_at, deliver_after)
 965        return MessageId.deserialize(self._producer.send(msg))
 966
 967    def send_async(self, content, callback,
 968                   properties=None,
 969                   partition_key=None,
 970                   sequence_id=None,
 971                   replication_clusters=None,
 972                   disable_replication=False,
 973                   event_timestamp=None,
 974                   deliver_at=None,
 975                   deliver_after=None,
 976                   ):
 977        """
 978        Send a message asynchronously.
 979
 980        The `callback` will be invoked once the message has been acknowledged
 981        by the broker.
 982
 983        Example:
 984
 985            #!python
 986            def callback(res, msg_id):
 987                print('Message published: %s' % res)
 988
 989            producer.send_async(msg, callback)
 990
 991        When the producer queue is full, by default the message will be rejected
 992        and the callback invoked with an error code.
 993
 994        **Args**
 995
 996        * `content`:
 997          A `bytes` object with the message payload.
 998
 999        **Options**
1000
1001        * `properties`:
1002          A dict of application0-defined string properties.
1003        * `partition_key`:
1004          Sets the partition key for the message routing. A hash of this key is
1005          used to determine the message's topic partition.
1006        * `sequence_id`:
1007          Specify a custom sequence id for the message being published.
1008        * `replication_clusters`: Override namespace replication clusters. Note
1009          that it is the caller's responsibility to provide valid cluster names
1010          and that all clusters have been previously configured as topics.
1011          Given an empty list, the message will replicate per the namespace
1012          configuration.
1013        * `disable_replication`:
1014          Do not replicate this message.
1015        * `event_timestamp`:
1016          Timestamp in millis of the timestamp of event creation
1017        * `deliver_at`:
1018          Specify the this message should not be delivered earlier than the
1019          specified timestamp.
1020          The timestamp is milliseconds and based on UTC
1021        * `deliver_after`:
1022          Specify a delay in timedelta for the delivery of the messages.
1023        """
1024        msg = self._build_msg(content, properties, partition_key, sequence_id,
1025                              replication_clusters, disable_replication, event_timestamp,
1026                              deliver_at, deliver_after)
1027        self._producer.send_async(msg, callback)
1028
1029
1030    def flush(self):
1031        """
1032        Flush all the messages buffered in the client and wait until all messages have been
1033        successfully persisted
1034        """
1035        self._producer.flush()
1036
1037
1038    def close(self):
1039        """
1040        Close the producer.
1041        """
1042        self._producer.close()
1043
1044    def _build_msg(self, content, properties, partition_key, sequence_id,
1045                   replication_clusters, disable_replication, event_timestamp,
1046                   deliver_at, deliver_after):
1047        data = self._schema.encode(content)
1048
1049        _check_type(bytes, data, 'data')
1050        _check_type_or_none(dict, properties, 'properties')
1051        _check_type_or_none(str, partition_key, 'partition_key')
1052        _check_type_or_none(int, sequence_id, 'sequence_id')
1053        _check_type_or_none(list, replication_clusters, 'replication_clusters')
1054        _check_type(bool, disable_replication, 'disable_replication')
1055        _check_type_or_none(int, event_timestamp, 'event_timestamp')
1056        _check_type_or_none(int, deliver_at, 'deliver_at')
1057        _check_type_or_none(timedelta, deliver_after, 'deliver_after')
1058
1059        mb = _pulsar.MessageBuilder()
1060        mb.content(data)
1061        if properties:
1062            for k, v in properties.items():
1063                mb.property(k, v)
1064        if partition_key:
1065            mb.partition_key(partition_key)
1066        if sequence_id:
1067            mb.sequence_id(sequence_id)
1068        if replication_clusters:
1069            mb.replication_clusters(replication_clusters)
1070        if disable_replication:
1071            mb.disable_replication(disable_replication)
1072        if event_timestamp:
1073            mb.event_timestamp(event_timestamp)
1074        if deliver_at:
1075            mb.deliver_at(deliver_at)
1076        if deliver_after:
1077            mb.deliver_after(deliver_after)
1078
1079        return mb.build()
1080
1081    def is_connected(self):
1082        """
1083        Check if the producer is connected or not.
1084        """
1085        return self._producer.is_connected()

The Pulsar message producer, used to publish messages on a topic.

Producer()
def topic(self)
890    def topic(self):
891        """
892        Return the topic which producer is publishing to
893        """
894        return self._producer.topic()

Return the topic which producer is publishing to

def producer_name(self)
896    def producer_name(self):
897        """
898        Return the producer name which could have been assigned by the
899        system or specified by the client
900        """
901        return self._producer.producer_name()

Return the producer name which could have been assigned by the system or specified by the client

def last_sequence_id(self)
903    def last_sequence_id(self):
904        """
905        Get the last sequence id that was published by this producer.
906
907        This represent either the automatically assigned or custom sequence id
908        (set on the `MessageBuilder`) that was published and acknowledged by the broker.
909
910        After recreating a producer with the same producer name, this will return the
911        last message that was published in the previous producer session, or -1 if
912        there no message was ever published.
913        """
914        return self._producer.last_sequence_id()

Get the last sequence id that was published by this producer.

This represent either the automatically assigned or custom sequence id (set on the MessageBuilder) that was published and acknowledged by the broker.

After recreating a producer with the same producer name, this will return the last message that was published in the previous producer session, or -1 if there no message was ever published.

def send( self, content, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)
916    def send(self, content,
917             properties=None,
918             partition_key=None,
919             sequence_id=None,
920             replication_clusters=None,
921             disable_replication=False,
922             event_timestamp=None,
923             deliver_at=None,
924             deliver_after=None,
925             ):
926        """
927        Publish a message on the topic. Blocks until the message is acknowledged
928
929        Returns a `MessageId` object that represents where the message is persisted.
930
931        **Args**
932
933        * `content`:
934          A `bytes` object with the message payload.
935
936        **Options**
937
938        * `properties`:
939          A dict of application-defined string properties.
940        * `partition_key`:
941          Sets the partition key for message routing. A hash of this key is used
942          to determine the message's topic partition.
943        * `sequence_id`:
944          Specify a custom sequence id for the message being published.
945        * `replication_clusters`:
946          Override namespace replication clusters. Note that it is the caller's
947          responsibility to provide valid cluster names and that all clusters
948          have been previously configured as topics. Given an empty list,
949          the message will replicate according to the namespace configuration.
950        * `disable_replication`:
951          Do not replicate this message.
952        * `event_timestamp`:
953          Timestamp in millis of the timestamp of event creation
954        * `deliver_at`:
955          Specify the this message should not be delivered earlier than the
956          specified timestamp.
957          The timestamp is milliseconds and based on UTC
958        * `deliver_after`:
959          Specify a delay in timedelta for the delivery of the messages.
960
961        """
962        msg = self._build_msg(content, properties, partition_key, sequence_id,
963                              replication_clusters, disable_replication, event_timestamp,
964                              deliver_at, deliver_after)
965        return MessageId.deserialize(self._producer.send(msg))

Publish a message on the topic. Blocks until the message is acknowledged

Returns a MessageId object that represents where the message is persisted.

Args

  • content: A bytes object with the message payload.

Options

  • properties: A dict of application-defined string properties.
  • partition_key: Sets the partition key for message routing. A hash of this key is used to determine the message's topic partition.
  • sequence_id: Specify a custom sequence id for the message being published.
  • replication_clusters: Override namespace replication clusters. Note that it is the caller's responsibility to provide valid cluster names and that all clusters have been previously configured as topics. Given an empty list, the message will replicate according to the namespace configuration.
  • disable_replication: Do not replicate this message.
  • event_timestamp: Timestamp in millis of the timestamp of event creation
  • deliver_at: Specify the this message should not be delivered earlier than the specified timestamp. The timestamp is milliseconds and based on UTC
  • deliver_after: Specify a delay in timedelta for the delivery of the messages.
def send_async( self, content, callback, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)
 967    def send_async(self, content, callback,
 968                   properties=None,
 969                   partition_key=None,
 970                   sequence_id=None,
 971                   replication_clusters=None,
 972                   disable_replication=False,
 973                   event_timestamp=None,
 974                   deliver_at=None,
 975                   deliver_after=None,
 976                   ):
 977        """
 978        Send a message asynchronously.
 979
 980        The `callback` will be invoked once the message has been acknowledged
 981        by the broker.
 982
 983        Example:
 984
 985            #!python
 986            def callback(res, msg_id):
 987                print('Message published: %s' % res)
 988
 989            producer.send_async(msg, callback)
 990
 991        When the producer queue is full, by default the message will be rejected
 992        and the callback invoked with an error code.
 993
 994        **Args**
 995
 996        * `content`:
 997          A `bytes` object with the message payload.
 998
 999        **Options**
1000
1001        * `properties`:
1002          A dict of application0-defined string properties.
1003        * `partition_key`:
1004          Sets the partition key for the message routing. A hash of this key is
1005          used to determine the message's topic partition.
1006        * `sequence_id`:
1007          Specify a custom sequence id for the message being published.
1008        * `replication_clusters`: Override namespace replication clusters. Note
1009          that it is the caller's responsibility to provide valid cluster names
1010          and that all clusters have been previously configured as topics.
1011          Given an empty list, the message will replicate per the namespace
1012          configuration.
1013        * `disable_replication`:
1014          Do not replicate this message.
1015        * `event_timestamp`:
1016          Timestamp in millis of the timestamp of event creation
1017        * `deliver_at`:
1018          Specify the this message should not be delivered earlier than the
1019          specified timestamp.
1020          The timestamp is milliseconds and based on UTC
1021        * `deliver_after`:
1022          Specify a delay in timedelta for the delivery of the messages.
1023        """
1024        msg = self._build_msg(content, properties, partition_key, sequence_id,
1025                              replication_clusters, disable_replication, event_timestamp,
1026                              deliver_at, deliver_after)
1027        self._producer.send_async(msg, callback)

Send a message asynchronously.

The callback will be invoked once the message has been acknowledged by the broker.

Example:

#!python
def callback(res, msg_id):
    print('Message published: %s' % res)

producer.send_async(msg, callback)

When the producer queue is full, by default the message will be rejected and the callback invoked with an error code.

Args

  • content: A bytes object with the message payload.

Options

  • properties: A dict of application0-defined string properties.
  • partition_key: Sets the partition key for the message routing. A hash of this key is used to determine the message's topic partition.
  • sequence_id: Specify a custom sequence id for the message being published.
  • replication_clusters: Override namespace replication clusters. Note that it is the caller's responsibility to provide valid cluster names and that all clusters have been previously configured as topics. Given an empty list, the message will replicate per the namespace configuration.
  • disable_replication: Do not replicate this message.
  • event_timestamp: Timestamp in millis of the timestamp of event creation
  • deliver_at: Specify the this message should not be delivered earlier than the specified timestamp. The timestamp is milliseconds and based on UTC
  • deliver_after: Specify a delay in timedelta for the delivery of the messages.
def flush(self)
1030    def flush(self):
1031        """
1032        Flush all the messages buffered in the client and wait until all messages have been
1033        successfully persisted
1034        """
1035        self._producer.flush()

Flush all the messages buffered in the client and wait until all messages have been successfully persisted

def close(self)
1038    def close(self):
1039        """
1040        Close the producer.
1041        """
1042        self._producer.close()

Close the producer.

def is_connected(self)
1081    def is_connected(self):
1082        """
1083        Check if the producer is connected or not.
1084        """
1085        return self._producer.is_connected()

Check if the producer is connected or not.

class Consumer:
1088class Consumer:
1089    """
1090    Pulsar consumer.
1091    """
1092
1093    def topic(self):
1094        """
1095        Return the topic this consumer is subscribed to.
1096        """
1097        return self._consumer.topic()
1098
1099    def subscription_name(self):
1100        """
1101        Return the subscription name.
1102        """
1103        return self._consumer.subscription_name()
1104
1105    def unsubscribe(self):
1106        """
1107        Unsubscribe the current consumer from the topic.
1108
1109        This method will block until the operation is completed. Once the
1110        consumer is unsubscribed, no more messages will be received and
1111        subsequent new messages will not be retained for this consumer.
1112
1113        This consumer object cannot be reused.
1114        """
1115        return self._consumer.unsubscribe()
1116
1117    def receive(self, timeout_millis=None):
1118        """
1119        Receive a single message.
1120
1121        If a message is not immediately available, this method will block until
1122        a new message is available.
1123
1124        **Options**
1125
1126        * `timeout_millis`:
1127          If specified, the receive will raise an exception if a message is not
1128          available within the timeout.
1129        """
1130        if timeout_millis is None:
1131            msg = self._consumer.receive()
1132        else:
1133            _check_type(int, timeout_millis, 'timeout_millis')
1134            msg = self._consumer.receive(timeout_millis)
1135
1136        m = Message()
1137        m._message = msg
1138        m._schema = self._schema
1139        return m
1140
1141    def acknowledge(self, message):
1142        """
1143        Acknowledge the reception of a single message.
1144
1145        This method will block until an acknowledgement is sent to the broker.
1146        After that, the message will not be re-delivered to this consumer.
1147
1148        **Args**
1149
1150        * `message`:
1151          The received message or message id.
1152        """
1153        if isinstance(message, Message):
1154            self._consumer.acknowledge(message._message)
1155        else:
1156            self._consumer.acknowledge(message)
1157
1158    def acknowledge_cumulative(self, message):
1159        """
1160        Acknowledge the reception of all the messages in the stream up to (and
1161        including) the provided message.
1162
1163        This method will block until an acknowledgement is sent to the broker.
1164        After that, the messages will not be re-delivered to this consumer.
1165
1166        **Args**
1167
1168        * `message`:
1169          The received message or message id.
1170        """
1171        if isinstance(message, Message):
1172            self._consumer.acknowledge_cumulative(message._message)
1173        else:
1174            self._consumer.acknowledge_cumulative(message)
1175
1176    def negative_acknowledge(self, message):
1177        """
1178        Acknowledge the failure to process a single message.
1179
1180        When a message is "negatively acked" it will be marked for redelivery after
1181        some fixed delay. The delay is configurable when constructing the consumer
1182        with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
1183
1184        This call is not blocking.
1185
1186        **Args**
1187
1188        * `message`:
1189          The received message or message id.
1190        """
1191        if isinstance(message, Message):
1192            self._consumer.negative_acknowledge(message._message)
1193        else:
1194            self._consumer.negative_acknowledge(message)
1195
1196    def pause_message_listener(self):
1197        """
1198        Pause receiving messages via the `message_listener` until
1199        `resume_message_listener()` is called.
1200        """
1201        self._consumer.pause_message_listener()
1202
1203    def resume_message_listener(self):
1204        """
1205        Resume receiving the messages via the message listener.
1206        Asynchronously receive all the messages enqueued from the time
1207        `pause_message_listener()` was called.
1208        """
1209        self._consumer.resume_message_listener()
1210
1211    def redeliver_unacknowledged_messages(self):
1212        """
1213        Redelivers all the unacknowledged messages. In failover mode, the
1214        request is ignored if the consumer is not active for the given topic. In
1215        shared mode, the consumer's messages to be redelivered are distributed
1216        across all the connected consumers. This is a non-blocking call and
1217        doesn't throw an exception. In case the connection breaks, the messages
1218        are redelivered after reconnect.
1219        """
1220        self._consumer.redeliver_unacknowledged_messages()
1221
1222    def seek(self, messageid):
1223        """
1224        Reset the subscription associated with this consumer to a specific message id or publish timestamp.
1225        The message id can either be a specific message or represent the first or last messages in the topic.
1226        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
1227        seek() on the individual partitions.
1228
1229        **Args**
1230
1231        * `message`:
1232          The message id for seek, OR an integer event time to seek to
1233        """
1234        self._consumer.seek(messageid)
1235
1236    def close(self):
1237        """
1238        Close the consumer.
1239        """
1240        self._consumer.close()
1241        self._client._consumers.remove(self)
1242
1243    def is_connected(self):
1244        """
1245        Check if the consumer is connected or not.
1246        """
1247        return self._consumer.is_connected()

Pulsar consumer.

Consumer()
def topic(self)
1093    def topic(self):
1094        """
1095        Return the topic this consumer is subscribed to.
1096        """
1097        return self._consumer.topic()

Return the topic this consumer is subscribed to.

def subscription_name(self)
1099    def subscription_name(self):
1100        """
1101        Return the subscription name.
1102        """
1103        return self._consumer.subscription_name()

Return the subscription name.

def unsubscribe(self)
1105    def unsubscribe(self):
1106        """
1107        Unsubscribe the current consumer from the topic.
1108
1109        This method will block until the operation is completed. Once the
1110        consumer is unsubscribed, no more messages will be received and
1111        subsequent new messages will not be retained for this consumer.
1112
1113        This consumer object cannot be reused.
1114        """
1115        return self._consumer.unsubscribe()

Unsubscribe the current consumer from the topic.

This method will block until the operation is completed. Once the consumer is unsubscribed, no more messages will be received and subsequent new messages will not be retained for this consumer.

This consumer object cannot be reused.

def receive(self, timeout_millis=None)
1117    def receive(self, timeout_millis=None):
1118        """
1119        Receive a single message.
1120
1121        If a message is not immediately available, this method will block until
1122        a new message is available.
1123
1124        **Options**
1125
1126        * `timeout_millis`:
1127          If specified, the receive will raise an exception if a message is not
1128          available within the timeout.
1129        """
1130        if timeout_millis is None:
1131            msg = self._consumer.receive()
1132        else:
1133            _check_type(int, timeout_millis, 'timeout_millis')
1134            msg = self._consumer.receive(timeout_millis)
1135
1136        m = Message()
1137        m._message = msg
1138        m._schema = self._schema
1139        return m

Receive a single message.

If a message is not immediately available, this method will block until a new message is available.

Options

  • timeout_millis: If specified, the receive will raise an exception if a message is not available within the timeout.
def acknowledge(self, message)
1141    def acknowledge(self, message):
1142        """
1143        Acknowledge the reception of a single message.
1144
1145        This method will block until an acknowledgement is sent to the broker.
1146        After that, the message will not be re-delivered to this consumer.
1147
1148        **Args**
1149
1150        * `message`:
1151          The received message or message id.
1152        """
1153        if isinstance(message, Message):
1154            self._consumer.acknowledge(message._message)
1155        else:
1156            self._consumer.acknowledge(message)

Acknowledge the reception of a single message.

This method will block until an acknowledgement is sent to the broker. After that, the message will not be re-delivered to this consumer.

Args

  • message: The received message or message id.
def acknowledge_cumulative(self, message)
1158    def acknowledge_cumulative(self, message):
1159        """
1160        Acknowledge the reception of all the messages in the stream up to (and
1161        including) the provided message.
1162
1163        This method will block until an acknowledgement is sent to the broker.
1164        After that, the messages will not be re-delivered to this consumer.
1165
1166        **Args**
1167
1168        * `message`:
1169          The received message or message id.
1170        """
1171        if isinstance(message, Message):
1172            self._consumer.acknowledge_cumulative(message._message)
1173        else:
1174            self._consumer.acknowledge_cumulative(message)

Acknowledge the reception of all the messages in the stream up to (and including) the provided message.

This method will block until an acknowledgement is sent to the broker. After that, the messages will not be re-delivered to this consumer.

Args

  • message: The received message or message id.
def negative_acknowledge(self, message)
1176    def negative_acknowledge(self, message):
1177        """
1178        Acknowledge the failure to process a single message.
1179
1180        When a message is "negatively acked" it will be marked for redelivery after
1181        some fixed delay. The delay is configurable when constructing the consumer
1182        with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
1183
1184        This call is not blocking.
1185
1186        **Args**
1187
1188        * `message`:
1189          The received message or message id.
1190        """
1191        if isinstance(message, Message):
1192            self._consumer.negative_acknowledge(message._message)
1193        else:
1194            self._consumer.negative_acknowledge(message)

Acknowledge the failure to process a single message.

When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.

This call is not blocking.

Args

  • message: The received message or message id.
def pause_message_listener(self)
1196    def pause_message_listener(self):
1197        """
1198        Pause receiving messages via the `message_listener` until
1199        `resume_message_listener()` is called.
1200        """
1201        self._consumer.pause_message_listener()

Pause receiving messages via the message_listener until resume_message_listener() is called.

def resume_message_listener(self)
1203    def resume_message_listener(self):
1204        """
1205        Resume receiving the messages via the message listener.
1206        Asynchronously receive all the messages enqueued from the time
1207        `pause_message_listener()` was called.
1208        """
1209        self._consumer.resume_message_listener()

Resume receiving the messages via the message listener. Asynchronously receive all the messages enqueued from the time pause_message_listener() was called.

def redeliver_unacknowledged_messages(self)
1211    def redeliver_unacknowledged_messages(self):
1212        """
1213        Redelivers all the unacknowledged messages. In failover mode, the
1214        request is ignored if the consumer is not active for the given topic. In
1215        shared mode, the consumer's messages to be redelivered are distributed
1216        across all the connected consumers. This is a non-blocking call and
1217        doesn't throw an exception. In case the connection breaks, the messages
1218        are redelivered after reconnect.
1219        """
1220        self._consumer.redeliver_unacknowledged_messages()

Redelivers all the unacknowledged messages. In failover mode, the request is ignored if the consumer is not active for the given topic. In shared mode, the consumer's messages to be redelivered are distributed across all the connected consumers. This is a non-blocking call and doesn't throw an exception. In case the connection breaks, the messages are redelivered after reconnect.

def seek(self, messageid)
1222    def seek(self, messageid):
1223        """
1224        Reset the subscription associated with this consumer to a specific message id or publish timestamp.
1225        The message id can either be a specific message or represent the first or last messages in the topic.
1226        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
1227        seek() on the individual partitions.
1228
1229        **Args**
1230
1231        * `message`:
1232          The message id for seek, OR an integer event time to seek to
1233        """
1234        self._consumer.seek(messageid)

Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.

Args

  • message: The message id for seek, OR an integer event time to seek to
def close(self)
1236    def close(self):
1237        """
1238        Close the consumer.
1239        """
1240        self._consumer.close()
1241        self._client._consumers.remove(self)

Close the consumer.

def is_connected(self)
1243    def is_connected(self):
1244        """
1245        Check if the consumer is connected or not.
1246        """
1247        return self._consumer.is_connected()

Check if the consumer is connected or not.

class Reader:
1251class Reader:
1252    """
1253    Pulsar topic reader.
1254    """
1255
1256    def topic(self):
1257        """
1258        Return the topic this reader is reading from.
1259        """
1260        return self._reader.topic()
1261
1262    def read_next(self, timeout_millis=None):
1263        """
1264        Read a single message.
1265
1266        If a message is not immediately available, this method will block until
1267        a new message is available.
1268
1269        **Options**
1270
1271        * `timeout_millis`:
1272          If specified, the receive will raise an exception if a message is not
1273          available within the timeout.
1274        """
1275        if timeout_millis is None:
1276            msg = self._reader.read_next()
1277        else:
1278            _check_type(int, timeout_millis, 'timeout_millis')
1279            msg = self._reader.read_next(timeout_millis)
1280
1281        m = Message()
1282        m._message = msg
1283        m._schema = self._schema
1284        return m
1285
1286    def has_message_available(self):
1287        """
1288        Check if there is any message available to read from the current position.
1289        """
1290        return self._reader.has_message_available();
1291
1292    def seek(self, messageid):
1293        """
1294        Reset this reader to a specific message id or publish timestamp.
1295        The message id can either be a specific message or represent the first or last messages in the topic.
1296        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
1297        seek() on the individual partitions.
1298
1299        **Args**
1300
1301        * `message`:
1302          The message id for seek, OR an integer event time to seek to
1303        """
1304        self._reader.seek(messageid)
1305
1306    def close(self):
1307        """
1308        Close the reader.
1309        """
1310        self._reader.close()
1311        self._client._consumers.remove(self)
1312
1313    def is_connected(self):
1314        """
1315        Check if the reader is connected or not.
1316        """
1317        return self._reader.is_connected()

Pulsar topic reader.

Reader()
def topic(self)
1256    def topic(self):
1257        """
1258        Return the topic this reader is reading from.
1259        """
1260        return self._reader.topic()

Return the topic this reader is reading from.

def read_next(self, timeout_millis=None)
1262    def read_next(self, timeout_millis=None):
1263        """
1264        Read a single message.
1265
1266        If a message is not immediately available, this method will block until
1267        a new message is available.
1268
1269        **Options**
1270
1271        * `timeout_millis`:
1272          If specified, the receive will raise an exception if a message is not
1273          available within the timeout.
1274        """
1275        if timeout_millis is None:
1276            msg = self._reader.read_next()
1277        else:
1278            _check_type(int, timeout_millis, 'timeout_millis')
1279            msg = self._reader.read_next(timeout_millis)
1280
1281        m = Message()
1282        m._message = msg
1283        m._schema = self._schema
1284        return m

Read a single message.

If a message is not immediately available, this method will block until a new message is available.

Options

  • timeout_millis: If specified, the receive will raise an exception if a message is not available within the timeout.
def has_message_available(self)
1286    def has_message_available(self):
1287        """
1288        Check if there is any message available to read from the current position.
1289        """
1290        return self._reader.has_message_available();

Check if there is any message available to read from the current position.

def seek(self, messageid)
1292    def seek(self, messageid):
1293        """
1294        Reset this reader to a specific message id or publish timestamp.
1295        The message id can either be a specific message or represent the first or last messages in the topic.
1296        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
1297        seek() on the individual partitions.
1298
1299        **Args**
1300
1301        * `message`:
1302          The message id for seek, OR an integer event time to seek to
1303        """
1304        self._reader.seek(messageid)

Reset this reader to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.

Args

  • message: The message id for seek, OR an integer event time to seek to
def close(self)
1306    def close(self):
1307        """
1308        Close the reader.
1309        """
1310        self._reader.close()
1311        self._client._consumers.remove(self)

Close the reader.

def is_connected(self)
1313    def is_connected(self):
1314        """
1315        Check if the reader is connected or not.
1316        """
1317        return self._reader.is_connected()

Check if the reader is connected or not.

class CryptoKeyReader:
1320class CryptoKeyReader:
1321    """
1322    Default crypto key reader implementation
1323    """
1324    def __init__(self, public_key_path, private_key_path):
1325        """
1326        Create crypto key reader.
1327
1328        **Args**
1329
1330        * `public_key_path`: Path to the public key
1331        * `private_key_path`: Path to private key
1332        """
1333        _check_type(str, public_key_path, 'public_key_path')
1334        _check_type(str, private_key_path, 'private_key_path')
1335        self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)

Default crypto key reader implementation

CryptoKeyReader(public_key_path, private_key_path)
1324    def __init__(self, public_key_path, private_key_path):
1325        """
1326        Create crypto key reader.
1327
1328        **Args**
1329
1330        * `public_key_path`: Path to the public key
1331        * `private_key_path`: Path to private key
1332        """
1333        _check_type(str, public_key_path, 'public_key_path')
1334        _check_type(str, private_key_path, 'private_key_path')
1335        self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)

Create crypto key reader.

Args

  • public_key_path: Path to the public key
  • private_key_path: Path to private key