Копирование с сервера OnPrem SQL в DocumentDB с помощью настраиваемого действия в конвейере ADF

Я пытаюсь скопировать данные из таблицы SQL на локальном сервере sql и загрузить их в базу данных документов, используя настраиваемое действие в конвейере фабрики данных Azure. Может ли кто-нибудь сказать мне, как я могу это сделать с помощью IDotNetActivity или любого другого интерфейса или класса.


person Maverik    schedule 06.10.2016    source источник
comment
В этой статье подробно описано копирование из хранилище BLOB-объектов в DocumentDB. Посмотрите, сможете ли вы заставить это работать, а затем адаптируйтесь к локальному источнику данных.   -  person wBob    schedule 06.10.2016
comment
Спасибо за ответ, но я пытаюсь скопировать с помощью настраиваемого действия.   -  person Maverik    schedule 07.10.2016


Ответы (4)


Фактически, сегодня пользовательское действие не может получить доступ к локальным данным.

Аналогичный вопрос здесь: Локальное соединение SQL, выбрасывающее SqlException в Пользовательское действие Datafactory

Решение - копировать локальные данные в облако. Затем запустите настраиваемое действие для облачного хранилища. wBob поделился хорошим образцом выше.

Если вам нужно выполнить это за одно действие, вы можете настроить виртуальную сеть и ExpressRoute для подключения общедоступного облака Azure к среде onprem.

person Charles Gu    schedule 13.10.2016

Я получил это для работы с обычными задачами фабрики данных Azure (ADF). Никакой специальной задачи не требуется. Я бы не стал усложнять ситуацию, особенно с этими компонентами, которые может быть трудно отладить.

В следующем примере показано:

  1. Связанная служба типа OnPremisesSqlServer.
  2. Связанная служба типа DocumentDb.
  3. Входной набор данных типа SQLServerDataset.
  4. Выходной набор данных типа DocumentDbCollection.
  5. конвейер с действием копирования, использующим SqlSource и DocumentDbCollectionSink.

Связанная служба типа Локальный SQL Server:

{
    "name": "OnPremLinkedService",
    "properties": {
        "type": "OnPremisesSqlServer",
        "description": "",
        "typeProperties": {
            "connectionString": "Data Source=<servername - required for credential encryption>;Initial Catalog=<databasename - required for credential encryption>;Integrated Security=False;User ID=<username>;Password=<password>;",
            "gatewayName": "<Name of the gateway that the Data Factory service should use to connect to the on-premises SQL Server database - required for credential encryption>",
            "userName": "<Specify user name if you are using Windows Authentication>",
            "password": "<Specify password for the user account>"
        }
    }
}

Связанная служба типа DocumentDB:

{
    "name": "DocumentDbLinkedService",
    "properties": {
        "type": "DocumentDb",
        "typeProperties": {
            "connectionString": "AccountEndpoint=<EndpointUrl>;AccountKey=<AccessKey>;Database=<Database>"
        }
    }
}

Входной набор данных типа SqlServerTable:

{
    "name": "SQLServerDataset",
    "properties": {
        "structure": [
            {
                "name": "Id",
                "type": "Int32"
            },
            {
                "name": "FirstName",
                "type": "String"
            },
            {
                "name": "MiddleName",
                "type": "String"
            },
            {
                "name": "LastName",
                "type": "String"
            }
        ],
        "published": false,
        "type": "SqlServerTable",
        "linkedServiceName": "OnPremLinkedService",
        "typeProperties": {
            "tableName": "dbo.Users"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        },
        "external": true,
        "policy": {}
    }
}

Выходной набор данных типа DocumentDbCollection:

{
    "name": "PersonDocumentDbTableOut",
    "properties": {
        "structure": [
            {
                "name": "Id",
                "type": "Int32"
            },
            {
                "name": "Name.First",
                "type": "String"
            },
            {
                "name": "Name.Middle",
                "type": "String"
            },
            {
                "name": "Name.Last",
                "type": "String"
            }
        ],
        "published": false,
        "type": "DocumentDbCollection",
        "linkedServiceName": "DocumentDbLinkedService",
        "typeProperties": {
            "collectionName": "Person"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

Конвейер с активностью копирования с использованием SqlSource и DocumentDbCollectionSink:

{
    "name": "PipelineTemplate 3",
    "properties": {
        "description": "On prem to DocDb test",
        "activities": [
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "SqlSource"
                    },
                    "sink": {
                        "type": "DocumentDbCollectionSink",
                        "writeBatchSize": 2,
                        "writeBatchTimeout": "00:00:00"
                    },
                    "translator": {
                        "type": "TabularTranslator",
                        "columnMappings": "id: id, FirstName: Name.First, MiddleName: Name.Middle, LastName: Name.Last"
                    }
                },
                "inputs": [
                    {
                        "name": "SQLServerDataset"
                    }
                ],
                "outputs": [
                    {
                        "name": "PersonDocumentDbTableOut"
                    }
                ],
                "policy": {
                    "timeout": "1.00:00:00",
                    "concurrency": 1,
                    "retry": 3
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "CopyActivityTemplate"
            }
        ],
        "start": "2016-10-05T00:00:00Z",
        "end": "2016-10-05T00:00:00Z",
        "isPaused": false,
        "hubName": "adfdocdb2_hub",
        "pipelineMode": "Scheduled"
    }
}
person wBob    schedule 09.10.2016
comment
Я смог заставить его работать с конфигурацией ADF, как упоминалось wBob. Однако требуется сделать это с помощью настраиваемого действия. - person Maverik; 10.10.2016
comment
Это интересно. Можете ли вы объяснить, почему вы должны использовать настраиваемые действия? По опыту, их может быть очень сложно отладить, когда что-то идет не так, поэтому обычно предпочтительнее выполнять это с помощью встроенных задач. Интересно узнать больше о вашем варианте использования. Спасибо. - person wBob; 10.10.2016
comment
wBob вы правы насчет сложности отладки. В настоящее время проект находится в начальной стадии. Здесь мы расскажем, почему нам нужны специальные действия и бизнес-потребности на более поздней стадии проекта. Спасибо за ваш быстрый ответ. - person Maverik; 10.10.2016

Я смог решить проблему. Решение состоит в том, чтобы написать код в самом настраиваемом действии, который копирует данные из локального SQL Server в DocumentDB, используя следующий код:

 public async Task CopyDataFromTo(string source)
    {
        try
        {
            DataTable dtSource = new DataTable();
            string EndpointUrl = "https://yourendpoint.documents.azure.com:443/";
            string AuthorizationKey = "*****";
            SecureString authKey = new SecureString();
            foreach(char c in AuthorizationKey.ToCharArray())
            {
                authKey.AppendChar(c);
            }
            SqlDataAdapter adapSource = new SqlDataAdapter("Select * From YourTable", source);
            adapSource.Fill(dtSource);
            foreach (DataRow Dr in dtSource.Rows)
            {
                dynamic docFirst = new
                {
                    UserID = Int32.Parse(Dr["ColumnOne"].ToString()),
                    UserAlias = Dr["ColumnTwo"].ToString()
                };
                using (var client = new DocumentClient(new Uri(EndpointUrl), authKey))
                {
                    Document newDocument = await client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri("DatabaseName", "CollectionName"), docFirst);
                };
            }
        }
        catch (Exception Ex)
        {
            throw Ex;
        }
    }
person Maverik    schedule 10.10.2016

Спасибо, Чарльз. Оказывается, вы правы. Я реализовал следующее решение:

Часть 1:

Реализован конвейер фабрики данных для перемещения данных из локальных баз данных в промежуточные коллекции DocumentDB.

Часть 2:

Используется настраиваемое действие для объединения данных из разных коллекций (поэтапно) в documentdb для создания новой коллекции documentdb с необходимыми выходными данными.

person Maverik    schedule 18.10.2016