class documentation
The asynchronous version of pulsar.Client.
| Method | __init__ |
See pulsar.Client.__init__ |
| Async Method | close |
Close the client and all the associated producers and consumers |
| Async Method | create |
Create a new producer on a given topic |
| Async Method | get |
Get the list of partitions for a given topic in asynchronous mode. |
| Method | shutdown |
Shutdown the client and all the associated producers and consumers |
| Async Method | subscribe |
Subscribe to the given topic and subscription combination. |
| Instance Variable | _client |
Undocumented |
async def create_producer(self, topic:
str, producer_name: str | None = None, schema: pulsar.schema.Schema | None = None, initial_sequence_id: int | None = None, send_timeout_millis: int = 30000, compression_type: CompressionType = CompressionType.NONE, max_pending_messages: int = 1000, max_pending_messages_across_partitions: int = 50000, block_if_queue_full: bool = False, batching_enabled: bool = True, batching_max_messages: int = 1000, batching_max_allowed_size_in_bytes: int = 128 * 1024, batching_max_publish_delay_ms: int = 10, chunking_enabled: bool = False, message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers: bool = False, properties: dict | None = None, batching_type: BatchingType = BatchingType.Default, encryption_key: str | None = None, crypto_key_reader: pulsar.CryptoKeyReader | None = None, access_mode: ProducerAccessMode = ProducerAccessMode.Shared, message_router: Callable[ [ pulsar.Message, int], int] | None = None) -> Producer:
(source)
¶
Create a new producer on a given topic
| Parameters | |
topic:str | The topic name |
producerNone | Specify a name for the producer. If not assigned, the system will generate a globally
unique name which can be accessed with Producer.producer_name(). When specifying a
name, it is up to the user to ensure that, for a given topic, the producer name is
unique across all Pulsar's clusters. |
schema:pulsar.schema.Schema|None, default None | Define the schema of the data that will be published by this producer. |
initialNone, default None | Set the baseline for the sequence ids for messages published by the producer. |
sendint, default 30000 | If a message is not acknowledged by the server before the send_timeout expires, an error will be reported. |
compressionCompressionType, default CompressionType.NONE | Set the compression type for the producer. |
maxint, default 1000 | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. |
maxint, default 50000 | Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions. |
blockbool, default False | Set whether send operations should block when the outgoing message queue is full. |
batchingbool, default True | Enable automatic message batching. Note that, unlike the synchronous producer API in pulsar.Client.create_producer, batching is enabled by default for the asyncio producer. |
batchingint, default 1000 | Maximum number of messages in a batch. |
batchingint, default 128*1024 | Maximum size in bytes of a batch. |
batchingint, default 10 | The batch interval in milliseconds. |
chunkingbool, default False | Enable chunking of large messages. |
messagePartitionsRoutingMode, | default=PartitionsRoutingMode.RoundRobinDistribution Set the message routing mode for the partitioned producer. |
lazybool, default False | Start partitioned producers lazily on demand. |
properties:dict|None, default None | Sets the properties for the producer. |
batchingBatchingType, default BatchingType.Default | Sets the batching type for the producer. |
encryptionNone, default None | The key used for symmetric encryption. |
cryptoNone, default None | Symmetric encryption class implementation. |
accessProducerAccessMode, default ProducerAccessMode.Shared | Set the type of access mode that the producer requires on the topic. |
messageCallable[[pulsar.Message, int], int] |None, default None | A custom message router function that takes a Message and the number of partitions and returns the partition index. |
| Returns | |
Producer | The producer created |
| Raises | |
PulsarException | |
Get the list of partitions for a given topic in asynchronous mode.
If the topic is partitioned, this will return a list of partition names. If the topic is not partitioned, the returned list will contain the topic name itself.
This can be used to discover the partitions and create Reader, Consumer or Producer instances directly on a particular partition.
| Parameters | |
topic:str | the topic name to lookup |
| Returns | |
list | a list of partition names |
async def subscribe(self, topic:
str | list[ str], subscription_name: str, consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive, schema: pulsar.schema.Schema | None = None, receiver_queue_size: int = 1000, max_total_receiver_queue_size_across_partitions: int = 50000, consumer_name: str | None = None, unacked_messages_timeout_ms: int | None = None, broker_consumer_stats_cache_time_ms: int = 30000, negative_ack_redelivery_delay_ms: int = 60000, is_read_compacted: bool = False, properties: dict | None = None, initial_position: InitialPosition = InitialPosition.Latest, crypto_key_reader: pulsar.CryptoKeyReader | None = None, replicate_subscription_state_enabled: bool = False, max_pending_chunked_message: int = 10, auto_ack_oldest_chunked_message_on_queue_full: bool = False, start_message_id_inclusive: bool = False, batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None = None, key_shared_policy: pulsar.ConsumerKeySharedPolicy | None = None, batch_index_ack_enabled: bool = False, regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None = None, crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, is_pattern_topic: bool = False) -> Consumer:
(source)
¶
Subscribe to the given topic and subscription combination.
| Parameters | |
topic:str, List[str], or regex pattern | The name of the topic, list of topics or regex pattern.
When is_pattern_topic is True, topic is treated as a regex. |
subscriptionstr | The name of the subscription. |
consumerpulsar.ConsumerType, default pulsar.ConsumerType.Exclusive | Select the subscription type to be used when subscribing to the topic. |
schema:pulsar.schema.Schema|None, default None | Define the schema of the data that will be received by this consumer. |
receiverint, default 1000 | Sets the size of the consumer receive queue. |
maxint, default 50000 | Set the max total receiver queue size across partitions. |
consumerNone, default None | Sets the consumer name. |
unackedNone, default None | Sets the timeout in milliseconds for unacknowledged messages. |
brokerint, default 30000 | Sets the time duration for which the broker-side consumer stats will be cached in the client. |
negativeint, default 60000 | The delay after which to redeliver the messages that failed to be processed. |
isbool, default False | Selects whether to read the compacted version of the topic. |
properties:dict|None, default None | Sets the properties for the consumer. |
initialInitialPosition, default InitialPosition.Latest | Set the initial position of a consumer when subscribing to the topic. |
cryptoNone, default None | Symmetric encryption class implementation. |
replicatebool, default False | Set whether the subscription status should be replicated. |
maxint, default 10 | Consumer buffers chunk messages into memory until it receives all the chunks. |
autobool, default False | Automatically acknowledge oldest chunked messages on queue full. |
startbool, default False | Set the consumer to include the given position of any reset operation. |
batchNone, default None | Set the batch collection policy for batch receiving. |
keyNone, default None | Set the key shared policy for use when the ConsumerType is KeyShared. |
batchbool, default False | Enable the batch index acknowledgement. |
regexRegexSubscriptionMode, | default=RegexSubscriptionMode.PersistentOnly Set the regex subscription mode for use when the topic is a regex pattern. |
deadNone, default None | Set dead letter policy for consumer. |
cryptoConsumerCryptoFailureAction, | default=ConsumerCryptoFailureAction.FAIL Set the behavior when the decryption fails. |
isbool, default False | Whether topic is a regex pattern. If it's True when topic is a list, a ValueError
will be raised. |
| Returns | |
Consumer | The consumer created |
| Raises | |
PulsarException | |