Skip to content

twitter connector

TwitterConnector (EventStreamProducer)

run(self)

run function ensuring the connector is either running correctly or restarting / reconnecting as needed

Parameters:

Name Type Description Default
- time_delta

how often to run this

required
Source code in src/twitter_connector.py
def run(self):
    """run function ensuring the connector is either running correctly or restarting / reconnecting as needed

    Arguments:
        - time_delta: how often to run this
    """
    while self.running:
        try:
            self.send_data()
        except (requests.Timeout, ValueError,
                requests.exceptions.ChunkedEncodingError, urllib3.exceptions.InvalidChunkLength,
                urllib3.exceptions.ReadTimeoutError, urllib3.exceptions.ProtocolError):
            logging.exception(self.log)
        except requests.ConnectionError:
            logging.exception(self.log)
            time.sleep(5)  # sleep a little to avoid issues with max connections
        finally:
            logging.warning('stream restart in 10')
            time.sleep(10)

send_data(self)

open the stream to twitter and send the data as events to kafka

Source code in src/twitter_connector.py
def send_data(self):
    """open the stream to twitter and send the data as events to kafka
    """
    time_delta = 30
    self.counter = 0

    api_limit_thread = threading.Timer(time_delta, self.throughput_statistics, args=[time_delta, 0])
    api_limit_thread.daemon = True
    api_limit_thread.start()

    bearer_token = os.getenv('TWITTER_BEARER_TOKEN')
    headers = create_headers(bearer_token)
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream",
        params={self.tweet_expansion_key: ','.join(self.tweet_expansion_value),
                self.tweet_fields_key: ','.join(self.tweet_fields_value),
                self.user_expansion_key: ','.join(self.user_expansion_value)},
        headers=headers,
        stream=True,
    )

    logging.debug('twitter response data')
    logging.debug({self.tweet_expansion_key: ','.join(self.tweet_expansion_value),
                   self.tweet_fields_key: ','.join(self.tweet_fields_value),
                   self.user_expansion_key: ','.join(self.user_expansion_value)})
    logging.debug(response.status_code)
    logging.debug(response)

    if response.status_code != 200:
        raise ConnectionError(
            "Cannot get stream (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )

    i = 0
    for line in response.iter_lines():
        if line:
            i += 1
            logging.debug(self.log + "got new message " + str(i))
            twitter_json = json.loads(line.decode('utf-8'))

            if 'data' in twitter_json and 'includes' in twitter_json and 'matching_rules' in twitter_json \
                    and 'id' in twitter_json['data'] and 'created_at' in twitter_json['data'] \
                    and 'author_id' in twitter_json['data']:
                e = Event()

                self.counter += 1

                e.set('id', str(uuid.uuid4()))

                e.set('subj_id', twitter_json['data']['id'])
                e.set('relation_type', 'discusses')
                e.set('occurred_at', twitter_json['data']['created_at'])
                e.set('source_id', 'twitter')
                e.set('state', 'unlinked')

                basePid = "twitter://status?id="
                baseAuthorUrl = "twitter://user?screen_name="
                e.data['subj']['pid'] = basePid + twitter_json['data']['id']
                e.data['subj']['url'] = basePid + twitter_json['data']['id']
                e.data['subj']['title'] = "Tweet " + twitter_json['data']['id']
                e.data['subj']['issued'] = twitter_json['data']['created_at']
                e.data['subj']['author'] = {
                    "url": baseAuthorUrl + get_author_name(
                        twitter_json['data']['author_id'],
                        twitter_json['includes']['users'])
                }

                if 'users' in twitter_json['includes'] and 'referenced_tweets' in twitter_json['data'] \
                        and 'id' in twitter_json['data']['referenced_tweets'][0]:
                    e.data['subj']['original-tweet-url'] = basePid + twitter_json['data']['referenced_tweets'][0][
                        'id']
                    e.data['subj']['original-tweet-author'] = baseAuthorUrl + get_author_name(
                        twitter_json['data']['author_id'], twitter_json['includes']['users'], True)
                e.data['subj']['alternative-id'] = twitter_json['data']['id']

                e.data['subj']['data'] = twitter_json['data']
                e.data['subj']['data']['includes'] = twitter_json['includes']
                e.data['subj']['data']['matching_rules'] = twitter_json['matching_rules']

                self.publish(e)
            else:
                logging.warning(twitter_json)
        else:
            logging.debug('keep alive')

start(i=0) staticmethod

start the consumer, wait 5 sec in case its a restart

  • i: id to use
Source code in src/twitter_connector.py
@staticmethod
def start(i=0):
    """start the consumer, wait 5 sec in case its a restart

    Arguments:
    - i: id to use
    """
    time.sleep(10)
    tc = TwitterConnector(i)
    logging.debug(TwitterConnector.log + 'Start %s' % str(i))
    tc.running = True
    tc.run()

throughput_statistics(self, time_delta, i)

statistic tools

Parameters:

Name Type Description Default
- time_delta

how often to run this

required
Source code in src/twitter_connector.py
def throughput_statistics(self, time_delta, i):
    """statistic tools

    Arguments:
        - time_delta: how often to run this
    """
    logging.warning("THROUGHPUT: %d / %d" % (self.counter, time_delta))
    if self.counter == 0:
        i += 1
    if i == 5:
        sys.exit() # end so it will restart clean

    self.counter = 0

    api_limit_thread = threading.Timer(time_delta, self.throughput_statistics, args=[time_delta, i])
    api_limit_thread.daemon = True
    api_limit_thread.start()

create_headers(bearer_token)

create the header needed for bearer authorization

Parameters:

Name Type Description Default
- time_delta

how often to run this

required
Source code in src/twitter_connector.py
def create_headers(bearer_token):
    """create the header needed for bearer authorization

    Arguments:
        - time_delta: how often to run this
    """
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers

get_author_name(author_id, users, original=False)

get the name of the author from the includes

Parameters:

Name Type Description Default
- author_id

the author_id

required
- users

the users part of the includes

required
- original required
Source code in src/twitter_connector.py
def get_author_name(author_id, users, original=False):
    """get the name of the author from the includes

    Arguments:
        - author_id: the author_id
        - users: the users part of the includes
        - original:
    """
    for user in users:
        if ('id' in user and 'username' in user) and (
                (user['id'] == author_id and not original) or user['id'] != author_id and original):
            return user['username']
    return ""