От Digoal.

Все больше и больше данных, особенно финансовых данных, данных, собранных датчиками Интернета вещей (IoT), и данных онлайн-игр, все чаще используется в приложениях для анализа, агрегации и поиска в реальном времени. А учитывая такие тенденции, важно уметь находить мощные и эффективные средства обработки данных, конкретно выясняя, как быстро и эффективно записывать и обновлять данные по мере их сбора в режиме реального времени.

Следуя этой тенденции, в этой статье мы рассмотрим преимущества и недостатки использования трех основных методов: потоковая передача данных, лямбда-архитектура и синхронный анализ данных в реальном времени (или триггерный режим). Мы уделим основное внимание тому, как запись и обновление данных, собранных в режиме реального времени, работают с этими различными методами.

Дизайн сценария

В этой статье мы рассмотрим случай, когда развернут миллион датчиков и каждый датчик периодически сообщает данные. Требования к пользователю для этого сценария обработки данных следующие:

  1. Просматривайте последние значения датчиков в режиме реального времени.
  2. Просматривайте статистику исторических данных датчиков в режиме реального времени по периодам времени.
  3. Просматривайте подробные исторические данные датчика в режиме реального времени.
  4. Просматривайте статистику исторических данных датчика в режиме реального времени на основе других измерений.

Объем данных может достигать нескольких сотен терабайт. Чтобы выполнить предыдущие четыре требования, система должна вычислять данные в режиме реального или близкого к реальному времени.

Для этого сценария вы можете использовать следующий дизайн:

Дизайн структуры таблицы

Данные в деталях

create table sensor_data(    
  pk serial8 primary key, -- Primary key    
  ts timestamp,  -- Timestamp    
  sid int,  -- Sensor ID    
  val numeric(10,2)  -- Data    
);

Каждый фрагмент данных, как показано выше, включает в себя показанную выше информацию, которая представляет собой первичный ключ, отметку времени, идентификатор датчика и сам вывод данных.

Проектирование агрегации данных в реальном времени

Сначала вы получите последнее значение данных, собранное каждым датчиком.

create table sensor_lastdata(    
  sid int primary key,  -- Sensor ID, which is the primary key    
  last_ts timestamp,  -- Timestamp    
  last_val numeric(10,2)  -- Value    
);

Информация, которую вы увидите, включает в себя все значения каждого датчика за каждый период времени (скажем, час), а также другие значения, включая значение суммы, количество записей, максимальное значение, минимальное значение. значение, среднее значение и отклонение значения, как показано ниже:

create table sensor_aggdata(    
  sid int,  -- Sensor ID    
  ts_group varchar(10),  -- Group by time, such as by hour (yyyymmddhh24)    
  sum_val numeric,  -- Sum    
  min_val numeric(10,2),  -- Minimum value    
  max_val numeric(10,2),  -- Maximum value    
  avg_val numeric(10,2),  -- Average value    
  count_val int,  -- Count    
  all_vals numeric(10,2)[],  -- Detail value    
  unique (sid,ts_group)  -- Unique constraint    
);

Наконец, вы можете собирать статистику данных, сообщаемых датчиками, в зависимости от региона или других параметров в режиме реального времени.

Как я могу получить последние значения датчика из подробных данных?

Чтобы получить последние значения каждого идентификатора датчика (SID) с помощью операторов SQL, вы можете использовать функцию агрегации или окна. Но прежде чем делать что-либо из этого, вставьте пакет тестовых данных.

postgres=#  insert into sensor_data(ts,sid,val) select clock_timestamp(), random()*100, random()*10000 from generate_series(1,100000);

Рассмотрим, например, использование одной из этих функций. Из вариантов сначала рассмотрим агрегацию. Для этого метода вам нужно сгруппировать данные на основе идентификатора датчика (сокращенно SID), агрегировать VAL в массивы (в порядке убывания на основе первичного ключа каждой отдельной записи данных) и, наконец, получить первое значение каждого массива. .

Для получения дополнительной информации ознакомьтесь с этой статьей: Агрегатные функции. Далее ниже приведен пример кода, в котором используется этот метод:

postgres=#  select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val from sensor_data group by sid;    
 sid |          last_ts           | last_val     
-----+----------------------------+----------    
   0 | 2017-05-18 14:09:10.625812 |  6480.54    
   1 | 2017-05-18 14:09:10.627607 |  9644.29    
   2 | 2017-05-18 14:09:10.627951 |  3995.04    
   3 | 2017-05-18 14:09:10.627466 |   840.80    
   4 | 2017-05-18 14:09:10.627703 |  1500.59    
   5 | 2017-05-18 14:09:10.627813 |  3109.42    
   6 | 2017-05-18 14:09:10.62754  |  4131.31    
   7 | 2017-05-18 14:09:10.627851 |  9333.88    
......

В качестве альтернативы вы можете использовать оконный метод, который мы здесь обсуждать не будем. На самом деле очень похоже. Рассмотрим приведенный ниже пример кода для справки:

postgres=# select sid,ts,val from (select sid,ts,val,row_number() over(partition by sid order by pk desc) as rn from sensor_data) t where rn=1;    
 sid |             ts             |   val       
-----+----------------------------+---------    
   0 | 2017-05-18 14:09:10.625812 | 6480.54    
   1 | 2017-05-18 14:09:10.627607 | 9644.29    
   2 | 2017-05-18 14:09:10.627951 | 3995.04    
   3 | 2017-05-18 14:09:10.627466 |  840.80    
   4 | 2017-05-18 14:09:10.627703 | 1500.59    
   5 | 2017-05-18 14:09:10.627813 | 3109.42    
   6 | 2017-05-18 14:09:10.62754  | 4131.31    
   7 | 2017-05-18 14:09:10.627851 | 9333.88    
......

Ну, на наш взгляд, какая функция и метод, на наш взгляд, лучше в итоге? Что ж, чтобы ответить на этот вопрос, давайте проверим планы выполнения. Подводя итог, оконный метод имеет немного более быстрое время выполнения; однако они оба довольно быстрые.

postgres=# set work_mem ='16MB';    
SET    
postgres=# explain (analyze,verbose,timing,costs,buffers) select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val from sensor_data group by sid;    
                                                             QUERY PLAN                                                                 
------------------------------------------------------------------------------------------------------------------------------------    
 GroupAggregate  (cost=7117.15..7823.57 rows=101 width=44) (actual time=29.628..88.095 rows=101 loops=1)    
   Output: sid, (array_agg(ts ORDER BY pk DESC))[1], (array_agg(val ORDER BY pk DESC))[1]    
   Group Key: sensor_data.sid    
   Buffers: shared hit=736    
   ->  Sort  (cost=7117.15..7293.38 rows=70490 width=26) (actual time=29.273..36.249 rows=70490 loops=1)    
         Output: sid, ts, pk, val    
         Sort Key: sensor_data.sid    
         Sort Method: quicksort  Memory: 8580kB    
         Buffers: shared hit=736    
         ->  Seq Scan on public.sensor_data  (cost=0.00..1440.90 rows=70490 width=26) (actual time=0.243..9.768 rows=70490 loops=1)    
               Output: sid, ts, pk, val    
               Buffers: shared hit=736    
 Planning time: 0.077 ms    
 Execution time: 88.489 ms    
(14 rows)    
    
postgres=# explain (analyze,verbose,timing,costs,buffers) select sid,ts,val from (select sid,ts,val,row_number() over(partition by sid order by pk desc) as rn from sensor_data) t where rn=1;    
                                                                QUERY PLAN                                                                    
------------------------------------------------------------------------------------------------------------------------------------------    
 Subquery Scan on t  (cost=7117.15..9408.08 rows=352 width=18) (actual time=46.074..81.377 rows=101 loops=1)    
   Output: t.sid, t.ts, t.val    
   Filter: (t.rn = 1)    
   Rows Removed by Filter: 70389    
   Buffers: shared hit=736    
   ->  WindowAgg  (cost=7117.15..8526.95 rows=70490 width=34) (actual time=46.072..76.115 rows=70490 loops=1)    
         Output: sensor_data.sid, sensor_data.ts, sensor_data.val, row_number() OVER (?), sensor_data.pk    
         Buffers: shared hit=736    
         ->  Sort  (cost=7117.15..7293.38 rows=70490 width=26) (actual time=46.065..51.742 rows=70490 loops=1)    
               Output: sensor_data.sid, sensor_data.pk, sensor_data.ts, sensor_data.val    
               Sort Key: sensor_data.sid, sensor_data.pk DESC    
               Sort Method: quicksort  Memory: 8580kB    
               Buffers: shared hit=736    
               ->  Seq Scan on public.sensor_data  (cost=0.00..1440.90 rows=70490 width=26) (actual time=0.245..9.863 rows=70490 loops=1)    
                     Output: sensor_data.sid, sensor_data.pk, sensor_data.ts, sensor_data.val    
                     Buffers: shared hit=736    
 Planning time: 0.100 ms    
 Execution time: 82.480 ms    
(18 rows)

Обновление и статистика в реальном времени

В этом разделе мы рассмотрим, как работает режим Lamdba, потоковая передача данных, а также как работают режимы синхронного анализа данных, когда речь идет об обновлениях и статистике в реальном времени. Мы сосредоточимся на их общем дизайне и на том, как работает стресс-тестирование для каждого из них.

1. Лямбда-режим

В лямбда-режиме данные датчика записываются в подробную таблицу. Система извлекает, а затем удаляет данные из таблицы сведений посредством планирования задач, затем собирает добавочную статистику по извлеченным данным и объединяет статистические результаты.

Поскольку существует множество статистических измерений, поиск данных изолирован от удаления данных для достижения параллелизма. Система получает и удаляет подробные данные пакетами и упорядочивает подробные данные на основе первичных ключей. Несколько записей данных получаются одновременно. Функция выглядит следующим образом:

create or replace function get_sensor_data(i_limit int) returns sensor_data[] as 
$$
    
declare    
  arr_pk int8[];    
  arr_sensor_data sensor_data[];    
begin    
  select array_agg(t.sensor_data), array_agg((t.sensor_data).pk)    
    into arr_sensor_data, arr_pk    
    from (select sensor_data from sensor_data order by pk limit i_limit for update skip locked) t ;    
  delete from sensor_data WHERE pk = any (arr_pk);    
  return arr_sensor_data;    
end;    
$$
 language plpgsql strict;

После получения подробных данных система переходит к следующему действию. Если существуют более свежие значения, система обновляет эти значения. Если самое последнее из значений не может быть найдено, система вставит значения с помощью синтаксиса INSERT ON CONFLICT PostgreSQL.

В частности, система делает следующее. Он обновляет последние значения датчиков в режиме реального времени.

insert into sensor_lastdata    
  select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val from     
    unnest(get_sensor_data(1000))     
  group by sid    
on conflict (sid) do update set last_ts=excluded.last_ts,last_val=excluded.last_val;

А затем он также собирает добавочную статистику по значениям датчиков в пакетах.

Если вам интересно, как работает метод объединения статистических значений, обратите внимание на материалы, связанные с SQL. Подробные данные объединяются в массивы в соответствии с идентификатором датчика (SID) и сохраняются на основе первичного ключа (PK).

insert into sensor_aggdata (sid,ts_group,sum_val,min_val,max_val,avg_val,count_val,all_vals)    
select sid,to_char(ts,'yyyymmddhh24'),sum(val),min(val),max(val),avg(val),count(val),array_agg(val order by pk) from unnest(get_sensor_data(1000))     
  group by sid,to_char(ts,'yyyymmddhh24')    
  on conflict (sid,ts_group) do update set     
    sum_val=sensor_aggdata.sum_val+excluded.sum_val,    
    min_val=least(sensor_aggdata.min_val, excluded.min_val),    
    max_val=greatest(sensor_aggdata.max_val, excluded.max_val),    
    avg_val=(sensor_aggdata.sum_val+excluded.sum_val)/(sensor_aggdata.count_val+excluded.count_val),    
    count_val=sensor_aggdata.count_val+excluded.count_val,    
    all_vals=array_cat(sensor_aggdata.all_vals, excluded.all_vals);

Стресс-тестирование

Ниже приведены некоторые стресс-тесты. Для этих стресс-тестов вам необходимо создать эти таблицы.

create table sensor_data(    
  pk serial8 primary key, -- Primary key    
  ts timestamp,  -- Timestamp    
  sid int,  -- Sensor ID    
  val numeric(10,2)  -- Data    
);    
    
create table sensor_lastdata(    
  sid int primary key,  -- Sensor ID, which is the primary key    
  last_ts timestamp,  -- Timestamp    
  last_val numeric(10,2)  -- Value    
);    
    
create table sensor_aggdata(    
  sid int,  -- Sensor ID    
  ts_group varchar(10),  -- Group by time, such as by hour (yyyymmddhh24)    
  sum_val numeric,  -- Sum    
  min_val numeric(10,2),  -- Minimum value    
  max_val numeric(10,2),  -- Maximum value    
  avg_val numeric(10,2),  -- Average value    
  count_val int,  -- Count    
  all_vals numeric(10,2)[],  -- Detail value    
  unique (sid,ts_group)  -- Unique constraint    
);

Сценарий стресс-теста 1

Для этого первого сценария стресс-теста у нас будут данные, записанные и обновленные до самых последних значений датчиков в режиме реального времени.

vi ins.sql    
\set sid random(1,1000000)    
insert into sensor_data(ts,sid,val) values (clock_timestamp(), :sid, random()*1000);

Вам потребуется каждый раз объединять 50 000 записей данных.

vi lambda1.sql    
insert into sensor_lastdata select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val from unnest(get_sensor_data(50000)) group by sid on conflict (sid) do update set last_ts=excluded.last_ts,last_val=excluded.last_val;

Вы также захотите иметь около 100 000 записей данных каждую секунду.

pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 64 -j 64 -T 120    
    
transaction type: ./ins.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 64    
number of threads: 64    
duration: 120 s    
number of transactions actually processed: 12742596    
latency average = 0.603 ms    
latency stddev = 2.163 ms    
tps = 106184.095420 (including connections establishing)    
tps = 106188.650794 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.001  \set sid random(1,1000000)    
         0.602  insert into sensor_data(ts,sid,val) values (clock_timestamp(), :sid, random()*1000);

Затем используйте данные постепенно и обновляйте примерно 50 000 последних значений каждую секунду.

pgbench -M prepared -n -r -P 1 -f ./lambda1.sql -c 1 -j 1 -T 1200    
    
progress: 236.0 s, 1.0 tps, lat 649.196 ms stddev 0.000    
progress: 237.0 s, 2.0 tps, lat 868.952 ms stddev 6.024    
progress: 238.0 s, 1.0 tps, lat 728.553 ms stddev 0.000    
progress: 239.0 s, 258.1 tps, lat 5.335 ms stddev 44.167    
progress: 240.0 s, 850.9 tps, lat 0.983 ms stddev 14.506    
progress: 241.0 s, 7962.2 tps, lat 0.146 ms stddev 3.672    
progress: 242.0 s, 13488.1 tps, lat 0.074 ms stddev 0.006    
    
postgres=# select count(*) from sensor_data;    
 count     
-------    
     0    
(1 row)    
    
postgres=# select * from sensor_lastdata  limit 10;    
 sid  |          last_ts           | last_val     
------+----------------------------+----------    
  672 | 2017-05-18 16:33:43.569255 |   196.01    
  178 | 2017-05-18 16:33:31.23651  |   593.16    
  686 | 2017-05-18 16:33:38.792138 |   762.95    
 4906 | 2017-05-18 16:33:43.498217 |   150.13    
  544 | 2017-05-18 16:33:45.338635 |   410.31    
  165 | 2017-05-18 16:33:28.393902 |   678.75    
  625 | 2017-05-18 16:33:37.077898 |   229.06    
 1316 | 2017-05-18 16:33:45.218268 |    27.55    
 3091 | 2017-05-18 16:33:33.320828 |   697.75    
  340 | 2017-05-18 16:33:31.567852 |    24.18    
(10 rows)

При пакетном сборе статистики по 100 000 значений производительность можно немного повысить.

progress: 211.0 s, 1.0 tps, lat 1428.401 ms stddev 0.000    
progress: 212.0 s, 0.0 tps, lat -nan ms stddev -nan    
progress: 213.0 s, 1.0 tps, lat 1375.766 ms stddev 0.000    
progress: 214.0 s, 2665.9 tps, lat 0.699 ms stddev 23.234    
progress: 215.0 s, 8963.1 tps, lat 0.083 ms stddev 0.008    
progress: 216.0 s, 1699.4 tps, lat 0.741 ms stddev 12.434    
progress: 217.0 s, 13247.9 tps, lat 0.075 ms stddev 0.006

Сценарий стресс-теста 2

Для этого второго текста стресса у нас будут данные, записанные в режиме реального времени и собранные для дополнительной статистики по значениям датчиков в пакетах. Для этого сценария первое, что вам нужно, чтобы 100 000 записей данных каждый раз объединялись.

vi lambda2.sql    
insert into sensor_aggdata (sid,ts_group,sum_val,min_val,max_val,avg_val,count_val,all_vals) select sid,to_char(ts,'yyyymmddhh24'),sum(val),min(val),max(val),avg(val),count(val),array_agg(val order by pk) from unnest(get_sensor_data(100000))   group by sid,to_char(ts,'yyyymmddhh24')  on conflict (sid,ts_group) do update set     sum_val=sensor_aggdata.sum_val+excluded.sum_val,    min_val=least(sensor_aggdata.min_val, excluded.min_val),    max_val=greatest(sensor_aggdata.max_val, excluded.max_val),    avg_val=(sensor_aggdata.sum_val+excluded.sum_val)/(sensor_aggdata.count_val+excluded.count_val),    count_val=sensor_aggdata.count_val+excluded.count_val,    all_vals=array_cat(sensor_aggdata.all_vals, excluded.all_vals);

Затем запишите около 100 000 записей данных каждую секунду.

pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 64 -j 64 -T 120    
    
transaction type: ./ins.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 64    
number of threads: 64    
duration: 120 s    
number of transactions actually processed: 12753950    
latency average = 0.602 ms    
latency stddev = 2.733 ms    
tps = 106272.985233 (including connections establishing)    
tps = 106277.604416 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.001  \set sid random(1,1000000)    
         0.601  insert into sensor_data(ts,sid,val) values (clock_timestamp(), :sid, random()*1000);

Затем постепенно используйте данные и собирайте статистику со скоростью 44 000 записей данных каждую секунду.

pgbench -M prepared -n -r -P 1 -f ./lambda2.sql -c 1 -j 1 -T 1200    
    
progress: 287.0 s, 1.0 tps, lat 2107.584 ms stddev 0.000    
progress: 288.0 s, 0.0 tps, lat -nan ms stddev -nan    
progress: 289.0 s, 100.1 tps, lat 29.854 ms stddev 213.634    
progress: 290.0 s, 1855.0 tps, lat 0.540 ms stddev 5.677    
progress: 291.0 s, 8447.0 tps, lat 0.118 ms stddev 0.005    
    
postgres=# select * from sensor_aggdata limit 10;    
  sid   |  ts_group  | sum_val  | min_val | max_val | avg_val | count_val |                                                                      all_vals                                                                          
--------+------------+----------+---------+---------+---------+-----------+--------------------------------------------------------------------------------  
      6 | 2017051816 |  1842.71 |   42.47 |  577.09 |  307.12 |         6 | {42.47,559.47,577.09,193.62,75.74,394.32}    
      2 | 2017051816 |  5254.01 |   69.98 |  861.77 |  437.83 |        12 | {628.03,77.15,662.74,69.98,337.83,563.70,750.44,423.81,158.27,861.77,649.27,71.02}    
    226 | 2017051816 |  2756.42 |  144.00 |  680.45 |  344.55 |         8 | {350.57,144.00,194.23,352.52,680.45,302.66,420.01,311.98}    
    509 | 2017051816 |  6235.10 |   44.98 |  939.43 |  566.83 |        11 | {939.43,598.33,741.12,535.66,44.98,732.00,694.66,440.00,327.80,312.98,868.14}    
     20 | 2017051816 |  4684.00 |    7.01 |  878.64 |  425.82 |        11 | {209.70,288.67,76.35,544.31,289.33,7.01,841.21,878.64,418.05,651.01,479.72}    
 934042 | 2017051816 | 10210.41 |   46.44 |  945.59 |  486.21 |        21 | {235.86,656.24,450.73,945.59,932.06,256.10,46.44,903.74,694.43,713.79,523.25,325.82,333.67,603.01,743.63,137.48,238.60,321.65,466.50,70.49,611.33}   
    960 | 2017051816 |  3621.60 |   20.59 |  895.01 |  603.60 |         6 | {347.70,876.07,895.01,20.59,871.64,610.59}    
     81 | 2017051816 |  4209.38 |  459.06 |  949.42 |  701.56 |         6 | {716.38,949.42,706.20,459.06,613.36,764.96}    
 723065 | 2017051816 |  7176.00 |   12.37 |  983.84 |  512.57 |        14 | {869.29,715.48,323.42,595.29,983.84,700.06,716.37,741.55,137.88,12.37,334.74,951.94,46.85,46.92}    
     77 | 2017051816 |  5394.54 |   87.43 |  872.90 |  490.41 |        11 | {301.87,777.52,872.90,219.96,87.43,525.80,308.87,509.80,383.90,608.52,797.97}    
(10 rows)

2. Потоковая передача данных (режим потоковых вычислений)

В решении для потоковых вычислений PipelineDB используется для создания потока (и подробной таблицы), таблицы обновления в реальном времени и статистической таблицы.

Для этого создайте поток для хранения данных датчика.

create sequence seq;  -- Create the PK sequence.    
    
pipeline=# create stream sensor_data(    
pk int8, -- Latest PK value, used for sequencing    
ts timestamp, -- Timestamp    
sid int, -- Sensor ID    
val numeric(10,2)  -- Value    
);    
    
CREATE STREAM

Затем создайте непрерывное представление для обновления последних значений датчиков в режиме реального времени. Вам нужно использовать уникальную функцию агрегации PipelineDB для получения последних значений:

keyed_max ( key, value )  
  
Returns the value associated with the "highest" key.  
keyed_min ( key, value )  
  
Returns the value associated with the "lowest" key.

Обратите внимание, что вы не хотите использовать (array_agg(ts order by pk desc))[1], потому что PipelineDB не обеспечивает хорошей поддержки agg(order by).

-- An error occurs during the writing test because PipelineDB currently does not provide good support for agg(order by).    
CREATE CONTINUOUS VIEW sensor_lastdata1 AS     
  select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val     
    from  sensor_data    
  group by sid;    
    
--1. Replace the preceding SQL statement with this one.  
CREATE CONTINUOUS VIEW sensor_lastdata1 AS     
  select sid, keyed_max(pk, ts) as last_ts, keyed_max(pk, val) as last_val     
    from  sensor_data    
  group by sid;    
    
-- PipelineDB currently does not support the window function. You can replace the window function with keyed_max and keyed_min.      
CREATE CONTINUOUS VIEW sensor_lastdata2 AS     
  select sid,ts as last_ts,val as last_val from sensor_data    
  where row_number() over(partition by sid order by pk desc)=1;    
    
ERROR:  subqueries in continuous views cannot contain window functions

Затем создайте непрерывное представление для сбора статистики о значениях датчиков и агрегирования подробных данных в режиме реального времени.

-- An error occurs during the writing test because PipelineDB currently does not provide good support for agg(order by).    
CREATE CONTINUOUS VIEW sensor_aggdata1 AS     
  select     
  sid,    
  to_char(ts,'yyyymmddhh24') as ts_group,    
  sum(val) as sum_val,    
  min(val) as min_val,    
  max(val) as max_val,    
  avg(val) as avg_val,    
  count(val) as count_val,    
  array_agg(val order by pk) as all_vals    
    from sensor_data    
  group by sid,to_char(ts,'yyyymmddhh24');    
    
--2. Replace the preceding SQL statement with this one.  
CREATE CONTINUOUS VIEW sensor_aggdata1 AS     
  select     
  sid,    
  to_char(ts,'yyyymmddhh24') as ts_group,    
  sum(val) as sum_val,    
  min(val) as min_val,    
  max(val) as max_val,    
  avg(val) as avg_val,    
  count(val) as count_val,    
  jsonb_object_agg (pk, val) as all_vals    
    from sensor_data    
  group by sid,to_char(ts,'yyyymmddhh24');

Затем активируйте непрерывные виды.

pipeline=# activate sensor_lastdata1;    
ACTIVATE    
pipeline=# activate sensor_aggdata1;    
ACTIVATE

Стресс-тестирование

vi ins.sql    
    
\set sid random(1,1000000)    
insert into sensor_data(pk,ts,sid,val) values (nextval('seq'), clock_timestamp(), :sid, random()*1000);

Если вы не используете приведенный выше оператор замены SQL, появятся следующие сообщения об ошибках, поскольку PipelineDB в настоящее время не обеспечивает хорошей поддержки для agg(order by).

/home/digoal/pgsql10/bin/pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 1 -j 1 -T 100    
    
progress: 1.0 s, 12.0 tps, lat 1.302 ms stddev 0.455    
WARNING:  a background worker crashed while processing this batch    
HINT:  Some of the tuples inserted in this batch might have been lost.    
progress: 2.0 s, 16.0 tps, lat 70.528 ms stddev 253.719    
WARNING:  a background worker crashed while processing this batch    
HINT:  Some of the tuples inserted in this batch might have been lost.    
WARNING:  a background worker crashed while processing this batch    
HINT:  Some of the tuples inserted in this batch might have been lost.    
WARNING:  a background worker crashed while processing this batch    
HINT:  Some of the tuples inserted in this batch might have been lost.

Ниже приведены результаты стресс-тестирования после использования замещающего оператора SQL:

Для этого результаты стресс-тестирования выводятся при агрегировании значений. Для этого скорость записи составляет 127 000 записей в секунду.

/home/digoal/pgsql10/bin/pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 256 -j 256 -T 100    
  
transaction type: ./ins.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 256  
number of threads: 256  
duration: 100 s  
number of transactions actually processed: 12840629  
latency average = 1.994 ms  
latency stddev = 14.671 ms  
tps = 127857.131372 (including connections establishing)  
tps = 127864.890658 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set sid random(1,1000000)    
         1.997  insert into sensor_data(pk,ts,sid,val) values (nextval('seq'), clock_timestamp(), :sid, random()*1000);  
pipeline=# select * from sensor_aggdata1 limit 10;  
-[ RECORD 1 ]----------------------------------------------------------------------------------------------------------------------------------------------  
sid       | 444427  
ts_group  | 2017052410  
sum_val   | 4902.07  
min_val   | 18.69  
max_val   | 980.26  
avg_val   | 445.6427272727272727  
count_val | 11  
all_vals  | {"41971591": 731.45, "42075280": 69.63, "42629210": 980.26, "45243895": 18.69, "45524545": 320.88, "46971341": 741.88, "47036195": 357.47, "47895869": 562.16, "49805560": 136.78, "51753795": 344.00, "53039367": 638.87}

Вывод результатов стресс-тестирования, когда значения не агрегированы. Скорость записи составляет 200 000 записей в секунду.

CREATE CONTINUOUS VIEW sensor_aggdata2 AS     
  select     
  sid,    
  to_char(ts,'yyyymmddhh24') as ts_group,    
  sum(val) as sum_val,    
  min(val) as min_val,    
  max(val) as max_val,    
  avg(val) as avg_val,    
  count(val) as count_val    
  -- jsonb_object_agg (pk, val) as all_vals    
    from sensor_data    
  group by sid,to_char(ts,'yyyymmddhh24');    
/home/digoal/pgsql10/bin/pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 256 -j 256 -T 100    
    
transaction type: ./ins.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 256    
number of threads: 256    
duration: 100 s    
number of transactions actually processed: 20940292    
latency average = 1.222 ms    
latency stddev = 0.423 ms    
tps = 208834.531839 (including connections establishing)    
tps = 208854.792937 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.001  \set sid random(1,1000000)    
         1.222  insert into sensor_data(pk,ts,sid,val) values (nextval('seq'), clock_timestamp(), :sid, random()*1000);    
    
    
pipeline=# select * from sensor_aggdata2;    
 sid  |  ts_group  |   sum_val   | min_val | max_val |       avg_val        | count_val     
------+------------+-------------+---------+---------+----------------------+-----------    
  196 | 2017051815 | 11462397.00 |    0.00 |  999.99 | 503.1780948200175593 |     22780    
  833 | 2017051815 | 11479990.49 |    0.07 |  999.99 | 498.4365443730461966 |     23032    
  700 | 2017051815 | 11205820.52 |    0.04 |  999.97 | 497.1967574762623125 |     22538    
   83 | 2017051815 | 11466423.01 |    0.01 |  999.93 | 501.3959075604530150 |     22869    
  526 | 2017051815 | 11389541.40 |    0.01 |  999.99 | 503.4496485877204615 |     22623    
  996 | 2017051815 | 11416373.92 |    0.03 |  999.99 | 502.1938996172964413 |     22733    
  262 | 2017051815 | 11458700.05 |    0.03 |  999.98 | 499.5509656465254163 |     22938    
  542 | 2017051815 | 11365373.33 |    0.00 |  999.95 | 499.6427366246098387 |     22747    
......

3. Синхронный анализ данных (режим запуска в реальном времени)

Теперь давайте рассмотрим сценарий синхронного анализа данных. Для этого мы хотим записывать подробные данные в режиме реального времени и синхронно обновлять окончательный статус.

Вообще понятно, что использовать синхронную статистику не рекомендуется, поскольку этот метод может негативно сказаться на производительности записи RT. Тем не менее, давайте пока продолжим с этим.

Чтобы настроить это, вам нужно, чтобы итоговая таблица состояния датчиков обновлялась в режиме реального времени.

create table sensor_lastdata(    
  sid int primary key,    
  last_ts timestamp,    
  last_val numeric(10,2)    
);

Теперь, чтобы настроить сценарий стресс-теста 1, вам нужно обновить состояние датчиков в реальном времени.

vi ins.sql    
    
\set sid random(1,1000000)    
insert into sensor_lastdata values (:sid, now(), random()*1000) on conflict (sid) do update set last_ts=excluded.last_ts,last_val=excluded.last_val;

В результате производительность составляет около 180 000 записей состояния каждую секунду, как вы можете видеть ниже:

/home/digoal/pgsql10/bin/pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 128 -j 128 -T 100    
    
transaction type: ./ins.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 128    
number of threads: 128    
duration: 100 s    
number of transactions actually processed: 18659587    
latency average = 0.686 ms    
latency stddev = 2.566 ms    
tps = 186557.140033 (including connections establishing)    
tps = 186565.458460 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.001  \set sid random(1,1000000)    
         0.684  insert into sensor_lastdata values (:sid, now(), random()*1000) on conflict (sid) do update set last_ts=excluded.last_ts,last_val=excluded.last_val;

Сравнение трех методов

Сравнение производительности

Теперь давайте посмотрим, как каждый из них сравнивается.

1. Подробная запись скорости записи

  • Лямбда: 106 000 записей данных/с
  • Потоковая передача данных (включая агрегацию данных VAL): 127 800 записей/с
  • Потоковая передача данных (исключая агрегацию данных VAL): 208 000 записей/с

2. Скорость обновления конечного статуса

  • Лямбда: 59 800 записей/с
  • Синхронный анализ данных: 186 000 записей/с
  • Потоковая передача данных: 208 000 записей/с

3. Скорость статистики

  • Лямбда (включая агрегацию данных VAL): 44 000 записей/с
  • Потоковая передача данных (включая агрегацию данных VAL): 127 800 записей/с
  • Потоковая передача данных (исключая агрегацию данных VAL): 208 000 записей/с

Преимущества, недостатки и применимые сценарии

1. Лямбда-режим

Производительность Lambda относительно средняя — ничего особенного, с нашей точки зрения. Однако благодаря использованию пользовательских функций (UDF) и добавочного планирования Lambda поддерживает все статистические режимы. Функция Lambda имеет множество вариантов использования и поддерживает статистику почти в реальном времени для нескольких терабайт данных в день.

Мы ожидаем, что сообщество PostgreSQL разработает следующую функцию:

delete from table order by pk limit xxx skip locked returning array_agg(ts),array_agg(val) group by sid;

Такой запрос может удалить пакет записей из таблицы и вернуть те же записи с минимальными затратами. По сравнению с предыдущим примером производительность может быть улучшена на 100%.

2. Режим потоковых вычислений

Это решение обеспечивает максимальную производительность и удобство использования. Поэтому мы рекомендуем вам использовать это решение. Потоковое решение станет еще удобнее после преобразования PipelineDB в плагин.

3. Синхронный анализ данных (режим триггера в реальном времени)

Это решение рекомендуется, если вам нужно только обновить окончательный статус. Если это используется, ваша рабочая нагрузка по разработке будет сведена к минимуму, а также не потребуется планирование.

Оригинальный источник



📝 Читайте эту историю позже в Журнале.

👩‍💻 Просыпайтесь каждое воскресное утро и слушайте самые примечательные новости недели в области технологий, ожидающие в вашем почтовом ящике. Читать информационный бюллетень Noteworthy in Tech.