Журналы являются ценным источником информации для мониторинга трафика веб-сайта и устранения неполадок. Однако управление журналами может оказаться непростой задачей, особенно если у вас большой веб-сайт с большим трафиком.

Одним из способов упростить управление журналами является использование брокера сообщений, такого как RabbitMQ. RabbitMQ — это легкий и надежный брокер обмена сообщениями, который можно использовать для разделения приложений и распределения сообщений между различными системами.

В этой статье мы создадим трекер посетителей, который отправляет логи боту Telegram с помощью RabbitMQ и FastAPI. Кроме того, средство отслеживания посетителей использует базу данных Postgres для хранения журналов, но вы можете пропустить эту часть, если хотите, и создать только API с его промежуточным программным обеспечением. Будет два отдельных проекта: трекер посетителей и телеграмм-бот. А подключать их мы будем с помощью RabbitMQ.

К концу этой статьи у вас будет работающая система, которая может отправлять логи боту Telegram в режиме реального времени. Это позволит вам отслеживать трафик вашего веб-сайта и легче устранять проблемы.

Архитектура, управляемая событиями

Как объясняется в этой статье в Amazon, архитектура, управляемая событиями, использует события для запуска и обмена данными между несвязанными службами и распространена в современных приложениях, созданных с помощью микросервисов. Событие — это изменение состояния или обновление, например, добавление товара в корзину на веб-сайте электронной коммерции. События могут либо нести состояние (приобретенный товар, его цена и адрес доставки), либо события могут быть идентификаторами (уведомление об отправке заказа).

У управляемых событиями архитектур есть три ключевых компонента: производители событий, маршрутизаторы событий и потребители событий. Производитель публикует событие на маршрутизаторе, который фильтрует и отправляет события потребителям. Сервисы-производители и сервисы-потребители разделены, что позволяет масштабировать, обновлять и развертывать их независимо друг от друга.

Кроме того, в этой статье в RedHat говорится, что архитектура, управляемая событиями, может быть основана либо на модели pub/sub, либо на модели потока событий.

Паб/саб модель

Это инфраструктура обмена сообщениями, основанная на подписках на поток событий. В этой модели после того, как событие происходит или публикуется, оно отправляется подписчикам, которых необходимо проинформировать.

Модель потоковой передачи событий

В модели потоковой передачи событий события записываются в журнал. Потребители событий не подписываются на поток событий. Вместо этого они могут читать из любой части потока и могут присоединиться к потоку в любое время.

В этом примере мы собираемся создать микросервис с FastAPI и Postgres, который отслеживает запросы API, затем каждый журнал будет отправляться в очередь RabbitMQ и, наконец, в бот Telegram. Сервер FastaAPI будет производителем, а бот Telegram — потребителем.

Требования

  • Python установлен
  • Базовые знания Python
  • Пип установлен
  • Постгрес установлен
  • Установлен Ngrok или любое программное обеспечение для туннелирования.
  • Установлен RabbitMQ (Скачать отсюда)

Создание системы отслеживания посетителей

Во-первых, мы создаем каталог для этого приложения.

mkdir visitor_tracker
cd

#Windows users
py -m venv venv
cd venv/Scripts
./activate

#Linux
python3 -m venv venv
source venv/bin/activate

Устанавливаем все необходимые нам зависимости.

pip install fastapi psycopg2-binary python-dotenv

Внутри каталога visitor_tracker мы создаем новый файл, main.py.

main.py

from fastapi import FastAPI, Request
from datetime import datetime

app = FastAPI()

@app.get("/")
def index(request: Request):
    """Track website visitor."""
    ip_address = request.client.host
    request_url = request.url._url
    request_port = request.url.port
    request_path = request.url.path
    request_method = request.method
    request_time = datetime.now()
    browser_type = request.headers["User-Agent"]
    operating_system = request.headers["Accept"]

    return {
        "ip_address": ip_address,
        "request_url": request_url,
        "request_port": request_port,
        "request_path": request_path,
        "request_method": request_method,
        "request_time": request_time,
        "browser_type": browser_type,
        "operating_system": operating_system,
    }



if __name__ == "__main__":
    app.run(debug=True)

В этом файле мы определяем приложение FastAPI, которое отслеживает посетителей веб-сайта. Маршрут index() — единственный маршрут, определенный в приложении. Этот маршрут принимает объект Request в качестве входных данных и возвращает словарь информации о посетителе веб-сайта. Отслеживаемая информация включает IP-адрес посетителя, URL-адрес запроса, порт запроса, путь запроса, метод запроса, время запроса, тип браузера и операционную систему.

Мы переходим к «localhost: 8000», и мы должны увидеть этот ответ в нашем окне.

Но мы получили эти данные от конечной точки, мы не хотим писать этот код для каждой конечной точки. Итак, мы собираемся создать промежуточное ПО, которое собирает эти данные из каждого запроса и для каждой конечной точки.

Мы создаем новый файл tracker.py.

from fastapi import Request
from datetime import datetime


def visitor_tracker(request: Request):
    ip_address = request.client.host
    request_url = request.url._url
    request_port = request.url.port
    request_path = request.url.path
    request_method = request.method
    request_time = datetime.now()
    browser_type = request.headers["User-Agent"]
    operating_system = request.headers["Accept"]

    return {
        "ip_address": ip_address,
        "request_url": request_url,
        "request_port": request_port,
        "request_path": request_path,
        "request_method": request_method,
        "request_time": request_time,
        "browser_type": browser_type,
        "operating_system": operating_system,
    }

Функция visitor_tracker() используется для отслеживания одного посетителя веб-сайта. Функция принимает на вход объект Request и возвращает словарь информации о посетителе сайта. Отслеживаемая информация включает IP-адрес посетителя, URL-адрес запроса, порт запроса, путь запроса, метод запроса, время запроса, тип браузера и операционную систему.

Затем мы переходим к файлу main.py и создаем промежуточное ПО.

from fastapi import FastAPI, Request
from datetime import datetime
from tracker import visitor_tracker

app = FastAPI()


@app.middleware("tracker")
async def tracker(request: Request, call_next):
    tracker = visitor_tracker(request)
    response = await call_next(request)

    return response


@app.get("/")
def index():
    return "Hello, world"

@app.get("/json")
def some_func():
    return {
        "some_json": "Some Json"
    }



if __name__ == "__main__":
    app.run(debug=True)

Функция промежуточного программного обеспечения вызывается для каждого запроса к приложению. Функция промежуточного программного обеспечения сначала вызывает функцию visitor_tracker() для отслеживания посетителя веб-сайта. Затем функция промежуточного программного обеспечения вызывает функцию call_next() для обработки запроса.

Это данные, которые мы хотим, чтобы API хранил в базе данных и отправлял брокеру (RabbitMQ).

Теперь нам нужно создать базу данных для его хранения.

Нам нужно создать базу данных.

В нашей командной строке мы запускаем следующую команду:

CREATE DATABASE logs_db;

init_db.py

import os
import psycopg2
from dotenv import load_dotenv

load_dotenv()
USER = os.getenv('USER')
PASSWORD = os.getenv('PASSWORD')

def get_db_connection():
    conn = psycopg2.connect(
        dbname = "logs_db",
        user = "postgres",
        password = PASSWORD
    )
    return conn

conn = get_db_connection()
cur = conn.cursor()

cur.execute('DROP TABLE IF EXISTS logs;')
cur.execute('CREATE TABLE logs (id serial PRIMARY KEY,'
                                 'ip_address varchar (150) NOT NULL,'
                                 'request_url varchar (50) NOT NULL,'
                                 'request_port integer NOT NULL,'
                                 'request_path varchar (50) NOT NULL,'
                                 'request_method varchar (50) NOT NULL,'
                                 'browser_type varchar (150) NOT NULL,'
                                 'operating_system varchar (150) NOT NULL,'
                                 'request_time timestamp (50) NOT NULL,'

                                 'date_added date DEFAULT CURRENT_TIMESTAMP);'
                                 )

cur.execute('INSERT INTO logs (ip_address,'
                                 'request_url,'
                                 'request_port,'
                                 'request_path,'
                                 'request_method,'
                                 'browser_type,'
                                 'operating_system,'
                                 'request_time)'
                                 'VALUES (%s, %s, %s, %s, %s, %s, %s, %s)',
            ('127.0.0.1',
             'http://localhost:8000',
             8000,
             "/",
             "GET",
             "Chrome",
             "Windows 11",
             "2023-06-25T16:03:24.722256"
             )
            )




conn.commit()

cur.close()
conn.close()

Здесь мы настраиваем соединение с базой данных для хранения данных журнала. Сначала мы загружаем файл .env, используя dotenv, чтобы получить переменные имени пользователя и пароля базы данных. Затем мы определяем функцию get_db_connection(), которая устанавливает соединение с базой данных PostgreSQL с именем logs_db. Затем код вызывает эту функцию, чтобы получить соединение с базой данных и курсор.

В этом файле код также удаляет таблицу журналов, если она существует, и воссоздает ее с заданной схемой — со столбцами для хранения IP-адреса, URL-адреса запроса, порта, пути, метода, браузера, ОС, времени и т. д. Он вставляет образцы данных журнала. в таблицу со своими значениями. Он фиксирует изменения в базе данных и закрывает курсор и соединение.

helpers.py

import collections


def to_dict(psycopg_tuple:tuple):
    tracker = collections.OrderedDict()
    tracker['id'] = psycopg_tuple[0]

    tracker["ip_address"] = psycopg_tuple[1]
    tracker["request_url"] = psycopg_tuple[2]
    tracker["request_port"] = psycopg_tuple[3]

    tracker["request_path"] = psycopg_tuple[4]
    tracker["request_method"] = psycopg_tuple[5]
    tracker["browser_type"] = psycopg_tuple[6]
    tracker["operating_system"] = psycopg_tuple[7]
    tracker["request_time"] = psycopg_tuple[8].strftime("%d-%m-%Y, %H:%M:%S")
    return tracker


def list_dict(rows:list):

    row_list = []
    for row in rows:
        book_dict = to_dict(row)
        row_list.append(book_dict)

    return row_list

Этот файл имеет две функции: to_dict() и list_dict(). Функция to_dict() преобразует кортеж PostgreSQL в словарь. Функция list_dict() преобразует список кортежей PostgreSQL в список словарей.

Функция to_dict() принимает на вход кортеж PostgreSQL и возвращает словарь. Словарь содержит значения кортежа в том же порядке, что и кортеж. Функция list_dict() принимает на вход список кортежей PostgreSQL и возвращает список словарей. Словари создаются с помощью функции to_dict().

controllers.py

from init_db import get_db_connection
from helpers import to_dict,list_dict
import json


def all_logs():
    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute('SELECT * FROM logs;')
    logs = list_dict(cur.fetchall())
    cur.close()
    conn.close()


    return logs

def new_log(ip_address: str,
         request_url: str,
         request_port: int,
         request_path: str,
         request_method: str,
         browser_type: str,
         operating_system: str,
         request_time: str,):

    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute('INSERT INTO logs (ip_address, request_url, request_port, request_path, request_method, browser_type, operating_system, request_time)'
                    'VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING *;',(ip_address,
                                                    request_url,
                                                    request_port,
                                                    request_path,
                                                    request_method,
                                                    browser_type,
                                                    operating_system,
                                                    request_time,))

    log = cur.fetchone()[:]
    log_dict = to_dict(log)
    conn.commit()
    cur.close()
    conn.close()

    return json.dumps(log_dict)

Функция all_logs() получает все журналы из базы данных и возвращает список словарей. Каждый словарь содержит информацию об одном журнале.

Функция new_log() вставляет новый журнал в базу данных.

Создание телеграмм бота

БотОтец

Для создания Telegram-бота нам нужен токен.

Перейдите в приложение Telegram и введите @BotFather в строке поиска.

Выберите /start, чтобы активировать бота. Затем выбираем /newbot и следуем инструкциям по созданию бота.

Мы должны выбрать имя, которое увидят пользователи, и имя пользователя. Затем бот отправит нам сообщение с нашим токеном.

Теперь мы создаем новый проект для бота телеграммы.

mkdir telegram_bot
cd telegram_bot

Создайте виртуальную среду для проекта телеграмм-бота.

Установите библиотеку pyTelegramBotAPI с помощью pip:

pip install pyTelegramBotAPI python-dotenv

Создаем файл .env и вставляем в него токен, который дал нам BotFather.

.env

BOT_TOKEN=<Telegram Token>

Создаем новый файл bot.py, куда будем писать обработчики бота Telegram.

bot.py

import os
import logging
import telebot
from dotenv import load_dotenv

logger = telebot.logger
telebot.logger.setLevel(logging.INFO)


load_dotenv()

BOT_TOKEN = os.getenv('BOT_TOKEN')



bot = telebot.TeleBot(BOT_TOKEN)

@bot.message_handler(commands=['start', 'hello'])
def send_welcome(message):
    bot.reply_to(message, f"Howdy, how are you doing? This is your chat ID: {message.chat.id}")



bot.infinity_polling()

Мы инициализируем экземпляр Telebot, используя файл BOT_TOKEN.

Вышеприведенное отправляет сообщение «Привет, как дела? Это ваш идентификатор чата: { message.chat.id}» для пользователя, когда пользователь отправляет команду /start или /hello. Нам понадобится идентификатор чата для отправки журналов определенному чату или пользователю.

Запускаем опрос бота с помощью infinity_polling(). Это будет опрос для новых обновлений на неопределенный срок.

Запускаем python3 bot.py в своей командной строке и переходим в Telegram для отправки команды /start или /hello.

Но это всего лишь сценарий. Нам нужна программа, которая получает журналы из службы FastAPI, которую мы создали ранее.

Мы должны запустить этот скрипт на сервере. Мы собираемся использовать FastAPI для запуска бота.

pip install fastapi uvicorn

Мы создаем новый файл main.py.

from fastapi import FastAPI, Request
from datetime import datetime
import os
import logging
import telebot
from dotenv import load_dotenv



logger = telebot.logger
telebot.logger.setLevel(logging.INFO)


load_dotenv()

BOT_TOKEN = os.getenv('BOT_TOKEN')
WEBHOOK_HOST = '<ip/domain>'
WEBHOOK_PORT = 8443  # 443, 80, 88 or 8443 (port need to be 'open')
WEBHOOK_LISTEN = '0.0.0.0'  # In some VPS you may need to put here the IP addr
WEBHOOK_URL_BASE = "<Ngrok URL>"
WEBHOOK_URL_PATH = "/{}/".format(BOT_TOKEN)

app = FastAPI()

bot = telebot.TeleBot(BOT_TOKEN)


@app.post(f'/{BOT_TOKEN}/')
def process_webhook(update: dict):
    """
    Process webhook calls
    """
    if update:
        update = telebot.types.Update.de_json(update)
        bot.process_new_updates([update])
    else:
        return



@bot.message_handler(commands=['start', 'hello'])
def send_welcome(message):
    bot.reply_to(message, f"Howdy, how are you doing? This is your chat ID: {message.chat.id}")




bot.remove_webhook()

# Set webhook
bot.set_webhook(
    url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH,

)  


if __name__ == "__main__":

    app.run(app,
    host="127.0.0.1",
    port=5000)

main.py

Мы инициализируем экземпляр Telebot, используя BOT_TOKEN. Мы определяем переменные, связанные с веб-перехватчиком:

  • WEBHOOK_HOST: IP-адрес или домен, используемый для веб-перехватчика.
  • WEBHOOK_PORT: порт для прослушивания (443, 80, 88 или 8443).
  • WEBHOOK_LISTEN: IP-адрес для прослушивания (0.0.0.0 для прослушивания со всех адресов)
  • WEBHOOK_URL_BASE: базовый URL-адрес веб-перехватчика (из URL-адреса Ngrok).
  • WEBHOOK_URL_PATH: URL-адрес для веб-перехватчика.

Мы инициализируем приложение FastAPI. Мы определяем конечную точку POST для обработки вызовов веб-перехватчиков из Telegram. Он обрабатывает JSON обновления и передает его методу process_new_updates() Telebot. Мы удаляем любой существующий веб-хук для бота. Мы устанавливаем URL-адрес веб-перехватчика, используя URL-адрес Ngrok и путь к веб-перехватчику.

Константа WEBHOOK_URL_BASE должна быть URL-адресом, сгенерированным Ngrok для получения сообщений из чата.

Мы собираемся оставить телеграмм-бота незавершенным и вернемся позже.

Теперь мы собираемся настроить сервер RabbitMQ для отправки и получения сообщений.

Настройка RabbitMQ

Нам нужно установить RabbitMQ, скачать его можно здесь. И следуйте инструкциям на веб-странице, чтобы установить его.

Теперь мы собираемся построить отправителя.

Отправитель — это программа, которая будет отправлять журналы в очередь RabbitMQ.

В папке visitor_tracker создаем новый файл send.py и устанавливаем клиент RabbitMQ.

pip install pika

отправить.py

import pika


def sender(body: dict):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='logs')

    channel.basic_publish(exchange='', routing_key='logs', body=body)
    print(" [x] Sent 'Logs'")
    connection.close()

Функция sender() принимает словарь в качестве входных данных и отправляет словарь брокеру RabbitMQ. Словарь содержит информацию об одном журнале.

Функция sender() сначала создает соединение с брокером RabbitMQ. Затем функция создает канал в соединении. Затем функция объявляет очередь с именем logs на канале. Наконец, функция публикует словарь в очередь.

Функция sender() выводит на консоль сообщение о том, что журнал отправлен. Затем функция закрывает соединение с брокером RabbitMQ.

Журналы, которые sender() отправляет в очередь, будут потребляться ботом телеграммы, и бот отправляет их в чат.

Теперь мы переходим к файлу main.py, чтобы добавить отправителя в промежуточное ПО.

main.py

from send import sender

@app.middleware("tracker")
async def tracker(request: Request, call_next):


    tracker = visitor_tracker(request)


    log = new_log(tracker["ip_address"], tracker["request_url"], tracker["request_port"],
                      tracker["request_path"], tracker["request_method"],
                      tracker["browser_type"], tracker["operating_system"],tracker["request_time"])

    sender(log)



    response = await call_next(request)
    return response

Теперь мы запускаем этот сервер. И делаем запрос к http://localhost:8000 через веб-клиент или веб-браузер.

Когда мы делаем запрос, сервер отправляет значение переменной log в очередь.

Мы можем проверить, находится ли сообщение в очереди, выполнив команду rabbitmqctl.bat list_queues в нашей командной строке.

В очереди журналов есть сообщение, ожидающее обработки.

Теперь мы переходим к боту телеграммы и создаем приемник, обработчик для получения сообщений из очереди.

Нам также нужно установить клиент RabbitMQ.

pip install pika

В файле main.py телеграм-бота создаем функцию для получения сообщений из очереди и отправки их в чат.

import pika, sys, os

...
def receiver():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='logs')

    def callback(ch, method, properties, body:dict):
        print(" [x] Received %r" % body)
        log = f"This is the body : {body}"
        bot.send_message(<CHAT ID>, log)

    channel.basic_consume(queue='logs', on_message_callback=callback, auto_ack=True)


    print(' [*] Waiting for messages.')
    channel.start_consuming()
....

@bot.message_handler(func=receiver())
def echo_all(message):
    pass #This is intentional, you may leave this function like this

Функция receiver() получает логи от брокера RabbitMQ и отправляет их в Telegram. Журналы отправляются боту Telegram.

Функция receiver() сначала создает соединение с брокером RabbitMQ. Затем функция создает канал в соединении. Затем функция объявляет очередь с именем logs на канале. Наконец, функция начинает потреблять сообщения из очереди.

Функция receiver() вызывает функцию callback() при получении сообщения. Функция callback() выводит сообщение в консоль, а затем отправляет сообщение боту Telegram.

Пользователь будет получать сообщение каждый раз, когда запрашивается API.

Полный main.py файл сервера.

from fastapi import FastAPI, Request
from datetime import datetime
from tracker import visitor_tracker
from controllers import new_log, all_logs

from send import sender

app = FastAPI()

@app.middleware("tracker")
async def tracker(request: Request, call_next):


    tracker = visitor_tracker(request)


    log = new_log(tracker["ip_address"], tracker["request_url"], tracker["request_port"],
                      tracker["request_path"], tracker["request_method"],
                      tracker["browser_type"], tracker["operating_system"],tracker["request_time"])

    sender(log)



    response = await call_next(request)
    return response


@app.get("/logs")
def logs():
    logs = all_logs()
    return logs

@app.get("/")
def index():
    return "Hello, world"

@app.get("/json")
def some_func():
    return {
        "some_json": "Some Json"
    }


if __name__ == "__main__":
    app.run(port=5000)

Заполните main.py файл телеграм-бота.

from fastapi import FastAPI, Request
from datetime import datetime
import os
import logging
import telebot
from dotenv import load_dotenv
import pika, sys, os


logger = telebot.logger
telebot.logger.setLevel(logging.INFO)


load_dotenv()

BOT_TOKEN = os.getenv('BOT_TOKEN')
WEBHOOK_HOST = '<ip/domain>'
WEBHOOK_PORT = 8443  # 443, 80, 88 or 8443 (port need to be 'open')
WEBHOOK_LISTEN = '0.0.0.0'  # In some VPS you may need to put here the IP addr
WEBHOOK_URL_BASE = "<Ngrok URL>"
WEBHOOK_URL_PATH = "/{}/".format(BOT_TOKEN)

app = FastAPI()

bot = telebot.TeleBot(BOT_TOKEN)


def receiver():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='logs')

    def callback(ch, method, properties, body:dict):
        print(" [x] Received %r" % body)
        log = f"This is the body : {body}"
        bot.send_message(1047727961, log)

    channel.basic_consume(queue='logs', on_message_callback=callback, auto_ack=True)


    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()



@app.post(f'/{BOT_TOKEN}/')
def process_webhook(update: dict):
    """
    Process webhook calls
    """
    if update:
        update = telebot.types.Update.de_json(update)
        bot.process_new_updates([update])
    else:
        return



@bot.message_handler(commands=['start', 'hello'])
def send_welcome(message):
    bot.reply_to(message, f"Howdy, how are you doing? This is your chat ID: {message.chat.id}")


@bot.message_handler(func=receiver())
def echo_all(message):
    pass #This is intentional, you may leave this function like this



bot.remove_webhook()

# Set webhook
bot.set_webhook(
    url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH,

)


if __name__ == "__main__":

    app.run(app,
    host="127.0.0.1",
    port=8000)

Запускаем оба сервера, запуская команду uvicorn main:app в разных терминалах. И мы переходим к locahost:5000/. Мы должны получить в логах телеграм-бота такое сообщение:

А если зайти в наш Telegram-чат, то мы должны получить такое сообщение:

Заключение

В этой статье мы показали вам, как отправлять журналы боту Telegram с помощью RabbitMQ и FastAPI. Мы надеемся, что эта статья была полезной, я начал писать эту статью, чтобы узнать, как использовать RabbitMQ и как я могу использовать его с FastAPI, а также попрактиковаться и немного узнать о приложениях, управляемых событиями, что является концепцией, которую я пытаюсь понять.

Даже когда эти программы выполняли то, что я имел в виду, отправляя логи в Telegram-бот. Меня не устраивает телеграм-бот, особенно обработчик, который имеет receiver() в качестве параметра и callback() отправляет сообщение в чат.

Кроме того, у него есть проблема, когда мы останавливаем бота телеграммы или перезагружаемся, сервер не запускается полностью.

Спасибо, что нашли время прочитать эту статью.

Если у вас есть рекомендации по другим пакетам, архитектурам, как улучшить мой код, мой английский или что-то в этом роде; оставьте комментарий или свяжитесь со мной через Twitter или LinkedIn.

Код трекера посетителей здесь.

Исходный код Telegram Bot находится здесь.

Ресурсы

Первоначально опубликовано на https://carlosmv.hashnode.dev.