Skip to content

event stream consumer

EventStreamConsumer (EventStreamBase)

a base consumer class for consuming from kafka, uses multiprocessing to share workload

consume(self)

consume messages and add them to a queue to share with the worker processes

Source code in event_stream/event_stream_consumer.py
def consume(self):
    """consume messages and add them to a queue to share with the worker processes

    """
    logging.warning(self.log + "start consume")
    self.running = True

    if not self.consumer:
        self.create_consumer()

    if self.throughput_statistics_running and not self.counter:
        self.counter = Value('i', 0)
        counter_time = 10
        api_limit_thread = threading.Timer(counter_time, throughput_statistics, args=[self.counter, counter_time])
        api_limit_thread.daemon = True
        api_limit_thread.start()

    pool = Pool(self.process_number, self.worker, (self.task_queue,))

    while self.running:
        try:
            for msg in self.consumer:
                logging.debug(self.log + 'msg in consumer ')
                if self.counter:
                    with self.counter.get_lock():
                        self.counter.value += 1
                self.task_queue.put(json.loads(msg.value.decode('utf-8')))

        except Exception as exc:
            self.consumer.close()
            logging.error(self.log + 'stream Consumer generated an exception: %s' % exc)
            logging.warning(self.log + "Consumer closed")
            break

    # keep alive
    if self.running:
        return self.consume()

    pool.close()
    logging.warning(self.log + "Consumer shutdown")

create_consumer(self)

create the consumer, connect to kafka

Source code in event_stream/event_stream_consumer.py
def create_consumer(self):
    """create the consumer, connect to kafka
    """
    logging.debug(self.log + "rt: %s" % self.relation_type)

    if self.state == 'all':
        self.topics = self.build_topic_list()

    if isinstance(self.state, six.string_types):
        self.state = [self.state]

    if isinstance(self.relation_type, six.string_types):
        self.relation_type = [self.relation_type]

    if not self.topics:
        self.topics = list()
        for state in self.state:
            for relation_type in self.relation_type:
                self.topics.append(self.get_topic_name(state=state, relation_type=relation_type))

    logging.debug(self.log + "get consumer for topic: %s" % self.topics)
    self.consumer = KafkaConsumer(group_id=self.group_id,
                                  bootstrap_servers=self.bootstrap_servers, api_version=self.api_version,
                                  consumer_timeout_ms=self.consumer_timeout_ms)

    for topic in self.topics:
        logging.debug(self.log + "consumer subscribe: %s" % topic)
        self.consumer.subscribe(topic)

    logging.debug(self.log + "consumer subscribed to: %s" % self.consumer.topics())

on_message(self, json_msg)

the on message function to be implemented in own classes

Parameters:

Name Type Description Default
json_msg

the message to do stuff with

required
Source code in event_stream/event_stream_consumer.py
def on_message(self, json_msg):
    """the on message function to be implemented in own classes

    Arguments:
        json_msg: the message to do stuff with
    """
    logging.debug(self.log + "on message")

start(i=0) staticmethod

start the consumer

Source code in event_stream/event_stream_consumer.py
@staticmethod
def start(i=0):
    """start the consumer
    """
    esc = EventStreamConsumer(i)
    logging.debug(EventStreamBase.log + 'Start %s' % str(i))
    esc.consume()

stop(self)

stop the consumer

Source code in event_stream/event_stream_consumer.py
def stop(self):
    """stop the consumer
    """
    self.running = False
    logging.debug(self.log + 'stop running consumer')

worker(self, queue)

worker function to get items from the queue

Parameters:

Name Type Description Default
queue

the queue

required
Source code in event_stream/event_stream_consumer.py
def worker(self, queue):
    """worker function to get items from the queue

    Arguments:
        queue: the queue
    """
    logging.debug(self.log + "working %s" % os.getpid())
    while self.running:
        time.sleep(0.005)
        try:
            item = queue.get()
        except queue.Empty:
            time.sleep(0.1)
            pass
        else:
            logging.debug(self.log + "got %s item" % os.getpid())
            self.on_message(item)

throughput_statistics(v, time_delta, no_throughput_counter=0)

show and setup in own thread repeatedly how many events are processed restarts if counter of no throughput is 10 (10 timed deltas with no data processed)

Parameters:

Name Type Description Default
v

the value

required
time_delta

time delta we wan't to monitor

required
no_throughput_counter

counter of no throughput

0
Source code in event_stream/event_stream_consumer.py
def throughput_statistics(v, time_delta, no_throughput_counter=0):
    """show and setup in own thread repeatedly how many events are processed
        restarts if counter of no throughput is 10 (10 timed deltas with no data processed)
    Arguments:
        v: the value
        time_delta: time delta we wan't to monitor
        no_throughput_counter: counter of no throughput
    """
    logging.warning("THROUGHPUT: %d / %d" % (v.value, time_delta))

    if v.value == 0:
        no_throughput_counter += 1
    else:
        no_throughput_counter = 0
    if no_throughput_counter == 10:
        logging.warning('Exit Container because of no data throughput')
        os.system("pkill -9 python")  # allows killing of multiprocessing programs

    with v.get_lock():
        v.value = 0

    api_limit_thread = threading.Timer(time_delta, throughput_statistics, args=[v, time_delta, no_throughput_counter])
    api_limit_thread.daemon = True
    api_limit_thread.start()