Manage Schemas
This page only shows some frequently used operations.
- 
For the latest and complete information about Pulsar admin, including commands, flags, descriptions, and more, see Pulsar admin docs.
- 
For the latest and complete information about REST API, including parameters, responses, samples, and more, see REST API doc.
- 
For the latest and complete information about Java admin API, including classes, methods, descriptions, and more, see Java admin API doc.
Manage schema
Upload a schema
To upload (register) a new schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the upload subcommand.
pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
The schema-definition-file is in JSON format.
{
    "type": "<schema-type>",
    "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
    "properties": {} // the properties associated with the schema
}
Send a POST request to the endpoint documented here: POST /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_postSchema
Below is an example with CURL with a payload stored on the schema.json file, Pulsar broker running on localhost and the topic my-tenant/my-ns/my-topic:
curl -X POST -H 'Content-Type: application/json' -d @schema.json http://localhost:8080/admin/v2/schemas/my-tenant/my-ns/my-topic/schema
The post payload is in JSON format.
{
    "type": "<schema-type>",
    "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
    "properties": {} // the properties associated with the schema
}
The method on PulsarAdmin client is:
void createSchema(String topic, PostSchemaPayload schemaPayload)
Here is an example of PostSchemaPayload:
PulsarAdmin admin = …;
PostSchemaPayload payload = new PostSchemaPayload();
payload.setType("INT8");
payload.setSchema("");
admin.createSchema("my-tenant/my-ns/my-topic", payload);
If the schema is a primitive schema, the schema field must be blank.
If the schema is a struct schema, this field must be a JSON string of the Avro schema definition.
The payload includes the following fields:
| Field | Description | 
|---|---|
| type | |
| schema | The schema definition data, which is encoded in UTF 8 charset. | 
| properties | The additional properties associated with the schema. | 
The following is an example for a JSON schema.
Example
{
    "type": "JSON",
    "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}",
    "properties": {}
}
Get the latest schema
To get the latest schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the get subcommand.
pulsar-admin schemas get <topic-name>
Example output:
{
    "version": 0,
    "type": "String",
    "timestamp": 0,
    "data": "string",
    "properties": {
        "property1": "string",
        "property2": "string"
    }
}
Send a GET request to this endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_getSchema
Here is an example of a response, which is returned in JSON format.
{
    "version": "<the-version-number-of-the-schema>",
    "type": "<the-schema-type>",
    "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
    "data": "<an-utf8-encoded-string-of-schema-definition-data>",
    "properties": {} // the properties associated with the schema
}
SchemaInfo createSchema(String topic)
Here is an example of SchemaInfo:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
Get a specific schema
To get a specific version of a schema, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the get subcommand.
pulsar-admin schemas get <topic-name> --version <version>
Send a GET request to a schema endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version/SchemasResource_getSchema
Here is an example of a response, which is returned in JSON format.
{
    "version": "<the-version-number-of-the-schema>",
    "type": "<the-schema-type>",
    "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
    "data": "<an-utf8-encoded-string-of-schema-definition-data>",
    "properties": {} // the properties associated with the schema
}
SchemaInfo createSchema(String topic, long version)
Here is an example of SchemaInfo:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
Extract a schema
To extract (provide) a schema via a topic, use the following method.
- Admin CLI
Use the extract subcommand.
pulsar-admin schemas extract --classname <class-name> --jar <absolute-jar-path> --type <type-name>
Delete a schema
In any case, the delete action deletes all versions of a schema registered for a topic.
To delete a schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the delete subcommand.
pulsar-admin schemas delete <topic-name>
Send a DELETE request to a schema endpoint: DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_deleteSchema
Here is an example of a response returned in JSON format.
{
    "version": "<the-latest-version-number-of-the-schema>",
}
void deleteSchema(String topic)
Here is an example of deleting a schema.
PulsarAdmin admin = …;
admin.deleteSchema("my-tenant/my-ns/my-topic");
Manage schema AutoUpdate
Enable schema AutoUpdate
To enable/enforce schema auto-update at the namespace level, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the set-is-allow-auto-update-schema subcommand.
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace
Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema/Namespaces_setIsAllowAutoUpdateSchema
The post payload is in JSON format.
{
“isAllowAutoUpdateSchema”: “true”
}
Here is an example to enable schema auto-update for a tenant/namespace.
admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", true);
Disable schema AutoUpdate
When schema auto-update is disabled, you can only register a new schema.
To disable schema auto-update at the namespace level, you can use one of the following commands.
- Admin CLI
- REST API
- Java
Use the set-is-allow-auto-update-schema subcommand.
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace
Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema/Namespaces_setIsAllowAutoUpdateSchema
The post payload is in JSON format.
{
“isAllowAutoUpdateSchema”: “false”
}
Here is an example to enable schema auto-unpdate of a tenant/namespace.
admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", false);
Manage schema validation enforcement
Enable schema validation enforcement
To enforce schema validation enforcement at the cluster level, you can configure isSchemaValidationEnforced to true in the conf/broker.conf file.
To enable schema validation enforcement at the namespace level, you can use one of the following commands.
- Admin CLI
- REST API
- Java
Use the set-schema-validation-enforce subcommand.
bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced/Namespaces_setSchemaValidationEnforced
The post payload is in JSON format.
{
“schemaValidationEnforced”: “true”
}
Here is an example to enable schema validation enforcement for a tenant/namespace.
admin.namespaces().setSchemaValidationEnforced("my-namspace", true);
Disable schema validation enforcement
To disable schema validation enforcement at the namespace level, you can use one of the following commands.
- Admin CLI
- REST API
- Java
Use the set-schema-validation-enforce subcommand.
bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced/Namespaces_setSchemaValidationEnforced
The post payload is in JSON format.
{
“schemaValidationEnforced”: “false”
}
Here is an example to enable schema validation enforcement for a tenant/namespace.
admin.namespaces().setSchemaValidationEnforced("my-namspace", false);
Manage schema compatibility strategy
The schema compatibility check strategy configured at different levels has priority: topic level > namespace level > cluster level. In other words:
- If you set the strategy at both topic and namespace levels, the topic-level strategy is used.
- If you set the strategy at both namespace and cluster levels, the namespace-level strategy is used.
Set schema compatibility strategy
Set topic-level schema compatibility strategy
To set a schema compatibility check strategy at the topic level, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the pulsar-admin topicPolicies set-schema-compatibility-strategy command.
pulsar-admin topicPolicies set-schema-compatibility-strategy <strategy> <topicName>
Send a PUT request to this endpoint: PUT /admin/v2/topics/:tenant/:namespace/:topic/PersistentTopics_setSchemaCompatibilityStrategy
void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy)
Here is an example of setting a schema compatibility check strategy at the topic level.
PulsarAdmin admin = …;
admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);