Skip to content

event stream base

EventStreamBase

a base class for connecting to kafka

build_topic_list(self)

build a list of topics from the configs

Source code in event_stream/event_stream_base.py
def build_topic_list(self):
    """build a list of topics from the configs
    """
    result = []

    for c_state in self.config_states:
        result.append(self.build_topic_name(c_state))
        if 'own_topic' in self.config_states[c_state]:
            for c_o_topic in self.config_states[c_state]['own_topic']:
                result.append(self.build_topic_name(c_state, c_o_topic))

    self.topics = result
    logging.debug("%s current topics for events: %s" % (self.log, self.topics))
    return result

build_topic_name(self, state, relation_type='')

build the name of the topic for a given state

Parameters:

Name Type Description Default
state

the state to get the topic for

required
relation_type

optional, in case it has it's own topic

''
Source code in event_stream/event_stream_base.py
def build_topic_name(self, state, relation_type=''):
    """build the name of the topic for a given state

    Arguments:
        state: the state to get the topic for
        relation_type: optional, in case it has it's own topic
    """
    result = self.event_string + self.state_separator + state

    if relation_type != '':
        result = result + self.relation_type_separator + relation_type
    return result

get_topic_name(self, state, relation_type='')

get the name of the topic for a given state

Parameters:

Name Type Description Default
state

the state to get the topic for

required
relation_type

optional, in case it has it's own topic

''
Source code in event_stream/event_stream_base.py
def get_topic_name(self, state, relation_type=''):
    """get the name of the topic for a given state

    Arguments:
        state: the state to get the topic for
        relation_type: optional, in case it has it's own topic
    """
    result = self.event_string + self.state_separator + state

    # if a relation type is set and has is own topic
    if relation_type != '' and 'own_topic' in self.config_states[state] and relation_type in \
            self.config_states[state]['own_topic']:
        result = result + self.relation_type_separator + relation_type
    return result

get_topic_name_event(self, event)

this will resolve an event to it's respected kafka topic

Parameters:

Name Type Description Default
key

the event to be resolved

required
Source code in event_stream/event_stream_base.py
def get_topic_name_event(self, event):
    """this will resolve an event to it's respected kafka topic

    Arguments:
        key: the event to be resolved
    """
    state = event.get('state')
    relation_type = event.get('relation_type')
    return self.get_topic_name(state, relation_type)

resolve_event(self, event)

this will resolve an event to it's respected kafka topic

Parameters:

Name Type Description Default
key

the event to be resolved

required
Source code in event_stream/event_stream_base.py
def resolve_event(self, event):
    """this will resolve an event to it's respected kafka topic

    Arguments:
        key: the event to be resolved
    """
    topic_name = self.build_topic_name(event['state'], event['relation_type'])
    if topic_name in self.topics:
        return topic_name

    logging.warning(self.log + "Unable to resolve event, topic_name %s not found" % topic_name)
    return False

setup_logging() staticmethod

logging config to be used

Source code in event_stream/event_stream_base.py
@staticmethod
def setup_logging():
    """logging config to be used"""
    logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.WARNING, datefmt="%H:%M:%S")