Реализация персистентного хранения данных в telegram боте
Практически ровно год назад я решил сделать telegram-бота для сохранения файлов (в первую очередь книг) в облачное хранилище. Для реализации бота взял самую популярную библиотеку для Python python-telegram-bot
. В прошлый раз запала хватило только на реализацию аутентификацию в tg-боте через OAuth 2.0. Детали про хранение access и refresh токенов я решил оставить на потом. И это потом наконец-то наступило, но несколько в ином виде: здесь будет отвлечённый пример без OAuth и предыдущего кода бота для более простого изложения.
Если кто-то ждал (ну, а вдруг?), то можете ругать меня в комментариях под постом за задержку :sad:
В этом посте будет:
- краткий пересказ документации
- реализация персистентности “коробочными” средствами
- кастомное хранилище на основе SQLite.
Краткий пересказ документации
Глобально есть два ~стула~ варианта реализации персистености в боте на python-telegram-bot
:
- Делать всё с нуля: самостоятельно реализовать все методы для работы с БД/файловой системы,
- Использовать класс
telegram.ext.BasePersistence
или один из его наследников.
Первый вариант рассматривать не буду — он ничем не отличается от любого другого приложения, которое работает в базой данных.
Второй вариант заставил меня немного понервничать, так как оказалось всё чуть запутаннее, чем кажется на первый взгляд. Ниже буду писать именно про него.
BasePersistence
и его наследники
Библиотека python-telegram-bot
содержит класс BasePersistence
и два класса-наследника: DictPersistence
и PicklePersistence
.
DictPersistence
и PicklePersistence
могут сохранять данные пользователя в словарь в памяти либо сериализовывать через pickle
.
BasePersistence
же используется для создания кастомных способов персистентности. Например, в базе данных.
Для того, чтобы бот научился сохранять данные в хранилище, предоставляемое одним из выше перечисленных persistence-провайдеров, требуется совсем немного усилий.
Реализация персистентности “коробочными” средствами
Для примера сделаю максимально простого бота на основе примера из репозитория python-telegram-bot
. Пример хороший, но, на мой взгляд, за дополнительными фичами не видно основного способа решения поставленной задачи.
Необходимо сделать всего два пункта, чтобы бот научился сохранять данные персистентно с помощью PicklePersistence
:
- указать persistence-провайдер в методе
persistence
application builder-а, - записывать данные, необходимые для персистентного хранения в соответствующую коллекцию (на уровне пользователя в
user_data
, чата —chat_data
или ботаbot_data
).
import logging
from typing import Dict
from telegram import Update
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
PicklePersistence,
filters,
CallbackContext
)
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text('Demo pickle persistence')
async def show_data(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Display the gathered info."""
def _read_messages(chat_messages):
return '\n'.join([f'{x["message_ts"]}: {x["message"]}' for x in chat_messages])
messages = [f"\n{key}:\n{_read_messages(value)}" for key, value in context.chat_data.items()]
facts = '\n'.join(messages)
await update.message.reply_text(
f"This is what you already told me: {facts}"
)
async def save_message(update: Update, context: CallbackContext) -> None:
if 'messages' not in context.chat_data:
context.chat_data['messages'] = []
context.chat_data['messages'].append({'message': update.message.text, 'message_ts': update.message.date.timestamp()}) # (4)
def main() -> None:
"""Run the bot."""
# Create the Application and pass it your bot's token.
persistence = PicklePersistence(filepath='persitencebot', update_interval=1) # (1)
application = Application.builder().token('BOT_TOKEN').persistence(persistence).build() # (2)
show_data_handler = CommandHandler("show_data", show_data)
application.add_handler(show_data_handler)
application.add_handler(MessageHandler(filters=filters.ALL, callback=save_message)) # (3)
# Run the bot until the user presses Ctrl-C
application.run_polling()
if __name__ == "__main__":
main()
Пройдусь по отмеченным местам:
- Создание объекта класса
PicklePersistence
. Здесь стоит обратить внимание наupdate_interval
. Это время (в секундах) как часто состояние (chat_data
,user_data
,bot_data
) будут сохраняться в хранилище (в этом примере — на диск), - Добавление
PicklePersistence
к боту, - Создаю handler, который будет реагировать на все сообщения,
- Handler из п.3 сохраняет все приходящие сообщения от одного пользователя в список
messages
в рамках одного чата (*).
*) Если бот будет добавлен в другой чат, то для нового чата будет новый
chat_data
, который никак не связанный с “личной” перепиской пользователя с ботом.
Теперь при старте бота, код которого представлен выше, все сообщения будут записывать в файл persitencebot
, а по команде /show_data
отображаться в беседе:
Кастомное хранилище на основе SQLite
Всё оказалось сложнее, когда я решил сделать кастомное хранилище в БД. Для простоты взял SQLite.
Основные проблемы возникли с тем, что
- Документация минимальная,
- Примеров от авторов библиотеки нет,
- Примеров от сторонних разработчиков очень мало: нашёл на основе Djange и с Redis.
Оба варианта имеют рабочие, но в них я не нашёл ответов на все мои вопросы и пришлось закатать рукава и разбираться самостоятельно.
Persistence на SQLite
Что надо знать перед началом
Всё что есть в [документации про Persistence] — это утверждение, что для реализации своей персистентности необходимо создать класс-наследник от telelgram.ext.BasePersistence
и реализовать необходимые методы. Объяснения для каких целей необходимо реализовать какие методы мне понять не удалось. Ниже отрывок из документации:
abstract async update_bot_data(data)
Will be called by the telegram.ext.Application after a handler has handled an update.
PARAMETERS
data (dict | telegram.ext.ContextTypes.bot_data) – The telegram.ext.Application.bot_data.
abstract async update_callback_data(data)
Will be called by the telegram.ext.Application after a handler has handled an update.
New in version 13.6.
Changed in version 20.0: Changed this method into an abstractmethod().
PARAMETERS
data (Tuple[List[Tuple[str, float, Dict[str, Any]]], Dict[str, str]] | None) – The relevant data to restore telegram.ext.CallbackDataCache.
abstract async update_chat_data(chat_id, data)
Will be called by the telegram.ext.Application after a handler has handled an update.
PARAMETERS
chat_id (int) – The chat the data might have been changed for.
data (dict | telegram.ext.ContextTypes.chat_data) – The telegram.ext.Application.chat_data [chat_id].
Copy-paste одной и той же строки документации для каждого метода не даёт особого понимания что он реально делает.
Самое важное, что удалось понять методом проб, отладки и заглядывания в исходники PicklePersistence
и persistence-реализаций для Django и Redis:
- Для каждой “области” данных (
chat
,user
,bot
) есть три методов:get_*_data
для получения данных,refresh_*_data
обновить данные в памяти значениями из хранилища,update_*_data
обновить данные в хранилище данными из памяти,drop_*_data
удалить данные в хранилище,
flush
используется для корректного отключения от хранилища в случае завершения работы бота.
Важно здесь отметить, что методы get_*_data
, refresh_*_data
и остальные самостоятельно вызывать не надо — библиотека сама “понимает”, когда необходимо дёрнуть тот или иной метод.
Теперь, зная эти ☝️ штуки, можно переписать пример с сохранением истории на работу с SQLite.
Реализация
Структура база данных будет максимально простой: одна таблица, каждая строчка в таблице — одно сообщение в чате:
id
— идентификатор записи в таблице,chat_id
— идентификатор чата,message_ts
— timestamp сообщения,message
— текст сообщения.
SQLite базу данных создать проще всего прямо в Python REPL:
import sqlite3
conn = sqlite3.connect('demo.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE chat_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INT NOT NULL,
message_ts INT,
message CHAR(500)
);''')
Теперь можно взяться за сам persistence-слой. Здесь всё буду делать максимально “в лоб”, чтобы было меньше деталей, на которые можно было отвлечься.
Во-первых, создаём класс-наследник от BasePersistence
:
class SqlitePersistence(BasePersistence):
def __init__(self, name: str='demo.db'):
# про update_interval писал выше
super().__init__(update_interval=1)
# store_data определяет какие данные будут храниться. Для простоты оставлю только chat_data
self.store_data = PersistenceInput(chat_data=True, bot_data=False, user_data=False, callback_data=False)
# подключение к базе данных
self.conn = sqlite3.connect(name)
self.cursor = self.conn.cursor()
Во-вторых, для указанных в store_data
областей необходимо реализовать методы get_*_data
, refresh_*_data
, update_*_data
и drop_*_data
. В моём случае это get_chat_data
, refresh_chat_data
, update_chat_data
и drop_chat_data
.
get_chat_data
Метод должен вернуть словарь: ключ — идентификатор чата, значение — данные чата:
async def get_chat_data(self) -> t.DefaultDict[int, t.Any]:
# получаем данные для всех чатов
data = self.cursor.execute('''SELECT * FROM chat_data''').fetchall()
# проходим по всем строчкам из таблицы и заполняем словарь chat_data
# таким образом, чтобы для каждого chat_id была своя коллекция сообщенений.
chat_data = defaultdict(dict)
for row in data:
chat_id = row[1]
if 'messages' not in chat_data[chat_id]:
chat_data[chat_id] = { 'messages': [] }
chat_data[chat_id]['messages'].append(dict(zip(['id', 'chat_id', 'message_ts', 'message'], row)))
return chat_data
refresh_chat_data
Метод refresh_chat_data
должен получить из БД данные и обновить данные в памяти. Здесь всё очень похоже на get_chat_data
, но с ограничением на один chat_id
:
async def refresh_chat_data(self, chat_id: int, chat_data: t.Any) -> None:
data = self.cursor.execute('''SELECT * FROM chat_data WHERE chat_id = ?''', (chat_id, ))
chat_data['messages'] = [dict(zip(['id', 'chat_id', 'message_ts', 'message'], x)) for x in data]
update_chat_data
update_chat_data
чуть сложнее, т.к. необходимо учитывать два случая: если сообщение уже есть БД, то его надо обновить, если нет — добавить. Но, в целом, всё тоже тривиально:
async def update_chat_data(self, chat_id: int, data: CD) -> None:
for row in data['messages']:
db_row = self.cursor.execute('''SELECT *
FROM chat_data
WHERE chat_id = ? AND message_ts = ? AND message = ?''',
(chat_id, row['message_ts'], row['message'])).fetchone()
if db_row is None:
self.cursor.execute('''INSERT INTO chat_data
(chat_id, message_ts, message)
VALUES
(?, ?, ?)''', (chat_id, row['message_ts'], row['message']))
else:
self.cursor.execute('''UPDATE chat_data
SET
message = ?
WHERE
chat_id = ? AND message_ts = ?
''', (row['message'], chat_id, row['message_ts']))
self.conn.commit()
drop_chat_data
Удаление данных — самое простое:
async def drop_chat_data(self, chat_id: int) -> None:
self.cursor.execute('''DELETE FROM chat_data WHERE chat_id = ?''', (chat_id, ))
Остальные методы должны pass
-овать иначе будут внезапные ошибки в процессе выполнения программы.
Итоговый код SqlitePersistence
:
class SqlitePersistence(BasePersistence):
def __init__(self, name: str='demo.db'):
super().__init__(update_interval=1)
self.store_data = PersistenceInput(bot_data=False, user_data=False, callback_data=False)
self.conn = sqlite3.connect(name)
self.cursor = self.conn.cursor()
async def get_chat_data(self) -> t.DefaultDict[int, t.Any]:
data = self.cursor.execute('''SELECT * FROM chat_data''').fetchall()
chat_data = defaultdict(dict)
for row in data:
chat_id = row[1]
if 'messages' not in chat_data[chat_id]:
chat_data[chat_id] = { 'messages': [] }
chat_data[chat_id]['messages'].append(dict(zip(['id', 'chat_id', 'message_ts', 'message'], row)))
return chat_data
async def update_chat_data(self, chat_id: int, data: CD) -> None:
for row in data['messages']:
db_row = self.cursor.execute('''SELECT *
FROM chat_data
WHERE chat_id = ? AND message_ts = ? AND message = ?''',
(chat_id, row['message_ts'], row['message'])).fetchone()
if db_row is None:
self.cursor.execute('''INSERT INTO chat_data
(chat_id, message_ts, message)
VALUES
(?, ?, ?)''', (chat_id, row['message_ts'], row['message']))
else:
self.cursor.execute('''UPDATE chat_data
SET
message = ?
WHERE
chat_id = ? AND message_ts = ?
''', (row['message'], chat_id, row['message_ts']))
self.conn.commit()
async def refresh_chat_data(self, chat_id: int, chat_data: t.Any) -> None:
data = self.cursor.execute('''SELECT * FROM chat_data WHERE chat_id = ?''', (chat_id, ))
chat_data['messages'] = [dict(zip(['id', 'chat_id', 'message_ts', 'message'], x)) for x in data]
async def drop_chat_data(self, chat_id: int) -> None:
self.cursor.execute('''DELETE * FROM chat_data WHERE chat_id = ?''', (chat_id, ))
async def get_bot_data(self) -> t.Any:
pass
def update_bot_data(self, data) -> None:
pass
def refresh_bot_data(self, bot_data) -> None:
pass
def get_user_data(self) -> t.DefaultDict[int, t.Any]:
pass
def update_user_data(self, user_id: int, data: t.Any) -> None:
pass
def refresh_user_data(self, user_id: int, user_data: t.Any) -> None:
pass
def get_callback_data(self) -> t.Optional[t.Any]:
pass
def update_callback_data(self, data: t.Any) -> None:
pass
def get_conversations(self, name: str) -> t.Any:
pass
def update_conversation(self, name: str, key, new_state: t.Optional[object]) -> None:
pass
def flush(self) -> None:
self.conn.close()
async def drop_user_data(self, user_id: int) -> None:
pass
async def get_user_data(self) -> t.Dict[int, UD]:
pass
И, в завершение, новый persistence необходимо указать при создании бота:
persistence = SqlitePersistence()
application = Application.builder().token('BOT_TOKEN').persistence(persistence).build()
Всё. Теперь при старте приложения (если база данных существует и BOT_TOKEN
заменили на реальный токен), сообщения будут сохраняться в БД.
Итоговый код можно найти в gist.