pulsar.functions.context

context.py: Context defines context information available during

processing of a request.

  1#!/usr/bin/env python
  2#
  3# Licensed to the Apache Software Foundation (ASF) under one
  4# or more contributor license agreements.  See the NOTICE file
  5# distributed with this work for additional information
  6# regarding copyright ownership.  The ASF licenses this file
  7# to you under the Apache License, Version 2.0 (the
  8# "License"); you may not use this file except in compliance
  9# with the License.  You may obtain a copy of the License at
 10#
 11#   http://www.apache.org/licenses/LICENSE-2.0
 12#
 13# Unless required by applicable law or agreed to in writing,
 14# software distributed under the License is distributed on an
 15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 16# KIND, either express or implied.  See the License for the
 17# specific language governing permissions and limitations
 18# under the License.
 19#
 20
 21# -*- encoding: utf-8 -*-
 22
 23# Licensed to the Apache Software Foundation (ASF) under one
 24# or more contributor license agreements.  See the NOTICE file
 25# distributed with this work for additional information
 26# regarding copyright ownership.  The ASF licenses this file
 27# to you under the Apache License, Version 2.0 (the
 28# "License"); you may not use this file except in compliance
 29# with the License.  You may obtain a copy of the License at
 30#
 31#   http://www.apache.org/licenses/LICENSE-2.0
 32#
 33# Unless required by applicable law or agreed to in writing,
 34# software distributed under the License is distributed on an
 35# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 36# KIND, either express or implied.  See the License for the
 37# specific language governing permissions and limitations
 38# under the License.
 39#
 40"""context.py: Context defines context information available during
 41# processing of a request.
 42"""
 43from abc import abstractmethod
 44
 45class Context(object):
 46  """Interface defining information available at process time"""
 47  @abstractmethod
 48  def get_message_id(self):
 49    """Return the messageid of the current message that we are processing"""
 50    pass
 51
 52  @abstractmethod
 53  def get_message_key(self):
 54    """Return the key of the current message that we are processing"""
 55    pass
 56
 57  @abstractmethod
 58  def get_message_eventtime(self):
 59    """Return the event time of the current message that we are processing"""
 60    pass
 61
 62  @abstractmethod
 63  def get_message_properties(self):
 64    """Return the message properties kv map of the current message that we are processing"""
 65    pass
 66
 67  @abstractmethod
 68  def get_current_message_topic_name(self):
 69    """Returns the topic name of the message that we are processing"""
 70    pass
 71  
 72  @abstractmethod
 73  def get_function_tenant(self):
 74    """Returns the tenant of the message that's being processed"""
 75    pass
 76
 77  @abstractmethod
 78  def get_function_namespace(self):
 79    """Returns the namespace of the message that's being processed"""
 80
 81  @abstractmethod
 82  def get_function_name(self):
 83    """Returns the function name that we are a part of"""
 84    pass
 85
 86  @abstractmethod
 87  def get_function_id(self):
 88    """Returns the function id that we are a part of"""
 89    pass
 90
 91  @abstractmethod
 92  def get_instance_id(self):
 93    """Returns the instance id that is executing the function"""
 94    pass
 95
 96  @abstractmethod
 97  def get_function_version(self):
 98    """Returns the version of function that we are executing"""
 99    pass
100
101  @abstractmethod
102  def get_logger(self):
103    """Returns the logger object that can be used to do logging"""
104    pass
105
106  @abstractmethod
107  def get_user_config_value(self, key):
108    """Returns the value of the user-defined config. If the key doesn't exist, None is returned"""
109    pass
110  
111  @abstractmethod
112  def get_user_config_map(self):
113    """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)"""
114    pass
115
116  @abstractmethod
117  def get_secret(self, secret_name):
118    """Returns the secret value associated with the name. None if nothing was found"""
119    pass
120
121  @abstractmethod
122  def get_partition_key(self):
123    """Returns partition key of the input message is one exists"""
124    pass
125
126
127  @abstractmethod
128  def record_metric(self, metric_name, metric_value):
129    """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
130    pass
131
132  @abstractmethod
133  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None):
134    """Publishes message to topic_name by first serializing the message using serde_class_name serde
135    The message will have properties specified if any
136
137    The available options for message_conf:
138
139      properties,
140      partition_key,
141      sequence_id,
142      replication_clusters,
143      disable_replication,
144      event_timestamp
145
146    """
147    pass
148
149  @abstractmethod
150  def get_input_topics(self):
151    """Returns the input topics of function"""
152    pass
153
154  @abstractmethod
155  def get_output_topic(self):
156    """Returns the output topic of function"""
157    pass
158
159  @abstractmethod
160  def get_output_serde_class_name(self):
161    """return output Serde class"""
162    pass
163
164  @abstractmethod
165  def ack(self, msgid, topic):
166    """ack this message id"""
167    pass
168
169  @abstractmethod
170  def incr_counter(self, key, amount):
171    """incr the counter of a given key in the managed state"""
172    pass
173
174  @abstractmethod
175  def get_counter(self, key):
176    """get the counter of a given key in the managed state"""
177    pass
178
179  @abstractmethod
180  def del_counter(self, key):
181    """delete the counter of a given key in the managed state"""
182    pass
183
184  @abstractmethod
185  def put_state(self, key, value):
186    """update the value of a given key in the managed state"""
187    pass
188
189  @abstractmethod
190  def get_state(self, key):
191    """get the value of a given key in the managed state"""
192    pass
class Context:
 46class Context(object):
 47  """Interface defining information available at process time"""
 48  @abstractmethod
 49  def get_message_id(self):
 50    """Return the messageid of the current message that we are processing"""
 51    pass
 52
 53  @abstractmethod
 54  def get_message_key(self):
 55    """Return the key of the current message that we are processing"""
 56    pass
 57
 58  @abstractmethod
 59  def get_message_eventtime(self):
 60    """Return the event time of the current message that we are processing"""
 61    pass
 62
 63  @abstractmethod
 64  def get_message_properties(self):
 65    """Return the message properties kv map of the current message that we are processing"""
 66    pass
 67
 68  @abstractmethod
 69  def get_current_message_topic_name(self):
 70    """Returns the topic name of the message that we are processing"""
 71    pass
 72  
 73  @abstractmethod
 74  def get_function_tenant(self):
 75    """Returns the tenant of the message that's being processed"""
 76    pass
 77
 78  @abstractmethod
 79  def get_function_namespace(self):
 80    """Returns the namespace of the message that's being processed"""
 81
 82  @abstractmethod
 83  def get_function_name(self):
 84    """Returns the function name that we are a part of"""
 85    pass
 86
 87  @abstractmethod
 88  def get_function_id(self):
 89    """Returns the function id that we are a part of"""
 90    pass
 91
 92  @abstractmethod
 93  def get_instance_id(self):
 94    """Returns the instance id that is executing the function"""
 95    pass
 96
 97  @abstractmethod
 98  def get_function_version(self):
 99    """Returns the version of function that we are executing"""
100    pass
101
102  @abstractmethod
103  def get_logger(self):
104    """Returns the logger object that can be used to do logging"""
105    pass
106
107  @abstractmethod
108  def get_user_config_value(self, key):
109    """Returns the value of the user-defined config. If the key doesn't exist, None is returned"""
110    pass
111  
112  @abstractmethod
113  def get_user_config_map(self):
114    """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)"""
115    pass
116
117  @abstractmethod
118  def get_secret(self, secret_name):
119    """Returns the secret value associated with the name. None if nothing was found"""
120    pass
121
122  @abstractmethod
123  def get_partition_key(self):
124    """Returns partition key of the input message is one exists"""
125    pass
126
127
128  @abstractmethod
129  def record_metric(self, metric_name, metric_value):
130    """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
131    pass
132
133  @abstractmethod
134  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None):
135    """Publishes message to topic_name by first serializing the message using serde_class_name serde
136    The message will have properties specified if any
137
138    The available options for message_conf:
139
140      properties,
141      partition_key,
142      sequence_id,
143      replication_clusters,
144      disable_replication,
145      event_timestamp
146
147    """
148    pass
149
150  @abstractmethod
151  def get_input_topics(self):
152    """Returns the input topics of function"""
153    pass
154
155  @abstractmethod
156  def get_output_topic(self):
157    """Returns the output topic of function"""
158    pass
159
160  @abstractmethod
161  def get_output_serde_class_name(self):
162    """return output Serde class"""
163    pass
164
165  @abstractmethod
166  def ack(self, msgid, topic):
167    """ack this message id"""
168    pass
169
170  @abstractmethod
171  def incr_counter(self, key, amount):
172    """incr the counter of a given key in the managed state"""
173    pass
174
175  @abstractmethod
176  def get_counter(self, key):
177    """get the counter of a given key in the managed state"""
178    pass
179
180  @abstractmethod
181  def del_counter(self, key):
182    """delete the counter of a given key in the managed state"""
183    pass
184
185  @abstractmethod
186  def put_state(self, key, value):
187    """update the value of a given key in the managed state"""
188    pass
189
190  @abstractmethod
191  def get_state(self, key):
192    """get the value of a given key in the managed state"""
193    pass

Interface defining information available at process time

Context()
@abstractmethod
def get_message_id(self):
48  @abstractmethod
49  def get_message_id(self):
50    """Return the messageid of the current message that we are processing"""
51    pass

Return the messageid of the current message that we are processing

@abstractmethod
def get_message_key(self):
53  @abstractmethod
54  def get_message_key(self):
55    """Return the key of the current message that we are processing"""
56    pass

Return the key of the current message that we are processing

@abstractmethod
def get_message_eventtime(self):
58  @abstractmethod
59  def get_message_eventtime(self):
60    """Return the event time of the current message that we are processing"""
61    pass

Return the event time of the current message that we are processing

@abstractmethod
def get_message_properties(self):
63  @abstractmethod
64  def get_message_properties(self):
65    """Return the message properties kv map of the current message that we are processing"""
66    pass

Return the message properties kv map of the current message that we are processing

@abstractmethod
def get_current_message_topic_name(self):
68  @abstractmethod
69  def get_current_message_topic_name(self):
70    """Returns the topic name of the message that we are processing"""
71    pass

Returns the topic name of the message that we are processing

@abstractmethod
def get_function_tenant(self):
73  @abstractmethod
74  def get_function_tenant(self):
75    """Returns the tenant of the message that's being processed"""
76    pass

Returns the tenant of the message that's being processed

@abstractmethod
def get_function_namespace(self):
78  @abstractmethod
79  def get_function_namespace(self):
80    """Returns the namespace of the message that's being processed"""

Returns the namespace of the message that's being processed

@abstractmethod
def get_function_name(self):
82  @abstractmethod
83  def get_function_name(self):
84    """Returns the function name that we are a part of"""
85    pass

Returns the function name that we are a part of

@abstractmethod
def get_function_id(self):
87  @abstractmethod
88  def get_function_id(self):
89    """Returns the function id that we are a part of"""
90    pass

Returns the function id that we are a part of

@abstractmethod
def get_instance_id(self):
92  @abstractmethod
93  def get_instance_id(self):
94    """Returns the instance id that is executing the function"""
95    pass

Returns the instance id that is executing the function

@abstractmethod
def get_function_version(self):
 97  @abstractmethod
 98  def get_function_version(self):
 99    """Returns the version of function that we are executing"""
100    pass

Returns the version of function that we are executing

@abstractmethod
def get_logger(self):
102  @abstractmethod
103  def get_logger(self):
104    """Returns the logger object that can be used to do logging"""
105    pass

Returns the logger object that can be used to do logging

@abstractmethod
def get_user_config_value(self, key):
107  @abstractmethod
108  def get_user_config_value(self, key):
109    """Returns the value of the user-defined config. If the key doesn't exist, None is returned"""
110    pass

Returns the value of the user-defined config. If the key doesn't exist, None is returned

@abstractmethod
def get_user_config_map(self):
112  @abstractmethod
113  def get_user_config_map(self):
114    """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)"""
115    pass

Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)

@abstractmethod
def get_secret(self, secret_name):
117  @abstractmethod
118  def get_secret(self, secret_name):
119    """Returns the secret value associated with the name. None if nothing was found"""
120    pass

Returns the secret value associated with the name. None if nothing was found

@abstractmethod
def get_partition_key(self):
122  @abstractmethod
123  def get_partition_key(self):
124    """Returns partition key of the input message is one exists"""
125    pass

Returns partition key of the input message is one exists

@abstractmethod
def record_metric(self, metric_name, metric_value):
128  @abstractmethod
129  def record_metric(self, metric_name, metric_value):
130    """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
131    pass

Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)

@abstractmethod
def publish( self, topic_name, message, serde_class_name='serde.IdentitySerDe', properties=None, compression_type=None, callback=None, message_conf=None):
133  @abstractmethod
134  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None):
135    """Publishes message to topic_name by first serializing the message using serde_class_name serde
136    The message will have properties specified if any
137
138    The available options for message_conf:
139
140      properties,
141      partition_key,
142      sequence_id,
143      replication_clusters,
144      disable_replication,
145      event_timestamp
146
147    """
148    pass

Publishes message to topic_name by first serializing the message using serde_class_name serde The message will have properties specified if any

The available options for message_conf:

properties, partition_key, sequence_id, replication_clusters, disable_replication, event_timestamp

@abstractmethod
def get_input_topics(self):
150  @abstractmethod
151  def get_input_topics(self):
152    """Returns the input topics of function"""
153    pass

Returns the input topics of function

@abstractmethod
def get_output_topic(self):
155  @abstractmethod
156  def get_output_topic(self):
157    """Returns the output topic of function"""
158    pass

Returns the output topic of function

@abstractmethod
def get_output_serde_class_name(self):
160  @abstractmethod
161  def get_output_serde_class_name(self):
162    """return output Serde class"""
163    pass

return output Serde class

@abstractmethod
def ack(self, msgid, topic):
165  @abstractmethod
166  def ack(self, msgid, topic):
167    """ack this message id"""
168    pass

ack this message id

@abstractmethod
def incr_counter(self, key, amount):
170  @abstractmethod
171  def incr_counter(self, key, amount):
172    """incr the counter of a given key in the managed state"""
173    pass

incr the counter of a given key in the managed state

@abstractmethod
def get_counter(self, key):
175  @abstractmethod
176  def get_counter(self, key):
177    """get the counter of a given key in the managed state"""
178    pass

get the counter of a given key in the managed state

@abstractmethod
def del_counter(self, key):
180  @abstractmethod
181  def del_counter(self, key):
182    """delete the counter of a given key in the managed state"""
183    pass

delete the counter of a given key in the managed state

@abstractmethod
def put_state(self, key, value):
185  @abstractmethod
186  def put_state(self, key, value):
187    """update the value of a given key in the managed state"""
188    pass

update the value of a given key in the managed state

@abstractmethod
def get_state(self, key):
190  @abstractmethod
191  def get_state(self, key):
192    """get the value of a given key in the managed state"""
193    pass

get the value of a given key in the managed state