class documentation

The asynchronous version of pulsar.Client.

Method __init__ See pulsar.Client.__init__
Async Method close Close the client and all the associated producers and consumers
Async Method create_producer Create a new producer on a given topic
Async Method get_topic_partitions Get the list of partitions for a given topic in asynchronous mode.
Method shutdown Shutdown the client and all the associated producers and consumers
Async Method subscribe Subscribe to the given topic and subscription combination.
Instance Variable _client Undocumented
def __init__(self, service_url, **kwargs): (source)
async def close(self): (source)

Close the client and all the associated producers and consumers

Raises
PulsarException
async def create_producer(self, topic: str, producer_name: str | None = None, schema: pulsar.schema.Schema | None = None, initial_sequence_id: int | None = None, send_timeout_millis: int = 30000, compression_type: CompressionType = CompressionType.NONE, max_pending_messages: int = 1000, max_pending_messages_across_partitions: int = 50000, block_if_queue_full: bool = False, batching_enabled: bool = True, batching_max_messages: int = 1000, batching_max_allowed_size_in_bytes: int = 128 * 1024, batching_max_publish_delay_ms: int = 10, chunking_enabled: bool = False, message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers: bool = False, properties: dict | None = None, batching_type: BatchingType = BatchingType.Default, encryption_key: str | None = None, crypto_key_reader: pulsar.CryptoKeyReader | None = None, access_mode: ProducerAccessMode = ProducerAccessMode.Shared, message_router: Callable[[pulsar.Message, int], int] | None = None) -> Producer: (source)

Create a new producer on a given topic

Parameters
topic:strThe topic name
producer_name:str|
None
Specify a name for the producer. If not assigned, the system will generate a globally unique name which can be accessed with Producer.producer_name(). When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters.
schema:pulsar.schema.Schema|
None
, default None
Define the schema of the data that will be published by this producer.
initial_sequence_id:int|
None
, default None
Set the baseline for the sequence ids for messages published by the producer.
send_timeout_millis:int, default 30000If a message is not acknowledged by the server before the send_timeout expires, an error will be reported.
compression_type:CompressionType, default CompressionType.NONESet the compression type for the producer.
max_pending_messages:int, default 1000Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
max_pending_messages_across_partitions:int, default 50000Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions.
block_if_queue_full:bool, default FalseSet whether send operations should block when the outgoing message queue is full.
batching_enabled:bool, default TrueEnable automatic message batching. Note that, unlike the synchronous producer API in pulsar.Client.create_producer, batching is enabled by default for the asyncio producer.
batching_max_messages:int, default 1000Maximum number of messages in a batch.
batching_max_allowed_size_in_bytes:int, default 128*1024Maximum size in bytes of a batch.
batching_max_publish_delay_ms:int, default 10The batch interval in milliseconds.
chunking_enabled:bool, default FalseEnable chunking of large messages.
message_routing_mode:PartitionsRoutingMode, default=PartitionsRoutingMode.RoundRobinDistribution Set the message routing mode for the partitioned producer.
lazy_start_partitioned_producers:bool, default FalseStart partitioned producers lazily on demand.
properties:dict|
None
, default None
Sets the properties for the producer.
batching_type:BatchingType, default BatchingType.DefaultSets the batching type for the producer.
encryption_key:str|
None
, default None
The key used for symmetric encryption.
crypto_key_reader:pulsar.CryptoKeyReader|
None
, default None
Symmetric encryption class implementation.
access_mode:ProducerAccessMode, default ProducerAccessMode.SharedSet the type of access mode that the producer requires on the topic.
message_router:Callable[[pulsar.Message, int], int] |
None
, default None
A custom message router function that takes a Message and the number of partitions and returns the partition index.
Returns
ProducerThe producer created
Raises
PulsarException
async def get_topic_partitions(self, topic: str) -> list[str]: (source)

Get the list of partitions for a given topic in asynchronous mode.

If the topic is partitioned, this will return a list of partition names. If the topic is not partitioned, the returned list will contain the topic name itself.

This can be used to discover the partitions and create Reader, Consumer or Producer instances directly on a particular partition.

Parameters
topic:strthe topic name to lookup
Returns
lista list of partition names
def shutdown(self): (source)

Shutdown the client and all the associated producers and consumers

Raises
PulsarException
async def subscribe(self, topic: str | list[str], subscription_name: str, consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive, schema: pulsar.schema.Schema | None = None, receiver_queue_size: int = 1000, max_total_receiver_queue_size_across_partitions: int = 50000, consumer_name: str | None = None, unacked_messages_timeout_ms: int | None = None, broker_consumer_stats_cache_time_ms: int = 30000, negative_ack_redelivery_delay_ms: int = 60000, is_read_compacted: bool = False, properties: dict | None = None, initial_position: InitialPosition = InitialPosition.Latest, crypto_key_reader: pulsar.CryptoKeyReader | None = None, replicate_subscription_state_enabled: bool = False, max_pending_chunked_message: int = 10, auto_ack_oldest_chunked_message_on_queue_full: bool = False, start_message_id_inclusive: bool = False, batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None = None, key_shared_policy: pulsar.ConsumerKeySharedPolicy | None = None, batch_index_ack_enabled: bool = False, regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None = None, crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, is_pattern_topic: bool = False) -> Consumer: (source)

Subscribe to the given topic and subscription combination.

Parameters
topic:str, List[str], or regex patternThe name of the topic, list of topics or regex pattern. When is_pattern_topic is True, topic is treated as a regex.
subscription_name:strThe name of the subscription.
consumer_type:pulsar.ConsumerType, default pulsar.ConsumerType.ExclusiveSelect the subscription type to be used when subscribing to the topic.
schema:pulsar.schema.Schema|
None
, default None
Define the schema of the data that will be received by this consumer.
receiver_queue_size:int, default 1000Sets the size of the consumer receive queue.
max_total_receiver_queue_size_across_partitions:int, default 50000Set the max total receiver queue size across partitions.
consumer_name:str|
None
, default None
Sets the consumer name.
unacked_messages_timeout_ms:int|
None
, default None
Sets the timeout in milliseconds for unacknowledged messages.
broker_consumer_stats_cache_time_ms:int, default 30000Sets the time duration for which the broker-side consumer stats will be cached in the client.
negative_ack_redelivery_delay_ms:int, default 60000The delay after which to redeliver the messages that failed to be processed.
is_read_compacted:bool, default FalseSelects whether to read the compacted version of the topic.
properties:dict|
None
, default None
Sets the properties for the consumer.
initial_position:InitialPosition, default InitialPosition.LatestSet the initial position of a consumer when subscribing to the topic.
crypto_key_reader:pulsar.CryptoKeyReader|
None
, default None
Symmetric encryption class implementation.
replicate_subscription_state_enabled:bool, default FalseSet whether the subscription status should be replicated.
max_pending_chunked_message:int, default 10Consumer buffers chunk messages into memory until it receives all the chunks.
auto_ack_oldest_chunked_message_on_queue_full:bool, default FalseAutomatically acknowledge oldest chunked messages on queue full.
start_message_id_inclusive:bool, default FalseSet the consumer to include the given position of any reset operation.
batch_receive_policy:pulsar.ConsumerBatchReceivePolicy|
None
, default None
Set the batch collection policy for batch receiving.
key_shared_policy:pulsar.ConsumerKeySharedPolicy|
None
, default None
Set the key shared policy for use when the ConsumerType is KeyShared.
batch_index_ack_enabled:bool, default FalseEnable the batch index acknowledgement.
regex_subscription_mode:RegexSubscriptionMode, default=RegexSubscriptionMode.PersistentOnly Set the regex subscription mode for use when the topic is a regex pattern.
dead_letter_policy:pulsar.ConsumerDeadLetterPolicy|
None
, default None
Set dead letter policy for consumer.
crypto_failure_action:ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL Set the behavior when the decryption fails.
is_pattern_topic:bool, default FalseWhether topic is a regex pattern. If it's True when topic is a list, a ValueError will be raised.
Returns
ConsumerThe consumer created
Raises
PulsarException

Undocumented