В начале 2023 года произошел кембрийский взрыв ИИ в пространстве естественного языка. Такие термины, как LLM, основополагающие модели, генеративный ИИ и многое другое, можно услышать в местной кофейне или в ежедневных новостях. Что-то под названием ChatGPT революционизирует мир. Что такое ChatGPT? Это действительно приходит на работу? Может ли ChatGPT писать эссе? Почему учителя бьют тревогу, когда студенты отправляют письменные эссе ИИ? И т. д. и т. д. Если наша работа как инженеров состоит в том, чтобы автоматизировать себя без работы, возможно, мы, наконец, преуспели! Одним из важнейших аспектов успеха ChatGPT является его способность поддерживать контекст не только через архитектуру модели, но и разговор между вами и ботом. Это то, что мы собираемся исследовать здесь, когда мы создаем приложение с отслеживанием состояния на Python с помощью Bytwax.

Прежде чем мы перейдем к нашему приложению, давайте кратко рассмотрим некоторые языки и термины. Хотите пропустить слова и сразу перейти к коду? Репозиторий на GitHub

LLM и базовые модели

LLM (Large Language Models) — это модели машинного обучения, обученные работе с огромными объемами текстовых данных. Они используют эти данные для изучения закономерностей и взаимосвязей в языке, что позволяет им генерировать человекоподобный текст с удивительной точностью. Модели часто обучаются на миллиардах слов из книг, веб-сайтов и других источников, что делает их одними из самых больших и сложных моделей в мире. Часто вы услышите ссылку на параметры в отношении размера модели. Параметрам присваиваются значения в виде весов или смещений, которые в упрощенном виде сообщают модели, какие токены важны.

Фундаментальные модели относятся к моделям, которые обучаются на огромном объеме данных без присмотра и затем могут быть настроены для различных приложений. Например, вы можете настроить базовую модель в области права, чтобы отвечать на юридические вопросы.

НАТС

NATS — это высокопроизводительная система обмена сообщениями с открытым исходным кодом, разработанная для облачных приложений и микросервисов. Он обеспечивает эффективную и масштабируемую связь между приложениями и службами и поддерживает такие функции, как публикация/подписка на сообщения, шаблоны запроса/ответа и потоковая передача. NATS написан на языке программирования Go и спроектирован таким образом, чтобы быть легким, быстрым и простым в использовании. Он стал популярным выбором для организаций, которые ищут быструю, эффективную и надежную систему обмена сообщениями для своих облачных приложений. Мы собираемся использовать NATS для простоты использования шаблона ответа на запрос.

Контекст и состояние

Когда мы разговариваем как люди, мы получаем информацию из множества различных источников в нашем окружении, что позволяет нам реагировать соответствующим образом; мы воспринимаем наше окружение, визуальные сигналы, язык тела и то, о чем говорилось в нашем разговоре. Это можно назвать контекстом. Программным приложениям трудно иметь такой мультимодальный контекст, но мы все же можем поддерживать тот контекст, который мы можем собрать, чтобы сделать приложение более интеллектуальным. Это то, что делает ChatGPT, он хранит информацию о нашем разговоре, чтобы отвечать разумно.

В приложении контекст хранится в состоянии, на которое может ссылаться приложение. Это решается несколькими способами, например, он может быть сохранен в браузере как часть сеанса или в базе данных, на которую ссылается сервер приложений в течение нескольких сеансов. С механизмом обработки распределенных потоков вы можете поддерживать состояние глобально или локально. Решение о том, используете ли вы один или другой, повлияет на задержку и сложность. Локально сохраненное состояние может обеспечить очень низкую задержку, но требует правильной маршрутизации данных нужному исполнителю. Глобальное состояние может быть медленнее, но вы не так заботитесь о маршрутизации правильных данных к правильному воркеру.

Bytewax поддерживает состояние каждого воркера и направляет данные правильному воркеру для операций с отслеживанием состояния. Мы воспользуемся этим, чтобы создать клон ChatGPT.

Разработка нашего клона ChatGPT

Мы собираемся создать поток данных Bytewax, который облегчит получение подсказок через NATS, поддержание диалога между пользователем и ИИ, а также упростит ответ на подсказку через NATS. Общая архитектура нашего приложения включена для справки.

Предпосылки

NATS
Входит в состав нашего docker-compose, вы также можете установить NATS через их веб-сайт.

Docker/Docker Compose
Мы будем использовать Docker, чтобы избежать тонкостей в разных ОС и средах. Docker Compose облегчит запуск нескольких сервисов, которым необходимо взаимодействовать друг с другом.

Ключ OpenAI API
Следуйте инструкциям на веб-сайте OpenAI.

Модули Python
bytewax Streamlit Backoff Pynats Openai

Вход НАТС

Для начала рассмотрим механизм ввода. Мы собираемся создать пользовательский ввод с помощью Bytewax manualInputConfig. Это будет подписка на субъект NATS с именем prompts, и когда будет получено новое сообщение NATS, он передаст необходимую информацию вниз по течению. Так, как работает NATS, мы получим уникальный INBOX идентификатор и полезную нагрузку. Нам нужна информация об идентификаторе INBOX, чтобы ответить этой информацией, поскольку наш интерфейс будет ждать ее. Так работает цикл запроса NATS.

from pynats import NATSClient
nats_client = NATSClient()
nats_client.connect()

def input_builder(worker_index, worker_count, state):
    # Ignore state recovery here
    state = None
    messages = []

    def callback(message):
        messages.append(message)

    nats_client.subscribe(subject="prompts", callback=callback)

    while True:
        nats_client.wait(count=1)
        yield None, messages.pop()
        
flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))

Ответ чат-бота и разговор

Если вы помните, выше мы упомянули процессоры с отслеживанием состояния и то, как в распределенном приложении нам нужно направлять данные нужному рабочему процессу, если состояние поддерживается локально. Мы вытащим информацию user_id из полезной нагрузки сообщения NATS и используем ее в качестве ключа. Затем Bytewax направит данные на основе этого ключа, чтобы они попали к нужному работнику для нашего оператора с отслеживанием состояния. Это обрабатывается оператором map.

import json

def key_on_user(msg):
    user_id = json.loads(msg.payload.decode())['user_id']
    return(user_id, msg)

flow.map(key_on_user)

После того, как данные были введены для пользователя, мы можем использовать оператор с отслеживанием состояния stateful_map, который позволит нам сделать запрос к OpenAI для ответа на запрос, включенный в сообщение NATS, а также сохранить состояние нашего разговора. stateful_map ожидает имя шага, функцию построения и функцию сопоставления.

Способ работы stateful_map заключается в том, что перед шагом данные обмениваются между работниками по мере необходимости на основе ключа. Затем, если ключ виден в первый раз, когда вызывается функция построителя, будет вызвана функция сопоставления.

class Conversation:

    def __init__(self) -> None:
        self.session_responses = []
        self.session_prompts = []
        self.session_start = time.time()
        self.elapsed_time = 0

    def generate_response(self, msg):
        if len(self.session_responses) >= 1:
            chat_log = "\n".join([f"Human: {x[0]}\nAI:{x[1]}" for x in zip(self.session_prompts, self.session_responses)])
        else:
            chat_log = ""
        prompt = f"{chat_log}\nHuman: {msg.payload.decode()}\nAI:"
        response = completions_with_backoff(
            model="text-davinci-003",
            prompt=prompt,
            stop=['\nHuman'],
            temperature=0.6,
            max_tokens=2000
        )
        print(response)
        self.session_responses.append(response.choices[0].text)
        self.session_prompts.append(msg.payload.decode())
        return self, (msg, response.choices[0].text)
    
flow.stateful_map(
    step_id = "conversation",
    builder = lambda: Conversation(),
    mapper = Conversation.generate_response,
)

Чтобы помочь с ограничением скорости в API openAI, мы добавляем функцию отсрочки/повторной попытки.

@backoff.on_exception(backoff.expo, openai.error.RateLimitError)
def completions_with_backoff(**kwargs):
    return openai.Completion.create(**kwargs)

Выход НАТС

Чтобы ответить NATS таким образом, чтобы наш ожидающий клиент получил сообщение, мы используем информацию из сообщения NATS, которое мы получили изначально, и запускаем код ответа внутри пользовательского оператора захвата.

def output_builder(worker_index, worker_count):
    def publish(key__msg__response):
        key, (msg, response) = key__msg__response
        nats_client.publish(msg.reply, payload=response)
    
    return publish

flow.capture(ManualOutputConfig(output_builder))

Запуск NATS

Теперь, когда у нас есть написанный поток данных, давайте запустим NATS и наш поток данных через docker compose, чтобы они могли легко взаимодействовать друг с другом.

version: "3.5"
services:
  nats-server:
    image: nats:latest
    restart: always
    ports:
      - "8222:8222"
      - "6222:6222"
      - "4222:4222"
  bytewax:
    image: bytewax-chatbot
    container_name: bytewax-1
    environment:
      - NATS_URL=nats://nats-server:4222
      - OPENAI_API_KEY=$OPENAI_API_KEY
    depends_on:
      - nats

Мы можем запустить их с помощью команд в простом сценарии запуска. Важно, чтобы ваш API-ключ сначала был сохранен как переменная среды. export OPENAI_API_KEY=<your key here>

# run.sh
# build docker image
docker build . -t bytewax-chatbot 

# run dataflow
docker compose up -e OPENAI_API_KEY=`$OPENAI_API_KEY`

Создание внешнего интерфейса Streamlit

Теперь у нас есть NATS, выступающий в роли нашего брокера, и наш поток данных, ожидающий подсказок и готовый предоставить ответ, мы можем добавить небольшой интерактивный виджет для написания и отправки наших подсказок. Streamlit — хороший инструмент для создания подобных приложений.

# frontend.py
import streamlit as st
from pynats import NATSClient

import json


# set up page details
st.set_page_config(
    page_title="Bytewax ChatGPT",
    page_icon="🐝",
    layout="wide",
)

st.title("Bytewax ChatGPT")

if __name__ == "__main__":
    
    user_id = st.text_input('Enter your user id')
    print(user_id)
    prompt = st.text_input('Enter your question here')
    if st.button("Submit"):
        print("querying for {prompt}")
        with NATSClient() as client:
            payload = json.dumps({
                    "user_id":user_id,
                    "prompt":prompt})
            print(payload)
            msg = client.request("prompts", payload=payload.encode())
        st.write(msg.payload.decode())

И мы можем запустить наше потоковое приложение с помощью инструмента командной строки.

streamlit run frontend.py

Масштабирование

Хорошая вещь в нашей архитектуре заключается в том, что масштабирование серверной части потока данных bytewax довольно просто. Просто масштабируйте worker_count_per_proc и/или proc_count в своем исполнении.

if __name__ == "__main__":
    spawn_cluster(flow, proc_count = 1, worker_count_per_proc = 1,)

Краткое содержание

Идите и играйте со своим новым клоном ChatGPT! Теперь вы можете создавать приложения с отслеживанием состояния с помощью Bytewax и NATS :)!

Следующие шаги могут включать:

  1. Добавление второго захвата в поток данных для записи запросов и ответов в базу данных для дальнейшей точной настройки.
  2. Добавление шага суммирования, который будет суммировать разговор, когда у нас закончатся токены, передающие все ответы обратно.
  3. Отказ от OpenAI с моделью трансформера с обнимающим лицом, чтобы сэкономить $$.
  4. Переход на следующий уровень и точная настройка собственной модели для определенного приложения.

Смотрите нашу полную галерею руководств → https://www.bytewax.io/guides

Первоначально опубликовано на https://www.bytewax.io.