client.query("""
CREATE TABLE IF NOT EXISTS angc3_aggregated_15sec (
ts DateTime64(0, 'UTC'),
name LowCardinality(String),
count_value UInt64,
sum_value Float64,
avg_value ALIAS sum_value / NULLIF(count_value, 0)
)
ENGINE = AggregatingMergeTree()
ORDER BY (name, ts)
PARTITION BY toYYYYMM(ts);""")
client.query("""
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_angc3_aggregate_15sec
TO angc3_aggregated_15sec
AS
SELECT
toStartOfInterval(ts_kafka, INTERVAL 15 SECOND) AS ts,
name,
count() AS count_value,
sum(value) AS sum_value
FROM angc3_raw_data
GROUP BY name, ts;
""")
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels.layers.InMemoryChannelLayer'
}
}
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('redis', 6379)],
},
},
}
while True засунуть в отдельный поток
def main():
while 1:
for i in Profiles.objects.all():
current = time.time()
current = str(current).split(".")[0]
period_time = int(i.unix_date) + (int(i.period) * 60) * 60
if period_time < int(current):
period(start_profil=i)
current = time.time()
current = str(current).split(".")[0]
Profiles.objects.filter(name=i).update(unix_date=int(current))
time.sleep(1000)
a = Thread(target=main)
a.start()