Хранение данных временных рядов в Postgres

Мне нужно хранить данные сетевого потока в Postgresql. Это данные о сетевом трафике. Каждая запись содержит следующее:

  • Время начала подключения
  • Время окончания соединения
  • Всего передано данных
  • Исходные/целевые IP-адреса/ASN
  • (Есть еще куча, но этого достаточно для целей этого вопроса).

Мой вопрос заключается в следующем: как я могу хранить эти данные, чтобы я мог эффективно рассчитывать скорость передачи данных за последние X дней/часов? Например, мне может понадобиться нарисовать график всего трафика на ASN Netflix за последние 7 дней с почасовым разрешением.

Разница между временем начала и окончания подключения может составлять миллисекунды или может превышать час.


Моим первым шагом было бы сохранить соединение в поле TSTZRANGE с индексом GiST. Затем, чтобы запросить данные о почасовом трафике за последние 7 дней:

  1. Используйте CTE для создания последовательности почасовых сегментов времени.
  2. Ищите любые TSTZRANGE, которые перекрываются с каждым сегментом.
  3. Рассчитать продолжительность перекрытия
  4. Рассчитать скорость передачи данных для записи в байтах в секунду
  5. Сделайте продолжительность * байт в секунду, чтобы получить общие данные
  6. Сгруппируйте все это в ведро, просуммировав общие значения данных

Тем не менее, это звучит как много тяжелой работы. Может кто придумает лучший вариант?


person Adam Charnock    schedule 25.01.2021    source источник
comment
Упрощение шагов 1 и 2: select tstzrange('1/25/2021 11:35-8', '1/25//2021 12:45-8', '[]')* tstzrange('1/25/2021 12:00-8', '1/25/2021 13:00-8', '[]'); ["2021-01-25 12:00:00-08","2021-01-25 12:45:00-08"]. Где * — оператор пересечения.   -  person Adrian Klaver    schedule 26.01.2021


Ответы (2)


Первый черновик:

WITH ts_bucket AS (
    SELECT
        LAG(gs, 1) OVER () AS begin_period,
        gs AS end_period
    FROM
        generate_series('1/25/2021 0:00-8'::timestamptz, '1/26/2021 0:00-8'::timestamptz, '1 hour') AS gs
),
se AS (
    SELECT
        1000000 AS bytes,
        '01/25/2021 11:35-8'::timestamptz AS start_ts,
        '01/25/2021 12:45-08'::timestamptz AS end_ts
)
SELECT
    *,
    extract('epoch' FROM (upper(tstzrange(begin_period, end_period, '[]') * tstzrange(start_ts, end_ts, '[]'))) - (lower(tstzrange(begin_period, end_period, '[]') * tstzrange(start_ts, end_ts, '[]')))) * bytes / extract('epoch' FROM end_ts - start_ts) AS data_transferred
FROM
    ts_bucket,
    se
WHERE
    begin_period IS NOT NULL
    AND tstzrange(se.start_ts, se.end_ts, '[]') && tstzrange(ts_bucket.begin_period, ts_bucket.end_period, '[]');

 begin_period      |       end_period       |  bytes  |        start_ts        |         end_ts         |  data_transferred  
------------------------+------------------------+---------+------------------------+------------------------+--------------------
 2021-01-25 11:00:00-08 | 2021-01-25 12:00:00-08 | 1000000 | 2021-01-25 11:35:00-08 | 2021-01-25 12:45:00-08 | 357142.85714285716
 2021-01-25 12:00:00-08 | 2021-01-25 13:00:00-08 | 1000000 | 2021-01-25 11:35:00-08 | 2021-01-25 12:45:00-08 |  642857.1428571428

Это основано на том, что время начала и окончания подключения сохраняется в отдельных полях, а затем при необходимости преобразуется в диапазоны.

person Adrian Klaver    schedule 25.01.2021
comment
Спасибо за это Адриану! В конце концов я пошел другим путем, см. мой другой ответ. - person Adam Charnock; 31.01.2021

Изучив это еще немного, я думаю, что реальный ответ заключается в том, что не существует готового способа добиться этого эффективным образом. Тем более, что объем данных увеличивается. В конечном счете, объединение многих тысяч строк будет медленным, потому что это просто большой объем доступа к данным.

Вместо этого я пошел другим путем. Я использую триггер Postgresql для таблицы, в которой хранятся необработанные потоки (traffic_flow). Каждый раз, когда запись вставляется в traffic_flow, триггер будет вставлять новые данные в отдельные таблицы агрегации для ежедневных, почасовых и поминутных данных.


Вот моя экспериментальная реализация, вдруг кому пригодится. Это можно улучшить, чтобы также обрабатывать обновления и удаления.

create or replace function update_aggregated_traffic(NEW RECORD, table_name TEXT, interval_name text, store_customer BOOLEAN)
    returns void
    language plpgsql
as
$body$
declare
    aggregate_interval interval;
    customer_ip_ inet;
begin
    -- Update the data aggregated traffic data given the insertion of a new flow.
    -- A flow is the data about a single connection (start time, stop time, total
    -- bytes/packets). This function essentially rasterises that data into a
    -- series of aggregation buckets.

    -- interval_name should be second, hour, or minute
    -- turn the interval_name into an actual INTERVAL
    aggregate_interval = ('1 ' || interval_name)::INTERVAL;
    if store_customer then
        customer_ip_ = NEW.source_address;
    else
        customer_ip_ = '100.64.0.0'::INET;
    end if;

    -- We need to insert into a dynamically generated table name. There is
    -- no way to do this without writing the whole SQL statement as a string.
    -- Instead, let's use a trick. Create a temporary view, then insert into that.
    -- Postgres will proxy this insert into the desired table
    drop view if exists table_pointer;
    execute format('create temporary view table_pointer as select * from %s', table_name);

    -- We use a CTE to keep things readable, even though it is pretty long
    with aggregate_range AS (
        -- Create all the aggregate buckets spanned by the inserted flow
        SELECT generate_series(
            date_trunc(interval_name, lower(NEW.range)),
            date_trunc(interval_name, upper(NEW.range)),
            aggregate_interval
        ) as range_lower
    ),
    -- For each bucket, figure out its overlap with the provided flow data.
    -- Only the first and last buckets will have less than than complete overlap,
    -- but we do the calculation for all buckets anyway
    with_overlaps AS (
        SELECT
            NEW.range * tstzrange(range_lower, range_lower + aggregate_interval) AS overlap,
            range_lower
        FROM
        aggregate_range
    ),
    -- Convert the overlap intervals into seconds (FLOAT)
    with_overlap_seconds AS (
        SELECT
            extract(epoch from (upper(overlap) - lower(overlap))) as overlap_seconds,
            range_lower
        FROM
            with_overlaps
    )
    -- Now we have enough information to do the inserts
    insert into table_pointer as traffic
        (timestamp, customer_ip, as_number, bytes, packets)
        select
            range_lower,
            customer_ip_,
            NEW.as_number,
            -- Scale the packets/bytes per second to be a total number of
            -- of packets/bytes
            round(NEW.bytes_per_second * overlap_seconds)::INT,
            round(NEW.packets_per_second * overlap_seconds)::INT
        from with_overlap_seconds
        -- We shouldn't have any 0-second overlaps, but let's just be sure
        where overlap_seconds > 0
        -- If there is already existing data, then increment the bytes/packets values
        on conflict (customer_ip, timestamp, as_number) DO UPDATE SET
            bytes = EXCLUDED.bytes + traffic.bytes,
            packets = EXCLUDED.packets + traffic.packets
    ;
end;
$body$;


create or replace function update_aggregated_traffic_hourly() returns trigger
    language plpgsql
as
$body$
begin
    -- Store aggregated data for different resolutions. For each we also store data
    -- without the customer information. This way we can efficiently see traffic data
    -- for the whole network
    PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', True);
    PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', False);

    PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', True);
    PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', False);

    PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', True);
    PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', False);

    PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', True);
    PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', False);

    return NEW;
end;
$body$;

create trigger update_aggregated_traffic_hourly_trigger AFTER INSERT ON traffic_flow
    FOR EACH ROW EXECUTE PROCEDURE update_aggregated_traffic_hourly();
person Adam Charnock    schedule 31.01.2021