Массовая начальная загрузка для postgresql в SymmetricDS

Я пытаюсь использовать symmetricds для настройки синхронизации mysql с postgres с преобразованиями. У меня очень низкая производительность вставки при начальной загрузке со 100% загрузкой ЦП на postgres. Когда я смотрю на журнал postgres, я обнаружил, что он использует INSERT. Это нормально для нормальной работы, но не для инициализации, потому что у меня миллионы записей. Я основал PostgresBulkDatabaseWriter в исходном коде, который использует COPY вместо INSERT, и это выглядит как хорошее решение (запрос COPY sql у меня работает довольно хорошо), но я не нашел, как я могу его использовать.

Итак, мои вопросы:

Как лучше сделать начальную загрузку с symmetricds для миллионов записей?

Как я могу включить PostgresBulkDatabaseWriter для начальной (обратной начальной) загрузки?

Спасибо

UPD:

Исходные таблицы mysql:

CREATE TABLE `companies` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cid` int(11) NOT NULL,
  `universalName` text NOT NULL,
  `name` text NOT NULL,
  `country` text NOT NULL,
  `city` text NOT NULL,
  `street` text NOT NULL,
  `phone` text NOT NULL,
  `foundedYear` text NOT NULL,
  `employeeCountRange` text NOT NULL,
  `specialties` text NOT NULL,
  `websiteUrl` text NOT NULL,
  `twitterId` text NOT NULL,
  `check` tinyint(4) NOT NULL DEFAULT '0',
  `date` datetime NOT NULL,
  PRIMARY KEY (`id`)
);

CREATE TABLE `search_results` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cid` int(11) NOT NULL,
  `title` text NOT NULL,
  `description` text NOT NULL,
  `link` text NOT NULL,
  `raw` text NOT NULL,
  `date` datetime NOT NULL,
  PRIMARY KEY (`id`)
);

Основные таблицы postgres:

CREATE TABLE res_country (
    id integer NOT NULL,
    create_uid integer,
    create_date timestamp without time zone,
    write_date timestamp without time zone,
    write_uid integer,
    address_format text,
    currency_id integer,
    code character varying(2),
    name character varying(64) NOT NULL
);

INSERT INTO res_country VALUES (1, 1, '2013-11-16 06:53:31.030363', '2013-11-16 06:53:31.030363', 1, '%(street)s
%(street2)s
%(city)s %(state_code)s %(zip)s
%(country_name)s', 1, 'AD', 'Andorra, Principality of');

CREATE TABLE res_partner (
    id integer NOT NULL,
    name character varying(128) NOT NULL,
    lang character varying(64),
    company_id integer,
    create_uid integer,
    create_date timestamp without time zone,
    write_date timestamp without time zone,
    write_uid integer,
    comment text,
    ean13 character varying(13),
    color integer,
    image bytea,
    use_parent_address boolean,
    active boolean,
    street character varying(128),
    supplier boolean,
    city character varying(128),
    user_id integer,
    zip character varying(24),
    title integer,
    function character varying(128),
    country_id integer,
    parent_id integer,
    employee boolean,
    type character varying,
    email character varying(240),
    vat character varying(32),
    website character varying(64),
    fax character varying(64),
    street2 character varying(128),
    phone character varying(64),
    credit_limit double precision,
    date date,
    tz character varying(64),
    customer boolean,
    image_medium bytea,
    mobile character varying(64),
    ref character varying(64),
    image_small bytea,
    birthdate character varying(64),
    is_company boolean,
    state_id integer,
    notification_email_send character varying NOT NULL,
    opt_out boolean,
    signup_type character varying,
    signup_expiration timestamp without time zone,
    signup_token character varying,
    last_reconciliation_date timestamp without time zone,
    debit_limit double precision,
    display_name character varying,
    vat_subjected boolean,
    section_id integer
);


CREATE TABLE my_res_partner_companies (
  id INT8 NOT NULL PRIMARY KEY,
  cid INT8 NOT NULL,
  universalName VARCHAR NOT NULL,
  employeeCountRange VARCHAR NOT NULL,
  specialties VARCHAR NOT NULL,
  twitterId VARCHAR NOT NULL,
  "check" INT4 NOT NULL DEFAULT '0',
  date TIMESTAMP NOT NULL
);

CREATE TABLE my_res_partner_search_result (
  id INT8 NOT NULL PRIMARY KEY,
  link VARCHAR NOT NULL,
  raw VARCHAR NOT NULL,
  date TIMESTAMP NOT NULL
);

Исходные свойства:

engine.name=source-001

# The class name for the JDBC Driver
db.driver=com.mysql.jdbc.Driver

# The JDBC URL used to connect to the database
db.url=jdbc:mysql://localhost:3307/data?tinyInt1isBit=false

# The user to login as who can create and update tables
db.user=root

# The password for the user to login as
db.password=

# The HTTP URL of the root node to contact for registration
registration.url=http://localhost:8080/sync/core-000
#auto.reload.reverse=true

# Do not change these for running the demo
group.id=source
external.id=001

# This is how often the routing job will be run in milliseconds
job.routing.period.time.ms=5000
# This is how often the push job will be run.
job.push.period.time.ms=10000
# This is how often the pull job will be run.
job.pull.period.time.ms=10000

Основные свойства:

engine.name=core-000

# The class name for the JDBC Driver
db.driver=org.postgresql.Driver

# The JDBC URL used to connect to the database
db.url=jdbc:postgresql://localhost:5432/data2?stringtype=unspecified

# The user to login as who can create and update tables
db.user=admin

# The password for the user to login as
db.password=admin

registration.url=
sync.url=http://localhost:8080/sync/core-000
auto.reload.reverse=true
datareload.batch.insert.transactional=true

# Do not change these for running the demo
group.id=core
external.id=000

# Don't muddy the waters with purge logging
job.purge.period.time.ms=7200000

# This is how often the routing job will be run in milliseconds
job.routing.period.time.ms=5000
# This is how often the push job will be run.
job.push.period.time.ms=10000
# This is how often the pull job will be run.
job.pull.period.time.ms=10000

Основная symmetric конфигурация:

-- Nodes
insert into sym_node_group (node_group_id, description)
values ('core', 'Core Storage');
insert into sym_node_group (node_group_id, description)
values ('source', 'Source Storage');

insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action)
values ('source', 'core', 'P');
insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action)
values ('core', 'source', 'W');

insert into sym_node (node_id, node_group_id, external_id, sync_enabled)
values ('000', 'core', '000', 1);
insert into sym_node_security (node_id,node_password,registration_enabled,registration_time,initial_load_enabled,initial_load_time,initial_load_id,initial_load_create_by,rev_initial_load_enabled,rev_initial_load_time,rev_initial_load_id,rev_initial_load_create_by,created_at_node_id)
values ('000','changeme',1,current_timestamp,1,current_timestamp,null,null,0,null,null,null,'000');
insert into sym_node_identity values ('000');

-- Channels
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('source__acc', 1, 100000, 1, 'accounting synchronisation');

-- Triggers
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('source__companies','companies','source__acc',current_timestamp,current_timestamp);

insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('source__search_results','search_results','source__acc',current_timestamp,current_timestamp);

-- Routers
insert into sym_router
(router_id,source_node_group_id,target_node_group_id,router_type,create_time,last_update_time)
values('source_2_core', 'source', 'core', 'default',current_timestamp, current_timestamp);

-- Trigger Router Links
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order, INITIAL_LOAD_BATCH_COUNT,last_update_time,create_time)
values('source__companies','source_2_core', 100, 0, current_timestamp, current_timestamp);

insert into sym_trigger_router
(trigger_id,router_id,initial_load_order, INITIAL_LOAD_BATCH_COUNT,last_update_time,create_time)
values('source__search_results','source_2_core', 200, 0, current_timestamp, current_timestamp);

Основные преобразования:

-- Transform
insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__companies__main', 'source', 'core', 'LOAD', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');
--  ('source__companies__main', 'source', 'core', 'EXTRACT', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__companies__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__companies__main', '*', 'name', 'name', 0, 'copy', NULL),
  ('source__companies__main', '*', 'country', 'country_id', 0, 'const', '1'),
  ('source__companies__main', '*', 'city', 'city', 0, 'copy', NULL),
  ('source__companies__main', '*', 'street', 'street', 0, 'copy', NULL),
  ('source__companies__main', '*', 'phone', 'phone', 0, 'copy', NULL),
  ('source__companies__main', '*', 'websiteUrl', 'website', 0, 'copy', NULL),
  ('source__companies__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__companies__main', '*', NULL, 'is_company', 0, 'const', '1');

insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__companies__residue', 'source', 'core', 'LOAD', 'companies', 'my_res_partner_companies', 'DEL_ROW', 'SPECIFIED');
  -- ('source__companies__residue', 'source', 'core', 'EXTRACT', 'companies', 'my_res_partner_companies', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__companies__residue', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__companies__residue', '*', 'cid', 'cid', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'universalName', 'universalName', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'employeeCountRange', 'employeeCountRange', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'specialties', 'specialties', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'twitterId', 'twitterId', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'check', 'check', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'date', 'date', 0, 'copy', NULL);


insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point,
   source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__search_results__main', 'source', 'core', 'LOAD', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');
  -- ('source__search_results__main', 'source', 'core', 'EXTRACT', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__search_results__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'),
  ('source__search_results__main', '*', 'cid', 'parent_id', 0, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__search_results__main', '*', 'title', 'name', 0, 'copy', NULL),
  ('source__search_results__main', '*', 'description', 'comment', 0, 'copy', NULL),
  ('source__search_results__main', '*', NULL, 'use_parent_address', 0, 'const', '1'),
  ('source__search_results__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__search_results__main', '*', NULL, 'is_company', 0, 'const', '0');

insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point,
   source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__search_results__residue', 'source', 'core', 'LOAD', 'search_results', 'my_res_partner_search_result', 'DEL_ROW', 'SPECIFIED');
  -- ('source__search_results__residue', 'source', 'core', 'EXTRACT', 'search_results', 'my_res_partner_search_result', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__search_results__residue', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'),
  ('source__search_results__residue', '*', 'link', 'link', 0, 'copy', NULL),
  ('source__search_results__residue', '*', 'raw', 'raw', 0, 'copy', NULL),
  ('source__search_results__residue', '*', 'date', 'date', 0, 'copy', NULL);

Упрощенные преобразования:

-- Transform
insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__companies__main', 'source', 'core', 'LOAD', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');
--  ('source__companies__main', 'source', 'core', 'EXTRACT', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__companies__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__companies__main', '*', 'name', 'name', 0, 'copy', NULL),
  ('source__companies__main', '*', 'country', 'country_id', 0, 'const', '1'),
  ('source__companies__main', '*', 'city', 'city', 0, 'copy', NULL),
  ('source__companies__main', '*', 'street', 'street', 0, 'copy', NULL),
  ('source__companies__main', '*', 'phone', 'phone', 0, 'copy', NULL),
  ('source__companies__main', '*', 'websiteUrl', 'website', 0, 'copy', NULL),
  ('source__companies__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__companies__main', '*', NULL, 'is_company', 0, 'const', '1');


insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point,
   source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__search_results__main', 'source', 'core', 'LOAD', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');
  -- ('source__search_results__main', 'source', 'core', 'EXTRACT', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__search_results__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'),
  ('source__search_results__main', '*', 'cid', 'parent_id', 0, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__search_results__main', '*', 'title', 'name', 0, 'copy', NULL),
  ('source__search_results__main', '*', 'description', 'comment', 0, 'copy', NULL),
  ('source__search_results__main', '*', NULL, 'use_parent_address', 0, 'const', '1'),
  ('source__search_results__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__search_results__main', '*', NULL, 'is_company', 0, 'const', '0');

Основная настройка:

update sym_channel set DATA_LOADER_TYPE = 'postgres_bulk' where channel_id = 'reload';

Выглядит как symmetric вставка с COPY последовательной записи с основными преобразованиями (LOAD и EXTRACT) и упрощенными преобразованиями (LOAD и EXTRACT).


person tbicr    schedule 14.01.2014    source источник
comment
С их веб-сайта: SymmetricDS Pro — это репликация корпоративного класса, построенная на SymmetricDS с открытым исходным кодом и других проверенных компонентах OSS. Повысьте удобство работы и производительность с помощью веб-интерфейса, который упрощает настройку, мониторинг и устранение неполадок. Ядро дополнено специальными функциями, включая быстрые загрузчики данных с массовой загрузкой и поддержку синхронизации с устройствами Android. недоступно в версии OSS...   -  person fvu    schedule 14.01.2014
comment
Только что основан symbolds.org/doc/3.5 /html-single/, вероятно, массовая загрузка существует в стандартной версии.   -  person tbicr    schedule 14.01.2014
comment
@fvu О, я люблю полуоткрытые приманки. Действительно.   -  person Craig Ringer    schedule 15.01.2014
comment
OSS SymmetricDS поддерживает загрузчики больших объемов данных. Я разработчик.   -  person Austin Brougher    schedule 15.01.2014


Ответы (2)


PostgresBulkDataLoaderFactory — ваш ответ.

Если вы просто хотите использовать средство массовой записи для начальной загрузки и перезагрузки, я предлагаю вам настроить канал перезагрузки только для использования средства массовой записи.

В таблице каналов (по умолчанию sym_channel) обновите столбец data_loader_type канала перезагрузки до «postgres_bulk».

В руководстве пользователя кратко объясняется, как реализовать DatabaseWriters.

person Austin Brougher    schedule 15.01.2014
comment
Действительно update sym_channel set DATA_LOADER_TYPE = 'postgres_bulk' where channel_id = 'reload'; на узле postgres включите его. Но похоже, что он делает COPY для каждой записи отдельно, а не массово. Возможно проблемы с моими преобразованиями. Моя загрузка ЦП: 35% по postgres, 75% по симметричному. - person tbicr; 16.01.2014
comment
Массовая обработка Postgres работает невероятно быстро без преобразований. С преобразованиями похоже, что записи вставляются одна за другой с COPY для EXTRACT и LOAD transform_point. Есть ли способ проверить мою теорию? Если попробовать, можно ли настроить преобразования на массовую вставку? - person tbicr; 16.01.2014
comment
Можете ли вы поделиться, какие преобразования вы используете? Вы запрашиваете БД? - person Austin Brougher; 16.01.2014
comment
Преобразования загрузки не выполняются с помощью массового загрузчика. Ваши преобразования должны выполняться на стороне извлечения. Некоторые преобразования могут выполняться дольше, чем другие. Если вы поделитесь своим преобразованием, я могу помочь предложить оптимизацию. - person Austin Brougher; 16.01.2014
comment
Обновленный пост с примером. - person tbicr; 17.01.2014
comment
Интерпретатор BSH может работать медленно, если вы преобразовываете большое количество данных. Ваш сценарий короткий, но многократный запуск интерпретатора требует больших затрат. Обойти это можно было бы, создав пользовательское преобразование. Если вы знакомы с Java, это не должно вызвать затруднений. Прежде чем вы попробуете пользовательское преобразование, я бы попробовал изменить ваши преобразования BSH на «копию» и снова запустить вашу первоначальную загрузку, чтобы убедиться, что это ваше узкое место. symbolds.org/doc/3.5/ html-один/ - person Austin Brougher; 17.01.2014
comment
Когда я меняю bsh на copy, я ускоряю время трансформации в 2 раза. Но, тем не менее, похоже на вставку записей с COPY по-прежнему одну за другой. Насколько я понял, symmetric не выполняет (или bsh не может) предварительную компиляцию и кэширование bsh скриптов? Также может ли symmetric кэшировать результаты запроса преобразования поиска? Можете ли вы описать, как я могу подключить свою собственную трансформацию (реализация IColumnTransform) к symmetric? - person tbicr; 18.01.2014
comment
BSH не поддерживает предварительную компиляцию. В будущем мы планируем поддерживать другие языки, включая возможность компилировать код Java на лету. А пока мы предлагаем создать пользовательскую точку расширения. Создайте объект, реализующий ISingleValueColumnTransform. Вам нужно будет скомпилировать это и поместить в папку lib. Когда SymmetricDS запускается, она ищет все, что реализует точку расширения, и загружает ее. Вы должны увидеть сообщение о том, что ваша точка расширения загружена. Затем вы можете ссылаться на свое новое расширение по имени, которое вы ему назначаете. Посмотрите на CopyColumnTransform. - person Austin Brougher; 19.01.2014
comment
Я проверил копию, работающую на отдельных записях. Я просмотрел код и не тестировал его, но похоже, что по умолчанию ставится в очередь 10000 записей перед записью в БД. Вы можете изменить этот параметр, обновив файл .property ваших двигателей в папке ./engines. Добавьте следующую строку, чтобы увеличить количество записей до 100 000. postgres.bulk.load.max.rows.before.flush=100000 - person Austin Brougher; 20.01.2014
comment
Я добавил собственное преобразование столбца с помощью этого примера bean-компонента: «nofollow noreferrer»> symbolds.org/doc/3.5/html-single/. postgres.bulk.load.max.rows.before.flush=100000 не ускоряет начальную загрузку. - person tbicr; 20.01.2014
comment
Сколько записей вы загружаете и как быстро это работает? Каковы ваши требования/ожидания? - person Austin Brougher; 20.01.2014
comment
две таблицы с тестовыми данными: 10000 для компаний и 930000 для записей search_results, занимают около 12 минут. Без преобразований это занимает около 2 минут и имеет красивый журнал операторов postgres. - person tbicr; 20.01.2014
comment
Был около 24 минут с bsh. Сейчас это нормальная скорость, но выглядят вставки стали не оптимальными. Теперь я хочу просто уточнить, изменило ли преобразование вставку пакетов для массовой вставки? - person tbicr; 20.01.2014
comment
Пользовательское преобразование на стороне извлечения не повлияет на массовую вставку на стороне загрузки. - person Austin Brougher; 20.01.2014
comment
Я создал запрос функции для преобразования математического выражения. Я поиграл с этим сегодня и увидел, что он примерно в 10-20 раз быстрее, чем BSH на простых выражениях. Вы можете отслеживать запрос функции здесь. Ошибка 1542 - person Austin Brougher; 22.01.2014
comment
Спасибо, но меня больше беспокоит, почему синхронизация с преобразованиями в 6 раз медленнее, чем прямая синхронизация для моего примера. - person tbicr; 23.01.2014

Ключом к хорошей производительности вставки больших наборов данных в postgresql является выполнение всего этого за минимально возможное количество транзакций. По умолчанию postgresql использует автоматическую фиксацию для каждой вставки, поэтому каждая вставка сама по себе является транзакцией. Просто оберните все вставки с помощью begin; совершить; может дать гораздо более высокую пропускную способность.

Таким образом, причины ускорения копирования двояки. 1: одна большая транзакция и 2: без синтаксического анализа отдельных операторов вставки. Накладные расходы на синтаксический анализ минимальны, но транзакционные накладные расходы, условно говоря, огромны для отдельных операторов.

Есть ли способ заставить это сделать:

begin;
insert ...;
insert ...;
insert ...;
commit;
person Scott Marlowe    schedule 14.01.2014
comment
SymmetricDS использовать транзакции (по умолчанию 5000 записей на транзакцию) как я понял. Однако COPY будет работать быстрее даже INSERT в транзакции. - person tbicr; 16.01.2014