Manage Schemas
This page only shows some frequently used operations.
-
For the latest and complete information about
Pulsar admin
, including commands, flags, descriptions, and more, see Pulsar admin docs. -
For the latest and complete information about
REST API
, including parameters, responses, samples, and more, see REST API doc. -
For the latest and complete information about
Java admin API
, including classes, methods, descriptions, and more, see Java admin API doc.
Manage schema
Upload a schema
To upload (register) a new schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the upload
subcommand.
pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
The schema-definition-file
is in JSON format.
{
"type": "<schema-type>",
"schema": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
Send a POST
request to this endpoint: POST /admin/v2/schemas/{tenant}/{namespace}/{topic}/schema
The post payload is in JSON format.
{
"type": "<schema-type>",
"schema": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
void createSchema(String topic, PostSchemaPayload schemaPayload)
Here is an example of PostSchemaPayload
:
PulsarAdmin admin = …;
PostSchemaPayload payload = new PostSchemaPayload();
payload.setType("INT8");
payload.setSchema("");
admin.createSchema("my-tenant/my-ns/my-topic", payload);
If the schema is a primitive schema, the schema
field must be blank.
If the schema is a struct schema, this field must be a JSON string of the Avro schema definition.
The payload includes the following fields:
Field | Description |
---|---|
type | The schema type. |
schema | The schema definition data, which is encoded in UTF 8 charset. |
properties | The additional properties associated with the schema. |
The following is an example for a JSON schema.
Example
{
"type": "JSON",
"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}",
"properties": {}
}
Get the latest schema
To get the latest schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the get
subcommand.
pulsar-admin schemas get <topic-name>
Example output:
{
"version": 0,
"type": "String",
"timestamp": 0,
"data": "string",
"properties": {
"property1": "string",
"property2": "string"
}
}
Send a GET
request to this endpoint: GET /admin/v2/schemas/{tenant}/{namespace}/{topic}/schema
Here is an example of a response, which is returned in JSON format.
{
"version": "<the-version-number-of-the-schema>",
"type": "<the-schema-type>",
"timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
"data": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
SchemaInfo createSchema(String topic)
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
Get a specific schema
To get a specific version of a schema, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the get
subcommand.
pulsar-admin schemas get <topic-name> --version <version>
Send a GET
request to a schema endpoint: GET /admin/v2/schemas/{tenant}/{namespace}/{topic}/schema/{version}
Here is an example of a response, which is returned in JSON format.
{
"version": "<the-version-number-of-the-schema>",
"type": "<the-schema-type>",
"timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
"data": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
SchemaInfo createSchema(String topic, long version)
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
Extract a schema
To extract (provide) a schema via a topic, use the following method.
- Admin CLI
Use the extract
subcommand.
pulsar-admin schemas extract --classname <class-name> --jar <absolute-jar-path> --type <type-name>
Delete a schema
In any case, the delete
action deletes all versions of a schema registered for a topic.
To delete a schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the delete
subcommand.
pulsar-admin schemas delete <topic-name>
Send a DELETE
request to a schema endpoint: DELETE /admin/v2/schemas/{tenant}/{namespace}/{topic}/schema
Here is an example of a response returned in JSON format.
{
"version": "<the-latest-version-number-of-the-schema>",
}
void deleteSchema(String topic)
Here is an example of deleting a schema.
PulsarAdmin admin = …;
admin.deleteSchema("my-tenant/my-ns/my-topic");
Manage schema AutoUpdate
Enable schema AutoUpdate
To enable/enforce schema auto-update at the namespace level, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the set-is-allow-auto-update-schema
subcommand.
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace
Send a POST
request to a namespace endpoint: POST /admin/v2/namespaces/{tenant}/{namespace}/isAllowAutoUpdateSchema
The post payload is in JSON format.
{
“isAllowAutoUpdateSchema”: “true