class documentation
The asynchronous version of pulsar.Client.
| Method | __init__ |
See pulsar.Client.__init__ |
| Async Method | close |
Close the client and all the associated producers and consumers |
| Async Method | create |
Create a new producer on a given topic |
| Async Method | subscribe |
Subscribe to the given topic and subscription combination. |
| Instance Variable | _client |
Undocumented |
async def create_producer(self, topic:
str, producer_name: str | None = None, schema: pulsar.schema.Schema | None = None, initial_sequence_id: int | None = None, send_timeout_millis: int = 30000, compression_type: CompressionType = CompressionType.NONE, max_pending_messages: int = 1000, max_pending_messages_across_partitions: int = 50000, block_if_queue_full: bool = False, batching_enabled: bool = True, batching_max_messages: int = 1000, batching_max_allowed_size_in_bytes: int = 128 * 1024, batching_max_publish_delay_ms: int = 10, chunking_enabled: bool = False, message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers: bool = False, properties: dict | None = None, batching_type: BatchingType = BatchingType.Default, encryption_key: str | None = None, crypto_key_reader: pulsar.CryptoKeyReader | None = None, access_mode: ProducerAccessMode = ProducerAccessMode.Shared, message_router: Callable[ [ pulsar.Message, int], int] | None = None) -> Producer:
(source)
¶
Create a new producer on a given topic
| Parameters | |
topic:str | The topic name |
producerNone | Specify a name for the producer. If not assigned, the system will generate a globally
unique name which can be accessed with Producer.producer_name(). When specifying a
name, it is up to the user to ensure that, for a given topic, the producer name is
unique across all Pulsar's clusters. |
schema:pulsar.schema.Schema|None, default None | Define the schema of the data that will be published by this producer. |
initialNone, default None | Set the baseline for the sequence ids for messages published by the producer. |
sendint, default 30000 | If a message is not acknowledged by the server before the send_timeout expires, an error will be reported. |
compressionCompressionType, default CompressionType.NONE | Set the compression type for the producer. |
maxint, default 1000 | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. |
maxint, default 50000 | Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions. |
blockbool, default False | Set whether send operations should block when the outgoing message queue is full. |
batchingbool, default True | Enable automatic message batching. Note that, unlike the synchronous producer API in pulsar.Client.create_producer, batching is enabled by default for the asyncio producer. |
batchingint, default 1000 | Maximum number of messages in a batch. |
batchingint, default 128*1024 | Maximum size in bytes of a batch. |
batchingint, default 10 | The batch interval in milliseconds. |
chunkingbool, default False | Enable chunking of large messages. |
messagePartitionsRoutingMode, | default=PartitionsRoutingMode.RoundRobinDistribution Set the message routing mode for the partitioned producer. |
lazybool, default False | Start partitioned producers lazily on demand. |
properties:dict|None, default None | Sets the properties for the producer. |
batchingBatchingType, default BatchingType.Default | Sets the batching type for the producer. |
encryptionNone, default None | The key used for symmetric encryption. |
cryptoNone, default None | Symmetric encryption class implementation. |
accessProducerAccessMode, default ProducerAccessMode.Shared | Set the type of access mode that the producer requires on the topic. |
messageCallable[[pulsar.Message, int], int] |None, default None | A custom message router function that takes a Message and the number of partitions and returns the partition index. |
| Returns | |
Producer | The producer created |
| Raises | |
PulsarException | |
async def subscribe(self, topic:
str | list[ str], subscription_name: str, consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive, schema: pulsar.schema.Schema | None = None, receiver_queue_size: int = 1000, max_total_receiver_queue_size_across_partitions: int = 50000, consumer_name: str | None = None, unacked_messages_timeout_ms: int | None = None, broker_consumer_stats_cache_time_ms: int = 30000, negative_ack_redelivery_delay_ms: int = 60000, is_read_compacted: bool = False, properties: dict | None = None, initial_position: InitialPosition = InitialPosition.Latest, crypto_key_reader: pulsar.CryptoKeyReader | None = None, replicate_subscription_state_enabled: bool = False, max_pending_chunked_message: int = 10, auto_ack_oldest_chunked_message_on_queue_full: bool = False, start_message_id_inclusive: bool = False, batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None = None, key_shared_policy: pulsar.ConsumerKeySharedPolicy | None = None, batch_index_ack_enabled: bool = False, regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None = None, crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, is_pattern_topic: bool = False) -> Consumer:
(source)
¶
Subscribe to the given topic and subscription combination.
| Parameters | |
topic:str, List[str], or regex pattern | The name of the topic, list of topics or regex pattern.
When is_pattern_topic is True, topic is treated as a regex. |
subscriptionstr | The name of the subscription. |
consumerpulsar.ConsumerType, default pulsar.ConsumerType.Exclusive | Select the subscription type to be used when subscribing to the topic. |
schema:pulsar.schema.Schema|None, default None | Define the schema of the data that will be received by this consumer. |
receiverint, default 1000 | Sets the size of the consumer receive queue. |
maxint, default 50000 | Set the max total receiver queue size across partitions. |
consumerNone, default None | Sets the consumer name. |
unackedNone, default None | Sets the timeout in milliseconds for unacknowledged messages. |
brokerint, default 30000 | Sets the time duration for which the broker-side consumer stats will be cached in the client. |
negativeint, default 60000 | The delay after which to redeliver the messages that failed to be processed. |
isbool, default False | Selects whether to read the compacted version of the topic. |
properties:dict|None, default None | Sets the properties for the consumer. |
initialInitialPosition, default InitialPosition.Latest | Set the initial position of a consumer when subscribing to the topic. |
cryptoNone, default None | Symmetric encryption class implementation. |
replicatebool, default False | Set whether the subscription status should be replicated. |
maxint, default 10 | Consumer buffers chunk messages into memory until it receives all the chunks. |
autobool, default False | Automatically acknowledge oldest chunked messages on queue full. |
startbool, default False | Set the consumer to include the given position of any reset operation. |
batchNone, default None | Set the batch collection policy for batch receiving. |
keyNone, default None | Set the key shared policy for use when the ConsumerType is KeyShared. |
batchbool, default False | Enable the batch index acknowledgement. |
regexRegexSubscriptionMode, | default=RegexSubscriptionMode.PersistentOnly Set the regex subscription mode for use when the topic is a regex pattern. |
deadNone, default None | Set dead letter policy for consumer. |
cryptoConsumerCryptoFailureAction, | default=ConsumerCryptoFailureAction.FAIL Set the behavior when the decryption fails. |
isbool, default False | Whether topic is a regex pattern. If it's True when topic is a list, a ValueError
will be raised. |
| Returns | |
Consumer | The consumer created |
| Raises | |
PulsarException | |