class documentation

The Pulsar client. A single client instance can be used to create producers and consumers on multiple topics.

The client will share the same connection pool and threads across all producers and consumers.

Method __init__ Create a new Pulsar client instance.
Method close Close the client and all the associated producers and consumers
Method create_producer Create a new producer on a given topic.
Method create_reader Create a reader on a particular topic
Method get_topic_partitions Get the list of partitions for a given topic.
Method shutdown Perform immediate shutdown of Pulsar client.
Method subscribe Subscribe to the given topic and subscription combination.
Static Method _prepare_logger Undocumented
Instance Variable _client Undocumented
Instance Variable _consumers Undocumented
def __init__(self, service_url, authentication=None, operation_timeout_seconds=30, io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, tls_validate_hostname=False, logger=None, connection_timeout_ms=10000, listener_name=None): (source)

Create a new Pulsar client instance.

Parameters
service_url:strThe Pulsar service url eg: pulsar://my-broker.com:6650/
authentication:Authentication, optional

Set the authentication provider to be used with the broker. Supported methods:

operation_timeout_seconds:int, default 30Set timeout on client operations (subscribe, create producer, close, unsubscribe).
io_threads:int, default 1Set the number of IO threads to be used by the Pulsar client.
message_listener_threads:int, default 1Set the number of threads to be used by the Pulsar client when delivering messages through message listener. The default is 1 thread per Pulsar client. If using more than 1 thread, messages for distinct message_listener``s will be delivered in different threads, however a single ``MessageListener will always be assigned to the same thread.
concurrent_lookup_requests:int, default 50000Number of concurrent lookup-requests allowed on each broker connection to prevent overload on the broker.
log_conf_file_path:str, optionalThis parameter is deprecated and makes no effect. It's retained only for compatibility. Use logger to customize a logger.
use_tls:bool, default FalseConfigure whether to use TLS encryption on the connection. This setting is deprecated. TLS will be automatically enabled if the serviceUrl is set to pulsar+ssl:// or https://
tls_trust_certs_file_path:str, optionalSet the path to the trusted TLS certificate file. If empty defaults to certifi.
tls_allow_insecure_connection:bool, default FalseConfigure whether the Pulsar client accepts untrusted TLS certificates from the broker.
tls_validate_hostname:bool, default FalseConfigure whether the Pulsar client validates that the hostname of the endpoint, matches the common name on the TLS certificate presented by the endpoint.
logger:optional

Set a Python logger for this Pulsar client. Should be an instance of logging.Logger. It should be noted that if the Python logger is configured, during the termination of the Python interpreter, the Python logger will be unavailable and the default logger will be used for logging. To avoid strange behavior, you'd better delete all instances explicitly before exiting.

import logging
client = Client(service_url, logger=logging.getLogger('pulsar'))
producer = client.create_producer(topic)
# ...
del producer
del client
connection_timeout_ms:int, default 10000Set timeout in milliseconds on TCP connections.
listener_name:str, optionalListener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible. advertisedListeners must be enabled in broker side.
def close(self): (source)

Close the client and all the associated producers and consumers

def create_producer(self, topic, producer_name=None, schema=schema.BytesSchema(), initial_sequence_id=None, send_timeout_millis=30000, compression_type: CompressionType = CompressionType.NONE, max_pending_messages=1000, max_pending_messages_across_partitions=50000, block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000, batching_max_allowed_size_in_bytes=128*1024, batching_max_publish_delay_ms=10, chunking_enabled=False, message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers=False, properties=None, batching_type: BatchingType = BatchingType.Default, encryption_key=None, crypto_key_reader: Union[None, CryptoKeyReader] = None, access_mode: ProducerAccessMode = ProducerAccessMode.Shared): (source)

Create a new producer on a given topic.

Parameters
topic:strThe topic name
producer_name:str, optionalSpecify a name for the producer. If not assigned, the system will generate a globally unique name which can be accessed with Producer.producer_name(). When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters.
schema:pulsar.schema.Schema, default pulsar.schema.BytesSchema

Define the schema of the data that will be published by this producer, e.g, schema=JsonSchema(MyRecordClass).

The schema will be used for two purposes:
  • Validate the data format against the topic defined schema
  • Perform serialization/deserialization between data and objects
initial_sequence_id:int, optionalSet the baseline for the sequence ids for messages published by the producer. First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified.
send_timeout_millis:int, default 30000If a message is not acknowledged by the server before the send_timeout expires, an error will be reported.
compression_type:CompressionType, default CompressionType.NONE

Set the compression type for the producer. By default, message payloads are not compressed.

Supported compression types:

  • CompressionType.LZ4
  • CompressionType.ZLib
  • CompressionType.ZSTD
  • CompressionType.SNAPPY

ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that release in order to be able to receive messages compressed with ZSTD.

SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to be able to receive messages compressed with SNAPPY.

max_pending_messages:int, default 1000Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
max_pending_messages_across_partitions:int, default 50000Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions from the broker.
block_if_queue_full:bool, default FalseSet whether send_async operations should block when the outgoing message queue is full.
batching_enabled:bool, default FalseWhen automatic batching is enabled, multiple calls to send can result in a single batch to be sent to the broker, leading to better throughput, especially when publishing small messages. All messages in a batch will be published as a single batched message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.
batching_max_messages:int, default 1000When you set this option to a value greater than 1, messages are queued until this threshold or batching_max_allowed_size_in_bytes is reached or batch interval has elapsed.
batching_max_allowed_size_in_bytes:int, default 128*1024When you set this option to a value greater than 1, messages are queued until this threshold or batching_max_messages is reached or batch interval has elapsed.
batching_max_publish_delay_ms:int, default 10The batch interval in milliseconds. Queued messages will be sent in batch after this interval even if both the threshold of batching_max_messages and batching_max_allowed_size_in_bytes are not reached.
chunking_enabled:bool, default FalseIf message size is higher than allowed max publish-payload size by broker then chunking_enabled helps producer to split message into multiple chunks and publish them to broker separately and in order. So, it allows client to successfully publish large size of messages in pulsar.
message_routing_mode:PartitionsRoutingMode, default PartitionsRoutingMode.RoundRobinDistribution

Set the message routing mode for the partitioned producer.

Supported modes:

  • PartitionsRoutingMode.RoundRobinDistribution
  • PartitionsRoutingMode.UseSinglePartition
lazy_start_partitioned_producers:bool, default False

This config affects producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition or start lazily on demand. The internal producer of one partition is always started eagerly, chosen by the routing policy, but the internal producers of any additional partitions are started on demand, upon receiving their first message.

Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition routing policy is used without keyed messages. Because producer connection can be on demand, this can produce extra send latency for the first messages of a given partition.

properties:dict, optionalSets the properties for the producer. The properties associated with a producer can be used for identify a producer at broker side.
batching_type:BatchingType, default BatchingType.Default

Sets the batching type for the producer.

There are two batching type: DefaultBatching and KeyBasedBatching.

DefaultBatching will batch single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
... into single batch message:
[(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
KeyBasedBatching will batch incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
... into single batch message:
[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
encryption_key:str, optionalThe key used for symmetric encryption, configured on the producer side
crypto_key_reader:CryptoKeyReader, optionalSymmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer
access_mode:ProducerAccessMode, optional

Set the type of access mode that the producer requires on the topic.

Supported modes:

  • Shared: By default multiple producers can publish on a topic.
  • Exclusive: Require exclusive access for producer.
    Fail immediately if there's already a producer connected.
  • WaitForExclusive: Producer creation is pending until it can acquire exclusive access.
  • ExclusiveWithFencing: Acquire exclusive access for the producer.
    Any existing producer will be removed and invalidated immediately.
def create_reader(self, topic, start_message_id, schema=schema.BytesSchema(), reader_listener=None, receiver_queue_size=1000, reader_name=None, subscription_role_prefix=None, is_read_compacted=False, crypto_key_reader: Union[None, CryptoKeyReader] = None, start_message_id_inclusive=False): (source)

Create a reader on a particular topic

Parameters
topicThe name of the topic.
start_message_id

The initial reader positioning is done by specifying a message id. The options are:

  • MessageId.earliest:

Start reading from the earliest message available in the topic

  • MessageId.latest:

Start reading from the end topic, only getting messages published after the reader was created

  • MessageId:

When passing a particular message id, the reader will position itself on that specific position. The first message to be read will be the message next to the specified messageId. Message id can be serialized into a string and deserialized back into a MessageId object:

# Serialize to string
s = msg.message_id().serialize()

# Deserialize from string
msg_id = MessageId.deserialize(s)
schema:pulsar.schema.Schema, default pulsar.schema.BytesSchemaDefine the schema of the data that will be received by this reader.
reader_listener:optional

Sets a message listener for the reader. When the listener is set, the application will receive messages through it. Calls to reader.read_next() will not be allowed. The listener function needs to accept (reader, message), for example:

def my_listener(reader, message):
    # process message
    pass
receiver_queue_size:int, default 1000Sets the size of the reader receive queue. The reader receive queue controls how many messages can be accumulated by the reader before the application calls read_next(). Using a higher value could potentially increase the reader throughput at the expense of higher memory utilization.
reader_name:str, optionalSets the reader name.
subscription_role_prefix:str, optionalSets the subscription role prefix.
is_read_compacted:bool, default FalseSelects whether to read the compacted version of the topic
crypto_key_reader:CryptoKeyReader, optionalSymmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer
start_message_id_inclusive:bool, default FalseSet the reader to include the startMessageId or given position of any reset operation like Reader.seek
def get_topic_partitions(self, topic): (source)

Get the list of partitions for a given topic.

If the topic is partitioned, this will return a list of partition names. If the topic is not partitioned, the returned list will contain the topic name itself.

This can be used to discover the partitions and create Reader, Consumer or Producer instances directly on a particular partition.

Parameters
topic:strthe topic name to lookup
Returns
lista list of partition name
def shutdown(self): (source)

Perform immediate shutdown of Pulsar client.

Release all resources and close all producer, consumer, and readers without waiting for ongoing operations to complete.

def subscribe(self, topic, subscription_name, consumer_type: ConsumerType = ConsumerType.Exclusive, schema=schema.BytesSchema(), message_listener=None, receiver_queue_size=1000, max_total_receiver_queue_size_across_partitions=50000, consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, properties=None, pattern_auto_discovery_period=60, initial_position: InitialPosition = InitialPosition.Latest, crypto_key_reader: Union[None, CryptoKeyReader] = None, replicate_subscription_state_enabled=False, max_pending_chunked_message=10, auto_ack_oldest_chunked_message_on_queue_full=False, start_message_id_inclusive=False, batch_receive_policy=None, key_shared_policy=None, batch_index_ack_enabled=False, regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None): (source)

Subscribe to the given topic and subscription combination.

Parameters
topicThe name of the topic, list of topics or regex pattern. This method will accept these forms: * topic='my-topic' * topic=['topic-1', 'topic-2', 'topic-3'] * topic=re.compile('persistent://public/default/topic-*')
subscription_name:strThe name of the subscription.
consumer_type:ConsumerType, default ConsumerType.ExclusiveSelect the subscription type to be used when subscribing to the topic.
schema:pulsar.schema.Schema, default pulsar.schema.BytesSchemaDefine the schema of the data that will be received by this consumer.
message_listener:optional

Sets a message listener for the consumer. When the listener is set, the application will receive messages through it. Calls to consumer.receive() will not be allowed. The listener function needs to accept (consumer, message), for example:

def my_listener(consumer, message):
    # process message
    consumer.acknowledge(message)
receiver_queue_size:int, default 1000

Sets the size of the consumer receive queue. The consumer receive queue controls how many messages can be accumulated by the consumer before the application calls receive(). Using a higher value could potentially increase the consumer throughput at the expense of higher memory utilization. Setting the consumer queue size to zero decreases the throughput of the consumer by disabling pre-fetching of messages.

This approach improves the message distribution on shared subscription by pushing messages only to those consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used if the consumer queue size is zero. The receive() function call should not be interrupted when the consumer queue size is zero. The default value is 1000 messages and should work well for most use cases.

max_total_receiver_queue_size_across_partitions:int, default 50000Set the max total receiver queue size across partitions. This setting will be used to reduce the receiver queue size for individual partitions
consumer_name:str, optionalSets the consumer name.
unacked_messages_timeout_ms:int, optionalSets the timeout in milliseconds for unacknowledged messages. The timeout needs to be greater than 10 seconds. An exception is thrown if the given value is less than 10 seconds. If a successful acknowledgement is not sent within the timeout, all the unacknowledged messages are redelivered.
broker_consumer_stats_cache_time_ms:int, default 30000Sets the time duration for which the broker-side consumer stats will be cached in the client.
negative_ack_redelivery_delay_ms:int, default 60000The delay after which to redeliver the messages that failed to be processed (with the consumer.negative_acknowledge())
is_read_compacted:bool, default FalseSelects whether to read the compacted version of the topic
properties:dict, optionalSets the properties for the consumer. The properties associated with a consumer can be used for identify a consumer at broker side.
pattern_auto_discovery_period:int, default 60Periods of seconds for consumer to auto discover match topics.
initial_position:InitialPosition, default InitialPosition.LatestSet the initial position of a consumer when subscribing to the topic. It could be either: InitialPosition.Earliest or InitialPosition.Latest.
crypto_key_reader:CryptoKeyReader, optionalSymmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer
replicate_subscription_state_enabled:bool, default FalseSet whether the subscription status should be replicated.
max_pending_chunked_message:int, default 10

Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream, and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages.

If it's zero, the pending chunked messages will not be limited.

auto_ack_oldest_chunked_message_on_queue_full:bool, default FalseBuffering large number of outstanding uncompleted chunked messages can create memory pressure, and it can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
start_message_id_inclusive:bool, default FalseSet the consumer to include the given position of any reset operation like Consumer::seek.
batch_receive_policy:class ConsumerBatchReceivePolicySet the batch collection policy for batch receiving.
key_shared_policy:class ConsumerKeySharedPolicySet the key shared policy for use when the ConsumerType is KeyShared.
batch_index_ack_enabled:Enable the batch index acknowledgement.It should be noted that this option can only work when the broker side also enables the batch index acknowledgement. See the acknowledgmentAtBatchIndexLevelEnabled config in broker.conf.
regex_subscription_mode:RegexSubscriptionMode, optional

Set the regex subscription mode for use when the topic is a regex pattern.

Supported modes:

  • PersistentOnly: By default only subscribe to persistent topics.
  • NonPersistentOnly: Only subscribe to non-persistent topics.
  • AllTopics: Subscribe to both persistent and non-persistent topics.
dead_letter_policy:class ConsumerDeadLetterPolicySet dead letter policy for consumer. By default, some messages are redelivered many times, even to the extent that they can never be stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged automatically.
@staticmethod
def _prepare_logger(logger): (source)

Undocumented

Undocumented

_consumers: list = (source)

Undocumented