Я пытаюсь скопировать данные из таблицы SQL на локальном сервере sql и загрузить их в базу данных документов, используя настраиваемое действие в конвейере фабрики данных Azure. Может ли кто-нибудь сказать мне, как я могу это сделать с помощью IDotNetActivity или любого другого интерфейса или класса.
Копирование с сервера OnPrem SQL в DocumentDB с помощью настраиваемого действия в конвейере ADF
Ответы (4)
Фактически, сегодня пользовательское действие не может получить доступ к локальным данным.
Аналогичный вопрос здесь: Локальное соединение SQL, выбрасывающее SqlException в Пользовательское действие Datafactory
Решение - копировать локальные данные в облако. Затем запустите настраиваемое действие для облачного хранилища. wBob поделился хорошим образцом выше.
Если вам нужно выполнить это за одно действие, вы можете настроить виртуальную сеть и ExpressRoute для подключения общедоступного облака Azure к среде onprem.
Я получил это для работы с обычными задачами фабрики данных Azure (ADF). Никакой специальной задачи не требуется. Я бы не стал усложнять ситуацию, особенно с этими компонентами, которые может быть трудно отладить.
В следующем примере показано:
- Связанная служба типа OnPremisesSqlServer.
- Связанная служба типа DocumentDb.
- Входной набор данных типа SQLServerDataset.
- Выходной набор данных типа DocumentDbCollection.
- конвейер с действием копирования, использующим SqlSource и DocumentDbCollectionSink.
Связанная служба типа Локальный SQL Server:
{
"name": "OnPremLinkedService",
"properties": {
"type": "OnPremisesSqlServer",
"description": "",
"typeProperties": {
"connectionString": "Data Source=<servername - required for credential encryption>;Initial Catalog=<databasename - required for credential encryption>;Integrated Security=False;User ID=<username>;Password=<password>;",
"gatewayName": "<Name of the gateway that the Data Factory service should use to connect to the on-premises SQL Server database - required for credential encryption>",
"userName": "<Specify user name if you are using Windows Authentication>",
"password": "<Specify password for the user account>"
}
}
}
Связанная служба типа DocumentDB:
{
"name": "DocumentDbLinkedService",
"properties": {
"type": "DocumentDb",
"typeProperties": {
"connectionString": "AccountEndpoint=<EndpointUrl>;AccountKey=<AccessKey>;Database=<Database>"
}
}
}
Входной набор данных типа SqlServerTable:
{
"name": "SQLServerDataset",
"properties": {
"structure": [
{
"name": "Id",
"type": "Int32"
},
{
"name": "FirstName",
"type": "String"
},
{
"name": "MiddleName",
"type": "String"
},
{
"name": "LastName",
"type": "String"
}
],
"published": false,
"type": "SqlServerTable",
"linkedServiceName": "OnPremLinkedService",
"typeProperties": {
"tableName": "dbo.Users"
},
"availability": {
"frequency": "Day",
"interval": 1
},
"external": true,
"policy": {}
}
}
Выходной набор данных типа DocumentDbCollection:
{
"name": "PersonDocumentDbTableOut",
"properties": {
"structure": [
{
"name": "Id",
"type": "Int32"
},
{
"name": "Name.First",
"type": "String"
},
{
"name": "Name.Middle",
"type": "String"
},
{
"name": "Name.Last",
"type": "String"
}
],
"published": false,
"type": "DocumentDbCollection",
"linkedServiceName": "DocumentDbLinkedService",
"typeProperties": {
"collectionName": "Person"
},
"availability": {
"frequency": "Day",
"interval": 1
}
}
}
Конвейер с активностью копирования с использованием SqlSource и DocumentDbCollectionSink:
{
"name": "PipelineTemplate 3",
"properties": {
"description": "On prem to DocDb test",
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource"
},
"sink": {
"type": "DocumentDbCollectionSink",
"writeBatchSize": 2,
"writeBatchTimeout": "00:00:00"
},
"translator": {
"type": "TabularTranslator",
"columnMappings": "id: id, FirstName: Name.First, MiddleName: Name.Middle, LastName: Name.Last"
}
},
"inputs": [
{
"name": "SQLServerDataset"
}
],
"outputs": [
{
"name": "PersonDocumentDbTableOut"
}
],
"policy": {
"timeout": "1.00:00:00",
"concurrency": 1,
"retry": 3
},
"scheduler": {
"frequency": "Day",
"interval": 1
},
"name": "CopyActivityTemplate"
}
],
"start": "2016-10-05T00:00:00Z",
"end": "2016-10-05T00:00:00Z",
"isPaused": false,
"hubName": "adfdocdb2_hub",
"pipelineMode": "Scheduled"
}
}
Я смог решить проблему. Решение состоит в том, чтобы написать код в самом настраиваемом действии, который копирует данные из локального SQL Server в DocumentDB, используя следующий код:
public async Task CopyDataFromTo(string source)
{
try
{
DataTable dtSource = new DataTable();
string EndpointUrl = "https://yourendpoint.documents.azure.com:443/";
string AuthorizationKey = "*****";
SecureString authKey = new SecureString();
foreach(char c in AuthorizationKey.ToCharArray())
{
authKey.AppendChar(c);
}
SqlDataAdapter adapSource = new SqlDataAdapter("Select * From YourTable", source);
adapSource.Fill(dtSource);
foreach (DataRow Dr in dtSource.Rows)
{
dynamic docFirst = new
{
UserID = Int32.Parse(Dr["ColumnOne"].ToString()),
UserAlias = Dr["ColumnTwo"].ToString()
};
using (var client = new DocumentClient(new Uri(EndpointUrl), authKey))
{
Document newDocument = await client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri("DatabaseName", "CollectionName"), docFirst);
};
}
}
catch (Exception Ex)
{
throw Ex;
}
}
Спасибо, Чарльз. Оказывается, вы правы. Я реализовал следующее решение:
Часть 1:
Реализован конвейер фабрики данных для перемещения данных из локальных баз данных в промежуточные коллекции DocumentDB.
Часть 2:
Используется настраиваемое действие для объединения данных из разных коллекций (поэтапно) в documentdb для создания новой коллекции documentdb с необходимыми выходными данными.