Skip to content

event stream producer

EventStreamProducer (EventStreamBase)

produce messages for kafka

create_producer(self)

create the producer

Source code in event_stream/event_stream_producer.py
def create_producer(self):
    """create the producer
    """
    self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, api_version=self.api_version)

publish(self, event)

publish an event

Parameters:

Name Type Description Default
event

the event which should be shared

required
Source code in event_stream/event_stream_producer.py
def publish(self, event):
    """publish an event

    Arguments:
        event: the event which should be shared
    """
    topic_event = self.get_topic_name_event(event)

    if not self.producer:
        self.create_producer()

    value = event.get_json()
    self.producer.send(topic_event, value=value.encode('utf-8'))
    self.producer.flush()
    logging.debug(self.log + 'Message published successfully to topic %s' % topic_event)