Как сериализовать вывод курсора pyodbc (из .fetchone
, .fetchmany
или .fetchall
) в виде словаря Python?
Я использую Bottlepy, и мне нужно вернуть dict, чтобы он мог вернуть его как JSON.
Как сериализовать вывод курсора pyodbc (из .fetchone
, .fetchmany
или .fetchall
) в виде словаря Python?
Я использую Bottlepy, и мне нужно вернуть dict, чтобы он мог вернуть его как JSON.
Если вы не знаете столбцы заранее, используйте Cursor.description для создания список имен столбцов и zip с каждой строкой, чтобы создать список словари. В примере предполагается, что соединение и запрос построены:
>>> cursor = connection.cursor().execute(sql)
>>> columns = [column[0] for column in cursor.description]
>>> print(columns)
['name', 'create_date']
>>> results = []
>>> for row in cursor.fetchall():
... results.append(dict(zip(columns, row)))
...
>>> print(results)
[{'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'master'},
{'create_date': datetime.datetime(2013, 1, 30, 12, 31, 40, 340000), 'name': u'tempdb'},
{'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'model'},
{'create_date': datetime.datetime(2010, 4, 2, 17, 35, 8, 970000), 'name': u'msdb'}]
cursor.description
. это просто сэкономило мне кучу времени.
- person TehTris; 19.03.2015
Decimal('1000.000')
, так как теперь я получаю сообщение об ошибке — Object of type Decimal is not JSON serializable
- person Loser Coder; 13.09.2018
Используя результат @Beargle с Bottlepy, я смог создать этот очень краткий запрос, раскрывающий конечную точку:
@route('/api/query/<query_str>')
def query(query_str):
cursor.execute(query_str)
return {'results':
[dict(zip([column[0] for column in cursor.description], row))
for row in cursor.fetchall()]}
Вот краткая версия, которую вы можете использовать
>>> cursor.select("<your SQL here>")
>>> single_row = dict(zip(zip(*cursor.description)[0], cursor.fetchone()))
>>> multiple_rows = [dict(zip(zip(*cursor.description)[0], row)) for row in cursor.fetchall()]
Как вы, возможно, знаете, когда вы добавляете * к списку, вы фактически удаляете список, оставляя отдельные записи списка в качестве параметров вызываемой функции. Используя zip, мы выбираем запись с 1-го по n и застегиваем их вместе, как молнию в штанах.
поэтому, используя
zip(*[(a,1,2),(b,1,2)])
# interpreted by python as zip((a,1,2),(b,1,2))
ты получаешь
[('a', 'b'), (1, 1), (2, 2)]
Поскольку описание представляет собой кортеж с кортежами, где каждый кортеж описывает заголовок и тип данных для каждого столбца, вы можете извлечь первый из каждого кортежа с помощью
>>> columns = zip(*cursor.description)[0]
эквивалентно
>>> columns = [column[0] for column in cursor.description]
TypeError: 'zip' object is not subscriptable
, поэтому я не могу использовать трюк zip(*description)[0]
.
- person malat; 19.10.2015
columns
, но умножили сложность функции, вычислив имена столбцов для каждой строки отдельно
- person Sergey Nudnov; 09.03.2020
В ситуациях, когда курсор недоступен — например, когда строки были возвращены вызовом какой-либо функции или внутренним методом, вы все равно можете создать представление словаря, используя row.cursor_description.
def row_to_dict(row):
return dict(zip([t[0] for t in row.cursor_description], row))
В основном, исходя из ответа @Torxed, я создал полный обобщенный набор функций для поиска схемы и данных в словаре:
def schema_dict(cursor):
cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
schema = {}
for it in cursor.fetchall():
if it[0] not in schema:
schema[it[0]]={'scheme':[]}
else:
schema[it[0]]['scheme'].append(it[1])
return schema
def populate_dict(cursor, schema):
for i in schema.keys():
cursor.execute("select * from {table};".format(table=i))
for row in cursor.fetchall():
colindex = 0
for col in schema[i]['scheme']:
if not 'data' in schema[i]:
schema[i]['data']=[]
schema[i]['data'].append(row[colindex])
colindex += 1
return schema
def database_to_dict():
cursor = connect()
schema = populate_dict(cursor, schema_dict(cursor))
Не стесняйтесь использовать код-гольф, чтобы уменьшить количество строк; а между тем это работает!
;)
Мне нравятся ответы @bryan и @foo-stack. Если вы работаете с postgresql и используете psycopg2
, вам могут пригодиться некоторые полезности от psycopg2. чтобы добиться того же, указав cursorfactory как DictCursor
при создании курсора из соединения, например:
cur = conn.cursor( cursor_factory=psycopg2.extras.DictCursor )
Итак, теперь вы можете выполнить свой sql-запрос и получить словарь для получения результатов без необходимости сопоставлять их вручную.
cur.execute( sql_query )
results = cur.fetchall()
for row in results:
print row['row_no']
Обратите внимание, что для этого вам потребуется import psycopg2.extras
.
Что мне нужно, что немного отличается от того, о чем просил ОП:
Если вы хотите полностью обобщить процедуру, выполняющую запросы SQL Select, но вам нужно ссылаться на результаты по номеру индекса, а не по имени, вы может сделать это со списком списков вместо словаря.
Каждая строка возвращаемых данных представлена в возвращаемом списке в виде списка значений полей (столбцов).
Имена столбцов могут быть предоставлены в качестве первой записи возвращаемого списка, поэтому синтаксический анализ возвращенного списка в вызывающей процедуре может быть очень простым и гибким.
Таким образом, подпрограмме, выполняющей вызов базы данных, не нужно ничего знать о данных, с которыми она обрабатывается. Вот такая рутина:
def read_DB_Records(self, tablename, fieldlist, wherefield, wherevalue) -> list:
DBfile = 'C:/DATA/MyDatabase.accdb'
# this connection string is for Access 2007, 2010 or later .accdb files
conn = pyodbc.connect(r'Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ='+DBfile)
cursor = conn.cursor()
# Build the SQL Query string using the passed-in field list:
SQL = "SELECT "
for i in range(0, len(fieldlist)):
SQL = SQL + "[" + fieldlist[i] + "]"
if i < (len(fieldlist)-1):
SQL = SQL + ", "
SQL = SQL + " FROM " + tablename
# Support an optional WHERE clause:
if wherefield != "" and wherevalue != "" :
SQL = SQL + " WHERE [" + wherefield + "] = " + "'" + wherevalue + "';"
results = [] # Create the results list object
cursor.execute(SQL) # Execute the Query
# (Optional) Get a list of the column names returned from the query:
columns = [column[0] for column in cursor.description]
results.append(columns) # append the column names to the return list
# Now add each row as a list of column data to the results list
for row in cursor.fetchall(): # iterate over the cursor
results.append(list(row)) # add the row as a list to the list of lists
cursor.close() # close the cursor
conn.close() # close the DB connection
return results # return the list of lists
Предполагая, что вы знаете имена столбцов! Кроме того, вот три разных решения,
вы, вероятно, захотите взглянуть на последнее!
colnames = ['city', 'area', 'street']
data = {}
counter = 0
for row in x.fetchall():
if not counter in data:
data[counter] = {}
colcounter = 0
for colname in colnames:
data[counter][colname] = row[colcounter]
colcounter += 1
counter += 1
Это проиндексированная версия, не самое красивое решение, но оно будет работать. Другим было бы индексировать имя столбца как ключ словаря со списком в каждом ключе, содержащем данные в порядке номера строки. при выполнении:
colnames = ['city', 'area', 'street']
data = {}
for row in x.fetchall():
colindex = 0
for col in colnames:
if not col in data:
data[col] = []
data[col].append(row[colindex])
colindex += 1
Написав это, я понимаю, что выполнение for col in colnames
можно было бы заменить на for colindex in range(0, len())
, но вы поняли. Более поздний пример был бы полезен, когда извлекаются не все данные, а по одной строке за раз, например:
def fetchone_dict(stuff):
colnames = ['city', 'area', 'street']
data = {}
for colindex in range(0, colnames):
data[colnames[colindex]] = stuff[colindex]
return data
row = x.fetchone()
print fetchone_dict(row)['city']
Получение имен таблиц (я думаю... благодаря Foo Stack):
более прямое решение от Beargle ниже!
cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
schema = {}
for it in cursor.fetchall():
if it[0] in schema:
schema[it[0]].append(it[1])
else:
schema[it[0]] = [it[1]]
Я знаю, что это старо, и я просто резюмирую то, что уже сказали другие. Но я нашел этот способ аккуратным, так как он также безопасен для инъекций.
def to_dict(row):
return dict(zip([t[0] for t in row.cursor_description], row))
def query(cursor, query, params=[], cursor_func=to_dict):
cursor.execute(query, params)
results = [cursor_func(row) for row in cursor.fetchall()]
return results
quotes = query(cursor, "select * from currency where abbreviation like ?", ["USD"])
Шаги следующие:
from pandas import DataFrame
import pyodbc
import sqlalchemy
db_file = r'xxx.accdb'
odbc_conn_str = 'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=%s' % (db_file)
conn = pyodbc.connect(odbc_conn_str)
cur = conn.cursor()
qry = cur.execute("SELECT * FROM tbl")
columns = [column[0] for column in cur.description]
results = []
for row in cur.fetchall():
results.append(dict(zip(columns, row)))
df = DataFrame(results)
df