pulsar.functions.context
context.py: Context defines context information available during
processing of a request.
1#!/usr/bin/env python 2# 3# Licensed to the Apache Software Foundation (ASF) under one 4# or more contributor license agreements. See the NOTICE file 5# distributed with this work for additional information 6# regarding copyright ownership. The ASF licenses this file 7# to you under the Apache License, Version 2.0 (the 8# "License"); you may not use this file except in compliance 9# with the License. You may obtain a copy of the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, 14# software distributed under the License is distributed on an 15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16# KIND, either express or implied. See the License for the 17# specific language governing permissions and limitations 18# under the License. 19# 20 21# -*- encoding: utf-8 -*- 22 23# Licensed to the Apache Software Foundation (ASF) under one 24# or more contributor license agreements. See the NOTICE file 25# distributed with this work for additional information 26# regarding copyright ownership. The ASF licenses this file 27# to you under the Apache License, Version 2.0 (the 28# "License"); you may not use this file except in compliance 29# with the License. You may obtain a copy of the License at 30# 31# http://www.apache.org/licenses/LICENSE-2.0 32# 33# Unless required by applicable law or agreed to in writing, 34# software distributed under the License is distributed on an 35# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 36# KIND, either express or implied. See the License for the 37# specific language governing permissions and limitations 38# under the License. 39# 40"""context.py: Context defines context information available during 41# processing of a request. 42""" 43from abc import abstractmethod 44 45class Context(object): 46 """Interface defining information available at process time""" 47 @abstractmethod 48 def get_message_id(self): 49 """Return the messageid of the current message that we are processing""" 50 pass 51 52 @abstractmethod 53 def get_message_key(self): 54 """Return the key of the current message that we are processing""" 55 pass 56 57 @abstractmethod 58 def get_message_eventtime(self): 59 """Return the event time of the current message that we are processing""" 60 pass 61 62 @abstractmethod 63 def get_message_properties(self): 64 """Return the message properties kv map of the current message that we are processing""" 65 pass 66 67 @abstractmethod 68 def get_current_message_topic_name(self): 69 """Returns the topic name of the message that we are processing""" 70 pass 71 72 @abstractmethod 73 def get_function_tenant(self): 74 """Returns the tenant of the message that's being processed""" 75 pass 76 77 @abstractmethod 78 def get_function_namespace(self): 79 """Returns the namespace of the message that's being processed""" 80 81 @abstractmethod 82 def get_function_name(self): 83 """Returns the function name that we are a part of""" 84 pass 85 86 @abstractmethod 87 def get_function_id(self): 88 """Returns the function id that we are a part of""" 89 pass 90 91 @abstractmethod 92 def get_instance_id(self): 93 """Returns the instance id that is executing the function""" 94 pass 95 96 @abstractmethod 97 def get_function_version(self): 98 """Returns the version of function that we are executing""" 99 pass 100 101 @abstractmethod 102 def get_logger(self): 103 """Returns the logger object that can be used to do logging""" 104 pass 105 106 @abstractmethod 107 def get_user_config_value(self, key): 108 """Returns the value of the user-defined config. If the key doesn't exist, None is returned""" 109 pass 110 111 @abstractmethod 112 def get_user_config_map(self): 113 """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)""" 114 pass 115 116 @abstractmethod 117 def get_secret(self, secret_name): 118 """Returns the secret value associated with the name. None if nothing was found""" 119 pass 120 121 @abstractmethod 122 def get_partition_key(self): 123 """Returns partition key of the input message is one exists""" 124 pass 125 126 127 @abstractmethod 128 def record_metric(self, metric_name, metric_value): 129 """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)""" 130 pass 131 132 @abstractmethod 133 def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None): 134 """Publishes message to topic_name by first serializing the message using serde_class_name serde 135 The message will have properties specified if any 136 137 The available options for message_conf: 138 139 properties, 140 partition_key, 141 sequence_id, 142 replication_clusters, 143 disable_replication, 144 event_timestamp 145 146 """ 147 pass 148 149 @abstractmethod 150 def get_input_topics(self): 151 """Returns the input topics of function""" 152 pass 153 154 @abstractmethod 155 def get_output_topic(self): 156 """Returns the output topic of function""" 157 pass 158 159 @abstractmethod 160 def get_output_serde_class_name(self): 161 """return output Serde class""" 162 pass 163 164 @abstractmethod 165 def ack(self, msgid, topic): 166 """ack this message id""" 167 pass 168 169 @abstractmethod 170 def incr_counter(self, key, amount): 171 """incr the counter of a given key in the managed state""" 172 pass 173 174 @abstractmethod 175 def get_counter(self, key): 176 """get the counter of a given key in the managed state""" 177 pass 178 179 @abstractmethod 180 def del_counter(self, key): 181 """delete the counter of a given key in the managed state""" 182 pass 183 184 @abstractmethod 185 def put_state(self, key, value): 186 """update the value of a given key in the managed state""" 187 pass 188 189 @abstractmethod 190 def get_state(self, key): 191 """get the value of a given key in the managed state""" 192 pass
46class Context(object): 47 """Interface defining information available at process time""" 48 @abstractmethod 49 def get_message_id(self): 50 """Return the messageid of the current message that we are processing""" 51 pass 52 53 @abstractmethod 54 def get_message_key(self): 55 """Return the key of the current message that we are processing""" 56 pass 57 58 @abstractmethod 59 def get_message_eventtime(self): 60 """Return the event time of the current message that we are processing""" 61 pass 62 63 @abstractmethod 64 def get_message_properties(self): 65 """Return the message properties kv map of the current message that we are processing""" 66 pass 67 68 @abstractmethod 69 def get_current_message_topic_name(self): 70 """Returns the topic name of the message that we are processing""" 71 pass 72 73 @abstractmethod 74 def get_function_tenant(self): 75 """Returns the tenant of the message that's being processed""" 76 pass 77 78 @abstractmethod 79 def get_function_namespace(self): 80 """Returns the namespace of the message that's being processed""" 81 82 @abstractmethod 83 def get_function_name(self): 84 """Returns the function name that we are a part of""" 85 pass 86 87 @abstractmethod 88 def get_function_id(self): 89 """Returns the function id that we are a part of""" 90 pass 91 92 @abstractmethod 93 def get_instance_id(self): 94 """Returns the instance id that is executing the function""" 95 pass 96 97 @abstractmethod 98 def get_function_version(self): 99 """Returns the version of function that we are executing""" 100 pass 101 102 @abstractmethod 103 def get_logger(self): 104 """Returns the logger object that can be used to do logging""" 105 pass 106 107 @abstractmethod 108 def get_user_config_value(self, key): 109 """Returns the value of the user-defined config. If the key doesn't exist, None is returned""" 110 pass 111 112 @abstractmethod 113 def get_user_config_map(self): 114 """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)""" 115 pass 116 117 @abstractmethod 118 def get_secret(self, secret_name): 119 """Returns the secret value associated with the name. None if nothing was found""" 120 pass 121 122 @abstractmethod 123 def get_partition_key(self): 124 """Returns partition key of the input message is one exists""" 125 pass 126 127 128 @abstractmethod 129 def record_metric(self, metric_name, metric_value): 130 """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)""" 131 pass 132 133 @abstractmethod 134 def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None): 135 """Publishes message to topic_name by first serializing the message using serde_class_name serde 136 The message will have properties specified if any 137 138 The available options for message_conf: 139 140 properties, 141 partition_key, 142 sequence_id, 143 replication_clusters, 144 disable_replication, 145 event_timestamp 146 147 """ 148 pass 149 150 @abstractmethod 151 def get_input_topics(self): 152 """Returns the input topics of function""" 153 pass 154 155 @abstractmethod 156 def get_output_topic(self): 157 """Returns the output topic of function""" 158 pass 159 160 @abstractmethod 161 def get_output_serde_class_name(self): 162 """return output Serde class""" 163 pass 164 165 @abstractmethod 166 def ack(self, msgid, topic): 167 """ack this message id""" 168 pass 169 170 @abstractmethod 171 def incr_counter(self, key, amount): 172 """incr the counter of a given key in the managed state""" 173 pass 174 175 @abstractmethod 176 def get_counter(self, key): 177 """get the counter of a given key in the managed state""" 178 pass 179 180 @abstractmethod 181 def del_counter(self, key): 182 """delete the counter of a given key in the managed state""" 183 pass 184 185 @abstractmethod 186 def put_state(self, key, value): 187 """update the value of a given key in the managed state""" 188 pass 189 190 @abstractmethod 191 def get_state(self, key): 192 """get the value of a given key in the managed state""" 193 pass
Interface defining information available at process time
48 @abstractmethod 49 def get_message_id(self): 50 """Return the messageid of the current message that we are processing""" 51 pass
Return the messageid of the current message that we are processing
53 @abstractmethod 54 def get_message_key(self): 55 """Return the key of the current message that we are processing""" 56 pass
Return the key of the current message that we are processing
58 @abstractmethod 59 def get_message_eventtime(self): 60 """Return the event time of the current message that we are processing""" 61 pass
Return the event time of the current message that we are processing
63 @abstractmethod 64 def get_message_properties(self): 65 """Return the message properties kv map of the current message that we are processing""" 66 pass
Return the message properties kv map of the current message that we are processing
68 @abstractmethod 69 def get_current_message_topic_name(self): 70 """Returns the topic name of the message that we are processing""" 71 pass
Returns the topic name of the message that we are processing
73 @abstractmethod 74 def get_function_tenant(self): 75 """Returns the tenant of the message that's being processed""" 76 pass
Returns the tenant of the message that's being processed
78 @abstractmethod 79 def get_function_namespace(self): 80 """Returns the namespace of the message that's being processed"""
Returns the namespace of the message that's being processed
82 @abstractmethod 83 def get_function_name(self): 84 """Returns the function name that we are a part of""" 85 pass
Returns the function name that we are a part of
87 @abstractmethod 88 def get_function_id(self): 89 """Returns the function id that we are a part of""" 90 pass
Returns the function id that we are a part of
92 @abstractmethod 93 def get_instance_id(self): 94 """Returns the instance id that is executing the function""" 95 pass
Returns the instance id that is executing the function
97 @abstractmethod 98 def get_function_version(self): 99 """Returns the version of function that we are executing""" 100 pass
Returns the version of function that we are executing
102 @abstractmethod 103 def get_logger(self): 104 """Returns the logger object that can be used to do logging""" 105 pass
Returns the logger object that can be used to do logging
107 @abstractmethod 108 def get_user_config_value(self, key): 109 """Returns the value of the user-defined config. If the key doesn't exist, None is returned""" 110 pass
Returns the value of the user-defined config. If the key doesn't exist, None is returned
112 @abstractmethod 113 def get_user_config_map(self): 114 """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)""" 115 pass
Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)
117 @abstractmethod 118 def get_secret(self, secret_name): 119 """Returns the secret value associated with the name. None if nothing was found""" 120 pass
Returns the secret value associated with the name. None if nothing was found
122 @abstractmethod 123 def get_partition_key(self): 124 """Returns partition key of the input message is one exists""" 125 pass
Returns partition key of the input message is one exists
128 @abstractmethod 129 def record_metric(self, metric_name, metric_value): 130 """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)""" 131 pass
Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)
133 @abstractmethod 134 def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None): 135 """Publishes message to topic_name by first serializing the message using serde_class_name serde 136 The message will have properties specified if any 137 138 The available options for message_conf: 139 140 properties, 141 partition_key, 142 sequence_id, 143 replication_clusters, 144 disable_replication, 145 event_timestamp 146 147 """ 148 pass
Publishes message to topic_name by first serializing the message using serde_class_name serde The message will have properties specified if any
The available options for message_conf:
properties, partition_key, sequence_id, replication_clusters, disable_replication, event_timestamp
150 @abstractmethod 151 def get_input_topics(self): 152 """Returns the input topics of function""" 153 pass
Returns the input topics of function
155 @abstractmethod 156 def get_output_topic(self): 157 """Returns the output topic of function""" 158 pass
Returns the output topic of function
160 @abstractmethod 161 def get_output_serde_class_name(self): 162 """return output Serde class""" 163 pass
return output Serde class
170 @abstractmethod 171 def incr_counter(self, key, amount): 172 """incr the counter of a given key in the managed state""" 173 pass
incr the counter of a given key in the managed state
175 @abstractmethod 176 def get_counter(self, key): 177 """get the counter of a given key in the managed state""" 178 pass
get the counter of a given key in the managed state
180 @abstractmethod 181 def del_counter(self, key): 182 """delete the counter of a given key in the managed state""" 183 pass
delete the counter of a given key in the managed state