twitter worker
TwitterWorker (EventStreamConsumer, EventStreamProducer)
process tweets
get_author_data(tweet_data)
staticmethod
get the author data of a tweet
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tweet_data |
the tweet data we want an author from |
required |
Source code in src/twitter_worker.py
@staticmethod
def get_author_data(tweet_data):
"""get the author data of a tweet
Arguments:
tweet_data: the tweet data we want an author from
"""
author_id = tweet_data['author_id']
for user in tweet_data['includes']['users']:
if user['id'] == author_id:
return user
return None
normalize_abstract_value(value)
staticmethod
normalize the calculated value from an abstract comparison
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value |
the value to be normalized |
required |
Source code in src/twitter_worker.py
@staticmethod
def normalize_abstract_value(value):
"""normalize the calculated value from an abstract comparison
Arguments:
value: the value to be normalized
"""
if value > 0.9:
return 3
if value > 0.8:
return 5
if value > 0.5:
return 10
if value > 0.2:
return 3
return 1
normalize_sentiment_value(value)
staticmethod
normalize the calculated value from an sentiment comparison better sentiment -> better value
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value |
the value to be normalized |
required |
Source code in src/twitter_worker.py
@staticmethod
def normalize_sentiment_value(value):
"""normalize the calculated value from an sentiment comparison
better sentiment -> better value
Arguments:
value: the value to be normalized
"""
if value > 0.6:
return 10
if value > 0.33:
return 9
if value > 0.1:
return 7
if value < -0.6:
return 0
if value < -0.33:
return 1
if value < -0.1:
return 2
return 5
on_message(self, json_msg)
process a tweet
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
json_msg |
the json_msg containing the event to be processed |
required |
Source code in src/twitter_worker.py
def on_message(self, json_msg):
"""process a tweet
Arguments:
json_msg: the json_msg containing the event to be processed
"""
if not self.dao:
self.dao = DAO()
logging.warning(self.log + " create dao")
logging.info(self.log + "on message twitter consumer")
e = Event()
e.from_json(json_msg)
if e.get('source_id') == 'twitter':
e.data['subj']['processed'] = {}
e.data['subj']['processed']['question_mark_count'] = e.data['subj']['data']['text'].count("?")
e.data['subj']['processed']['exclamation_mark_count'] = e.data['subj']['data']['text'].count("!")
e.data['subj']['processed']['length'] = len(e.data['subj']['data']['text'])
pub_timestamp = date(2012, 1, 1)
if 'year' in e.data['obj']['data']:
pub_timestamp = date(int(e.data['obj']['data']['year']), 1, 1)
if 'pub_date' in e.data['obj']['data'] and e.data['obj']['data']['pub_date']:
split_date = e.data['obj']['data']['pub_date'].split('-')
if len(split_date) > 2:
pub_timestamp = date(int(split_date[0]), int(split_date[1]), int(split_date[2]))
e.data['subj']['processed']['time_past'] = (date.today() - pub_timestamp).days
hashtags = []
annotations = []
a_types = []
if 'entities' in e.data['subj']['data']:
if 'hashtags' in e.data['subj']['data']['entities']:
for tag in e.data['subj']['data']['entities']['hashtags']:
hashtags.append(normalize(tag['tag']))
if 'annotations' in e.data['subj']['data']['entities']:
for tag in e.data['subj']['data']['entities']['annotations']:
annotations.append(tag['normalized_text'])
a_types.append(tag['type'])
e.data['subj']['processed']['hashtags'] = hashtags
e.data['subj']['processed']['annotations'] = annotations
e.data['subj']['processed']['a_types'] = a_types
context_a_domain = []
context_a_entity = []
e.data['subj']['processed']['context_domain'] = context_a_domain
e.data['subj']['processed']['context_entity'] = context_a_entity
# typeOfTweet (quote, retweet, tweet)
if 'referenced_tweets' in e.data['subj']['data'] and len(e.data['subj']['data']['referenced_tweets']) > 0 \
and 'type' in e.data['subj']['data']['referenced_tweets'][0]:
e.data['subj']['processed']['tweet_type'] = e.data['subj']['data']['referenced_tweets'][0]['type']
else:
e.data['subj']['processed']['tweet_type'] = 'tweet'
if e.data['subj']['data']['conversation_id'] == e.data['subj']['pid']:
logging.warning('conversation id matches id -> tweet')
# author processing
author_data = TwitterWorker.get_author_data(e.data['subj']['data'])
# should be always true?
e.data['subj']['processed']['location'] = 'unknown'
e.data['subj']['processed']['followers'] = 0
e.data['subj']['processed']['bot_rating'] = 1
if author_data:
if 'location' in author_data:
temp_location = geoencode(author_data['location'])
if temp_location:
e.data['subj']['processed']['location'] = temp_location
e.data['subj']['processed']['followers'] = author_data['public_metrics']['followers_count']
e.data['subj']['processed']['verified'] = 10 if author_data['verified'] else 5
e.data['subj']['processed']['name'] = author_data['username']
if 'bot' not in author_data['username'].lower() and 'bot' not in e.data['subj']['data']['source']:
e.data['subj']['processed']['bot_rating'] = 10
content_score = 1
text = e.data['subj']['data']['text'].strip().lower()
if text and 'abstract' in e.data['obj']['data'] and 'lang' in e.data['subj']['data']:
spacy_result = self.spacy_process(text, e.data['obj']['data']['abstract'],
e.data['subj']['data']['lang'])
e.data['subj']['processed']['words'] = spacy_result['common_words']
e.data['subj']['processed']['contains_abstract_raw'] = spacy_result['abstract']
e.data['subj']['processed']['contains_abstract'] = self.normalize_abstract_value(
spacy_result['abstract'])
e.data['subj']['processed']['sentiment_raw'] = spacy_result['sentiment']
e.data['subj']['processed']['sentiment'] = self.normalize_sentiment_value(spacy_result['sentiment'])
content_score = e.data['subj']['processed']['contains_abstract'] + e.data['subj']['processed'][
'sentiment']
content_score += score_length(e.data['subj']['processed']['length'])
user_score = e.data['subj']['processed']['bot_rating']
if e.data['subj']['processed']['followers'] and type(e.data['subj']['processed']['followers']) == int \
or type(e.data['subj']['processed']['followers']) == float:
user_score += math.log(e.data['subj']['processed']['followers'], 2)
user_score += e.data['subj']['processed']['verified']
type_factor = score_type(e.data['subj']['processed']['tweet_type'])
time_score = score_time(e.data['subj']['processed']['time_past'])
logging.info('score %s - %s - %s - %s' % (type_factor, time_score, user_score, content_score))
e.data['subj']['processed']['time_score'] = time_score
e.data['subj']['processed']['type_factor'] = type_factor
e.data['subj']['processed']['user_score'] = user_score
e.data['subj']['processed']['content_score'] = content_score
weights = {'time': 3, 'user': 6, 'content': 5}
e.data['subj']['processed']['score'] = weights['time'] * time_score
e.data['subj']['processed']['score'] += weights['user'] * user_score
e.data['subj']['processed']['score'] += weights['content'] * content_score
e.data['subj']['processed']['score'] *= type_factor
e.set('state', 'processed')
self.dao.save_discussion_data(e.data)
logging.debug('publish ' + e.data['obj']['data']['doi'])
self.publish(e)
else:
logging.warning('non twitter event')
process_text_for_similarity(nlp, text)
staticmethod
process a given text after preparing it for language processing first remove certain words, and only allow verbs and nouns, no urls but annotations
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nlp |
the language processing |
required | |
text |
the text |
required |
Source code in src/twitter_worker.py
@staticmethod
def process_text_for_similarity(nlp, text):
"""process a given text after preparing it for language processing first
remove certain words, and only allow verbs and nouns, no urls but annotations
Arguments:
nlp: the language processing
text: the text
"""
doc = nlp(text)
if doc:
return [token.lemma_.lower() for token in doc if not token.is_stop and not token.is_punct
and not token.is_space and len(token.lemma_) > 2 and not token.lemma_.lower().startswith('http')
and token.lemma_.lower() != 'the' and token.lemma_.lower() != 'amp'
and token.lemma_.lower() != 'and' and token.lemma_.lower() != 'pos'
and token.lemma_.lower() != 'bull'
and (token.lemma_.isalpha() or token.lemma_.startswith('@'))
and (token.pos_ == "NOUN" or token.pos_ == "PROPN" or token.pos_ == "VERB")]
return []
spacy_process(self, text, abstract, lang)
process a tweet using spacy
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text |
the tweet text |
required | |
abstract |
the publication abstract |
required | |
lang |
language of the tweet |
required |
Source code in src/twitter_worker.py
def spacy_process(self, text, abstract, lang):
"""process a tweet using spacy
Arguments:
text: the tweet text
abstract: the publication abstract
lang: language of the tweet
"""
result = {
'sentiment': 0,
'abstract': 0,
'common_words': []
}
if not text or not abstract or not lang:
return result
local_nlp = None
# https://spacy.io/universe/project/spacy-langdetect
# in case we have an undefined language
supported = ['de', 'es', 'en', 'fr', 'ja', 'it', 'ru', 'pl']
if 'en' not in lang and lang in supported:
if lang not in self.nlp or not self.nlp[lang]:
self.nlp[lang] = spacy.load(lang + '_core_news_md')
self.nlp[lang].add_pipe('spacytextblob')
local_nlp = self.nlp[lang]
elif 'en' in lang:
if not self.nlp['en']:
self.nlp['en'] = spacy.load('en_core_web_md')
self.nlp['en'].add_pipe('spacytextblob')
local_nlp = self.nlp['en']
else:
# neutral results if we have an unknown language
logging.debug('unknown language')
return result
# https://www.trinnovative.de/blog/2020-09-08-natural-language-processing-mit-spacy.html
words = TwitterWorker.process_text_for_similarity(local_nlp, text)
abstract_words = TwitterWorker.process_text_for_similarity(local_nlp, abstract)
word_freq = Counter(words)
sim = 0
tweet_doc = local_nlp(" ".join(words))
abstract_doc = local_nlp(" ".join(abstract_words))
if abstract_doc:
sim = tweet_doc.similarity(abstract_doc)
sentiment = local_nlp(text)._.polarity
result = {
'sentiment': sentiment,
'abstract': sim,
'common_words': word_freq.most_common(10)
}
return result
start(i=0)
staticmethod
start the consumer
Source code in src/twitter_worker.py
@staticmethod
def start(i=0):
"""start the consumer
"""
esc = TwitterWorker(i)
logging.warning(TwitterWorker.log + 'Start %s' % str(i))
esc.consume()
geoencode(location)
geoencode a location
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
location |
the location we want a country for |
required |
Source code in src/twitter_worker.py
@lru_cache(maxsize=50000)
def geoencode(location):
"""geoencode a location
Arguments:
location: the location we want a country for
"""
base_url = 'https://nominatim.openstreetmap.org/search?&format=jsonv2&addressdetails=1&q='
r = requests.get(base_url + location)
if r.status_code == 200:
json_response = r.json()
if json_response and isinstance(json_response, list) and len(json_response) > 0:
json_object = json_response[0]
if 'address' in json_object and 'country_code' in json_object['address']:
return json_object['address']['country_code']
return None
normalize(string)
normalize a string
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
string |
the string to be normalized |
required |
Source code in src/twitter_worker.py
def normalize(string):
"""normalize a string
Arguments:
string: the string to be normalized
"""
return (re.sub('[^a-zA-Z ]+', '', string)).casefold().strip()
score_length(length)
calculate a score based on a given length
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
length |
the length to base the score on |
required |
Source code in src/twitter_worker.py
def score_length(length):
"""calculate a score based on a given length
Arguments:
length: the length to base the score on
"""
if length < 50:
return 3
if length < 100:
return 6
return 10
score_time(x)
calculate a score based on a given time in days
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x |
the time to base the score on |
required |
Source code in src/twitter_worker.py
def score_time(x):
"""calculate a score based on a given time in days
Arguments:
x: the time to base the score on
"""
if x and type(x) == int or type(x) == float:
y = 0
try:
y = (math.log(x) / math.log(1 / 7) + 3) * 10
except ValueError:
logging.debug('ValueError %s' % str(x))
if y > 30:
return 30
if y < 1:
return 1
return y
return 1
score_type(type)
calculate a score based on a given type
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
type |
the type to base the score on |
required |
Source code in src/twitter_worker.py
def score_type(type):
"""calculate a score based on a given type
Arguments:
type: the type to base the score on
"""
if type == 'quoted':
return 0.6
if type == 'replied_to':
return 0.7
if type == 'retweeted':
return 0.1
# original tweet
return 1