Twitter, несколько процессов и база данных

Я новичок, пишу небольшой твиттер-инструмент для запланированных твитов и автоматических ретвитов в python/flask.

Я застрял с проблемами процессов, работающих в фоновом режиме.

Я хочу, чтобы запланированные твиты и ретвиты работали одновременно в фоновом режиме для данного пользователя.

Я хочу иметь возможность завершать эти фоновые процессы, выполняющие ретвиты/запланированные твиты, отдельно друг от друга.

Как бы вы изменили приведенный ниже код, чтобы добиться этого?

Если вы сейчас посмотрите на приведенный ниже код, он работает, но пользователь не может запускать запланированные твиты и ретвиты одновременно. Кроме того, если пользователь решает завершить один из процессов, скажем, ретвитит, другой процесс также завершается (запланированные твиты) и наоборот.

Я думал о том, чтобы помещать идентификационные данные для данного процесса в базу данных и вызывать эти идентификационные данные из базы данных, когда возникает необходимость его прекратить, вместо использования сеанса cookie, но я не знаю, как реализовать эту идею в коде.

import ........

mysql = MySQL()
app = Flask(__name__)
app.secret_key = 'xxx'

app.config['MYSQL_DATABASE_USER'] = 'xxx'
app.config['MYSQL_DATABASE_PASSWORD'] = 'xxx'
app.config['MYSQL_DATABASE_DB'] = 'xxx'
app.config['MYSQL_DATABASE_HOST'] = '0.0.0.0'
mysql.init_app(app)

@app.route('/showSignin')
def showSignin():
   if session.get('user'):
       return redirect('/userHome')
   else:
       return render_template('signin.html')

@app.route('/showscheduletweets')

def showscheduletweets():

     if session.get('user'):
      return render_template('scheduletweets.html')
    else:
       return render_template('signin.html')

     @app.route('/validateLogin',methods=['POST'])
def validateLogin():
   try:
    _username = request.form['inputEmail']
    _password = request.form['inputPassword']

    # connect to mysql

    con = mysql.connect()
    cursor = con.cursor()
    cursor.callproc('sp_validateLogin',(_username,))
    data = cursor.fetchall()

    if len(data) > 0:
        if check_password_hash(str(data[0][3]),_password):
            session['user'] = data[0][0]
            consumerkey = data [0][4]
            consumersecret = data [0][5]
            accesstoken = data [0][6]
            tokensecret = data [0][7]
            twitter = Twython(consumerkey, consumersecret, accesstoken, tokensecret)
            twitter.update_status(status="xxx says hello.")
            return render_template('userHome.html')
        else:
            return render_template('error.html',error = 'Wrong Email address or Password.')
    else:
        return render_template('error.html',error = 'Wrong Email address or Password.')

except Exception as e:
    return render_template('error.html',error = str(e))
finally:
    cursor.close()
    con.close()

#schedule tweets

@app.route('/scheduletweets',methods=['POST'])

def scheduletweets():
    if session.get('user'):
    _username = request.form['inputEmail']
    con = mysql.connect()
    cursor = con.cursor()
    cursor.callproc('sp_GetTwitter', (_username,))
    data = cursor.fetchall()

    session['user'] = data[0][0]
    consumerkey = data [0][4]
    consumersecret = data [0][5]
    accesstoken = data [0][6]
    tokensecret = data [0][7]
    twitter = Twython(consumerkey, consumersecret, accesstoken, tokensecret)

    tweet1 = request.form['inputTweet1']
    tweet2 = request.form['inputTweet2']
    tweet3 = request.form['inputTweet3']
    tweet4 = request.form['inputTweet4']
    tweet5 = request.form['inputTweet5']
    tweet6 = request.form['inputTweet6']

    Hash1 = request.form['inputHash1']
    Hash2 = request.form['inputHash2']
    Hash3 = request.form['inputHash3']
    Hash4 = request.form['inputHash4']

    fruits = [Hash1, Hash2, Hash3, Hash4]



    list = [tweet1, tweet2, tweet3, tweet4, tweet5, tweet6]
    def workit():

     while True:
        try:
            if len(list) > 0:
                z = random.randint(1, len(fruits))
                a = random.sample(fruits, z)


                b=" ".join(str(x) for x in a)
                toTweet = list[random.randint(0,len(list))-1] + " " + b

                twitter.update_status(status=toTweet)
                time.sleep(10)


            else:
                twitter.update_status(status="Oh dear... I'm afraid I'm rather empty =(")
                break
        except TwythonError as e:
            print (e)


    if 'work_process' not in session:
     process = Process(target=workit)
     process.start()
     pid = process.pid
     parent_pid = psutil.Process(process.pid).parent().pid
     session['work_process'] = (parent_pid, pid)
    return redirect('/showscheduletweets')
     #retweets
     @app.route('/retweet',methods=['POST'])
def retweet():
   if session.get('user'):

    _username = request.form['inputEmail']
    con = mysql.connect()
    cursor = con.cursor()
    cursor.callproc('sp_GetTwitter', (_username,))
    data = cursor.fetchall()

    session['user'] = data[0][0]
    consumerkey = data [0][4]
    consumersecret = data [0][5]
    accesstoken = data [0][6]
    tokensecret = data [0][7]


    Retweet1 = request.form['inputRetweet1']
    Retweet2 = request.form['inputRetweet2']
    Retweet3 = request.form['inputRetweet3']
    Retweet4 = request.form['inputRetweet4']
    Exclude1 = request.form['inputExclude1']
    Exclude2 = request.form['inputExclude2']




    def work():
     twitter = Twython(consumerkey, consumersecret, accesstoken, tokensecret)
     naughty_words = [Exclude1, Exclude2]
     good_words = [Retweet1, Retweet2, Retweet3, Retweet4]
     filter = " OR ".join(good_words)
     blacklist = " -".join(naughty_words)
     keywords = filter +" -"+ blacklist
     print(keywords)
     while True:
        search_results = twitter.search(q=keywords, count=10)
        try:
            for tweet in search_results["statuses"]:
                try:
                    twitter.retweet(id = tweet["id_str"])
                    time.sleep(60)
                except TwythonError as e:
                                            print (e)
        except TwythonError as e:
                                    print (e)

    if 'work_process' not in session:
     process = Process(target=work)
     process.start()
     pid = process.pid
     parent_pid = psutil.Process(process.pid).parent().pid
     session['work_process'] = (parent_pid, pid)
    return redirect('/showretweet')

       #terminating scheduled tweets and retweets
      @app.route('/stoptweet', methods=['POST'])
  def stoptweet():
    if 'work_process' in session:
    parent_pid, pid = session['work_process']
    try:
        process = psutil.Process(pid)
        if process.parent().pid == parent_pid:
            process.terminate()
    except psutil.NoSuchProcess:
        pass
    session.pop('work_process')
    return render_template('index.html')
else:
    return render_template('index.html')

 if __name__ == '__main__':
  app.run(host=os.getenv('IP', '0.0.0.0'),port=int(os.getenv('PORT', xxx)))

person Lucas    schedule 05.12.2016    source источник


Ответы (1)


Вы можете использовать модуль celery python и перемещать твиты и ретвиты по расписанию в фоновом режиме.

Для получения дополнительной информации см. документ: http://flask.pocoo.org/docs/0.11/patterns/celery/

Вы украсите те функции, которые связаны с сельдереем, а не с флягой.

Например:

В вашем скрипте:

import my_schedule_module

а затем в my_schedule_module.py:

from celery import Celery, Task
from celery.result import AsyncResult

from celery.task.base import periodic_task

import sqlite3 # Here I use sqlite, can be sql
import redis # Here I am using redis, you can use another db as well > check documentation

from datetime import timedelta # used to schedule your background jobs, see in configuration below


app_schedule = Celery('my_schedule_module')


'''
Celery Configuration
'''

# a mockup configuration of your background jobs, as example use retweet each 60s
app_schedule.conf.update(
    CELERY_ACCEPT_CONTENT = ['application/json'],
    CELERY_TASK_SERIALIZER='json',
    # CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    # CELERY_TIMEZONE='Europe/Oslo',
    # CELERY_ENABLE_UTC=True,
    CELERYD_TASK_TIME_LIMIT = 600,
    CELERYD_TASK_SOFT_TIME_LIMIT = 600,
    CELERYD_MAX_TASKS_PER_CHILD = 1000,
    CELERYD_OPTS="--time-limit=600 --concurrency=4",
    BROKER_URL = 'redis://localhost:6379/0',
    CELERY_RESULT_BACKEND = 'redis://localhost',
    CELERYBEAT_SCHEDULE = {
        'add-every-60-seconds': {
        'task': 'my_schedule_module.retweet',
        'schedule': timedelta(seconds=60)
        },
    }
)

@app_schedule.task()
def retweet(tweet):
     # your tweet function

@app_schedule.task()
def scheduletweets():
     # your background job
     # pseudo code
     tweets = get_tweets()
     process_tweet_list = []
     for tweet in tweets:
          process_tweet_list.append( retweet.s(tweet) ) 
     job = group(process_tweet_list)  #group is celery.group, see documentation
     result = job.apply_async() # process job list async
     print 'result', result.ready(), result.successful()

Вы также можете использовать функции обратного вызова - например, вы можете обновить дату и время в своей базе данных, когда ваш твит был ретвитнут.

В этом случае у вас будет такой синтаксис:

    result = my_schedule_module.retweet.apply_async( (tweet,) , link=my_schedule_module.callback_to_store_results_of_retweet.s())
person user305883    schedule 05.12.2016
comment
Спасибо за это, но я пробовал сельдерей раньше. У меня возникла проблема с контекстом запроса: RuntimeError: Работа вне контекста запроса. Обычно это означает, что вы пытались использовать функции, требующие активного HTTP-запроса. - person Lucas; 05.12.2016
comment
Далее, что это означает на практике? CELERYD_TASK_TIME_LIMIT = 600, CELERYD_TASK_SOFT_TIME_LIMIT = 600, CELERYD_MAX_TASKS_PER_CHILD = 1000, CELERYD_OPTS=--time-limit=600 --concurrency=4, - этот процесс ретвита может длиться максимум 600 секунд, а затем завершается? - person Lucas; 05.12.2016
comment
Что касается моего первого комментария о контексте запроса - после того, как все твиты, которые нужно запланировать, или слова/хастэги, которые нужно ретвитнуть, исходят от пользователя, вводящего их в html-форму. Я бы использовал сельдерей, но я не знаю, как применить сельдерей с контекстом html-формы. - person Lucas; 05.12.2016
comment
Celery — правильный подход к фоновым задачам. stackoverflow.com/a/32890603/305883 для запуска приложения вне контекста - проверьте, как вы импортируете модули: я поставил celery_app в отдельный модуль и импортировать его в Flask_app. Вы можете создавать пользовательские вводы в форме html, сохранять данные в mysql и асинхронно обрабатывать их. Используйте вторую базу данных для обработки очереди (например, Redis). Здесь вы найдете документы для настройки: docs.celeryproject.org/en/3.1/configuration.html . Параллелизм — это максимальное количество рабочих процессов, которое необходимо настроить (например, если вы хотите распараллелить задания). - person user305883; 05.12.2016