Manage Schemas
This page only shows some frequently used operations.
-
For the latest and complete information about
Pulsar admin
, including commands, flags, descriptions, and more, see Pulsar admin docs. -
For the latest and complete information about
REST API
, including parameters, responses, samples, and more, see REST API doc. -
For the latest and complete information about
Java admin API
, including classes, methods, descriptions, and more, see Java admin API doc.
Manage schema
Upload a schema
To upload (register) a new schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the upload
subcommand.
pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
The schema-definition-file
is in JSON format.
{
"type": "<schema-type>",
"schema": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
Send a POST
request to the endpoint documented here: POST /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_postSchema
Below is an example with CURL with a payload stored on the schema.json
file, Pulsar broker running on localhost
and the topic my-tenant/my-ns/my-topic
:
curl -X POST -H 'Content-Type: application/json' -d @schema.json http://localhost:8080/admin/v2/schemas/my-tenant/my-ns/my-topic/schema
The post payload is in JSON format.
{
"type": "<schema-type>",
"schema": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
The method on PulsarAdmin
client is:
void createSchema(String topic, PostSchemaPayload schemaPayload)
Here is an example of PostSchemaPayload
:
PulsarAdmin admin = …;
PostSchemaPayload payload = new PostSchemaPayload();
payload.setType("INT8");
payload.setSchema("");
admin.createSchema("my-tenant/my-ns/my-topic", payload);
If the schema is a primitive schema, the schema
field must be blank.
If the schema is a struct schema, this field must be a JSON string of the Avro schema definition.
The payload includes the following fields:
Field | Description |
---|---|
type | |
schema | The schema definition data, which is encoded in UTF 8 charset. |
properties | The additional properties associated with the schema. |
The following is an example for a JSON schema.
Example
{
"type": "JSON",
"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}",
"properties": {}
}
Get the latest schema
To get the latest schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the get
subcommand.
pulsar-admin schemas get <topic-name>
Example output:
{
"version": 0,
"type": "String",
"timestamp": 0,
"data": "string",
"properties": {
"property1": "string",
"property2": "string"
}
}
Send a GET
request to this endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_getSchema
Here is an example of a response, which is returned in JSON format.
{
"version": "<the-version-number-of-the-schema>",
"type": "<the-schema-type>",
"timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
"data": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
SchemaInfo createSchema(String topic)
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
Get a specific schema
To get a specific version of a schema, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the get
subcommand.
pulsar-admin schemas get <topic-name> --version <version>
Send a GET
request to a schema endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version/SchemasResource_getSchema
Here is an example of a response, which is returned in JSON format.
{
"version": "<the-version-number-of-the-schema>",
"type": "<the-schema-type>",
"timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
"data": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
SchemaInfo createSchema(String topic, long version)
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
Extract a schema
To extract (provide) a schema via a topic, use the following method.
- Admin CLI
Use the extract
subcommand.
pulsar-admin schemas extract --classname <class-name> --jar <absolute-jar-path> --type <type-name>
Delete a schema
In any case, the delete
action deletes all versions of a schema registered for a topic.
To delete a schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the delete
subcommand.
pulsar-admin schemas delete <topic-name>
Send a DELETE
request to a schema endpoint: DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_deleteSchema
Here is an example of a response returned in JSON format.
{
"version": "<the-latest-version-number-of-the-schema>",
}
void deleteSchema(String topic)
Here is an example of deleting a schema.
PulsarAdmin admin = …;
admin.deleteSchema("my-tenant/my-ns/my-topic");
Manage schema AutoUpdate
Enable schema AutoUpdate
To enable/enforce schema auto-update at the namespace level, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the set-is-allow-auto-update-schema
subcommand.
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace
Send a POST
request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema/Namespaces_setIsAllowAutoUpdateSchema
The post payload is in JSON format.
{
“isAllowAutoUpdateSchema”: “true”
}
Here is an example to enable schema auto-update for a tenant/namespace.
admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", true);
Disable schema AutoUpdate
When schema auto-update is disabled, you can only register a new schema.
To disable schema auto-update at the namespace level, you can use one of the following commands.
- Admin CLI
- REST API
- Java
Use the set-is-allow-auto-update-schema
subcommand.
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace
Send a POST
request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema/Namespaces_setIsAllowAutoUpdateSchema
The post payload is in JSON format.
{
“isAllowAutoUpdateSchema”: “false”
}
Here is an example to enable schema auto-unpdate of a tenant/namespace.
admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", false);
Manage schema validation enforcement
Enable schema validation enforcement
To enforce schema validation enforcement at the cluster level, you can configure isSchemaValidationEnforced
to true
in the conf/broker.conf
file.
To enable schema validation enforcement at the namespace level, you can use one of the following commands.
- Admin CLI
- REST API
- Java
Use the set-schema-validation-enforce
subcommand.
bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
Send a POST
request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced/Namespaces_setSchemaValidationEnforced
The post payload is in JSON format.
{
“schemaValidationEnforced”: “true”
}
Here is an example to enable schema validation enforcement for a tenant/namespace.
admin.namespaces().setSchemaValidationEnforced("my-namspace", true);
Disable schema validation enforcement
To disable schema validation enforcement at the namespace level, you can use one of the following commands.
- Admin CLI
- REST API
- Java
Use the set-schema-validation-enforce
subcommand.
bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
Send a POST
request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced/Namespaces_setSchemaValidationEnforced
The post payload is in JSON format.
{
“schemaValidationEnforced”: “false”
}
Here is an example to enable schema validation enforcement for a tenant/namespace.
admin.namespaces().setSchemaValidationEnforced("my-namspace", false);
Manage schema compatibility strategy
The schema compatibility check strategy configured at different levels has priority: topic level > namespace level > cluster level. In other words:
- If you set the strategy at both topic and namespace levels, the topic-level strategy is used.
- If you set the strategy at both namespace and cluster levels, the namespace-level strategy is used.
Set schema compatibility strategy
Set topic-level schema compatibility strategy
To set a schema compatibility check strategy at the topic level, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the pulsar-admin topicPolicies set-schema-compatibility-strategy
command.
pulsar-admin topicPolicies set-schema-compatibility-strategy <strategy> <topicName>
Send a PUT
request to this endpoint: PUT /admin/v2/topics/:tenant/:namespace/:topic/PersistentTopics_setSchemaCompatibilityStrategy
void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy)
Here is an example of setting a schema compatibility check strategy at the topic level.
PulsarAdmin admin = …;
admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
Set namespace-level schema compatibility strategy
To set schema compatibility check strategy at the namespace level, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the pulsar-admin namespaces set-schema-compatibility-strategy
command.
pulsar-admin namespaces set-schema-compatibility-strategy options
Send a PUT
request to this endpoint: PUT /admin/v2/namespaces/:tenant/:namespace/schemaCompatibilityStrategy/Namespaces_setSchemaCompatibilityStrategy
Use the setSchemaCompatibilityStrategy
method.
admin.namespaces().setSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);
Set cluster-level schema compatibility strategy
To set schema compatibility check strategy at the cluster level, set schemaCompatibilityStrategy
in the conf/broker.conf
file.
The following is an example:
schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE
Get schema compatibility strategy
Get topic-level schema compatibility strategy
To get the topic-level schema compatibility check strategy, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the pulsar-admin topicPolicies get-schema-compatibility-strategy
command.
pulsar-admin topicPolicies get-schema-compatibility-strategy <topicName>
Send a GET
request to this endpoint: GET /admin/v2/topics/:tenant/:namespace/:topic/schemaCompatibilityStrategy/PersistentTopics_getSchemaCompatibilityStrategy
SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean applied)
Here is an example of getting the topic-level schema compatibility check strategy.
PulsarAdmin admin = …;
// get the current applied schema compatibility strategy
admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", true);
// only get the schema compatibility strategy from topic policies
admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", false);
Get namespace-level schema compatibility strategy
You can get schema compatibility check strategy at namespace level using one of the following methods.
- Admin CLI
- REST API
- Java
Use the pulsar-admin namespaces get-schema-compatibility-strategy
command.
pulsar-admin namespaces get-schema-compatibility-strategy options
Send a GET
request to this endpoint: GET /admin/v2/namespaces/:tenant/:namespace/schemaCompatibilityStrategy/Namespaces_getSchemaCompatibilityStrategy
Use the getSchemaCompatibilityStrategy
method.
admin.namespaces().getSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);