class documentation

class Producer: (source)

View In Hierarchy

The Pulsar message producer, used to publish messages on a topic.

Examples

import pulsar

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
    producer.send(('Hello-%d' % i).encode('utf-8'))
client.close()
Method close Close the producer.
Method flush Flush all the messages buffered in the client and wait until all messages have been successfully persisted
Method is_connected Check if the producer is connected or not.
Method last_sequence_id Get the last sequence id that was published by this producer.
Method producer_name Return the producer name which could have been assigned by the system or specified by the client
Method send Publish a message on the topic. Blocks until the message is acknowledged
Method send_async Send a message asynchronously.
Method topic Return the topic which producer is publishing to
Method _build_msg Undocumented
def close(self): (source)

Close the producer.

def flush(self): (source)

Flush all the messages buffered in the client and wait until all messages have been successfully persisted

def is_connected(self): (source)

Check if the producer is connected or not.

def last_sequence_id(self): (source)

Get the last sequence id that was published by this producer.

This represents either the automatically assigned or custom sequence id (set on the MessageBuilder) that was published and acknowledged by the broker.

After recreating a producer with the same producer name, this will return the last message that was published in the previous producer session, or -1 if there was no message ever published.

def producer_name(self): (source)

Return the producer name which could have been assigned by the system or specified by the client

def send(self, content, properties=None, partition_key=None, ordering_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None): (source)

Publish a message on the topic. Blocks until the message is acknowledged

Returns a MessageId object that represents where the message is persisted.

Parameters
contentA bytes object with the message payload.
properties:optionalA dict of application-defined string properties.
partition_key:optionalSets the partition key for message routing. A hash of this key is used to determine the message's topic partition.
ordering_key:optionalSets the ordering key for message routing.
sequence_id:optionalSpecify a custom sequence id for the message being published.
replication_clusters:optionalOverride namespace replication clusters. Note that it is the caller's responsibility to provide valid cluster names and that all clusters have been previously configured as topics. Given an empty list, the message will replicate according to the namespace configuration.
disable_replication:bool, default FalseDo not replicate this message.
event_timestamp:optionalTimestamp in millis of the timestamp of event creation
deliver_at:optionalSpecify the message should not be delivered earlier than the specified timestamp. The timestamp is milliseconds and based on UTC
deliver_after:optionalSpecify a delay in timedelta for the delivery of the messages.
def send_async(self, content, callback, properties=None, partition_key=None, ordering_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None): (source)

Send a message asynchronously.

Examples

The callback will be invoked once the message has been acknowledged by the broker. Users are responsible to handle the exception inside the callback. If any exception was thrown from the callback, the process would terminate immediately.

import pulsar

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
                'my-topic',
                block_if_queue_full=True,
                batching_enabled=True,
                batching_max_publish_delay_ms=10)

def callback(res, msg_id):
    print('Message published res=%s', res)

while True:
    producer.send_async(('Hello-%d' % i).encode('utf-8'), callback)

client.close()

When the producer queue is full, by default the message will be rejected and the callback invoked with an error code.

Parameters
contentA bytes object with the message payload.
callbackA callback that is invoked once the message has been acknowledged by the broker.
properties:optionalA dict of application0-defined string properties.
partition_key:optionalSets the partition key for the message routing. A hash of this key is used to determine the message's topic partition.
ordering_key:optionalSets the ordering key for the message routing.
sequence_id:optionalSpecify a custom sequence id for the message being published.
replication_clusters:optionalOverride namespace replication clusters. Note that it is the caller's responsibility to provide valid cluster names and that all clusters have been previously configured as topics. Given an empty list, the message will replicate per the namespace configuration.
disable_replication:optionalDo not replicate this message.
event_timestamp:optionalTimestamp in millis of the timestamp of event creation
deliver_at:optionalSpecify the message should not be delivered earlier than the specified timestamp.
deliver_after:optionalSpecify a delay in timedelta for the delivery of the messages.
def topic(self): (source)

Return the topic which producer is publishing to

def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id, replication_clusters, disable_replication, event_timestamp, deliver_at, deliver_after): (source)

Undocumented