class documentation

The asynchronous version of pulsar.Client.

Method __init__ See pulsar.Client.__init__
Async Method close Close the client and all the associated producers and consumers
Async Method create_producer Create a new producer on a given topic
Async Method subscribe Subscribe to the given topic and subscription combination.
Instance Variable _client Undocumented
def __init__(self, service_url, **kwargs): (source)
async def close(self): (source)

Close the client and all the associated producers and consumers

Raises
PulsarException
async def create_producer(self, topic: str, producer_name: str | None = None, schema: pulsar.schema.Schema | None = None, initial_sequence_id: int | None = None, send_timeout_millis: int = 30000, compression_type: CompressionType = CompressionType.NONE, max_pending_messages: int = 1000, max_pending_messages_across_partitions: int = 50000, block_if_queue_full: bool = False, batching_enabled: bool = True, batching_max_messages: int = 1000, batching_max_allowed_size_in_bytes: int = 128 * 1024, batching_max_publish_delay_ms: int = 10, chunking_enabled: bool = False, message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers: bool = False, properties: dict | None = None, batching_type: BatchingType = BatchingType.Default, encryption_key: str | None = None, crypto_key_reader: pulsar.CryptoKeyReader | None = None, access_mode: ProducerAccessMode = ProducerAccessMode.Shared, message_router: Callable[[pulsar.Message, int], int] | None = None) -> Producer: (source)

Create a new producer on a given topic

Parameters
topic:strThe topic name
producer_name:str|
None
Specify a name for the producer. If not assigned, the system will generate a globally unique name which can be accessed with Producer.producer_name(). When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters.
schema:pulsar.schema.Schema|
None
, default None
Define the schema of the data that will be published by this producer.
initial_sequence_id:int|
None
, default None
Set the baseline for the sequence ids for messages published by the producer.
send_timeout_millis:int, default 30000If a message is not acknowledged by the server before the send_timeout expires, an error will be reported.
compression_type:CompressionType, default CompressionType.NONESet the compression type for the producer.
max_pending_messages:int, default 1000Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
max_pending_messages_across_partitions:int, default 50000Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions.
block_if_queue_full:bool, default FalseSet whether send operations should block when the outgoing message queue is full.
batching_enabled:bool, default TrueEnable automatic message batching. Note that, unlike the synchronous producer API in pulsar.Client.create_producer, batching is enabled by default for the asyncio producer.
batching_max_messages:int, default 1000Maximum number of messages in a batch.
batching_max_allowed_size_in_bytes:int, default 128*1024Maximum size in bytes of a batch.
batching_max_publish_delay_ms:int, default 10The batch interval in milliseconds.
chunking_enabled:bool, default FalseEnable chunking of large messages.
message_routing_mode:PartitionsRoutingMode, default=PartitionsRoutingMode.RoundRobinDistribution Set the message routing mode for the partitioned producer.
lazy_start_partitioned_producers:bool, default FalseStart partitioned producers lazily on demand.
properties:dict|
None
, default None
Sets the properties for the producer.
batching_type:BatchingType, default BatchingType.DefaultSets the batching type for the producer.
encryption_key:str|
None
, default None
The key used for symmetric encryption.
crypto_key_reader:pulsar.CryptoKeyReader|
None
, default None
Symmetric encryption class implementation.
access_mode:ProducerAccessMode, default ProducerAccessMode.SharedSet the type of access mode that the producer requires on the topic.
message_router:Callable[[pulsar.Message, int], int] |
None
, default None
A custom message router function that takes a Message and the number of partitions and returns the partition index.
Returns
ProducerThe producer created
Raises
PulsarException
async def subscribe(self, topic: str | list[str], subscription_name: str, consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive, schema: pulsar.schema.Schema | None = None, receiver_queue_size: int = 1000, max_total_receiver_queue_size_across_partitions: int = 50000, consumer_name: str | None = None, unacked_messages_timeout_ms: int | None = None, broker_consumer_stats_cache_time_ms: int = 30000, negative_ack_redelivery_delay_ms: int = 60000, is_read_compacted: bool = False, properties: dict | None = None, initial_position: InitialPosition = InitialPosition.Latest, crypto_key_reader: pulsar.CryptoKeyReader | None = None, replicate_subscription_state_enabled: bool = False, max_pending_chunked_message: int = 10, auto_ack_oldest_chunked_message_on_queue_full: bool = False, start_message_id_inclusive: bool = False, batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None = None, key_shared_policy: pulsar.ConsumerKeySharedPolicy | None = None, batch_index_ack_enabled: bool = False, regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None = None, crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, is_pattern_topic: bool = False) -> Consumer: (source)

Subscribe to the given topic and subscription combination.

Parameters
topic:str, List[str], or regex patternThe name of the topic, list of topics or regex pattern. When is_pattern_topic is True, topic is treated as a regex.
subscription_name:strThe name of the subscription.
consumer_type:pulsar.ConsumerType, default pulsar.ConsumerType.ExclusiveSelect the subscription type to be used when subscribing to the topic.
schema:pulsar.schema.Schema|
None
, default None
Define the schema of the data that will be received by this consumer.
receiver_queue_size:int, default 1000Sets the size of the consumer receive queue.
max_total_receiver_queue_size_across_partitions:int, default 50000Set the max total receiver queue size across partitions.
consumer_name:str|
None
, default None
Sets the consumer name.
unacked_messages_timeout_ms:int|
None
, default None
Sets the timeout in milliseconds for unacknowledged messages.
broker_consumer_stats_cache_time_ms:int, default 30000Sets the time duration for which the broker-side consumer stats will be cached in the client.
negative_ack_redelivery_delay_ms:int, default 60000The delay after which to redeliver the messages that failed to be processed.
is_read_compacted:bool, default FalseSelects whether to read the compacted version of the topic.
properties:dict|
None
, default None
Sets the properties for the consumer.
initial_position:InitialPosition, default InitialPosition.LatestSet the initial position of a consumer when subscribing to the topic.
crypto_key_reader:pulsar.CryptoKeyReader|
None
, default None
Symmetric encryption class implementation.
replicate_subscription_state_enabled:bool, default FalseSet whether the subscription status should be replicated.
max_pending_chunked_message:int, default 10Consumer buffers chunk messages into memory until it receives all the chunks.
auto_ack_oldest_chunked_message_on_queue_full:bool, default FalseAutomatically acknowledge oldest chunked messages on queue full.
start_message_id_inclusive:bool, default FalseSet the consumer to include the given position of any reset operation.
batch_receive_policy:pulsar.ConsumerBatchReceivePolicy|
None
, default None
Set the batch collection policy for batch receiving.
key_shared_policy:pulsar.ConsumerKeySharedPolicy|
None
, default None
Set the key shared policy for use when the ConsumerType is KeyShared.
batch_index_ack_enabled:bool, default FalseEnable the batch index acknowledgement.
regex_subscription_mode:RegexSubscriptionMode, default=RegexSubscriptionMode.PersistentOnly Set the regex subscription mode for use when the topic is a regex pattern.
dead_letter_policy:pulsar.ConsumerDeadLetterPolicy|
None
, default None
Set dead letter policy for consumer.
crypto_failure_action:ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL Set the behavior when the decryption fails.
is_pattern_topic:bool, default FalseWhether topic is a regex pattern. If it's True when topic is a list, a ValueError will be raised.
Returns
ConsumerThe consumer created
Raises
PulsarException

Undocumented