Каков хороший подход к управлению подключением к базе данных в приложении Google Cloud SQL (GAE) Python?

Я только изучаю Google App Engine и пытаюсь найти хороший подход к управлению подключением моей базы данных к экземпляру Google Cloud SQL (если вы не использовали GC-SQL, в основном это MySQL в облаке с несколькими ограничения).

Я использую среду GAE python (2.7) с инфраструктурой webapp2 для обработки запросов. Я знаю, что в FAQ говорится, что рекомендуется устанавливать новое соединение с БД с каждым запросом, но я не знаю, каков рекомендуемый способ закрытия соединения. Каждый раз, когда я пытаюсь удалить таблицы во время разработки, GC-SQL зависает, и список процессов показывает, что есть куча процессов (вероятно, потому что я не закрываю БД) и что один из них ожидает блокировки (вероятно, процесс пытается удалить таблицы). Это раздражает и вынуждает меня перезапускать экземпляр GC-SQL (я думаю, что это похоже на перезапуск службы mysql-сервера). Также иногда случаются сбои в работе БД, которые, как мне кажется, связаны с тем, что я на самом деле не закрываю соединение с БД.

Итак, например, должен ли я иметь деструктор в моем экземпляре подкласса webapp2.Requesthandler для отключения от БД? Объекты GAE иногда кэшируются, так что это тоже нужно учитывать. Я полагаю, что мог бы просто подключаться/запрашивать/отключаться для каждого запроса, но это кажется неоптимальным.

Я знаю, что это расплывчатый вопрос, но я надеюсь, что кто-то, кто играл в этой области, может дать мне несколько советов.

Заранее спасибо!

Обновление: я попытался реализовать оболочку для методов, которым нужен курсор, используя ответ Шэя в качестве отправной точки. Я получаю ошибки GAE. Вот новый вопрос, относящийся к этому: Каковы ограничения на количество подключений для Google Cloud SQL из App Engine и как лучше всего повторно использовать подключения к БД?< /а>


person JJC    schedule 15.04.2012    source источник


Ответы (4)


Я не знаком с Google Cloud SQL, но не могли бы вы использовать промежуточное ПО WSGI для открытия и закрытия соединения?

person Guido van Rossum    schedule 15.04.2012
comment
Спасибо, Гвидо. Я унижен. К сожалению, webapp2, который является инфраструктурой WSGI, которую я использую в GAE, похоже, не включает оболочку DB API. Я слишком глубоко погрузился в свой проект и у меня мало времени, чтобы реорганизовать его для другого фреймворка. Итак, я застрял, управляя соединением с БД вручную. Любые другие подсказки? :-) Еще раз спасибо за ваше время. - person JJC; 15.04.2012
comment
Я не уверен, что понимаю - мое предложение состоит в том, чтобы вы сами написали крошечный фрагмент промежуточного программного обеспечения WSGI, который открывает и закрывает соединение с БД. Это все еще для меня вручную (поскольку вы пишете код) и не кажется несовместимым с webapp2, который, как вы говорите, совместим с WSGI. Что мне не хватает? (OTOH декоратор тоже кажется хорошей идеей.) - person Guido van Rossum; 15.04.2012
comment
Ааа. Извиняюсь. Я совершенно новичок в веб-разработке с Python и теперь понимаю, что раньше не понимал вашего ответа. :-) Теперь, когда я посмотрел и понял, что промежуточное ПО WSGI != веб-фреймворк совместим с WSGI, это имеет большой смысл. Фильтр/промежуточное программное обеспечение WSGI, которое оборачивает webapp2.RequestHandler для подключения, а затем отключается от БД, как вы предлагаете, имеет смысл и также соответствует духу ответа Шэя. Спасибо! - person JJC; 15.04.2012

Вот полный пример приложения helloworld из Руководства по началу работы. Он основан на фрагментах из Шая Эрлихмена и JJC, но эта версия является потокобезопасной.

Вы можете использовать его следующим образом:

  @with_db_cursor(do_commit = True)
  def get(self, cursor):
        cursor.execute('SELECT guestName, content, entryID FROM entries')

приложение.yaml

application: helloworld
version: 1
runtime: python27
api_version: 1
threadsafe: true

handlers:
- url: /.*
  script: helloworld.app

привет мир.py

import cgi
import logging
import os
import threading
import webapp2

from google.appengine.api import rdbms

_INSTANCE_NAME = <name goes here>

def _db_connect():
  return rdbms.connect(instance=_INSTANCE_NAME, database='guestbook')

_mydata = threading.local()

def with_db_cursor(do_commit = False):
  """ Decorator for managing DB connection by wrapping around web calls.

  Stores connections and open cursor count in a threadlocal
  between calls.  Sets a cursor variable in the wrapped function. Optionally
  does a commit.  Closes the cursor when wrapped method returns, and closes
  the DB connection if there are no outstanding cursors.

  If the wrapped method has a keyword argument 'existing_cursor', whose value
  is non-False, this wrapper is bypassed, as it is assumed another cursor is
  already in force because of an alternate call stack.
  """
  def method_wrap(method):
    def wrap(self, *args, **kwargs):
      if kwargs.get('existing_cursor', False):
        # Bypass everything if method called with existing open cursor.
        return method(self, None, *args, **kwargs)

      if not hasattr(_mydata, 'conn') or not _mydata.conn:
        _mydata.conn = _db_connect()
        _mydata.ref = 0
        _mydata.commit = False

      conn = _mydata.conn
      _mydata.ref = _mydata.ref + 1

      try:
        cursor = conn.cursor()
        try:
          result = method(self, cursor, *args, **kwargs)
          if do_commit or _mydata.commit:
            _mydata.commit = False
            conn.commit()
          return result
        finally:
          cursor.close()
      finally:
        _mydata.ref = _mydata.ref - 1
        if _mydata.ref == 0:
          _mydata.conn = None
          logging.info('Closing conn')
          conn.close()
    return wrap
  return method_wrap


class MainPage(webapp2.RequestHandler):
  @with_db_cursor(do_commit = True)
  def get(self, cursor):
        cursor.execute('SELECT guestName, content, entryID FROM entries')
        self.response.out.write("""
          <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
          <html xmlns="http://www.w3.org/1999/xhtml" lang="en" xml:lang="en">
            <head>
               <title>My Guestbook!</title>
            </head>
            <body>""")
        self.response.out.write("""
              <table style="border: 1px solid black">
                <tbody>
                  <tr>
                    <th width="35%" style="background-color: #CCFFCC; margin: 5px">Name</th>
                    <th style="background-color: #CCFFCC; margin: 5px">Message</th>
                    <th style="background-color: #CCFFCC; margin: 5px">ID</th>
                  </tr>""")
        for row in cursor.fetchall():
          self.response.out.write('<tr><td>')
          self.response.out.write(cgi.escape(row[0]))
          self.response.out.write('</td><td>')
          self.response.out.write(cgi.escape(row[1]))
          self.response.out.write('</td><td>')
          self.response.out.write(row[2])
          self.response.out.write('</td></tr>')

        self.response.out.write("""
          </tbody>
            </table>
              <br /> No more messages!
              <br /><strong>Sign the guestbook!</strong>
              <form action="/sign" method="post">
              <div>First Name: <input type="text" name="fname" style="border: 1px solid black"></div>
              <div>Message: <br /><textarea name="content" rows="3" cols="60"></textarea></div>
              <div><input type="submit" value="Sign Guestbook"></div>
            </form>
          </body>
        </html>""")

class Guestbook(webapp2.RequestHandler):
  @with_db_cursor(do_commit = True)
  def post(self, cursor):
    fname = self.request.get('fname')
    content = self.request.get('content')
    # Note that the only format string supported is %s
    cursor.execute('INSERT INTO entries (guestName, content) VALUES (%s, %s)', (fname, content))

    self.redirect("/")

app = webapp2.WSGIApplication(
    [('/', MainPage),
     ('/sign', Guestbook)],
    debug=True)
person Ken Ashcraft    schedule 03.05.2012
comment
Превосходно! Это очень поможет будущим читателям. Спасибо! Тем не менее, один, вероятно, глупый вопрос MySQL (ошибаюсь, Google Cloud SQL). Если в приведенном выше коде есть несколько открытых курсоров для данного соединения, и один из курсоров был создан с do_commit=True, а другие — нет, не приведет ли одна фиксация к фиксации всех изменений других курсоров, делает недействительным набор do_commit=False для остальных? Может, мне стоит спросить об этом Шэя, но я подумал, что сначала спрошу тебя. Спасибо! - person JJC; 04.05.2012
comment
Я думаю ты прав. Я не знаю, имеет ли смысл одновременно активировать несколько курсоров (и вообще работает ли это). Подсчет ссылок в основном нужен для того, чтобы выяснить, когда закрывать соединение. Наверное, лучше сделать что-то более явное. - person Ken Ashcraft; 04.05.2012
comment
Я могу подтвердить, что этот метод отлично работает для моего гораздо более сложного приложения GAE с Google Cloud SQL. Еще раз спасибо Кен! - person JJC; 05.05.2012
comment
Кен. Я разделяю соединение между потоками в порядке реализации Google интерфейса MySQL из Python? В документации MySQLdb говорится, что соединения НЕ должны быть разделены между потоками. У меня снова проблемы, и мне интересно, может ли это быть причиной. - person JJC; 08.05.2012
comment
Шей, os.environ не позволяет нескольким потокам использовать уникальные соединения. Все они будут использовать одно и то же соединение, но соединения не являются потокобезопасными. - person Ken Ashcraft; 14.05.2012
comment
Из тестов видно, что каждый запрос выполняется в своем собственном совершенно новом потоке — со своим собственным thread.local() — таким образом, запросы никогда не используют повторно одно и то же соединение, и все запросы открывают и закрывают соединение с БД. - person Ezequiel Muns; 16.08.2013

Я написал декоратор для обработки SQL-соединения, не стесняйтесь флеймить :)

# Here is how you use the decorator from below
# the open, commit, and close is done by the decorator 
@need_cursor(do_commit = True)
def get(self, cursor, request): # cursor param is added by the decorator
    execute_sql(cursor, sql)

def need_cursor(do_commit = False):
    def method_wrap(method):
        def wrap(*args, **kwargs):
            conn = os.environ.get("__data_conn")

            # Recycling connection for the current request
            # For some reason threading.local() didn't worked
            # and yes os.environ suppose to be thread safe 
            if not conn:                
                conn = create_connection() # You need to implement this
                os.environ["__data_conn"] = conn
                os.environ["__data_conn_ref"] = 1
            else:
                os.environ["__data_conn_ref"] = 
                    os.environ["__data_conn_ref"] + 1

            try:
                cursor = conn.cursor()
                try:
                    result = method(cursor, *args, **kwargs)

                    if do_commit or os.environ.get("__data_conn_commit"):
                        os.environ["__data_conn_commit"] = False
                        conn.commit()

                    return result                    
                finally:
                    cursor.close()                
            finally:
                os.environ["__data_conn_ref"] = 
                    os.environ["__data_conn_ref"] - 1
                if os.environ["__data_conn_ref"] == 0:
                    os.environ["__data_conn"] = None
                    conn.close()        

        return wrap

    return method_wrap 
person Shay Erlichmen    schedule 15.04.2012
comment
Интересно. Я не знал, что в среду GAE можно писать. Есть ли какая-то конкретная причина, по которой вы реализовали это с помощью декоратора, а не просто прямым методом? Спасибо! - person JJC; 15.04.2012
comment
@JJA С декораторами проще делать АОП - person Shay Erlichmen; 15.04.2012
comment
Да, я раньше не слышал об АОП, но в данном случае стоит помнить об этом. Безусловно, обслуживание соединения с БД является сквозным аспектом. Я почувствовал это, когда работал над своим кодом, и мне нужно было поговорить с соединением с БД из нескольких модулей. У меня просто не было названия для этого раздражения до сих пор. Спасибо! :-) - person JJC; 15.04.2012
comment
Вот ссылка на статью об АОП для будущих читателей этого вопроса: /Аспектно-ориентированное_программирование - person JJC; 15.04.2012
comment
Привет @Shay. Я попытался реализовать ваше предложение и получаю ApplicationError 1033. Экземпляр имеет слишком много одновременных запросов. после запуска пары сотен тестовых клиентов моего приложения. Вы когда-нибудь сталкивались с этим? Не могу найти хиты в сети. См. мой вопрос об этом: stackoverflow.com/q/10424442/379037 Еще раз спасибо за ваше предложение, я все еще считаю, что это Неплохо. Просто чтобы уточнить, пробовали ли вы это в Google Cloud SQL через приложение GAE? Спасибо. - person JJC; 03.05.2012
comment
@JJC Я написал это для использования в облачном SQL и использую его постоянно. Вы проверили, что соединения действительно закрываются? - person Shay Erlichmen; 03.05.2012
comment
Да, я пробовал пинговать соединение до и после закрытия. До этого работает, а после выходит из строя. Любые другие идеи? Спасибо! - person JJC; 03.05.2012
comment
Я обновил новый вопрос своим кодом. Пожалуйста, посмотрите: stackoverflow.com/q/10424442/379037 - person JJC; 03.05.2012
comment
Шей, как ты относишься к тому факту, что фиксация одного курсора (или откат) повлияет на другие открытые курсоры? Похоже, что MySQLdb (и, как я предполагаю, GCSQL) не поддерживает несколько курсоров на одно соединение (разрешает это, но приводит к проблемам согласованности данных). - person JJC; 09.05.2012
comment
@JJC для меня не проблема, потому что все вставки выполняются из одного места, вы можете отменить кэширование соединения, и оно будет работать для вас. - person Shay Erlichmen; 09.05.2012
comment
@JCC еще одна вещь, которую вы можете сделать, это не использовать совместное использование соединения, если do_commit имеет значение true. - person Shay Erlichmen; 09.05.2012
comment
Спасибо. Это проблема, о которой я не думал (т.е. я предполагал, что каждый курсор является отдельной транзакцией), и в итоге это вызвало у меня проблемы. Мое приложение интенсивно записывает, поэтому почти все курсоры выполняют вставки/обновления. Итак, я думаю, что мне придется вообще избегать совместного использования соединения. :-( - person JJC; 09.05.2012
comment
Без проблем. Вы можете поблагодарить меня, проголосовав больше за мои комментарии и вопросы. :-П - person JJC; 09.05.2012

Это мой подход, учитывающий возможные исключения. Я использую этот подход в производственной среде и работает хорошо:


def _create_connection(schema):

    if (os.getenv('SERVER_SOFTWARE') and
        os.getenv('SERVER_SOFTWARE').startswith('Google App Engine/')):
        socket = '/cloudsql/%s' % env.DB_INSTANCE_NAME
        return MySQLdb.connect(unix_socket=socket, user=env.DB_APP_USER,
                               passwd=env.DB_APP_PASS, db=schema)
    else:
        return MySQLdb.connect(host='127.0.0.1', port=3306,
                               user=env.DB_APP_USER, passwd=env.DB_APP_PASS,
                               db=schema)


def with_db(commit=False, schema=env.DB_SCHEMA_NAME):

    def method_wrap(method):
        @functools.wraps(method)
        def wrap(self, *args, **kwds):
            # If needed,a connection pool can be added here.
            connection = _create_connection(schema)

            try:
                cur = connection.cursor()
                self.cur = cur
                self.conn = connection

                result = method(self, *args, **kwds)

                if commit:
                    connection.commit()

            except OperationalError as e:

                logging.error('Operational error.\r\nSQL exception: {},\r\n'
                              'Last Query: {}'.format(e, cur._last_executed))

                if commit and connection.open:
                    connection.rollback()
                raise

            except MySQLError as e:

                try:
                    warns = self.conn.show_warnings()
                    error = self.conn.error()
                except:
                    warns = ""
                    error = ""

                logging.error('Try to rolling back transaction.\r\nSQL exception: {},\r\n'
                              'Last Query: {},\r\nConn warn: {},\r\nError: {}'
                              .format(e, cur._last_executed, warns, error))


                if commit and connection.open:
                    connection.rollback()
                raise

            except Exception as e:
                logging.error('Try to rolling back transaction. Non SQL exception: {0}'.format(e))

                if commit and connection.open:
                    connection.rollback()
                raise

            finally:
                connection.close()

            return result
        return wrap
    return method_wrap

Вы можете использовать его следующим образом:


@with_db(commit=True)
def update_user_phone(self, user, phone):
    self.cur.execute(_SQL_UPDATE_USER_PHONE, (phone, user.id))

    # add or replace existing user to cache
    user.phone = phone
    self._update_user_cache(user)
person Nico    schedule 30.09.2014