event stream consumer
EventStreamConsumer (EventStreamBase)
a base consumer class for consuming from kafka, uses multiprocessing to share workload
consume(self)
consume messages and add them to a queue to share with the worker processes
Source code in event_stream/event_stream_consumer.py
def consume(self):
"""consume messages and add them to a queue to share with the worker processes
"""
logging.warning(self.log + "start consume")
self.running = True
if not self.consumer:
self.create_consumer()
if self.throughput_statistics_running and not self.counter:
self.counter = Value('i', 0)
counter_time = 10
api_limit_thread = threading.Timer(counter_time, throughput_statistics, args=[self.counter, counter_time])
api_limit_thread.daemon = True
api_limit_thread.start()
pool = Pool(self.process_number, self.worker, (self.task_queue,))
while self.running:
try:
for msg in self.consumer:
logging.debug(self.log + 'msg in consumer ')
if self.counter:
with self.counter.get_lock():
self.counter.value += 1
self.task_queue.put(json.loads(msg.value.decode('utf-8')))
except Exception as exc:
self.consumer.close()
logging.error(self.log + 'stream Consumer generated an exception: %s' % exc)
logging.warning(self.log + "Consumer closed")
break
# keep alive
if self.running:
return self.consume()
pool.close()
logging.warning(self.log + "Consumer shutdown")
create_consumer(self)
create the consumer, connect to kafka
Source code in event_stream/event_stream_consumer.py
def create_consumer(self):
"""create the consumer, connect to kafka
"""
logging.debug(self.log + "rt: %s" % self.relation_type)
if self.state == 'all':
self.topics = self.build_topic_list()
if isinstance(self.state, six.string_types):
self.state = [self.state]
if isinstance(self.relation_type, six.string_types):
self.relation_type = [self.relation_type]
if not self.topics:
self.topics = list()
for state in self.state:
for relation_type in self.relation_type:
self.topics.append(self.get_topic_name(state=state, relation_type=relation_type))
logging.debug(self.log + "get consumer for topic: %s" % self.topics)
self.consumer = KafkaConsumer(group_id=self.group_id,
bootstrap_servers=self.bootstrap_servers, api_version=self.api_version,
consumer_timeout_ms=self.consumer_timeout_ms)
for topic in self.topics:
logging.debug(self.log + "consumer subscribe: %s" % topic)
self.consumer.subscribe(topic)
logging.debug(self.log + "consumer subscribed to: %s" % self.consumer.topics())
on_message(self, json_msg)
the on message function to be implemented in own classes
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_msg |
the message to do stuff with |
required |
Source code in event_stream/event_stream_consumer.py
def on_message(self, json_msg):
"""the on message function to be implemented in own classes
Arguments:
json_msg: the message to do stuff with
"""
logging.debug(self.log + "on message")
start(i=0)
staticmethod
start the consumer
Source code in event_stream/event_stream_consumer.py
@staticmethod
def start(i=0):
"""start the consumer
"""
esc = EventStreamConsumer(i)
logging.debug(EventStreamBase.log + 'Start %s' % str(i))
esc.consume()
stop(self)
stop the consumer
Source code in event_stream/event_stream_consumer.py
def stop(self):
"""stop the consumer
"""
self.running = False
logging.debug(self.log + 'stop running consumer')
worker(self, queue)
worker function to get items from the queue
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue |
the queue |
required |
Source code in event_stream/event_stream_consumer.py
def worker(self, queue):
"""worker function to get items from the queue
Arguments:
queue: the queue
"""
logging.debug(self.log + "working %s" % os.getpid())
while self.running:
time.sleep(0.005)
try:
item = queue.get()
except queue.Empty:
time.sleep(0.1)
pass
else:
logging.debug(self.log + "got %s item" % os.getpid())
self.on_message(item)
throughput_statistics(v, time_delta, no_throughput_counter=0)
show and setup in own thread repeatedly how many events are processed restarts if counter of no throughput is 10 (10 timed deltas with no data processed)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
the value |
required | |
time_delta |
time delta we wan't to monitor |
required | |
no_throughput_counter |
counter of no throughput |
0 |
Source code in event_stream/event_stream_consumer.py
def throughput_statistics(v, time_delta, no_throughput_counter=0):
"""show and setup in own thread repeatedly how many events are processed
restarts if counter of no throughput is 10 (10 timed deltas with no data processed)
Arguments:
v: the value
time_delta: time delta we wan't to monitor
no_throughput_counter: counter of no throughput
"""
logging.warning("THROUGHPUT: %d / %d" % (v.value, time_delta))
if v.value == 0:
no_throughput_counter += 1
else:
no_throughput_counter = 0
if no_throughput_counter == 10:
logging.warning('Exit Container because of no data throughput')
os.system("pkill -9 python") # allows killing of multiprocessing programs
with v.get_lock():
v.value = 0
api_limit_thread = threading.Timer(time_delta, throughput_statistics, args=[v, time_delta, no_throughput_counter])
api_limit_thread.daemon = True
api_limit_thread.start()