Я хотел бы настроить тесты для моих преобразований в Foundry, передавая тестовые входные данные и проверяя, что результат является ожидаемым. Можно ли вызвать преобразование с фиктивными наборами данных (файл .csv в репозитории) или мне следует создать функции внутри преобразования, которые будут вызываться тестами (данные, созданные в коде)?
Модульные тесты Python для преобразований Foundry?
Ответы (1)
Если вы посмотрите документацию по вашей платформе в разделе Code Repositories
-> Python Transforms
-> Python Unit Tests
, вы найдете там довольно много полезных ресурсов.
В частности, вам нужны разделы, посвященные написанию и запуску тестов.
// НАЧАЛО ДОКУМЕНТАЦИИ
Написание теста
Полную документацию можно найти по адресу https://docs.pytest.org.
Pytest находит тесты в любом файле Python, который начинается с test_. Рекомендуется поместить все ваши тесты в тестовый пакет в каталоге src вашего проекта. Тесты — это просто функции Python, имена которых также имеют префикс test_, а утверждения выполняются с помощью оператора assert Python. PyTest также будет запускать тесты, написанные с использованием встроенного в Python модуля unittest. Например, в файле transforms-python/src/test/test_increment.py простой тест будет выглядеть так:
def increment(num):
return num + 1
def test_increment():
assert increment(3) == 5
Запуск этого теста приведет к сбою проверки с сообщением, которое выглядит следующим образом:
============================= test session starts =============================
collected 1 item
test_increment.py F [100%]
================================== FAILURES ===================================
_______________________________ test_increment ________________________________
def test_increment():
> assert increment(3) == 5
E assert 4 == 5
E + where 4 = increment(3)
test_increment.py:5: AssertionError
========================== 1 failed in 0.08 seconds ===========================
Тестирование с помощью PySpark
Фикстуры PyTest — это мощная функция, которая позволяет вводить значения в тестовые функции, просто добавляя параметр с тем же именем. Эта функция используется для предоставления фикстуры spark_session для использования в ваших тестовых функциях. Например:
def test_dataframe(spark_session):
df = spark_session.createDataFrame([['a', 1], ['b', 2]], ['letter', 'number'])
assert df.schema.names == ['letter', 'number']
// КОНЕЦ ДОКУМЕНТАЦИИ
Если вы не хотите указывать свои схемы в коде, вы также можете прочитать их в файле в своем репозитории, следуя инструкциям в документации в разделе How To
-> Read file in Python repository
.
// НАЧАЛО ДОКУМЕНТАЦИИ
Прочитать файл в репозитории Python
Вы можете читать другие файлы из вашего репозитория в контексте преобразования. Это может быть полезно при настройке параметров для ссылки на ваш код преобразования.
Для начала в вашем репозитории Python отредактируйте setup.py:
setup(
name=os.environ['PKG_NAME'],
# ...
package_data={
'': ['*.yaml', '*.csv']
}
)
Это говорит python о том, что нужно связать файлы yaml и csv в пакет. Затем поместите файл конфигурации (например, config.yaml, но также может быть csv или txt) рядом с преобразованием python (например, read_yml.py, см. ниже):
- name: tbl1
primaryKey:
- col1
- col2
update:
- column: col3
with: 'XXX'
Вы можете прочитать это в своем преобразовании read_yml.py
с помощью кода ниже:
from transforms.api import transform_df, Input, Output
from pkg_resources import resource_stream
import yaml
import json
@transform_df(
Output("/Demo/read_yml")
)
def my_compute_function(ctx):
stream = resource_stream(__name__, "config.yaml")
docs = yaml.load(stream)
return ctx.spark_session.createDataFrame([{'result': json.dumps(docs)}])
Таким образом, структура вашего проекта будет такой:
- some_folder
- config.yaml
- read_yml.py
Это выведет в вашем наборе данных одну строку с одним результатом столбца с содержимым:
[{"primaryKey": ["col1", "col2"], "update": [{"column": "col3", "with": "XXX"}], "name": "tbl1"}]
// КОНЕЦ ДОКУМЕНТАЦИИ