class Client: (source)
Constructor: Client(service_url, authentication, operation_timeout_seconds, io_threads, ...)
The Pulsar client. A single client instance can be used to create producers and consumers on multiple topics.
The client will share the same connection pool and threads across all producers and consumers.
Method | __init__ |
Create a new Pulsar client instance. |
Method | close |
Close the client and all the associated producers and consumers |
Method | create |
Create a new producer on a given topic. |
Method | create |
Create a reader on a particular topic |
Method | get |
Get the list of partitions for a given topic. |
Method | shutdown |
Perform immediate shutdown of Pulsar client. |
Method | subscribe |
Subscribe to the given topic and subscription combination. |
Static Method | _prepare |
Undocumented |
Instance Variable | _client |
Undocumented |
Instance Variable | _consumers |
Undocumented |
Create a new Pulsar client instance.
Parameters | |
servicestr | The Pulsar service url eg: pulsar://my-broker.com:6650/ |
authentication:Authentication , optional | Set the authentication provider to be used with the broker. Supported methods: |
operationint , default 30 | Set timeout on client operations (subscribe, create producer, close, unsubscribe). |
ioint , default 1 | Set the number of IO threads to be used by the Pulsar client. |
messageint , default 1 | Set the number of threads to be used by the Pulsar client when delivering messages through message listener. The default is 1 thread per Pulsar client. If using more than 1 thread, messages for distinct message_listener``s will be delivered in different threads, however a single ``MessageListener will always be assigned to the same thread. |
concurrentint , default 50000 | Number of concurrent lookup-requests allowed on each broker connection to prevent overload on the broker. |
logstr , optional | This parameter is deprecated and makes no effect. It's retained only for compatibility.
Use logger to customize a logger. |
usebool , default False | Configure whether to use TLS encryption on the connection. This setting is deprecated. TLS will be automatically enabled if the serviceUrl is set to pulsar+ssl:// or https:// |
tlsstr , optional | Set the path to the trusted TLS certificate file. If empty defaults to certifi. |
tlsbool , default False | Configure whether the Pulsar client accepts untrusted TLS certificates from the broker. |
tlsbool , default False | Configure whether the Pulsar client validates that the hostname of the endpoint, matches the common name on the TLS certificate presented by the endpoint. |
logger:optional | Set a Python logger for this Pulsar client. Should be an instance of import logging client = Client(service_url, logger=logging.getLogger('pulsar')) producer = client.create_producer(topic) # ... del producer del client |
connectionint , default 10000 | Set timeout in milliseconds on TCP connections. |
listenerstr , optional | Listener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible. advertisedListeners must be enabled in broker side. |
CompressionType
= CompressionType.NONE, max_pending_messages=1000, max_pending_messages_across_partitions=50000, block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000, batching_max_allowed_size_in_bytes=128*1024, batching_max_publish_delay_ms=10, chunking_enabled=False, message_routing_mode: PartitionsRoutingMode
= PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers=False, properties=None, batching_type: BatchingType
= BatchingType.Default, encryption_key=None, crypto_key_reader: Union[ None, CryptoKeyReader]
= None, access_mode: ProducerAccessMode
= ProducerAccessMode.Shared):
(source)
¶
Create a new producer on a given topic.
Parameters | |
topic:str | The topic name |
producerstr , optional | Specify a name for the producer. If not assigned, the system will generate a globally unique name
which can be accessed with Producer.producer_name() . When specifying a name, it is app to the user
to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters. |
schema:pulsar.schema.Schema , default pulsar.schema.BytesSchema | Define the schema of the data that will be published by this producer, e.g, schema=JsonSchema(MyRecordClass).
|
initialint , optional | Set the baseline for the sequence ids for messages published by the producer. First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified. |
sendint , default 30000 | If a message is not acknowledged by the server before the send_timeout expires, an error will be reported. |
compressionCompressionType , default CompressionType.NONE | Set the compression type for the producer. By default, message payloads are not compressed. Supported compression types:
ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that release in order to be able to receive messages compressed with ZSTD. SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to be able to receive messages compressed with SNAPPY. |
maxint , default 1000 | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. |
maxint , default 50000 | Set the max size of the queue holding the messages pending to receive an acknowledgment across partitions from the broker. |
blockbool , default False | Set whether send_async operations should block when the outgoing message queue is full. |
batchingbool , default False | When automatic batching is enabled, multiple calls to send can result in a single batch to be sent to the
broker, leading to better throughput, especially when publishing small messages.
All messages in a batch will be published as a single batched message. The consumer will be delivered
individual messages in the batch in the same order they were enqueued. |
batchingint , default 1000 | When you set this option to a value greater than 1, messages are queued until this threshold or
batching_max_allowed_size_in_bytes is reached or batch interval has elapsed. |
batchingint , default 128*1024 | When you set this option to a value greater than 1, messages are queued until this threshold or
batching_max_messages is reached or batch interval has elapsed. |
batchingint , default 10 | The batch interval in milliseconds. Queued messages will be sent in batch after this interval even if both
the threshold of batching_max_messages and batching_max_allowed_size_in_bytes are not reached. |
chunkingbool , default False | If message size is higher than allowed max publish-payload size by broker then chunking_enabled helps producer to split message into multiple chunks and publish them to broker separately and in order. So, it allows client to successfully publish large size of messages in pulsar. |
messagePartitionsRoutingMode , default PartitionsRoutingMode.RoundRobinDistribution | Set the message routing mode for the partitioned producer. Supported modes:
|
lazybool , default False | This config affects producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition or start lazily on demand. The internal producer of one partition is always started eagerly, chosen by the routing policy, but the internal producers of any additional partitions are started on demand, upon receiving their first message. Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition routing policy is used without keyed messages. Because producer connection can be on demand, this can produce extra send latency for the first messages of a given partition. |
properties:dict , optional | Sets the properties for the producer. The properties associated with a producer can be used for identify a producer at broker side. |
batchingBatchingType , default BatchingType.Default | Sets the batching type for the producer. There are two batching type: DefaultBatching and KeyBasedBatching.
|
encryptionstr , optional | The key used for symmetric encryption, configured on the producer side |
cryptoCryptoKeyReader , optional | Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer |
accessProducerAccessMode , optional | Set the type of access mode that the producer requires on the topic. Supported modes:
|
Union[ None, CryptoKeyReader]
= None, start_message_id_inclusive=False):
(source)
¶
Create a reader on a particular topic
Parameters | |
topic | The name of the topic. |
start | The initial reader positioning is done by specifying a message id. The options are:
Start reading from the earliest message available in the topic
Start reading from the end topic, only getting messages published after the reader was created
When passing a particular message id, the reader will position itself on that specific position.
The first message to be read will be the message next to the specified messageId.
Message id can be serialized into a string and deserialized back into a # Serialize to string s = msg.message_id().serialize() # Deserialize from string msg_id = MessageId.deserialize(s) |
schema:pulsar.schema.Schema , default pulsar.schema.BytesSchema | Define the schema of the data that will be received by this reader. |
reader | Sets a message listener for the reader. When the listener is set, the application will receive messages through it. Calls to reader.read_next() will not be allowed. The listener function needs to accept (reader, message), for example: def my_listener(reader, message): # process message pass |
receiverint , default 1000 | Sets the size of the reader receive queue. The reader receive queue controls how many messages can be
accumulated by the reader before the application calls read_next() . Using a higher value could
potentially increase the reader throughput at the expense of higher memory utilization. |
readerstr , optional | Sets the reader name. |
subscriptionstr , optional | Sets the subscription role prefix. |
isbool , default False | Selects whether to read the compacted version of the topic |
cryptoCryptoKeyReader , optional | Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer |
startbool , default False | Set the reader to include the startMessageId or given position of any reset operation like Reader.seek |
Get the list of partitions for a given topic.
If the topic is partitioned, this will return a list of partition names. If the topic is not partitioned, the returned list will contain the topic name itself.
This can be used to discover the partitions and create Reader, Consumer or Producer instances directly on a particular partition.
Parameters | |
topic:str | the topic name to lookup |
Returns | |
list | a list of partition name |
Perform immediate shutdown of Pulsar client.
Release all resources and close all producer, consumer, and readers without waiting for ongoing operations to complete.
ConsumerType
= ConsumerType.Exclusive, schema=schema.BytesSchema(), message_listener=None, receiver_queue_size=1000, max_total_receiver_queue_size_across_partitions=50000, consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, properties=None, pattern_auto_discovery_period=60, initial_position: InitialPosition
= InitialPosition.Latest, crypto_key_reader: Union[ None, CryptoKeyReader]
= None, replicate_subscription_state_enabled=False, max_pending_chunked_message=10, auto_ack_oldest_chunked_message_on_queue_full=False, start_message_id_inclusive=False, batch_receive_policy=None, key_shared_policy=None, batch_index_ack_enabled=False, regex_subscription_mode: RegexSubscriptionMode
= RegexSubscriptionMode.PersistentOnly, dead_letter_policy: Union[ None, ConsumerDeadLetterPolicy]
= None):
(source)
¶
Subscribe to the given topic and subscription combination.
Parameters | |
topic | The name of the topic, list of topics or regex pattern. This method will accept these forms: * topic='my-topic' * topic=['topic-1', 'topic-2', 'topic-3'] * topic=re.compile('persistent://public/default/topic-*') |
subscriptionstr | The name of the subscription. |
consumerConsumerType , default ConsumerType.Exclusive | Select the subscription type to be used when subscribing to the topic. |
schema:pulsar.schema.Schema , default pulsar.schema.BytesSchema | Define the schema of the data that will be received by this consumer. |
message | Sets a message listener for the consumer. When the listener is set, the application will receive messages through it. Calls to consumer.receive() will not be allowed. The listener function needs to accept (consumer, message), for example: def my_listener(consumer, message): # process message consumer.acknowledge(message) |
receiverint , default 1000 | Sets the size of the consumer receive queue. The consumer receive queue controls how many messages can be
accumulated by the consumer before the application calls This approach improves the message distribution on shared subscription by pushing messages only to those
consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used
if the consumer queue size is zero. The |
maxint , default 50000 | Set the max total receiver queue size across partitions. This setting will be used to reduce the receiver queue size for individual partitions |
consumerstr , optional | Sets the consumer name. |
unackedint , optional | Sets the timeout in milliseconds for unacknowledged messages. The timeout needs to be greater than 10 seconds. An exception is thrown if the given value is less than 10 seconds. If a successful acknowledgement is not sent within the timeout, all the unacknowledged messages are redelivered. |
brokerint , default 30000 | Sets the time duration for which the broker-side consumer stats will be cached in the client. |
negativeint , default 60000 | The delay after which to redeliver the messages that failed to be processed (with the consumer.negative_acknowledge()) |
isbool , default False | Selects whether to read the compacted version of the topic |
properties:dict , optional | Sets the properties for the consumer. The properties associated with a consumer can be used for identify a consumer at broker side. |
patternint , default 60 | Periods of seconds for consumer to auto discover match topics. |
initialInitialPosition , default InitialPosition.Latest | Set the initial position of a consumer when subscribing to the topic. It could be either: InitialPosition.Earliest or InitialPosition.Latest. |
cryptoCryptoKeyReader , optional | Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer |
replicatebool , default False | Set whether the subscription status should be replicated. |
maxint , default 10 | Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream, and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages. If it's zero, the pending chunked messages will not be limited. |
autobool , default False | Buffering large number of outstanding uncompleted chunked messages can create memory pressure, and it can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery. |
startbool , default False | Set the consumer to include the given position of any reset operation like Consumer::seek. |
batch | Set the batch collection policy for batch receiving. |
key | Set the key shared policy for use when the ConsumerType is KeyShared. |
batch | It should be noted that this option can only work when the broker side also enables the batch index
acknowledgement. See the acknowledgmentAtBatchIndexLevelEnabled config in broker.conf . |
regexRegexSubscriptionMode , optional | Set the regex subscription mode for use when the topic is a regex pattern. Supported modes:
|
dead | Set dead letter policy for consumer. By default, some messages are redelivered many times, even to the extent that they can never be stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged automatically. |