event stream base
EventStreamBase
a base class for connecting to kafka
build_topic_list(self)
build a list of topics from the configs
Source code in event_stream/event_stream_base.py
def build_topic_list(self):
"""build a list of topics from the configs
"""
result = []
for c_state in self.config_states:
result.append(self.build_topic_name(c_state))
if 'own_topic' in self.config_states[c_state]:
for c_o_topic in self.config_states[c_state]['own_topic']:
result.append(self.build_topic_name(c_state, c_o_topic))
self.topics = result
logging.debug("%s current topics for events: %s" % (self.log, self.topics))
return result
build_topic_name(self, state, relation_type='')
build the name of the topic for a given state
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
the state to get the topic for |
required | |
relation_type |
optional, in case it has it's own topic |
'' |
Source code in event_stream/event_stream_base.py
def build_topic_name(self, state, relation_type=''):
"""build the name of the topic for a given state
Arguments:
state: the state to get the topic for
relation_type: optional, in case it has it's own topic
"""
result = self.event_string + self.state_separator + state
if relation_type != '':
result = result + self.relation_type_separator + relation_type
return result
get_topic_name(self, state, relation_type='')
get the name of the topic for a given state
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
the state to get the topic for |
required | |
relation_type |
optional, in case it has it's own topic |
'' |
Source code in event_stream/event_stream_base.py
def get_topic_name(self, state, relation_type=''):
"""get the name of the topic for a given state
Arguments:
state: the state to get the topic for
relation_type: optional, in case it has it's own topic
"""
result = self.event_string + self.state_separator + state
# if a relation type is set and has is own topic
if relation_type != '' and 'own_topic' in self.config_states[state] and relation_type in \
self.config_states[state]['own_topic']:
result = result + self.relation_type_separator + relation_type
return result
get_topic_name_event(self, event)
this will resolve an event to it's respected kafka topic
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
the event to be resolved |
required |
Source code in event_stream/event_stream_base.py
def get_topic_name_event(self, event):
"""this will resolve an event to it's respected kafka topic
Arguments:
key: the event to be resolved
"""
state = event.get('state')
relation_type = event.get('relation_type')
return self.get_topic_name(state, relation_type)
resolve_event(self, event)
this will resolve an event to it's respected kafka topic
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
the event to be resolved |
required |
Source code in event_stream/event_stream_base.py
def resolve_event(self, event):
"""this will resolve an event to it's respected kafka topic
Arguments:
key: the event to be resolved
"""
topic_name = self.build_topic_name(event['state'], event['relation_type'])
if topic_name in self.topics:
return topic_name
logging.warning(self.log + "Unable to resolve event, topic_name %s not found" % topic_name)
return False
setup_logging()
staticmethod
logging config to be used
Source code in event_stream/event_stream_base.py
@staticmethod
def setup_logging():
"""logging config to be used"""
logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.WARNING, datefmt="%H:%M:%S")