Код KafkaConsumer, завернутый в uwsgi, работающий в Docker, похоже, ничего не делает

Я пытаюсь запустить код KafkaConsumer в uwsgi и в контейнере Docker.

Код работает вне Docker/uwsgi, но после запуска в Docker uwsgi не сообщает об ошибках (но и не лотит ни в stdin/out, ни в лог-файл).

Итак, проблема в том, что этот потребитель ничего не фиксирует в ES, а также ничего не регистрирует в контейнере. У меня нет вариантов, поэтому мне нужна помощь.

Не могу понять, почему uwsgi не пишет/выводит никакой активности. Контейнер докера не содержит журналов (я проверял), и я не могу понять, почему многопроцессорные процессы не запускаются. Я сделал конфигурацию отложенного приложения, поэтому все воркеры должны создаваться независимо. Поведение такое же, если я запускаю один или несколько процессов, и оно было таким же в режиме master / non-master, и файл /var/log/uwsgi/app/myapp_consumer.log никогда не создается.

ини конфиг:

;uWSGI instance configuration
[uwsgi]
ini = /etc/uwsgi/apps-enabled/myapp_consumer.ini
uid = www-data
gid = www-data
plugins = python3
socket = /tmp/uwsgi.sock
chdir = /opt/myapp_consumer/src
enable-threads = true
lazy-apps = true
processes = 4
threads = 2
close-on-exec = true
show-config = true
master = true
logfile = file:/var/log/uwsgi/app/myapp_consumer.log
logfile-chmod = 644
logfile-chown = www-data:www-data
wsgi-file = main.py
env = MYAPP_CONSUMER_HOME=/opt/myapp_consumer
;end of configuration

журнал uwsgi

*** Starting uWSGI 2.0.12-debian (64bit) on [Sun Feb 25 23:53:51 2018] ***
compiled with version: 5.4.0 20160609 on 31 August 2017 21:02:04
os: Linux-4.9.60-linuxkit-aufs #1 SMP Mon Nov 6 16:00:12 UTC 2017
nodename: ecf416b71ce0
machine: x86_64
clock source: unix
pcre jit disabled
detected number of CPU cores: 4
current working directory: /
detected binary path: /usr/bin/uwsgi-core
setgid() to 33
setuid() to 33
chdir() to /opt/myapp_consumer/src
your memory page size is 4096 bytes
detected max file descriptor number: 1048576
lock engine: pthread robust mutexes
thunder lock: disabled (you can enable it with --thunder-lock)
uwsgi socket 0 bound to UNIX address /tmp/uwsgi.sock fd 3
Python version: 3.5.2 (default, Nov 23 2017, 16:37:01)  [GCC 5.4.0 20160609]
Python main interpreter initialized at 0xa3be40
python threads support enabled
your server socket listen backlog is limited to 100 connections
your mercy for graceful operations on workers is 60 seconds
mapped 415360 bytes (405 KB) for 8 cores
*** Operational MODE: preforking+threaded ***
*** uWSGI is running in multiple interpreter mode ***
spawned uWSGI master process (pid: 1)
spawned uWSGI worker 1 (pid: 7, cores: 2)
spawned uWSGI worker 2 (pid: 8, cores: 2)
spawned uWSGI worker 3 (pid: 9, cores: 2)
spawned uWSGI worker 4 (pid: 10, cores: 2)
WSGI app 0 (mountpoint='') ready in 0 seconds on interpreter 0xa3be40 pid: 9 (default app)
WSGI app 0 (mountpoint='') ready in 0 seconds on interpreter 0xa3be40 pid: 7 (default app)
WSGI app 0 (mountpoint='') ready in 0 seconds on interpreter 0xa3be40 pid: 10 (default app)
WSGI app 0 (mountpoint='') ready in 0 seconds on interpreter 0xa3be40 pid: 8 (default app)

код моего приложения

Обратите внимание, что код myapp сокращен для сокрытия клиентских данных. Тем не менее, я думаю, что ключ проблемы заключается в методе применения...

# !/usr/bin/env python
import logging
import time
import multiprocessing
import requests
import json

from kafka import KafkaConsumer

class AConsumer(multiprocessing.Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
        self.stop_event.set()

    def run(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=2000)
        consumer.subscribe(['mytopic'])

        while not self.stop_event.is_set():
            for message in consumer:
                entry = json.loads(message.value)
                Transaction.commit(entry)

                if self.stop_event.is_set():
                    break

        consumer.close()

class Transaction:
    @staticmethod
    def commit(id, payload):
        sPayload = payload
        sHeaders = {'Content-Type': 'application/json'}
        sEndpoint = 'http://localhost:9200/myapp/entries/
        response = requests.post(sEndpoint, data=sPayload, headers=sHeaders)
        logging.info(response.text)


def application(env, start_response):
    start_response("200 OK", [("Content-Type", "text/plain"),
                              ("Content-Encoding", "utf-8")])
    logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
        level=logging.DEBUG
    )

    tasks = [
        AConsumer()
    ]

    for t in tasks:
        t.start()

Докерфайл

FROM ubuntu:16.04

ENV DEBIAN_FRONTEND=noninteractive \
  TERM=linux \
  MYAPP_CONSUMER_HOME=/opt/myapp_consumer

RUN apt-get update \
  && apt-get upgrade -y \
  && apt-get install -y \
  apt-utils \
  build-essential \
  git \
  python3 \
  python3-dev \
  python3-setuptools \
  python3-pip \
  uwsgi \
  uwsgi-plugin-python3 \
  && apt-get autoremove \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/*

RUN pip3 install --upgrade pip
RUN pip3 install requests
RUN pip3 install xlwt
RUN pip3 install xlrd
RUN pip3 install kafka-python
RUN pip3 install -U uwsgi


# Copy our source files
COPY src ${MYAPP_CONSUMER_HOME}/src
COPY config ${MYAPP_CONSUMER_HOME}/config

RUN ln -s ${MYAPP_CONSUMER_HOME}/config/myapp_consumer.ini /etc/uwsgi/apps-enabled/
RUN chown -R www-data:www-data ${MYAPP_CONSUMER_HOME}/src


CMD ["/usr/bin/uwsgi", "--ini", "/etc/uwsgi/apps-enabled/myapp_consumer.ini"]

person Dex    schedule 26.02.2018    source источник
comment
При таком подходе я выявил две проблемы: я установил uwsgi как через pip3, так и через apt-get, и я думаю, что у меня несоответствие версии Python. Когда я использовал образ Docker uwsgi-nginx-flask:python3.6, проблема исчезла, поэтому я подозреваю, что проблема связана с версией Python. Тем не менее, я ищу помощь в том, как именно отладить это, чтобы быть действительно уверенным в первопричине.   -  person Dex    schedule 12.03.2018


Ответы (1)


У меня была похожая проблема, изменение типа bootstrap_server на список помогло в моем случае. Итак, ваш kafkaConsumer может выглядеть так:

consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
                             auto_offset_reset='earliest',
                             consumer_timeout_ms=2000)
person Mayank Gupta    schedule 04.04.2018
comment
Спасибо, что поделился! Однако я думаю, что с uwsgi происходит что-то подозрительное: приложение на самом деле не запускается. Мне не удалось найти в Интернете какие-либо следы того, как правильно настроить вызываемое приложение. Поэтому, если я использую Flask и говорю app = Flask(name), то все в порядке. Но - то, что происходит именно за кулисами, и есть ответ на мой вопрос. - person Dex; 05.04.2018