Практически ровно год назад я решил сделать telegram-бота для сохранения файлов (в первую очередь книг) в облачное хранилище. Для реализации бота взял самую популярную библиотеку для Python python-telegram-bot. В прошлый раз запала хватило только на реализацию аутентификацию в tg-боте через OAuth 2.0. Детали про хранение access и refresh токенов я решил оставить на потом. И это потом наконец-то наступило, но несколько в ином виде: здесь будет отвлечённый пример без OAuth и предыдущего кода бота для более простого изложения.

Если кто-то ждал (ну, а вдруг?), то можете ругать меня в комментариях под постом за задержку :sad:

В этом посте будет:

  1. краткий пересказ документации
  2. реализация персистентности “коробочными” средствами
  3. кастомное хранилище на основе SQLite.

Краткий пересказ документации

Глобально есть два ~стула~ варианта реализации персистености в боте на python-telegram-bot:

  1. Делать всё с нуля: самостоятельно реализовать все методы для работы с БД/файловой системы,
  2. Использовать класс telegram.ext.BasePersistence или один из его наследников.

Первый вариант рассматривать не буду — он ничем не отличается от любого другого приложения, которое работает в базой данных.

Второй вариант заставил меня немного понервничать, так как оказалось всё чуть запутаннее, чем кажется на первый взгляд. Ниже буду писать именно про него.

BasePersistence и его наследники

Библиотека python-telegram-bot содержит класс BasePersistence и два класса-наследника: DictPersistence и PicklePersistence.

DictPersistence и PicklePersistence могут сохранять данные пользователя в словарь в памяти либо сериализовывать через pickle.

BasePersistence же используется для создания кастомных способов персистентности. Например, в базе данных.

Для того, чтобы бот научился сохранять данные в хранилище, предоставляемое одним из выше перечисленных persistence-провайдеров, требуется совсем немного усилий.

Реализация персистентности “коробочными” средствами

Для примера сделаю максимально простого бота на основе примера из репозитория python-telegram-bot. Пример хороший, но, на мой взгляд, за дополнительными фичами не видно основного способа решения поставленной задачи.

Необходимо сделать всего два пункта, чтобы бот научился сохранять данные персистентно с помощью PicklePersistence:

  1. указать persistence-провайдер в методе persistence application builder-а,
  2. записывать данные, необходимые для персистентного хранения в соответствующую коллекцию (на уровне пользователя в user_data, чата — chat_data или бота bot_data).
import logging
from typing import Dict
from telegram import Update
from telegram.ext import (
    Application,
    CommandHandler,
    ContextTypes,
    MessageHandler,
    PicklePersistence,
    filters,
    CallbackContext
)

# Enable logging
logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)


async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    await update.message.reply_text('Demo pickle persistence')


async def show_data(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Display the gathered info."""
    def _read_messages(chat_messages):
        return '\n'.join([f'{x["message_ts"]}: {x["message"]}' for x in chat_messages])

    messages = [f"\n{key}:\n{_read_messages(value)}" for key, value in context.chat_data.items()]
    facts = '\n'.join(messages)
    await update.message.reply_text(
        f"This is what you already told me: {facts}"
    )

async def save_message(update: Update, context: CallbackContext) -> None:
    if 'messages' not in context.chat_data:
        context.chat_data['messages'] = []
    context.chat_data['messages'].append({'message': update.message.text, 'message_ts': update.message.date.timestamp()}) # (4)


def main() -> None:
    """Run the bot."""
    # Create the Application and pass it your bot's token.
    persistence = PicklePersistence(filepath='persitencebot', update_interval=1) # (1)
    application = Application.builder().token('BOT_TOKEN').persistence(persistence).build() # (2)
    show_data_handler = CommandHandler("show_data", show_data)
    application.add_handler(show_data_handler)
    application.add_handler(MessageHandler(filters=filters.ALL, callback=save_message)) # (3)

    # Run the bot until the user presses Ctrl-C
    application.run_polling()


if __name__ == "__main__":
    main()

Пройдусь по отмеченным местам:

  1. Создание объекта класса PicklePersistence. Здесь стоит обратить внимание на update_interval. Это время (в секундах) как часто состояние (chat_data, user_data, bot_data) будут сохраняться в хранилище (в этом примере — на диск),
  2. Добавление PicklePersistence к боту,
  3. Создаю handler, который будет реагировать на все сообщения,
  4. Handler из п.3 сохраняет все приходящие сообщения от одного пользователя в список messages в рамках одного чата (*).

*) Если бот будет добавлен в другой чат, то для нового чата будет новый chat_data, который никак не связанный с “личной” перепиской пользователя с ботом.

Теперь при старте бота, код которого представлен выше, все сообщения будут записывать в файл persitencebot, а по команде /show_data отображаться в беседе:

conversation

Кастомное хранилище на основе SQLite

Всё оказалось сложнее, когда я решил сделать кастомное хранилище в БД. Для простоты взял SQLite.

Основные проблемы возникли с тем, что

  1. Документация минимальная,
  2. Примеров от авторов библиотеки нет,
  3. Примеров от сторонних разработчиков очень мало: нашёл на основе Djange и с Redis.

Оба варианта имеют рабочие, но в них я не нашёл ответов на все мои вопросы и пришлось закатать рукава и разбираться самостоятельно.

Persistence на SQLite

Что надо знать перед началом

Всё что есть в [документации про Persistence] — это утверждение, что для реализации своей персистентности необходимо создать класс-наследник от telelgram.ext.BasePersistence и реализовать необходимые методы. Объяснения для каких целей необходимо реализовать какие методы мне понять не удалось. Ниже отрывок из документации:

abstract async update_bot_data(data)
Will be called by the telegram.ext.Application after a handler has handled an update.

PARAMETERS
data (dict | telegram.ext.ContextTypes.bot_data) – The telegram.ext.Application.bot_data.

abstract async update_callback_data(data)
Will be called by the telegram.ext.Application after a handler has handled an update.

New in version 13.6.

Changed in version 20.0: Changed this method into an abstractmethod().

PARAMETERS
data (Tuple[List[Tuple[str, float, Dict[str, Any]]], Dict[str, str]] | None) – The relevant data to restore telegram.ext.CallbackDataCache.

abstract async update_chat_data(chat_id, data)
Will be called by the telegram.ext.Application after a handler has handled an update.

PARAMETERS
chat_id (int) – The chat the data might have been changed for.

data (dict | telegram.ext.ContextTypes.chat_data) – The telegram.ext.Application.chat_data [chat_id].

Copy-paste одной и той же строки документации для каждого метода не даёт особого понимания что он реально делает.

Самое важное, что удалось понять методом проб, отладки и заглядывания в исходники PicklePersistence и persistence-реализаций для Django и Redis:

  1. Для каждой “области” данных (chat, user, bot) есть три методов:
    1. get_*_data для получения данных,
    2. refresh_*_data обновить данные в памяти значениями из хранилища,
    3. update_*_data обновить данные в хранилище данными из памяти,
    4. drop_*_data удалить данные в хранилище,
  2. flush используется для корректного отключения от хранилища в случае завершения работы бота.

Важно здесь отметить, что методы get_*_data, refresh_*_data и остальные самостоятельно вызывать не надо — библиотека сама “понимает”, когда необходимо дёрнуть тот или иной метод.

Теперь, зная эти ☝️ штуки, можно переписать пример с сохранением истории на работу с SQLite.

Реализация

Структура база данных будет максимально простой: одна таблица, каждая строчка в таблице — одно сообщение в чате:

  • id — идентификатор записи в таблице,
  • chat_id — идентификатор чата,
  • message_ts — timestamp сообщения,
  • message — текст сообщения.

SQLite базу данных создать проще всего прямо в Python REPL:

import sqlite3
conn = sqlite3.connect('demo.db') 
cursor = conn.cursor()
cursor.execute('''CREATE TABLE chat_data (
    id                INTEGER PRIMARY KEY    AUTOINCREMENT,
    chat_id           INT     NOT NULL,
    message_ts        INT,
    message           CHAR(500)
);''')

Теперь можно взяться за сам persistence-слой. Здесь всё буду делать максимально “в лоб”, чтобы было меньше деталей, на которые можно было отвлечься.

Во-первых, создаём класс-наследник от BasePersistence:

class SqlitePersistence(BasePersistence):
    def __init__(self, name: str='demo.db'):
        # про update_interval писал выше
        super().__init__(update_interval=1)

        # store_data определяет какие данные будут храниться. Для простоты оставлю только chat_data
        self.store_data = PersistenceInput(chat_data=True, bot_data=False, user_data=False, callback_data=False)

        # подключение к базе данных
        self.conn = sqlite3.connect(name)
        self.cursor = self.conn.cursor()

Во-вторых, для указанных в store_data областей необходимо реализовать методы get_*_data, refresh_*_data, update_*_data и drop_*_data. В моём случае это get_chat_data, refresh_chat_data, update_chat_data и drop_chat_data.

get_chat_data

Метод должен вернуть словарь: ключ — идентификатор чата, значение — данные чата:

async def get_chat_data(self) -> t.DefaultDict[int, t.Any]:
    # получаем данные для всех чатов
    data = self.cursor.execute('''SELECT * FROM chat_data''').fetchall()
    
    # проходим по всем строчкам из таблицы и заполняем словарь chat_data
    # таким образом, чтобы для каждого chat_id была своя коллекция сообщенений.
    chat_data = defaultdict(dict)
    for row in data:
        chat_id = row[1]
        if 'messages' not in chat_data[chat_id]:
            chat_data[chat_id] = { 'messages': [] }
        chat_data[chat_id]['messages'].append(dict(zip(['id', 'chat_id', 'message_ts', 'message'], row)))
    return chat_data

refresh_chat_data

Метод refresh_chat_data должен получить из БД данные и обновить данные в памяти. Здесь всё очень похоже на get_chat_data, но с ограничением на один chat_id:

async def refresh_chat_data(self, chat_id: int, chat_data: t.Any) -> None:
    data = self.cursor.execute('''SELECT * FROM chat_data WHERE chat_id = ?''', (chat_id, ))
    chat_data['messages'] = [dict(zip(['id', 'chat_id', 'message_ts', 'message'], x)) for x in data]

update_chat_data

update_chat_data чуть сложнее, т.к. необходимо учитывать два случая: если сообщение уже есть БД, то его надо обновить, если нет — добавить. Но, в целом, всё тоже тривиально:

async def update_chat_data(self, chat_id: int, data: CD) -> None:
    for row in data['messages']:
        db_row = self.cursor.execute('''SELECT * 
                                        FROM chat_data 
                                        WHERE chat_id = ? AND message_ts = ? AND message = ?''', 
                                        (chat_id, row['message_ts'], row['message'])).fetchone()
        if db_row is None:
            self.cursor.execute('''INSERT INTO chat_data
                                        (chat_id, message_ts, message)
                                    VALUES 
                                        (?, ?, ?)''', (chat_id, row['message_ts'], row['message']))
        else:
            self.cursor.execute('''UPDATE chat_data
            SET
                message = ?
            WHERE
                chat_id = ? AND message_ts = ?
            ''', (row['message'], chat_id, row['message_ts']))
    self.conn.commit()

drop_chat_data

Удаление данных — самое простое:

async def drop_chat_data(self, chat_id: int) -> None:
    self.cursor.execute('''DELETE FROM chat_data WHERE chat_id = ?''', (chat_id, ))

Остальные методы должны pass-овать иначе будут внезапные ошибки в процессе выполнения программы.

Итоговый код SqlitePersistence:

class SqlitePersistence(BasePersistence):
    def __init__(self, name: str='demo.db'):
        super().__init__(update_interval=1)
        self.store_data = PersistenceInput(bot_data=False, user_data=False, callback_data=False)
        self.conn = sqlite3.connect(name)
        self.cursor = self.conn.cursor()

    async def get_chat_data(self) -> t.DefaultDict[int, t.Any]:
        data = self.cursor.execute('''SELECT * FROM chat_data''').fetchall()
        chat_data = defaultdict(dict)
        for row in data:
            chat_id = row[1]
            if 'messages' not in chat_data[chat_id]:
                chat_data[chat_id] = { 'messages': [] }
            chat_data[chat_id]['messages'].append(dict(zip(['id', 'chat_id', 'message_ts', 'message'], row)))
        return chat_data

    async def update_chat_data(self, chat_id: int, data: CD) -> None:
        for row in data['messages']:
            db_row = self.cursor.execute('''SELECT * 
                                            FROM chat_data 
                                            WHERE chat_id = ? AND message_ts = ? AND message = ?''', 
                                            (chat_id, row['message_ts'], row['message'])).fetchone()
            if db_row is None:
                self.cursor.execute('''INSERT INTO chat_data
                                           (chat_id, message_ts, message)
                                       VALUES 
                                           (?, ?, ?)''', (chat_id, row['message_ts'], row['message']))
            else:
                self.cursor.execute('''UPDATE chat_data
                SET
                    message = ?
                WHERE
                    chat_id = ? AND message_ts = ?
                ''', (row['message'], chat_id, row['message_ts']))
        self.conn.commit()

    async def refresh_chat_data(self, chat_id: int, chat_data: t.Any) -> None:
        data = self.cursor.execute('''SELECT * FROM chat_data WHERE chat_id = ?''', (chat_id, ))
        chat_data['messages'] = [dict(zip(['id', 'chat_id', 'message_ts', 'message'], x)) for x in data]

    async def drop_chat_data(self, chat_id: int) -> None:
        self.cursor.execute('''DELETE * FROM chat_data WHERE chat_id = ?''', (chat_id, ))
    
    async def get_bot_data(self) -> t.Any:
        pass

    def update_bot_data(self, data) -> None:
        pass

    def refresh_bot_data(self, bot_data) -> None:
        pass

    def get_user_data(self) -> t.DefaultDict[int, t.Any]:
        pass

    def update_user_data(self, user_id: int, data: t.Any) -> None:
        pass

    def refresh_user_data(self, user_id: int, user_data: t.Any) -> None:
        pass

    def get_callback_data(self) -> t.Optional[t.Any]:
        pass

    def update_callback_data(self, data: t.Any) -> None:
        pass

    def get_conversations(self, name: str) -> t.Any:
        pass

    def update_conversation(self, name: str, key, new_state: t.Optional[object]) -> None:
        pass

    def flush(self) -> None:
        self.conn.close()

    async def drop_user_data(self, user_id: int) -> None:
        pass

    async def get_user_data(self) -> t.Dict[int, UD]:
        pass

И, в завершение, новый persistence необходимо указать при создании бота:

persistence = SqlitePersistence()
application = Application.builder().token('BOT_TOKEN').persistence(persistence).build()

Всё. Теперь при старте приложения (если база данных существует и BOT_TOKEN заменили на реальный токен), сообщения будут сохраняться в БД.

sql

Итоговый код можно найти в gist.

Если вам понравилась статья, то можете зайти в мой telegram-канал. В канал попадают небольшие заметки о Python, .NET, Go.