event stream producer
EventStreamProducer (EventStreamBase)
produce messages for kafka
create_producer(self)
create the producer
Source code in event_stream/event_stream_producer.py
def create_producer(self):
"""create the producer
"""
self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, api_version=self.api_version)
publish(self, event)
publish an event
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
the event which should be shared |
required |
Source code in event_stream/event_stream_producer.py
def publish(self, event):
"""publish an event
Arguments:
event: the event which should be shared
"""
topic_event = self.get_topic_name_event(event)
if not self.producer:
self.create_producer()
value = event.get_json()
self.producer.send(topic_event, value=value.encode('utf-8'))
self.producer.flush()
logging.debug(self.log + 'Message published successfully to topic %s' % topic_event)