aggregator
calculate_trend(data)
calculate the theil sens slope dor a given array of dois with attached data frames
Source code in src/aggregator.py
def calculate_trend(data):
"""
calculate the theil sens slope dor a given array of dois with attached data frames
"""
result = {}
for d in data:
doi = d['doi']
df = d['df']
trend = mk.sens_slope(df)
result[doi] = trend.slope
return result
delete_trending_table(session, duration)
delete the trending table
Source code in src/aggregator.py
def delete_trending_table(session, duration):
"""
delete the trending table
"""
query = """
DELETE FROM trending
WHERE duration=:duration;
"""
params = {'duration': duration, }
s = text(query)
s = s.bindparams(bindparam('duration'))
return session.execute(s, params)
doi_filter_list(doi_list, params)
helper function to generate a custom filter function based on a list of dois, this is faster than influx own array functions but needs to be dynamically adjusted for the number of dois
Source code in src/aggregator.py
def doi_filter_list(doi_list, params):
"""
helper function to generate a custom filter function based on a list of dois, this is faster than influx own array
functions but needs to be dynamically adjusted for the number of dois
"""
filter_string = "|> filter(fn: (r) =>"
i = 0
for doi in doi_list:
filter_string += 'r["doi"] == _doi_nr_' + str(i) + ' or '
params['_doi_nr_' + str(i)] = doi
i += 1
filter_string = filter_string[:-4] + ')'
return {"string": filter_string, "params": params}
get_base_trend_table(trending)
calculate trends, save them in db and influx
Source code in src/aggregator.py
def get_base_trend_table(trending):
"""
calculate trends, save them in db and influx
"""
p = {"_start": trending['duration'],
"_bucket": trending['name'],
"_exponent": trending['time_exponent'],
"_window": trending['window_size'],
"_window_count": trending['window_count'],
}
dois = get_doi_list_trending(trending)
if len(dois) > 0:
# print('done pubs')
session_factory = sessionmaker(bind=DAO.engine)
Session = scoped_session(session_factory)
session = Session()
frames = get_dataframes(trending)
trend = calculate_trend(frames)
a = time.time()
# split dois in smaller lists to reduce time per connection and read timeouts
query_dois = split_list(dois, 200)
records = []
for part_dois in query_dois:
tables = run_influx_trend_calculation(part_dois, p)
print(time.time() - a)
if tables:
for table in tables:
for record in table.records:
records.append(record)
print('total: ' + str(time.time() - a) + ' with ' + str(len(records)) + ' dois total')
if len(records) > 0:
trending_objects = []
delete_trending_table(session, trending['name'])
for record in records:
trending_value = 0
if record['doi'] in trend:
trending_value = trend[record['doi']]
save_trend_to_influx(record, trending_value, trending['trending_bucket'])
t_obj = Trending(publication_doi=record['doi'],
duration=trending['name'],
score=record['score'], count=record['count'],
mean_sentiment=record['mean_sentiment'],
sum_followers=record['sum_followers'],
mean_age=record['mean_age'],
mean_length=record['length'],
mean_questions=record['questions'],
mean_exclamations=record['exclamations'],
abstract_difference=record['contains_abstract_raw'],
mean_bot_rating=record['mean_bot_rating'],
ema=record['ema'],
kama=record['kama'],
ker=record['ker'],
mean_score=record['mean'],
stddev=record['stddev'],
trending=trending_value,
projected_change=record['prediction'])
t_obj = save_or_update(session, t_obj, Trending,
{'publication_doi': t_obj.publication_doi, 'duration': t_obj.duration})
trending_objects.append(t_obj)
return trending_objects
return []
get_dataframes(trending)
get data frames for a given trending definition
Source code in src/aggregator.py
def get_dataframes(trending):
"""
get data frames for a given trending definition
"""
p = {"_bucket": trending['name'],
"_start": trending['duration'],
}
filter_obj = doi_filter_list(get_doi_list_trending(trending), p)
query = """
_stop = now()
totalTable = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> filter(fn: (r) => r["_field"] == "score")"""
query += filter_obj['string']
query += """
|> yield()
"""
try:
result = query_api.query(org=org, query=query, params=filter_obj['params'])
except urllib3.exceptions.ReadTimeoutError:
print('ReadTimeoutError')
return []
else:
results = []
if result:
for table in result:
scores = []
times = []
doi = None
for record in table.records:
if not doi:
doi = record['doi']
scores.append(record['_value'])
times.append(record['_time'])
df = pd.DataFrame(data={'score': scores}, index=times)
results.append({"doi": doi, "df": df})
return results
get_doi_list_trending(trending)
Return a list of dois that have the required min count for the trending definition given. It will use the count of datapoints not the actual tweet count.
Source code in src/aggregator.py
def get_doi_list_trending(trending):
"""
Return a list of dois that have the required min count for the trending definition given.
It will use the count of datapoints not the actual tweet count.
"""
p = {"_bucket": trending['name'],
"_min_count": trending['min_count'],
"_start": trending['duration'],
}
query = """
_stop = now()
countTable = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> filter(fn: (r) => r["_field"] == "score")
|> count()
|> filter(fn: (r) => r["_value"] > _min_count)
|> group()
|> keep(columns: ["doi"])
|> yield()
"""
try:
result = query_api.query(org=org, query=query, params=p)
except urllib3.exceptions.ReadTimeoutError:
print('ReadTimeoutError')
return []
else:
results = []
for table in result:
for record in table.records:
results.append(record['doi'])
return results
hot_papers()
Twitter the hottest 3 covid papers. Extracts needed data, generates the twitter string adjusting the title length as needed and than using the bot credentials to tweet.
Source code in src/aggregator.py
def hot_papers():
"""
Twitter the hottest 3 covid papers.
Extracts needed data, generates the twitter string adjusting the title length as needed and than using the bot
credentials to tweet.
"""
query = """
SELECT distinct on (trending_ranking) * FROM trending_covid_papers tcp
JOIN publication_author pa on tcp.publication_doi = pa.publication_doi
JOIN author a on pa.author_id = a.id
WHERE duration = 'today' ORDER BY trending_ranking LIMIT 3;
"""
s = text(query)
session_factory = sessionmaker(bind=DAO.engine)
Session = scoped_session(session_factory)
session = Session()
result = session.execute(s).fetchall()
length = 40
end = 4
pretext = ""
while end > 0:
pretext = 'Todays trending COVID-19 papers:'
for r in result:
pretext += "\n" + str(r['trending_ranking']) + '. ' + r['name'].split(' ')[-1] + ' et al. ' \
+ smart_truncate(r['title'], length)
pretext += '\nhttps://bit.ly/3vUSZFg'
if len(pretext) > 280:
length -= 10
else:
break
end -= 1
print(pretext)
print(len(pretext))
if 100 < len(pretext) < 280:
consumer_key = os.environ.get('CONSUMER_KEY_TWITTER_BOT')
consumer_secret = os.environ.get('CONSUMER_SECRET_TWITTER_BOT')
access_token = os.environ.get('ACCESS_TOKEN_TWITTER_BOT')
access_token_secret = os.environ.get('ACCESS_TOKEN_SECRET_TWITTER_BOT')
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
print(api.verify_credentials().name)
api.update_status(status=pretext)
return True
init_influx()
async
connect to influx and set up buckets and tasks needed
Source code in src/aggregator.py
@app.task
async def init_influx():
"""connect to influx and set up buckets and tasks needed"""
buckets_api = client.buckets_api()
buckets = buckets_api.find_buckets().buckets
org_api = client.organizations_api()
org_obj = org_api.find_organizations(org=org)[0]
tasks_api = client.tasks_api()
for key, item in trending_time_definition.items():
exist = False
for b in buckets:
if b.name == item['name']:
exist = True
if not exist:
if item['retention']:
print('create bucket %s with retention %s' % (item['name'], str(item['retention'].total_seconds())))
retention_rules = BucketRetentionRules(type="expire",
every_seconds=int(item['retention'].total_seconds()))
created_bucket = buckets_api.create_bucket(bucket_name=item['name'],
retention_rules=retention_rules, org=org)
else:
print('create bucket %s without retention' % item['name'])
created_bucket = buckets_api.create_bucket(bucket_name=item['name'], org=org)
if item['trending_bucket']:
print('create trending bucket %s without retention' % item['trending_bucket'])
created_bucket = buckets_api.create_bucket(bucket_name=item['trending_bucket'], org=org)
if item['downsample_bucket']:
print('create downsample task')
name = "task_" + item['downsample_bucket']
cron = item['downsample_cron']
offset = item['downsample_offset']
flux = '''
import "date"
import "math"
import "experimental"
option task = {
name: "''' + name + '''",
cron: "''' + cron + '''",
offset: ''' + offset + ''',
}
'''
flux += '_w_int = '
flux += str(int(item['downsample_window'].total_seconds() // 60))
flux += """
_window = duration(v: string(v: _w_int) + "m")
_duration = duration(v: string(v: 2 * _w_int) + "m")
_start = experimental.subDuration(d: _duration, from: date.truncate(t: now(), unit: _window))
baseTable = from(bucket: """
flux += '"' + item['name'] + '"'
flux += """)
|> range(start: _start, stop: date.truncate(t: now(), unit: _window))
|> filter(fn: (r) => r["_measurement"] == "trending")
a = baseTable
|> filter(fn: (r) => r["_field"] == "followers")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> aggregateWindow(fn: sum, every: _window, createEmpty: false)
|> group()
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> keep(columns: ["_value", "_time", "_field", "doi", "_measurement"])
b = baseTable
|> filter(fn: (r) => r["_field"] == "sentiment_raw")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> aggregateWindow(fn: mean, every: _window, createEmpty: false)
|> group()
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> keep(columns: ["_value", "_time", "_field", "doi", "_measurement"])
c = baseTable
|> filter(fn: (r) => r["_field"] == "contains_abstract_raw")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> aggregateWindow(fn: mean, every: _window, createEmpty: false)
|> group()
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> keep(columns: ["_value", "_time", "_field", "doi", "_measurement"])
ad = baseTable
|> filter(fn: (r) => r["_field"] == "questions")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> aggregateWindow(fn: mean, every: _window, createEmpty: false)
|> group()
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> keep(columns: ["_value", "_time", "_field", "doi", "_measurement"])
e = baseTable
|> filter(fn: (r) => r["_field"] == "exclamations")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> aggregateWindow(fn: mean, every: _window, createEmpty: false)
|> group()
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> keep(columns: ["_value", "_time", "_field", "doi", "_measurement"])
j = baseTable
|> filter(fn: (r) => r["_field"] == "count")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> aggregateWindow(fn: sum, every: _window, createEmpty: false)
|> group()
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> set(key: "_field", value: "count")
|> keep(columns: ["_value", "_time", "_field", "doi", "_measurement"])
f = baseTable
|> filter(fn: (r) => r["_field"] == "length")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> aggregateWindow(fn: mean, every: _window, createEmpty: false)
|> group()
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> keep(columns: ["_value", "_time", "_field", "doi", "_measurement"])
jk = baseTable
|> filter(fn: (r) => r["_field"] == "bot_rating")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> aggregateWindow(fn: mean, every: _window, createEmpty: false)
|> group()
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> keep(columns: ["_value", "_time", "_field", "doi", "_measurement"])
g = baseTable
|> filter(fn: (r) => r["_field"] == "score")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> aggregateWindow(fn: sum, every: _window, createEmpty: false)
|> group()
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> keep(columns: ["_value", "_time", "_field", "doi", "_measurement"])
union(tables: [a, b, c, ad, e, j, f, jk, g])
|> group(columns: ["doi"])
|> to(bucket: """
flux += '"' + item['downsample_bucket'] + '"'
flux += """, org: "ambalytics")"""
task = Task(id=0, name=name, org_id=org_obj.id, status="active", flux=flux)
task = tasks_api.create_task(task)
print('create numbers task')
name = "task_numbers_" + item['name']
cron = item['numbers_cron']
offset = item['numbers_offset']
flux = '''
import "date"
import "math"
import "experimental"
option task = {
name: "''' + name + '''",
cron: "''' + cron + '''",
offset: ''' + offset + ''',
}
'''
flux += """
_start = """ + str(int(item['duration'].total_seconds())) + """s
_stop = now()
_bucket = """ + '"' + item['name'] + '"' + """
numbers = (tables=<-, field_selector, aggregator, bucket, name) =>
tables
|> filter(fn: (r) => r["_field"] == field_selector)
|> group()
|> aggregator()
|> keep(columns: ["_value"])
|> toFloat()
|> set(key: "_measurement", value: bucket)
|> set(key: "_field", value: name)
|> map(fn: (r) => ({ r with _time: now() }))
aa = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> numbers(aggregator: experimental.mean, field_selector: "bot_rating", bucket: _bucket, name: "bot_rating")
ab = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> numbers(aggregator: experimental.mean, field_selector: "contains_abstract_raw", bucket: _bucket, name: "contains_abstract_raw")
ac = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> numbers(aggregator: experimental.mean, field_selector: "exclamations", bucket: _bucket, name: "exclamations")
ad = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> numbers(aggregator: sum, field_selector: "followers", bucket: _bucket, name: "followers")
ae = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measur,,ement"] == "trending")
|> numbers(aggregator: experimental.mean, field_selector: "length", bucket: _bucket, name: "length")
af = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> numbers(aggregator: experimental.mean, field_selector: "questions", bucket: _bucket, name: "questions")
ag = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> numbers(aggregator: sum, field_selector: "score", bucket: _bucket, name: "score")
ah = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> numbers(aggregator: experimental.mean, field_selector: "sentiment_raw", bucket: _bucket, name: "sentiment_raw")
ai = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> numbers(aggregator: sum, field_selector: "count", bucket: _bucket, name: "count")
aj = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
|> filter(fn: (r) => r["_field"] == "score")
|> group()
|> distinct(column: "doi")
|> count()
|> keep(columns: ["_value"])
|> toFloat()
|> set(key: "_measurement", value: _bucket)
|> set(key: "_field", value: "pub_count")
|> map(fn: (r) => ({ r with _time: now() }))
union(tables: [aa, ab, ac, ad, ae, af, ag, ah, ai, aj])
|> to(bucket: "numbers", org: "ambalytics")"""
task = Task(id=1, name=name, org_id=org_obj.id, status="active", flux=flux)
task = tasks_api.create_task(task)
loop = asyncio.get_event_loop()
loop.create_task(run_trend_calculation())
run_influx_trend_calculation(dois, p)
start running the trend calculations in influx
Source code in src/aggregator.py
def run_influx_trend_calculation(dois, p):
"""
start running the trend calculations in influx
"""
filter_obj = doi_filter_list(dois, p)
query = '''
import "math"
import "experimental"
_stop = now()
baseTable = from(bucket: _bucket)
|> range(start: _start, stop: _stop)
|> filter(fn: (r) => r["_measurement"] == "trending")
'''
query += filter_obj['string']
query += '''
windowTable = baseTable
|> filter(fn: (r) => r["_field"] == "score")
|> aggregateWindow(every: _window, fn: sum, createEmpty: true)
|> sort(columns: ["_time"], desc: true)
|> limit(n: _window_count)
|> sort(columns: ["_time"])
|> map(fn:(r) => ({
r with _value:
if exists r._value then r._value
else 0.0
}))
|> keep(columns: ["_time", "_value", "doi"])
stddev = windowTable
|> stddev()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "stddev"})
ema = windowTable
|> exponentialMovingAverage(n: _window_count)
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "ema"})
j1 = join(
tables: {stddev:stddev, ema:ema},
on: ["doi"]
)
ker = windowTable
|> kaufmansER(n: _window_count - 1)
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "ker"})
j2 = join(
tables: {j1:j1, ker:ker},
on: ["doi"]
)
kama = windowTable
|> kaufmansAMA(n: _window_count -1)
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "kama"})
j3 = join(
tables: {kama:kama, j2:j2},
on: ["doi"]
)
mean = baseTable
|> filter(fn: (r) => r["_field"] == "score" or r["_field"] == "count")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({ r with _value: float(v:r.score / float(v: r.count)) }))
|> experimental.mean()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "mean"})
j5 = join(
tables: {mean:mean, j3:j3},
on: ["doi"]
)
prediction = windowTable
|> holtWinters(n: 1, seasonality: 0, interval: _window, withFit: false, timeColumn: "_time", column: "_value")
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "prediction"})
j6 = join(
tables: {prediction:prediction, j5:j5},
on: ["doi"]
)
score = baseTable
|> filter(fn: (r) => r["_field"] == "score")
|> keep(columns: ["_value", "_time", "doi"])
|> map(fn:(r) => ({ r with _value: float(v: r._value) * math.exp(x: _exponent * float(v: uint(v: now()) - uint(v: r._time)) / (10.0 ^ 9.0)) }))
|> cumulativeSum(columns: ["_value"])
|> last(column: "_value")
|> keep(columns: ["_value", "doi"])
|> group()
|> rename(columns: {_value: "score"})
j7 = join(
tables: {j6:j6, score:score},
on: ["doi"]
)
a = baseTable
|> filter(fn: (r) => r["_field"] == "followers")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> sum()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "sum_followers"})
b = baseTable
|> filter(fn: (r) => r["_field"] == "sentiment_raw")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> experimental.mean()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "mean_sentiment"})
c = baseTable
|> filter(fn: (r) => r["_field"] == "contains_abstract_raw")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> experimental.mean()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "contains_abstract_raw"})
ad = baseTable
|> filter(fn: (r) => r["_field"] == "questions")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> experimental.mean()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "questions"})
e = baseTable
|> filter(fn: (r) => r["_field"] == "exclamations")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> experimental.mean()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "exclamations"})
f = baseTable
|> filter(fn: (r) => r["_field"] == "length")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> experimental.mean()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "length"})
g = baseTable
|> filter(fn: (r) => r["_field"] == "length")
|> map(fn: (r) => ({r with _value: math.round(x: float(v: uint(v: now()) - uint(v: r._time)) / (10.0 ^ 9.0)) }))
|> experimental.mean()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "mean_age"})
j = baseTable
|> filter(fn: (r) => r["_field"] == "count")
|> sum()
|> group()
|> toInt()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "count"})
jk = baseTable
|> filter(fn: (r) => r["_field"] == "bot_rating")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> experimental.mean()
|> group()
|> keep(columns: ["_value", "doi"])
|> rename(columns: {_value: "mean_bot_rating"})
join1 = join(
tables: {a:a, b:b},
on: ["doi"]
)
join2 = join(
tables: {join1:join1, c:c},
on: ["doi"]
)
join3 = join(
tables: {join2:join2, ad:ad},
on: ["doi"]
)
join4 = join(
tables: {join3:join3, e:e},
on: ["doi"]
)
join46 = join(
tables: {join4:join4, j:j},
on: ["doi"]
)
join5 = join(
tables: {join46:join46, f:f},
on: ["doi"]
)
join6 = join(
tables: {join5:join5, g:g},
on: ["doi"]
)
join68 = join(
tables: {join6:join6, jk:jk},
on: ["doi"]
)
join9 = join(
tables: {join68:join68, j7:j7},
on: ["doi"]
)
|> sort(columns: ["score"], desc: true)
|> yield(name: "join10")
'''
try:
return query_api.query(query, params=p)
except urllib3.exceptions.ReadTimeoutError:
print('ReadTimeoutError')
return []
run_trend_calculation()
async
run a trend calculation, this includes calculating the trend table as well as the consecutive update of the covid trends view Trends are run sequential extracted from the trending queue
Source code in src/aggregator.py
async def run_trend_calculation():
"""run a trend calculation, this includes calculating the trend table as well as the consecutive update of the
covid trends view
Trends are run sequential extracted from the trending queue"""
while True:
trending_time = await queue.get()
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, get_base_trend_table, trending_time)
await loop.run_in_executor(None, update_covid_trends)
queue.task_done()
save_data_to_influx(data, retries=0)
save a event to influx, retries up to 10 times
Source code in src/aggregator.py
def save_data_to_influx(data, retries=0):
"""
save a event to influx, retries up to 10 times
"""
doi = data['obj']['data']['doi']
createdAt = data['timestamp']
score = data['subj']['processed']['score']
point = {
"measurement": "trending",
"tags": {
"doi": doi
},
"fields": {
"score": float(score),
"contains_abstract_raw": float(
data['subj']['processed']['contains_abstract_raw']) if 'contains_abstract_raw' in data['subj'][
'processed'] else 0.0,
"sentiment_raw": float(data['subj']['processed']['sentiment_raw']),
"followers": data['subj']['processed']['followers'],
"length": data['subj']['processed']['length'],
"questions": data['subj']['processed']['question_mark_count'],
"exclamations": data['subj']['processed']['exclamation_mark_count'],
"bot_rating": data['subj']['processed']['bot_rating'],
"time_score": float(data['subj']['processed']['time_score']),
"type_factor": float(data['subj']['processed']['type_factor']),
"user_score": float(data['subj']['processed']['user_score']),
"content_score": float(data['subj']['processed']['content_score']),
"count": 1
},
"time": createdAt}
try:
write_api.write('currently', org, [point])
except influxdb_client.rest.ApiException as e:
print(e)
if retries < 5: # to much?
save_data_to_influx(data, (retries + 1))
time.sleep(5)
else:
print('LOST DATA ' + str(point['time']))
except urllib3.exceptions.NewConnectionError:
print('NewConnectionError')
if retries < 10:
save_data_to_influx(data, (retries + 1))
time.sleep(3)
else:
print('LOST DATA')
os.system("pkill -9 python")
save_or_update(session, obj, table, kwargs)
upsert postgresql of a given object and table
Source code in src/aggregator.py
def save_or_update(session, obj, table, kwargs):
"""
upsert postgresql of a given object and table
"""
obj_db = DAO.get_object(session, table, kwargs)
if obj_db:
session.delete(obj_db)
session.commit()
DAO.save_object(session, obj)
return obj
save_trend_to_influx(record, trending_value, bucket)
save a calculated trend to influx
Source code in src/aggregator.py
def save_trend_to_influx(record, trending_value, bucket):
"""
save a calculated trend to influx
"""
point = {
"measurement": "trending",
"tags": {
"doi": record['doi']
},
"fields": {
"score": float(record['score']),
"count": record['count'],
"mean_sentiment": float(record['mean_sentiment']),
"sum_follower": record['sum_followers'],
"mean_age": float(record['mean_age']),
"mean_length": float(record['length']),
"mean_questions": float(record['questions']),
"mean_exclamations": float(record['exclamations']),
"abstract_difference": float(record['contains_abstract_raw']),
"mean_bot_rating": float(record['mean_bot_rating']),
"ema": float(record['ema']),
"kama": float(record['kama']),
"ker": float(record['ker']),
"mean_score": float(record['mean']),
"stddev": float(record['stddev']),
"trending": float(trending_value),
"projected_change": float(record['prediction'])
},
"time": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')}
write_api.write(bucket, org, [point])
smart_truncate(content, length=100, suffix=' (...)')
Truncate a string as close as possible to length 100 while only using word breaks (space) and append a suffix
Source code in src/aggregator.py
def smart_truncate(content, length=100, suffix=' (...)'):
"""
Truncate a string as close as possible to length 100 while only using word breaks (space) and append a suffix
"""
if len(content) <= length:
return content
else:
return ' '.join(content[:length + 1].split(' ')[0:-1]) + suffix
split_list(total_list, n)
split a list in a list of list with an max length of n
Source code in src/aggregator.py
def split_list(total_list, n):
"""
split a list in a list of list with an max length of n
"""
return [total_list[i:i + n] for i in range(0, len(total_list), n)]
trend_calc_currently()
async
run trend calculation in the defined interval
Source code in src/aggregator.py
@app.crontab('*/3 * * * *')
async def trend_calc_currently():
"""run trend calculation in the defined interval"""
await asyncio.sleep(15)
print('calc trend currently')
await queue.put(trending_time_definition['currently'])
trend_calc_month()
async
run trend calculation in the defined interval
Source code in src/aggregator.py
@app.crontab('5 */3 * * *')
async def trend_calc_month():
"""run trend calculation in the defined interval"""
await asyncio.sleep(15)
print('calc trend month')
await queue.put(trending_time_definition['month'])
trend_calc_today()
async
run trend calculation in the defined interval
Source code in src/aggregator.py
@app.crontab('1-59/10 * * * *')
async def trend_calc_today():
"""run trend calculation in the defined interval"""
await asyncio.sleep(15)
print('calc trend today')
await queue.put(trending_time_definition['today'])
trend_calc_week()
async
run trend calculation in the defined interval
Source code in src/aggregator.py
@app.crontab('25 * * * *')
async def trend_calc_week():
"""run trend calculation in the defined interval"""
await asyncio.sleep(15)
print('calc trend week')
await queue.put(trending_time_definition['week'])
trend_calc_year()
async
run trend calculation in the defined interval
Source code in src/aggregator.py
@app.crontab('4 2 * * *')
async def trend_calc_year():
"""run trend calculation in the defined interval"""
await asyncio.sleep(15)
print('calc trend year')
await queue.put(trending_time_definition['year'])
update_covid_trends()
Update the trending_covid_papers materialized view, this includes setting up the view if not already and setting the correct index to allow a concurrently refresh
Source code in src/aggregator.py
def update_covid_trends():
"""
Update the trending_covid_papers materialized view, this includes setting up the view if not already and setting the
correct index to allow a concurrently refresh
"""
print('calc covid trends')
q1 = """
CREATE MATERIALIZED VIEW IF NOT EXISTS trending_covid_papers AS
SELECT t.*
FROM (
SELECT rank() over (partition by duration order by score desc) trending_ranking, p.doi, p.pub_date, p.year, p.citation_count, p.title, t.*
FROM trending t
JOIN publication p on p.doi = t.publication_doi
) t
INNER JOIN (
SELECT publication_doi
FROM (
SELECT value,
count,
publication_doi,
rank() over (partition by publication_doi order by publication_doi, count desc) rn
FROM (
SELECT publication_doi, SUM(ddp.count) as count, dd.value
FROM discussion_data_point as ddp
JOIN discussion_data as dd ON (ddp.discussion_data_point_id = dd.id)
WHERE type = 'entity'
GROUP BY (dd.value, publication_doi)
ORDER BY count DESC) AS temp1) as temp2
WHERE value LIKE '%COVID-19%' AND rn <= 3
) ddp ON t.doi = ddp.publication_doi
ORDER BY trending_ranking;
"""
q2 = """
CREATE UNIQUE INDEX IF NOT EXISTS trending_covid_papers_doi_duration_index
ON trending_covid_papers (publication_doi, duration);
"""
q3 = """
REFRESH MATERIALIZED VIEW CONCURRENTLY trending_covid_papers;
"""
a = time.time()
with DAO.engine.connect() as connection:
result = connection.execute(text(q1))
with DAO.engine.connect() as connection:
result = connection.execute(text(q2))
with DAO.engine.connect() as connection:
result = connection.execute(text(q3))
print(time.time() - a)
return True
write_event(data)
async
write a given event, this wraps the blocking sync api function in a async thread pool executor
Source code in src/aggregator.py
async def write_event(data):
"""
write a given event, this wraps the blocking sync api function in a async thread pool executor
"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, save_data_to_influx, data)