Пиксай

Project X Intelligence

Вы определяете X — мы помогаем найти решение

Наши проекты

PXI Messenger

Мессенджер с открытым исходным кодом. Развернут и готов к работе.

URL_checker

TG-бот для мониторинга сетевой доступности веб-сайтов.

Запустить бота
Что нового в URL_checker?

Мои посты

[2026-05-02 08:44]
#PXI_NET

🙂Всем привет! Этот пост является продолжением предыдущего, снова направляю интересующихся почитать RFC 9110. А для тех, кто хочет краткую выжимку, читайте этот пост.

⚡️Наиболее используемые статус коды

3xx (Перенаправление)

300 (Multiple choices, множественный выбор) — указывает на то, что целевой ресурс имеет более одного представления, каждое со своим более конкретным идентификатором, и предоставляется информация об альтернативах, чтобы пользователь (или пользовательский агент) мог выбрать предпочтительное представление, перенаправив свой запрос на один или несколько из этих идентификаторов.

301 (Moved permanently, перемещено навсегда) — указывает на то, что целевому ресурсу присвоен новый постоянный URI, и любые будущие ссылки на этот ресурс должны использовать один из включенных в него URI.

302 (Found, найдено) — указывает на то, что целевой ресурс временно находится по другому URI. Поскольку перенаправление может периодически изменяться, клиенту следует продолжать использовать целевой URI для будущих запросов.

303 (See Other, смотри другое) — указывает на то, что сервер перенаправляет пользовательский агент на другой ресурс, как указано в URI в поле заголовка Location , который предназначен для предоставления косвенного ответа на исходный запрос. Пользовательский агент может выполнить запрос на получение данных, нацеленный на этот URI (запрос GET или HEAD при использовании HTTP), который также может быть перенаправлен, и представить конечный результат в качестве ответа на исходный запрос. Обратите внимание, что новый URI в поле заголовка Location не считается эквивалентным целевому URI.

304 (Not Modified, не изменено) — указывает на то, что у нас и так актуальная версия ресурса и нет надобности сервера отправлять его снова.

307 (Temporary redirect, временное перенаправление) — указывает на то, что целевой ресурс временно находится под другим URI, и пользовательский агент НЕ ДОЛЖЕН изменять метод запроса, если он выполняет автоматическое перенаправление на этот URI.

308 (Permanent redirect, постоянное перенаправление) — указывает на то, что целевому ресурсу присвоен новый постоянный URI, и любые будущие ссылки на этот ресурс должны использовать один из включенных в него URI. Аналог 301, но с гарантией неизменности методов.

4xx (Ошибка клиента)

400 (Bad Request, неверный запрос) — указывает на то, что сервер не может или не будет обрабатывать запрос из-за ошибки, которая воспринимается как ошибка клиента. Например, если не подходит к pydantic схеме, то выбрасываем это.

401 (Unauthorized, не авторизован или несанкционированный доступ) — указывает на то, что запрос не был выполнен, поскольку отсутствуют действительные учётные данные для аутентификации целевого ресурса. Я кидаю её, если пользователь дёргает ручку без токена или с невалидным токеном.

403 (Forbidden, запрещено) — указывает на то, что сервер понял запрос, но отказался его выполнить. Иногда его используют для неявных блокировках пользователей, а я его кидаю, когда пользователь пытается редактировать не свой профиль и тд.

404 (Not Found, не найдено) — указывает на то, что исходный сервер не обнаружил актуального представления целевого ресурса или не желает сообщать о его существовании.

405 (Method not allowed, метод не разрешен) — указывает на то, что метод, полученный в строке запроса, известен исходному серверу, но не применим к этому ресурсу.

408 (Request Time-out, тайм-аут запроса) — указывает на то, что сервер не получил полный запрос в течение времени, отведенного на ожидание

409 (Conflict, конфликт) — указывает на то, что запрос не может быть выполнен из-за конфликта с текущим состоянием целевого ресурса. Этот код используется в ситуациях, когда пользователь может разрешить конфликт и повторно отправить запрос. Я его кидаю, когда пользователь с таким именем уже есть.


5xx (Ошибка сервера)

500 (Внутренняя ошибка сервера) — исключительная ситуация на сервере.

503 (Сервис недоступен) — указывает на то, что сервер в данный момент не может обработать запрос из-за временной перегрузки или планового технического обслуживания
[2026-04-28 20:07]
#PXI_NET

🙂Всем привет! В дополнение к прошлому посту решил сделать для вас шпаргалку с основными статус-кодами протокола HTTP/HTTPS. Для более подробного погружения в тему рекомендую прочитать RFC 9110, который регламентирует статус коды и весь протокол. В этот пост вместятся основные понятие и первые две группы статус кодов.

Что такое status code или коды состояния ответа ?

Код состояния ответа — это трёхзначный целочисленный код, описывающий результат запроса и семантику ответа, включая информацию об успешности запроса и его содержимом (если таковое имеется). Все допустимые коды состояния находятся в диапазоне от 100 до 599 включительно.

Первая цифра кода состояния определяет класс ответа. Последние две цифры не имеют никакого отношения к классификации.

Первая цифра может принимать пять значений:
1️⃣ 1xx (Информационное сообщение) : Запрос получен, процесс продолжается
2️⃣ 2xx (Успешно) : Запрос был успешно получен, понят и принят
3️⃣ 3xx (Перенаправление) : Для завершения запроса необходимо предпринять дополнительные действия
4️⃣ 4xx (Ошибка клиента) : Запрос содержит некорректный синтаксис или не может быть выполнен
5️⃣ 5xx (Ошибка сервера) : Сервер не смог выполнить, по-видимому, действительный запрос

Важное уточнение, что коды состояния HTTP являются расширяемыми. Клиент не обязан понимать значение всех зарегистрированных кодов состояния, хотя такое понимание желательно.

Однако клиент ОБЯЗАН понимать класс любого кода состояния, определяемый первой цифрой, и рассматривать нераспознанный код состояния как эквивалентный коду состояния x00 этого класса.

Иногда используют статус коды вне диапазона 100-599 (например, 600-999), но они используются для внутренней связи не связанной с HTTP/HTTPS, например, ошибка конкретной библиотеки. Клиент такую ошибку может приравнивать к 5xx (Ошибка сервера).

Некоторые статус коды являются эвристически кэшируемыми (например, 200, 203, 204, 206, 300, 301, 308, 404, 405, 410, 414 и 501), то есть браузер кэширует ответ с ними, даже если это не указано явно.

📎Что такое URI, URL и URN?

В документации встречается аббревиатура URI, кратко разберём что это такое сразу.

URI (Uniform Resource Identifier) — это компактная последовательность символов, идентифицирующая абстрактный или физический ресурс. Он включает в себя URN и URL.

URL (Uniform Resource Locator) — это тип URI, который идентифицирует ресурс через описание способа его нахождения (протокол и тд).

URN (Uniform Resource Name) — это тип URI, который должен оставаться глобально уникальным и постоянным (имя ресурса, ISBN книг).

⚡️Наиболее используемые статус коды

1xx (Информационные)

100 (Continue, продолжить) — сервер получил начальную часть запроса и ждёт недостающих данных, чтобы дать окончательный ответ клиенту.

101 (Switching Protocols, переключение протокола) — сервер понимает и готов выполнить запрос клиента, поступающий через поле заголовка Upgrade, об изменении используемого протокола приложения в данном соединении (так происходит переход с HTTP на websocket).

2xx (Успешные)

200 (Ok, успех) — запрос выполнен успешно, обычно есть содержимое в ответе.

201 (Created, создано) — запрос выполнен и в результате создан один или несколько новых ресурсов, содержимое ответа обычно содержит этот ресурс.

202 (Accepted, принято) — запрос принят к обработке, но обработка еще не завершена.

203 (Non-authoritative information, неавторитетная информация) — запрос был успешным, но содержимое было изменено по сравнению с ответом исходного сервера 200 (OK) преобразующим прокси-сервером.

204 (No content, нет содержимого) — сервер успешно выполнил запрос и в ответе нет дополнительного содержимого для отправки.

205 (Reset content, сброс содержимого) — сервер выполнил запрос и желает, чтобы пользовательский агент сбросил «представление документа», которое вызвало отправку запроса, до его исходного состояния, полученного от исходного сервера.

206 (Partial content, частичное содержание) — сервер успешно выполняет запрос диапазона для целевого ресурса, передавая одну или несколько частей выбранного представления.
[2026-04-27 17:17]
#PXI_WEB

🙂Вновь всех приветствую! И вновь пост про нововведения в нашем мессенджере. Не беспокойтесь, следующий пост будет уже о кое-чём более интересном. А сегодня я расскажу, как добавил профили пользователей и возможность их редактирования.

⚡️Ручка получения данных о профиле пользователя

На фронтенде я привязал клик на аватарку или имя пользователя к функции, которая дёргает эту ручку через fetch.

@router.get("/get_profile/{username}",status_code=200)
async def get_profile(username:str, db: SessionDep):
    stmt = select(models.Profiles).where(models.Profiles.username == username)
    result = await db.execute(stmt)
    result = result.scalar_one_or_none()
    if not result : raise HTTPException(status_code=404, detail="Нет такого пользователя")
    result = {
        "display_name": result.display_name,
        "username": result.username,
        "avatar_url": "🤬",
        "bio": result.bio,
        "is_online": result.is_online,
        "last_seen": result.last_seen.isoformat().replace("T"," ").split(".")[0] if result.last_seen else None
    }
    return result

Здесь мы делаем select из таблицы Profiles, получая нужный нам профиль по имени, в не нахождения кидаем 404. Далее формируем словарь с нужными нам данными, last_seen это объект datatime, поэтому нужно самому превратить его в строку. Я использовал isoformat() и последующую ручную правку (хотя надо было просто использовать strftime).

В конце возвращаем наш словарь, а FastAPI сам его упакует в JSON или можем сами прописать JSONResponse(content = result).


☄️Добавим создание профилей новым и старым пользователям

В нашу ручку аутентификации и авторизации добавим создание профилей.

@router.post("")
async def auth(
    response: Response,
    database: SessionDep,
    user_data: UserAuth
):
    if not user_data.is_login:
        query = select(models.Users).where(models.Users.username == user_data.username)
        result = await database.execute(query)
        existing_user = result.scalar_one_or_none()
        if existing_user:
            return {"error": "пользователь уже существует"}
        user = models.Users(
            username=user_data.username,
            password=pwd_context.hash(user_data.password)
        )
        profile = models.Profiles(username=user_data.username,display_name=user_data.username, avatar_url="⭐", bio="Я новый пользователь!", is_online=True)
        database.add(user)
        await database.flush()
        database.add(profile)
        await database.commit()
        return {"status": "успешно зарегистрированы"}

    query = select(models.Users).where(models.Users.username == user_data.username)
    result = await database.execute(query)
    user = result.scalar_one_or_none()
   
    if not user or not pwd_context.verify(user_data.password, user.password):
        return {"error": "Неверный логин или пароль"}

    prof_query = select(models.Profiles).where(models.Profiles.username == user_data.username)
    prof_result = await database.execute(prof_query)
    if not prof_result.scalar_one_or_none():
        new_profile = models.Profiles(
            username=user_data.username,
            display_name=user_data.username,
            avatar_url="⭐",
            bio="Старый добрый пользователь",
            is_online=True
        )
        database.add(new_profile)
        await database.commit()

    token = jwt.encode({"sub": user_data.username}, SECRET_KEY, algorithm=ALGORITHM)

    response.set_cookie(key="access_token", value=token, httponly=True)
    return {"status": "ok"}


✏️Ручка редактирования профиля

Тут просто обновляем запись в таблице. Возвращаю 204, так как запрос выполнен, но данных не отправляю.

@router.post("/edit_profile", status_code=204)
async def edit_profile(profile: models.EditProfile, db: SessionDep, username: str = Depends(auth.get_current_user)):
    stmt = update(models.Profiles).where(models.Profiles.username == username).values(display_name=profile.display_name, bio=profile.bio)
    await db.execute(stmt)
    await db.commit()
    return None
[2026-04-26 00:29]
#PXI_WEB #PXI_PROJECT

🙂Всем привет! Хочу сказать вам, что вышла новая версия мессенджера v2.4.0. В ней добавлены новые смайлики, свайпы (не везде работает плавно, исправлю позже), возможность редактирования профиля, исправлены некоторые баги и много чего ещё. Так что заходите, пользуйтесь.

⚠️В этом и других постах я рассказываю о нововведениях, когда обновляется минорная или мажорная версия, а если вы хотите узнавать о нововведениях даже в патч версиях, то смотрите наш сайт.
[2026-04-24 23:43]
#PXI_WEB

🙂Всем привет! Продолжаю рассказывать про обновления в мессенджере. На этот раз расскажу про то, как я реализовал удаление чатов и получение групп и каналов, для отображения во вкладках.

⚡️Отображаем группы и каналы, в которых мы состоим

В прошлый раз мы с вами реализовали отправку сообщений в группах и каналах, получение сообщений, создание и поиск групп и каналов. В текущей реализации мы становимся участником после того, как отправим первое сообщение. Но до сих пор не реализовали отображение групп и каналов, в которых мы являемся участниками.

@router.get("/get_chats")
async def get_chats(type: str, db: SessionDep, username: str = Depends(auth.get_current_user)):
    try:
        chat_type_enum = models.ChatType(type)
    except ValueError:
        return []

    query = (
        select(models.Chats.id, models.Chats.name)
        .join(models.ChatMembers, models.ChatMembers.chat_id == models.Chats.id)
        .where(and_(
            models.Chats.type == chat_type_enum,
            models.ChatMembers.username == username
        ))
    )
   
    result = await db.execute(query)
    rows = result.all()

    return [{"id": row[0], "name": row[1]} for row in rows]

Здесь мы в зависимости от вкладки получаем разное значение в строке type от фронтенда, потом используем нашу функцию get_current_user и Depends (чтобы даже если нам отошлют имя в quary параметрах вызвалась наша функция и взялось наше значение), которая возвращает имя пользователя из токена (чтобы никто не мог представиться другим).

Приводим строку type к значению enum (models.ChatType(type), если type равен direct, то получим ChatType.DIRECT), иначе база данных не поймет, ведь в ней лежит именно объект enum.

Далее делаем select, выбирая все chat_id и name чатов нужного типа, в которых мы состоим. Вызываем метод .all() так как результат простые объекты кортежи, а не словари или списки. Возвращаем список словарей, FastAPI сам переведёт его в JSON.

💥Удаление личных чатов

Мы уже реализовали основу для групп и каналов (они сейчас правда работает как группы), перейдём теперь к другим важным функциям. Реализуем удаление личных чатов, сейчас я решил реализовать так, что удаляется сразу у обоих участников.

@router.delete("/chat/direct/{interlocutor_name}", status_code=204)
async def delete_direct_chat(interlocutor_name: str, db: SessionDep, current_user: str = Depends(auth.get_current_user)):
    stmt = (
        select(models.ChatMembers.chat_id)
        .join(models.Chats, models.Chats.id == models.ChatMembers.chat_id)
        .where(models.Chats.type == models.ChatType.DIRECT)
        .where(models.ChatMembers.username.in_([current_user, interlocutor_name]))
        .group_by(models.ChatMembers.chat_id)
        .having(func.count(models.ChatMembers.chat_id) == 2)
    )
   
    result = await db.execute(stmt)
    chat_id = result.scalar_one_or_none()

    if not chat_id:
        raise HTTPException(status_code=404, detail="Переписка не найдена")
    await db.execute(delete(models.Messages).where(models.Messages.chat_id == chat_id))   
    await db.execute(delete(models.ChatMembers).where(models.ChatMembers.chat_id == chat_id))
    await db.execute(delete(models.Chats).where(models.Chats.id == chat_id))
   
    await db.commit()
    return None

Вначале мы ищем chat_id по именам, присоединяя таблицу самих чатов (приписываем поля из Chats), чтобы потом отсечь группы, каналы и тд. Метод scalar_one_or_none() возвращает либо один объект, либо None.

Если чата нет возвращаем 404 ошибку, если есть, то удаляем сначала сообщения этого чата, потом участников, потом сам чат (как в зависимостях в базе данных). Здесь execute заставляет базу выполнить запрос, но пока не завершать транзакцию, после всех удалений мы завершим общую транзакцию.

💬На этом пока что всё, задавайте интересующие вас вопросы!
[2026-04-23 20:49]
#PXI_WEB #PXI_PROJECT

🙂Всем привет! Сегодня у меня для вас две новости. Во-первых, я наконец-то выкатил обновление мессенджера, в котором адаптировал фронтенд под мобильные устройства. Во-вторых, несколько людей просило, и я наконец-то создал репозитории в github. Подробнее про обновление или репозитории можете посмотреть на сайте!
[2026-04-22 21:00]
#PXI_WEB #PXI_PROJECT

🙂Всем привет! Сегодня произошло пополнение в проектах. Я сделал первую версию сайта, в котором содержаться все ссылки на другие проекты, социальные сети и репозиторий с кодом (скоро добавлю все исходники в репозитории на Хабре). В дальнейшем планирую дублировать туда посты и статьи, добавить комментарии для общения между друг другом.

🖥Попробуйте сейчас, думаю вам понравится! Также пишите свои идеи, что вы хотите там видеть

http://132.243.17.128/
[2026-04-21 23:30]
#PXI_WEB

🙂Всем привет! Продолжаю рассказывать про нововведения в нашем мессенджере. В пошлых постах мы добавили новые таблицы Chats, ChatMembers и другие, теперь наши сообщения в директах тоже имеют chat_id. Мы добавили также создание групп и каналов, а также обновили отправку сообщений, добавив функцию, которая отправляет всем участникам сразу. Но есть проблемы, которые надо решить...

⚡️Как найти группы и каналы?

Однако, хоть во фронтенде мы добавили вкладки для групп и каналов, сейчас там ничего нет, мы не можем даже найти каналы, которые мы создали. Исправим это добавив к поиску пользователей, поиск групп или каналов в зависимости от открытой вкладки.

@router.get("/search_chats")
async def search_chats(query:str, type: str, db:SessionDep):
    stmt = select(models.Chats).where(models.Chats.type == models.ChatType(type)).where(models.Chats.name.ilike(f"%{query}%"))
    result = await db.execute(stmt)
    return [{"id": c.id, "name": c.name} for c in result.scalars().all()]

Функция аналогична поиску пользователей, но теперь используем ChatType.

☄️Как получить сообщения относящиеся к группе/каналу?

Мы можем теперь находить группы и каналы, даже отправить сообщение, но как нам получить историю сообщений? Надо переписать функцию для получения сообщений. Я решил для директов тянуть получение через имена (как и было), чтобы не менять фронтенд, а для групп и каналов будем использовать chat_id.

Получение по именам для директов:
@router.get("/messages/by_name/{chat_with}")
async def get_messages_by_name(
    chat_with: str,
    database: SessionDep,
    current_user: str = Depends(auth.get_current_user)
):
    stmt = (
        select(models.Messages)
        .join(models.Chats, models.Chats.id == models.Messages.chat_id)
        .where(models.Chats.type == models.ChatType.DIRECT)
        .where(
            or_(
                and_(models.Messages.sender_name == current_user, models.Messages.receiver_name == chat_with),
                and_(models.Messages.sender_name == chat_with, models.Messages.receiver_name == current_user)
            )
        )
        .order_by(models.Messages.timestamp.asc())
    )
    result = await database.execute(stmt)
    messages = result.scalars().all()

    return [
        {
            "sender": m.sender_name,
            "text": m.content,
            "time": m.timestamp.strftime("%H:%M"),
            "type": "outgoing" if m.sender_name == current_user else "incoming"
        } for m in messages
    ]


Получение по chat_id:
@router.get("/messages/{chat_id}")
async def get_messages(chat_id: int, database: SessionDep, current_user: str = Depends(auth.get_current_user)):
    check_stmt = select(models.ChatMembers).where(
        and_(
            models.ChatMembers.chat_id == chat_id,
            models.ChatMembers.username == current_user
        )
    )
    is_member = await database.execute(check_stmt)
    if not is_member.scalar_one_or_none():
        raise HTTPException(status_code=403, detail="Вы не состоите в этом чате")
    stmt = (
        select(models.Messages)
        .where(models.Messages.chat_id == chat_id)
        .order_by(models.Messages.timestamp.asc())
    )
    result = await database.execute(stmt)
    messages = result.scalars().all()

    return [
        {
            "sender": m.sender_name,
            "text": m.content,
            "time": m.timestamp.strftime("%H:%M"),
            "type": "outgoing" if m.sender_name == current_user else "incoming"
        } for m in messages
    ]

В обоих случаях мы делаем запрос в базу данных, вытягиваем нужные сообщения и отправляем список словарей. FastAPI сам преобразует его в JSON, а на фронте мы преобразуем JSON в массив объектов JavaScript .

💬На этом пока что всё, в следующий раз расскажу, как я реализовал профили, удаление чатов и загрузку групп и каналов, в которых мы состоим, при переходе в нужную вкладку.
[2026-04-18 21:48]
#PXI_WEB

🙂Всем привет! Во-первых, начну с того, что я обновил наш мессенджер, добавив в него профили и отслеживание активности (онлайн или нет и когда).

Сегодня покажу обещанную ручку, которая соглашается на переход с HTTP/HTTPS на ws/wss, а также именно она занимается хранением текущих соединений, сохранением сообщений в базу данных и передачей их другим пользователям через сокет.


⚡️Ручка /ws/{username}

@router.websocket("/{username}")
async def websocket_endpoint(websocket: WebSocket, username: str):
    await manager.connect(username, websocket)
    try:
        while True:
            data = await websocket.receive_json()
            receiver = data.get("receiver")
            text = data.get("text")
            chat_id = data.get("chat_id")
            if(chat_id):
                async with SESSIONLOCAL() as db:
                    new_msg = models.Messages(
                        chat_id=chat_id,
                        sender_name=username,
                        content=text
                    )
                    db.add(new_msg)

                    member_stmt = select(models.ChatMembers).where(
                        and_(
                            models.ChatMembers.chat_id == chat_id,
                            models.ChatMembers.username == username
                        )
                    )
                    member_check = await db.execute(member_stmt)

                    if not member_check.scalar_one_or_none():
                        new_member = models.ChatMembers(
                            chat_id=chat_id,
                            username=username
                        )
                        db.add(new_member)
                    await db.commit()
                    await manager.send_broadcast({
                    "chat_id": chat_id,
                    "sender": username,
                    "text": text,
                    "time": datetime.now().strftime("%H:%M")
                },username,chat_id)

            else:
                async with SESSIONLOCAL() as db:
                    stmt = (
                        select(models.ChatMembers.chat_id)
                        .join(models.Chats, models.Chats.id == models.ChatMembers.chat_id)
                        .where(models.Chats.type == models.ChatType.DIRECT)
                        .where(models.ChatMembers.username.in_([username, receiver]))
                        .group_by(models.ChatMembers.chat_id)
                        .having(func.count(models.ChatMembers.username) == 2)
                    )
                    result = await db.execute(stmt)
                    chat_id = result.scalar()

                    if not chat_id:
                        new_chat = models.Chats(type=models.ChatType.DIRECT)
                        db.add(new_chat)
                        await db.flush()
                   
                        db.add_all([
                            models.ChatMembers(chat_id=new_chat.id, username=username),
                            models.ChatMembers(chat_id=new_chat.id, username=receiver)
                        ])
                        chat_id = new_chat.id

                    new_msg = models.Messages(
                        chat_id=chat_id,
                        sender_name=username,
                        receiver_name=receiver,
                        content=text
                    )
                    db.add(new_msg)
                    await db.commit()
                if receiver == "URL_checker🌐":
                    asyncio.create_task(handle_bot_logic(username, text,chat_id))
           
                await manager.send_personal_message({
                    "chat_id": chat_id,
                    "sender": username,
                    "text": text,
                    "time": datetime.now().strftime("%H:%M")
                }, receiver)
    except WebSocketDisconnect:
        manager.disconnect(username)

Здесь если фронт передал chat_id, то считаю группой/каналом и шлю сообщение всем, иначе личкой.

💬Отвечу на ваши любые вопросы
[2026-04-17 23:47]
#PXI_WEB

🙂Всем привет! Продолжаю рассказывать про то, как я добавил группы и каналы в наш мессенджер (http://2.26.118.52). Сегодня расскажу про изменения в работе сокетов и отправки сообщений.

☄️Обновим отправку сообщений для отправки нескольким людям сразу

Ранее наш класс ConnectionManager хранил текущие соединения с пользователями и имел только метод отправки персональных сообщений для личек. Добавим метод для рассылки всем участникам группы или канала.

class ConnectionManager:
    def __init__(self):
        self.active_connections: dict[str, WebSocket] = {}

    async def connect(self, username: str, websocket: WebSocket):
        await websocket.accept()
        self.active_connections[username] = websocket

    def disconnect(self, username: str):
        if username in self.active_connections:
            del self.active_connections[username]

    async def send_personal_message(self, message: dict, receiver: str):
        if receiver in self.active_connections:
            await self.active_connections[receiver].send_json(message)

    async def send_broadcast(self, message:dict, sender: str, chat_id: int):
        async with SESSIONLOCAL() as db:
            stmt = (select(models.ChatMembers.username).where(models.ChatMembers.chat_id == chat_id).where(models.ChatMembers.username != sender))
            results = await db.execute(stmt)
            results = results.scalars().all()
            for user in results:
                if user in self.active_connections:
                    await self.active_connections[user].send_json(message)

Я добавил метод send_broadcast, в нём мы получаем всех участников для чата по chat_id и отправляем им данные, если они сейчас активны через сокет.

☄️Мы добавили chat_id и множественную рассылку, теперь надо обновить функцию, которая занималась созданием и хранением соединений, а также отправкой сообщений.

Здесь стоит начать с фронтенда, для облегчения работы я использую websocket, а не чистые сокеты.

const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const socket = new WebSocket(`${wsProtocol}//${window.location.host}/ws/${myName}`);

Первым делом определяем протокол и домен сайта (в моём случае сразу IP), если защищённый https то и переходим (обновляем) на защищённый wss, иначе ws. Важно, что websocket это не сырой сокет 4 уровня OSI, а надстройка над HTTP/HTTPS и отправляет данные с такими же заголовками.

Далее мы создаём объект websocket передавая ему протокол ws или wss и домен нашего сервера, он его резолвит и отправляет HTTP запрос на переход с HTTP/HTTPS на ws/wss. Также передаем, какую ручку нужно дёрнуть (/ws/myName), чтобы наш сервер смог ответить на этот запрос корректно (в случае согласия отправит код 101).

Важно, что обновляется не то соединение, которое было создано при открытии сайта. При создании websocket сначала создаётся новое HTTP/HTTPS соединение и в нем уже GET запрос с заголовком на переход.

💬На этом пока что всё, ручку опишу в следующем посте!
[2026-04-16 23:37]
#PXI_WEB

🙂Всем привет! Что-то давненько я не показывал код нововведений в нашем мессенджере (http://2.26.118.52). Сегодня я покажу как реализовал группы и каналы.

⚡️Начнём с обновления нашей базы данных, добавим новые таблицы и привяжем все чаты к chat_id

До этого у меня было всего две таблицы Messages и Users. Все сообщения находились через имена пользователей, но раз мы хотим добавлять группы и каналы это уже не прокатит, так как писать может любой человек, поэтому добавим chat_id, таблицу Chats для хранения информации о чатах и ChatMembers для хранения информации о участниках чата.

class Messages(Base):
    __tablename__ = "messages"
    id = Column(Integer, primary_key=True, autoincrement=True)
    chat_id = Column(Integer, ForeignKey("chats.id"), index=True)
    sender_name = Column(Text, ForeignKey("users.username"))
    receiver_name = Column(Text, nullable=True) # Для лички
    content = Column(Text)
    is_read = Column(Boolean, default=False)
    timestamp = Column(DateTime, default=get_moscow_now)

class ChatType(enum.Enum):
    DIRECT = "direct"
    GROUP = "group"
    CHANNEL = "channel"

class Chats(Base):
    __tablename__ = "chats"
    id = Column(Integer, primary_key=True, autoincrement=True)
    type = Column(Enum(ChatType), default=ChatType.DIRECT)
    name = Column(Text, nullable=True)
    created_at = Column(DateTime, default=get_moscow_now)

class ChatMembers(Base):
    __tablename__ = "chat_members"
    chat_id = Column(Integer, ForeignKey("chats.id"), primary_key=True)
    username = Column(Text, ForeignKey("users.username"), primary_key=True)
    joined_at = Column(DateTime, default=get_moscow_now)

#pydantic-models
class CreateGroup(BaseModel):
    name: str
    type: str

Также добавил pydantic-модель CreateGroup для валидации данных при создании новых групп и каналов.

🖥Теперь добавим метод создания групп и каналов

Начать я решил с реализации вкладок и кнопки создания канала на фронтенде, а потом перешёл к более интересной бекенд части.

@router.post("/chat")
async def create_group(request:Request, data: models.CreateGroup, db: SessionDep, username = Depends(auth.get_current_user)):
    existing_chat = await db.execute(
        select(models.Chats).where(
            and_(
                models.Chats.name == data.name,
                models.Chats.type == (models.ChatType.GROUP if data.type == 'group' else models.ChatType.CHANNEL)
            )
        )
    )
    if existing_chat.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Группа или канал с таким названием уже существует"
        )

    new_chat = models.Chats(name=data.name, type=models.ChatType.GROUP if data.type == 'group' else models.ChatType.CHANNEL)
    db.add(new_chat)
    await db.flush()

    new_member = models.ChatMembers(chat_id=new_chat.id, username=username)
    db.add(new_member)
    await db.commit()
    chat_data = {"chat_id": new_chat.id, "name": new_chat.name}
   
    return JSONResponse(status_code=status.HTTP_201_CREATED,content=chat_data)

Здесь мы получаем название и тип создаваемого чата (канал или группа) и валидируем с помощью нашей pydantic-модели.

Далее проверяем нет ли уже с таким названием группы или канала и в случае нахождения отправляем 400 ошибку. Если всё хорошо, то создаём и комитим в базу данных этот чат.

Сначала делаем flush, чтобы данные не зафиксировались в базе данных, но при этом мы смогли получить chat_id и добавить создателя в участники. В конце возвращаем статус код 201 и JSON с chat_id и названием группы или канала.

💬Накопилось очень много нового функционала и в один пост я всё не смог вместить, поэтому обязательно расскажу всё в следующих.
[2026-04-15 21:26]
#PXI_NET

🙂Всем привет! Сегодня будет обещанный пост про VPS/VDS. Кстати, я докупил VPS в России и теперь мой бот проводит анализ из двух точек присутствия (@URL_checker_PXI_bot).

Что такое VPS и VDS?

VPS/VDS — это сервер, который развернут на платформе виртуализации и сдается провайдером в аренду. Можно сказать, что это аналог виртуальной машины.

На дата-центрах есть физические сервера, обычно все их ресурсы объединяют в пул и уже из этого пула продают ресурсы нам. Это позволяет эффективно использовать оборудование и не простаивать ему.

Часто используется динамическое выделение ресурсов под нужны пользователя, например, вы купили VPS с диском на 50 Гб, но используете только 30 тогда ваши 20 можно кому-то отдать, пока вы их не запросите.

⚡️В чём разница между VPS и VDS?

VPS (Virtual Private Server) — виртуальный приватный сервер.

VDS (Virtual Dedicated Server) — виртуальный выделенный сервер.

Считается, что VPS использует программную виртуализацию (разделение ресурсов происходит на уровне ОС, OpenVZ), а VDS — аппаратную (KVM).

Также существует Shared Hosting — это такой вид хостинга, когда на одном физическом сервере находится множество сайтов различных владельцев. При этом данные этих сайтов изолированы друг от друга. Это наиболее бюджетный вид хостинга и подходит для простых веб-проектов, которые не требуют больших ресурсов.

🖥Виды VPS/VDS

VPS/VDS разделяют на управляемые, неуправляемые и полууправляемые.

Управляемые VPS/VDS — это когда провайдер берет на себя все задачи по обслуживанию и обновлению серверов, например, установка актуального ПО (с услугой администрирования).

Полууправляемые VPS/VDS — это когда провайдер отвечает только за часть приложений или обновление ядра. Например, он может поддерживать безопасность сервера и связанных с этим приложений.

Неуправляемые или самоуправляемые VPS/VDS — это когда провайдер отвечает только за выделение необходимых ресурсов, а также за доступность.

🌐Я купил VPS/VDS, что дальше?

В большинстве случаев вам на почту придёт белый IP адрес, логин и пароль от пользователя на сервере. Подключаемся к нему через SSH и используем для своих целей.

Желательно поменять пароль сразу и прописать простенький iptables. Если как я планируете поднимать какой-то сервис с базой данных (например, мессенджер), то не надо этот порт открывать для всех. Вместо 5432:5432 в докере используйте 127.0.0.1:5432:5432.

К сожалению, я решил это оставить на потом и это привело к тому, что бот, сканирующий сеть, успел поставить майнер в мой докер контейнер, благо он изолирован от остального сервера и я легко от него избавился и залатал дыры безопасности.

💬На этом пока что всё! Буду рад, если вы будете пользоваться моими проектами.
[2026-04-14 21:17]
#PXI_PROJECTготовые и развивающиеся проекты

Я развиваю экосистему полезных сервисов и на данном этапе готовы следующие проекты:

1️⃣ Мой собственный сайт, который является хабом всех проектов и социальных сетей.

http://132.243.17.128/

2️⃣ Бот для проверки сетевой доступности сайтов. Удобно мониторить свои ресурсы по URL или IP прямо в Telegram.

@URL_checker_PXI_bot

3️⃣ PXI Messenger — мой собственный мессенджер.
Проект находится в стадии активной разработки, и сейчас он открыт для всех желающих! Заходите, создавайте аккаунты и общайтесь:

http://2.26.118.52

📎Полный исходный код можете посмотреть здесь:

https://github.com/geses-off

💬Буду рад любой обратной связи — ваши отзывы помогают развивать проекты дальше!
[2026-04-12 12:33]
#PXI_PROJECT #PXI_WEB

🙂Всем привет! Сегодня мой мессенджер стал доступен всем для тестирования. Пишите обо всех найденных багах или о падении мессенджера мне в личку или группу моего мессенджера (PXI, мой ник и имя группы именно большими буквами и без смайликов и тд), в личку в Telegram или в комментариях канала.

Как найти и пользоваться мессенджером?

Просто нажмите сюда:
http://2.26.118.52

Зарегистрируйте свой аккаунт и можете искать других пользователей для общения или создавать группы и каналы.

⚡️Что с ним сейчас не так

Во-первых, если у вас какая-то проблема, попробуйте обновить сайт. Например, сейчас если вы впервые пишете человеку или в группу, то он у вас в списке чатов появится только после обновления страницы, это скоро исправлю (уже исправил). С удалением чатов такой проблемы нет.

Также исправлю проблему с тем, что наши сообщения и нашего собеседника прижимаются все влево (тоже исправил).

В любом случае пишите обо всём, что вам не нравится и кажется сломанным.

⚠️На данный момент мессенджер нацелен удобством на компьютеры и ноутбуки. Позже добавлю поддержку красивого фронта для смартфонов.

💬В следующем посте расскажу про VPS, и как вам его можно использовать.
[2026-04-12 10:34]
#PXI_PROJECT #PXI_NET

🙂Всем привет! Наконец-то вы сможете использовать мои проекты каждый день, и каждый сможет поучаствовать в разработке. Сегодня хочу рассказать про бота для анализа сетевой доступности веб-сайтов. Пишите обо всех найденных багах или о падении бота мне в личку моего мессенджера (PXI, мой ник именно большими буквами), в личку в Telegram или в комментариях канала.

Как найти и пользоваться ботом?

Просто нажмите сюда:
@URL_checker_PXI_bot

После того, как перешли в бота, напишите /start, и бот вам всё пояснит.

⚡️Что с ним сейчас не так

На данный момент я купил VPS в Европе ещё не настроил прокси или ВПН для России, поэтому бот показывает данные для Европы. Но это я в скором времени исправлю (исправил).

💬В следующем посте расскажу про мессенджер, а ещё чуть позже про VPS, и как вам его можно использовать.
[2026-04-09 23:34]
#PXI_NET

🙂Всем привет! Да-да-да, сегодня я снова хочу рассказать про то, что я добавил в средство анализа доступности веб-сайтов. Кстати, совсем скоро я его запущу на постоянку вместе с мессенджером. Расскажу я про то, как я реализовал периодическую проверку веб-сайтов, при этом не заблокировал возможность пользователя делать другие запросы на проверку.

📎Начнём с обработчика для команды /periodtest

Тут мы получаем вместе с командой аргументы (/periodtest 1 pxi.net), первый аргумент уйдёт в переменную time (время периода в минутах), а все остальные в список urls.

active_checks = {}

@dp.message(Command("periodtest"))
async def period_test_handler(message: types.Message):
    chat_id = message.chat.id
    try:
        time, *urls = message.text.replace("/periodtest","").strip().split()
    except ValueError:
        await message.answer("Неверная команда, вот пример: /periodtest 10 https://google.com")
        return
    if float(time) < 0.5:
        return await message.answer("Слишком часто! Минимум — раз в 0.5 мин.")
    if not urls:
        return await message.answer("Неверная команда, вот пример: /periodtest 10 https://google.com")
    if chat_id in active_checks:
        return await message.answer("У вас уже запущена проверка! Сначала остановите её (команда /stop).")
    await message.answer(f"🚀 Запускаю периодическую проверку каждые {time} минут для {len(urls)} URL...")

    task = asyncio.create_task(period_check(chat_id,time,urls))
    active_checks[chat_id] = task

Далее мы проверяем не запустили ли уже проверку для этого пользователя, если нет, то создаём и запускаем задачу в фоне с выполнением нашей функции (покажу её ниже) и заносим эту задачу в словарь, чтоб потом проверять не запущена ли такая задача и суметь остановить её.

⚠️Остановка задачи

Здесь реализуем функцию остановки задачи (/stop). При помощи метода cancel() останавливаем выполнение задачи в фоне, которую мы получили из словаря по chat_id.

@dp.message(Command("stop"))
async def stop(message: types.Message):
    chat_id = message.chat.id
    if chat_id in active_checks:
        active_checks[chat_id].cancel()
        del active_checks[chat_id]
        await message.answer("⏹ Мониторинг остановлен.")
    else:
        await message.answer("У вас нет активных проверок.")

В конце также удаляем объект задачи из словаря.

⚡️Функция переодической проверки

Здесь мы в цикле вызываем наши асинхронные функции через gather(), формируем общий ответ и уже не изменяем прошлое сообщение, а отправляем новое, так как в тг через некоторое время нельзя будет изменить сообщение.

async def period_check(chat_id:int, time:float, urls):
    try:
        while True:
            analyze = [analyze_availability(url) for url in urls]
            results = await asyncio.gather(*analyze)
            response = ""
            for result in results:
                color_emoji = "✅" if result.status_label == "Доступен" else "❌"
                if result.status_label == "Нас заблокировали": color_emoji = "⚠️"
                response += (
                    f"{color_emoji} {hbold('Результат для:')} {result.url}\n"
                    f"🌐 {hbold('IP:')} {hcode(result.ip or 'N/A')}\n"
                    f"📊 {hbold('Статус:')} {result.status_label}\n"
                    f"⏱ {hbold('Задержка:')} {result.latency_ms:.1f}ms\n"
                    f"📝 {hbold('Детали:')} {result.detail or '—'}\n\n"
                )
            await bot.send_message(chat_id, response, parse_mode="HTML")
            await asyncio.sleep(float(time)*60)
    except asyncio.CancelledError:
        logging.info(f"Мониторинг для чата {chat_id} остановлен.")

При помощи asyncio.sleep() заставили ждать только именно эту задачу, другие пользователи и даже тот, который эту задачу вызвал могут пользоваться ботом дальше.

💬На этом пока что всё, пишите вопросы в комментариях. Буду рад помочь :)
[2026-04-06 16:27]
#PXI_NET

🙂Привет! Продолжаю прошлый пост про ТГ-бота своего программного средства анализа доступности веб-сайтов.

🌐Функция с анализом доступности

Создаём функцию с декоратором @dp.message(), то есть она будет срабатывать на любое сообщение от пользователя (если не среагировала какая-то другая функция выше).

Получаем от пользователя текст его сообщения через message.text, и передаём в нашу функцию анализа. Получаем результат в виде объекта pydantic модели.

@dp.message()
async def check_url_handler(message: types.Message):
    url = message.text.strip()
    status_msg = await message.answer(f"🔍 Проверяю {url}...")
    try:
        result = await analyze_availability(url)
        color_emoji = "✅" if result.status_label == "Доступен" else "❌"
        if result.status_label == "Нас заблокировали": color_emoji = "⚠️"

        response = (
            f"{color_emoji} {hbold('Результат для:')} {result.url}\n"
            f"🌐 {hbold('IP:')} {hcode(result.ip or 'N/A')}\n"
            f"📊 {hbold('Статус:')} {result.status_label}\n"
            f"⏱ {hbold('Задержка:')} {result.latency_ms:.1f}ms\n"
            f"📝 {hbold('Детали:')} {result.detail or '—'}"
        )
       
        await status_msg.edit_text(response, parse_mode="HTML")
       
    except Exception as e:
        await status_msg.edit_text(f"❗ Ошибка при проверке: {str(e)}")

Получив результат анализа мы начинаем формировать красивый ответ. Телеграм имеет встроенные теги для шрифтов (hbold — жирный, hcode — моно), воспользуемся ими.

Далее мы вместо того, чтобы отправить новое сообщение, изменим уже отправленное нами сообщение через функцию status_msg.edit_text. Первым параметром передадим наш сформированый ответ, вторым — метод парсинга (чтобы он понял, что текст имеет теги и их надо правильно отобразить, то есть перевести в HTML теги, а не просто текстом)

В случае ошибки уведомляем пользователя.

Зачем нужна функция main?

Как обычно в асинхронном коде мы создаём цикл событий и вызываем в нём функцию main при помощи asyncio.run(main()).

Функция main() включает логи с уровнем INFO. Метод dp.start_polling(bot) запускает режим опроса, объект bot постоянно опрашивает сервера telegram на наличие новых сообщений. Если они есть, то Dispatcher отправляет их в свои обработчики (check_url_handler или cmd_start у меня).

async def main():
    logging.basicConfig(level=logging.INFO)
    await dp.start_polling(bot)

if __name__ == "__main__":
    asyncio.run(main())


💬На этом хочу закончить. Обязательно скажу вам, когда снова можно будет пользоваться ботом.

В следующий раз думаю подробнее рассказать про то, как я добавил схожего бота в свой мессенджер, а позже про асинхронность, многопоточность или же про декораторы.
[2026-04-05 23:04]
#PXI_NET

🙂Всем привет! Надеюсь вы помните про программное средство анализа доступности веб-сайтов. Я обещал, что сделаю телеграмм бота, чтобы вы могли тоже проверять интересующие вас сайты. Особенно это актуально сейчас. Пару дней подписчики, которые успели, смогли поучаствовать в тестировании бота и остались довольны (за это я вас благодарю). Сегодня я расскажу про то, как я его сделал.

⚡️Как делаются ТГ-Боты?

Начнём с того, что есть достаточно много способов создания своего тг бота, но почти все они так или иначе взаимодействуют с botfather (бот, созданный telegram специально для создания других ботов). Я использовал асинхронную библиотеку aiogram, которая предоставляет API для работы с telegram.

Сначала мы должны создать объект класса Bot, этот объект ничего сам не делает, он лишь предоставляет нам доступ к API telegram. Далее идём в botfather и создаём нового бота, даём ему имя и получаем свой токен. Этот токен передаём в качестве параметра для конструктора при создании объекта класса Bot.

Далее интересный момент, телеграмм сейчас у меня тормозят, а для общения с его серверами нужен прокси или впн. Но если просто включить впн на компьютере, где запущено программное средство, то и проверка будет уже не для меня, а как будто я нахожусь в том месте, где сервер VPN. Поэтому я использовал свой телефон и VPN на нём в качестве прокси сервера именно для обращения к серверам телеграмма. Это задаётся параметром session при создании объекта класса Bot.

Как я сказал сам объект Bot лишь предоставляет доступ к API, то нам нужен тот, кто умеет что-то делать и знает, что делать на конкретный запрос от пользователя. Это объект класса Dispatcher.

import asyncio
import logging
from aiogram import Bot, Dispatcher, types
from aiogram.filters import Command
from aiogram.client.session.aiohttp import AiohttpSession
from aiogram.utils.markdown import hbold, hcode
from app.checker import analyze_availability
import dotenv
import os
import httpx

dotenv.load_dotenv()

TOKEN=os.getenv("BOT_TOKEN")

proxy_url = os.getenv("PROXY_URL", "http://10.166.129.106:8080" )

session = AiohttpSession(proxy=proxy_url)
bot = Bot(token=TOKEN, session=session)
#bot = Bot(token=TOKEN)
dp = Dispatcher()

@dp.message(Command("start"))
async def cmd_start(message: types.Message):
    await message.answer("Пришли мне URL (например, https://google.com), и я проверю его доступность.")

Теперь перейдём к первой функции с декоратором @db.message(Command("start")).
Эта функция срабатывает, когда пользователь отправляет /start боту. Объект types.Message автоматически отправляет бот нам, это объект сообщения пользователя в данном случае, но может быть и нашего. В нём есть информация о имени пользователя, тексте сообщения, с какого устройства отправлено и тд. Мы же в ответ на это вызываем метод answer, который отправляет сообщение пользователю, а также возвращает объект уже нашего сообщения.

💬На этом пока что всё, завтра расскажу про то, как реализовал непосредственно функцию анализа доступности веб-сайтов.
[2026-04-05 08:00]
#PXI_AI

🙂Приветствую вновь. Продолжаю прошлый пост, сейчас я наконец-то покажу пакетный градиентный спуск и дропаут на деле.

🛡Обучение и тестирование

Тут мне было тяжело если честно, поэтому постараюсь донести всё максимально подробно.

Начинаем проходиться по всему обучающему набору изображений пакетами по 100 штук. Их подаём на вход первого слоя, то есть подали матрицу 100 на 784.

Первый слой считает взвешенную сумму сразу для всех примеров, применяет tanh и записывает в матрицу 100 на 100, где строки это примеры, а столбцы — вывод нейрона для этого примера.

Далее применяем дропаут, отключая случайную половину нейронов, не забывая усилить сигнал (сохранить математическое ожидание). И передаём матрицу 100 на 100 на вход втрого слоя, применяем функцию softmax и получаем матрицу 100 на 10. Где строки это примеры, а в столбцах будет стоять число и чем оно больше тем вероятнее, что именно эта цифра на картинке (то есть если в строке 3 и столбце 6 стоит число близкое к 1, то на 4 картинке нарисована 7).

После этого мы считаем количество правильно отгаданных изображений, сравнивая в каком индексе у нас стоит наибольшее число. Тут важное отличие от прошлых наших сетей, я не прописываю в коде расчёт ошибки, потому что здесь уже не простая среднеквадратическая ошибка (SSE), а я использую перекрёстную энтропию. Она нам облегчит производную для функции softmax, производная перекрёстной энтропии по входам функции softmax это просто истина - предсказание, как в SSE вообще без функций активации на выходном слое.

#прямое распространение 

for j in range(iteration):
    correct_cnt = 0
    for i in range(int(len(images)/batch_size)):
        start, end = ((i*batch_size),((i+1)*batch_size))
        layer_0 = images[start:end]
        layer_1 = tanh(np.dot(layer_0,weight_1))
        dropout = np.random.randint(2,size=layer_1.shape)
        layer_1 *= dropout*2
        layer_2 = softmax(np.dot(layer_1,weight_2))

        for k in range(batch_size):
            correct_cnt += int(np.argmax(layer_2[k:k+1]) == np.argmax(labels[start+k:start+k+1]))

#обратное распространение

        layer_2_delta = (labels[start:end] - layer_2)/batch_size
        layer_1_delta = np.dot(layer_2_delta,weight_2.T)*tanh2deriv(layer_1)
        layer_1_delta *= dropout
        weight_2 += alpha*layer_1.T.dot(layer_2_delta)
        weight_1 += alpha*layer_0.T.dot(layer_1_delta)

    test_correct_cnt = 0
    for i in range(len(x_test)):
        layer_0 = x_test[i:i+1]
        layer_1 = tanh(layer_0.dot(weight_1))
        layer_2 = softmax(np.dot(layer_1,weight_2))
        test_correct_cnt += int(np.argmax(layer_2) == np.argmax(test_labels[i:i+1]))

    if(j %10 == 0):
        print(f"iteration: {j} Test_acc: {test_correct_cnt/float(len(x_test))} Train_acc: {correct_cnt/float(len(images))}")

Теперь переходим к ещё более сложному этапу. Обратное распространение начинается у нас с расчёта дельты для выходного слоя. Мы получаем вектор дельт, полученных для каждого примера и делим их значение на количество примеров в пакете (то есть в итоге у нас матрица 100 на 10, где строки это примеры, а столбцы это дельты для соответствующего нейрона). Эту дельту мы получили очень просто благодаря тому, что производная перекрёстной энтропии к взвешенной суммы подаваемой на вход softmax — это просто истина - результат.

Тут странным может показаться зачем нам целых 100 значений дельт (по одной дельте от каждого примера на каждый нейрон) для одного нейрона, но мы их поделили на количество примеров, и потом при градиентном спуске всё равно все перемножаются и складываются, то есть усреднились в одну для каждого веса.

Далее считаем дельту для первого слоя, это есть взвешенная сумма дельт следующего слоя, умноженная на производную функции активации от выхода этого слоя. Не забываем обнулить дельты для тех нейронов, которые были отключены при дропауте.

Теперь вычисляем новые веса. Как обычно умножаем дельту слоя на входные значения для этого слоя и на альфу.

В конце проводим тестирование нашей модели после каждой итерации (прохода по всему набору изображений).
[2026-04-03 08:05]
#PXI_AI

🙂Всем привет! Сегодня я покажу вам на практике методы регуляризации (пакетный градиентный спуск и дропаут). Показывать буду на примере нейронный сети для распознавания изображения чисел (датасет mnist) с двумя слоями, скрытый слой состоит из 100 нейронов с функцией активации tanh, а выходной слой из 10 нейронов с функцией активации softmax.

⚡️Начнём с обучающего и тестового набора данных

Первым делом мы для одинаковой инициализации весов при каждом запуске вызываем np.random.seed(1).

В качестве набора данных я использую изображения из датасета mnist. Метод mnist.load_data() возвращает кортеж с двумя двухэлеметными кортежами. В x_train и x_test изображения представлены в виде трёхмерного тензора. Обучающий набор состоит из 60000 изображений по 28 пикселей на строку и столбец. А тестовый тоже самое но 10000.

Поэтому для удобства мы меняем размерность на обычную матрицу или двухмерный тензор, где будет 1000 строк и 28*28 столбцов.

Также создаём две матрицы 1000 на 10, где в нужном месте будет стоять 1 (то есть если в 5 строке и в 6 столбце стоит 1, то 6 картинка содержит цифру 7).

import numpy as np 
from keras.datasets import mnist
np.random.seed(1)

(x_train,y_train),(x_test,y_test) = mnist.load_data()
images, labels = (x_train[0:1000].reshape(1000,28*28)/255, y_train[0:1000])
one_hot_labels = np.zeros((len(labels),10))
for i,j in enumerate(labels):
    one_hot_labels[i][j] = 1
labels = one_hot_labels
x_test = x_test[0:1000].reshape(1000,28*28)/255
test_labels = np.zeros((len(y_test),10))
for i,j in enumerate(y_test):
    test_labels[i][j] = 1


📊Реализуем функции активации

def tanh(x): return np.tanh(x)

def tanh2deriv(x):
    return 1 - x**2

def softmax(x):
    temp = np.exp(x)
    return temp/np.sum(temp, axis=1, keepdims=True)

Тут всё очевидно, просто реализуем известные функции и их производные, производная softmax нам не нужна, так как она сократится потом. Об этом ниже.

🥸Зададим начальные значения

Здесь мы зададим начальные гиперпараметры для нашей сети.

alpha, iteration, neirons = (0.07,300,100)
batch_size = 100
image_size, labels_num = 784, 10

weight_1 = 0.02*np.random.random((image_size, neirons)) - 0.01
weight_2 = 0.2*np.random.random((neirons,labels_num)) - 0.1

Можете попробовать проиграться с количеством итераций и альфа-коэффициентом, но не ставьте его слишком большим, иначе модель не сможет обучиться.

💬К сожалению, пост ограничен в размере, поэтому самый сок про обучение и тестирование покажу в следующем посте.
[2026-04-02 11:12]
@URL_checker_PXI_bot

🙂Всем привет! Каждый желающий может сейчас опробовать моё программное средство проверки доступности веб-сайтов.

🥲Уже закрыто, скажу, когда сможете в следующий раз. Также думаю добавить возможность эту в свой мессенджер (уже добавил👀).
[2026-04-01 22:58]
#PXI_AI

🙂Всем привет! Возможно вы сталкивались с проблемой переобучения, когда по какой-то причине ваша модель хорошо обучалась на обучающем наборе, но на тестовом не справляется. Сегодня я расскажу про методы регуляризации, которые помогают избежать этой проблемы.

⚡️Почему возникает переобучение?

Начнём с повторения, что такое переобучение и когда встречается.
⌛Переобучение встречается, когда используется слишком малый индуктивный сдвиг (предложение о функции, то есть даём большую свободу в выборе функции нашему алгоритму МО или нейронной сети , чтобы максимально соответствовать набору данных и больше акцентирует внимание на наборе данных). Из-за переобучения модель слишком сильно акцентирует на данный набор данных, может захватывать шумы данных, ошибки в данных, а также получиться слишком сложной.

Что такое регуляризация?

Для того, чтобы препятствовать переобучению моделей существует ряд методов регуляризации.

Регуляризация — это подмножество методов, способствующих обобщению изучаемых моделей, часто за счёт препятствования изучению мелких деталей.
Самые простые из них: ранняя остановка, прореживание (дропаут) и пакетный градиентный спуск.

⚠️Ранняя остановка

Этот метод предполагает остановку обучения сети, когда начнёт падать её точность на тестовом наборе данных. Благодаря этому модель просто не успеет захватить лишние шумы из обучающего набора данных. В некоторых случаях использование контрольного тестового набора данных, чтобы узнать, когда остановится, может привести к переобучению на контрольных данных.

📊Прореживание (Дропаут)

Начну с того, что мелкие сети не подвержены настолько сильно переобучению. Причина в том, что они не могут реализовать сложную функцию, которая может зацепить много деталей и шумов.

Это можно использовать, что и сделано в этом методе. Мы будем отключать случайную часть нейронов, тем самым не давая ей захватить шумы.

Важно, что раз мы отключили эти нейроны при прямом проходе, то и при обратном распространении надо их отключить и не обновлять их веса.

Если мы просто так выключим часть нейронов во время обучения, веса сети подстроятся под эту уменьшенную сумму. Но когда мы включим все нейроны для предсказания на реальных данных, сумма на входе в следующий слой резко вырастет, и сеть выдаст неверный результат.

Суть этого процесса заключается в сохранении математического ожидания (среднего значения) взвешенных сумм между обучением и тестированием.

❗️Пакетный градиентный спуск

Пакетный градиентный спуск корректирует веса не после каждого примера (стохастический) или всего набора данных (полный), а после просмотра пакета с указанным числом примеров (обычно от 8 до 256).

Этот метод основан на усреднении корректирующих воздействий на весовые коэффициенты в процессе обучения.

💬В заключении, эти методы не единственные, существуют и другие. Также на практике методы могут объединять для лучшего результата (пакетный градиентный спуск и дропаут).

В следующий раз покажу это на практике. Пишите свои вопросы в комментариях.
[2026-03-30 08:00]
#PXI_NET

🙂И вновь я приветствую всех! Сегодня продолжу про программное средство анализа доступности веб-сайтов, покажу как вызываю все те функции и как я использую для вывода данных библиотеку Rich

⚡️Подготовительный этап

Начнём с того, что создадим .env файл, в который запишем следующее:
TIMEOUT=5.0
SITES_FILE=app/sites.txt
В sites.txt напишем URL или доменное имя интересующих нас сайтов.

Я использую библиотеку dotenv и env-файл, чтобы не пришлось пересобирать докер или лезть в исходный код ради того, чтоб поменять значения переменных. Библиотека dotenv сама откроет файл .env и добавит переменные окружения в среду окружения. Это делается командой load_dotenv().

Также создаём объект консоли из библиотеки rich, чтобы выводить данные красивым образом.

import asyncio
import os
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table

from app.checker import analyze_availability

load_dotenv()
console = Console()

async def main():
    user_file = console.input("[bold blue]Введите путь до файла или нажмите Enter, тогда будет использоваться базовый файл \n[/bold blue]")
    sites_path = user_file if user_file.strip() != "" else os.getenv("SITES_FILE","/code/app/sites.txt")
    timeout = float(os.getenv("TIMEOUT",10.0))

    if not os.path.exists(sites_path):
        console.print(f"[red]Файл не найден: {sites_path}[/red]")
        return

    with open(sites_path, "r", encoding="utf8") as f:
        urls = [line.strip() for line in f if line.strip() and not line.startswith("#")]

    console.print(f"[bold blue]Анализ доступности ресурсов ({len(urls)} шт.)...[/bold blue]")

Тут мы сначала просим ввести путь к файлу, если его нет используем свой стандартный файл sites.txt. Далее пытаемся получить переменные окружения из файла, если их нет, то используем параметры по умолчанию.

Открываем файл и считываем все URLs и домены сайтов, заносим их в список.

📎Вызываем наши функции и рисуем таблицу в консоли

Напомню, что асинхронные функции вызываются только через await, если написать без, то мы получим корутину. То есть асинхронная функция будет ждать того, что мы её вызовем с уже переданными параметрами.

    tasks = [analyze_availability(url, timeout=timeout) for url in urls]
    results = await asyncio.gather(*tasks)

    table = Table(show_header=True, header_style="bold white")
    table.add_column("URL", width=30)
    table.add_column("IP", width=20)
    table.add_column("Status", width=30)
    table.add_column("Code", width=15, justify="center")
    table.add_column("Задержка", width=15, justify="center")

    for res in results:
        color = "green" if res.status_label == "Доступен" else "red"
        if res.status_label == "Нас заблокировали": color = "yellow"
        if res.status_label == "Домен не найден/нет сети": color = "blue"
        if res.status_label == "Нет ответа на HTTP запрос": color = "white"
        if res.status_label == "Проблема с SSL/SSL сертификатом": color = "white"
        table.add_row(
            res.url,
            res.ip or "N/A",
            f"[{color}]{res.status_label}[/{color}]",
            str(res.status_code) if res.status_code else "—",
            f"{res.latency_ms:.1f}ms" if res.latency_ms > 0 else "—"
        )

    console.print(table)

if name == "main":
    asyncio.run(main())

Сначала мы заносим в список объекты-корутины. А вызываем их все разом с помощью gather. Далее создаём нашу таблицу, для отображения в консоли и заполняем её нашим объектом CheckerResult.

Для вызова и создания цикла событий используется asyncio.run(main()).

💬На этом всё! В следующий раз думаю реализовать ТГ бота, чтобы вы могли пользоваться программным средством для проверки интересующих вас сайтов тоже. Конечно, если мой ПК включен будет, а то покупать сервер не очень хочется на данный момент
[2026-03-28 08:03]
#PXI_NET

🙂Всем привет! Продолжаю тему прошлого поста. Сегодня покажу как я реализовал проверку по http, tcp-traceroute и как всё это объединить в одну функцию

⚡️Реализуем отслеживание маршрута нашего IP-пакета

Здесь воспользуемся библиотекой scapy для создания RAW-пакетов (сырых, нужен root) с полезной нагрузкой типа TCP. К сожалению, библиотека scapy строго синхронная, поэтому создадим сами пул потоков (ThreadPoolExecutor) и будем запускать через run_in_executor в отдельном потоке

from scapy.all import IP, TCP, sr1
from concurrent.futures import ThreadPoolExecutor

traceroute_executor = ThreadPoolExecutor(max_workers=10)

def _sync_traceroute_logic(target_host, port, max_hops=15):
    path = []
    for ttl in range(1, max_hops + 1):
        pkt = IP(dst=target_host, ttl=ttl) / TCP(dport=port, flags="S")
        reply = sr1(pkt, verbose=0, timeout=1.0)
       
        if reply is None:
            path.append(None)
        else:
            path.append(reply.src)
            # 0х12 SYN-ACK
            if reply.haslayer(TCP) and reply.getlayer(TCP).flags == 0x12:
                break
    return path

async def run_diagnostics(host: str, port: int):
    loop = asyncio.get_running_loop()

    ips = await loop.run_in_executor(traceroute_executor, _sync_traceroute_logic, host, port)
   
    formatted = []
    for i, ip in enumerate(ips, 1):
        if ip is None:
            pass
        else:
            formatted.append(f"{i}. {ip}")
    return " -> ".join(formatted)

Здесь мы реализуем синхронную функцию для трассировки маршрута пакета с заданным TTL. Отправляем пакет, sr1 ждёт ровно 1 ответ, и смотрим что там. Отдельно запускаем эту функцию в новом потоке

🖥Соберём воедино и добавим http

async def analyze_availability(url: str, timeout: float = 5.0) -> CheckResult:
    if url.startswith("https://"):
        use_ssl = True
        port = 443
    elif url.startswith("http://"):
        use_ssl = False
        port = 80
    else:
        url = f"https://{url}"
        use_ssl = True
        port = 443

    host = url.replace("https://", "").replace("http://", "").split("/")[0]
   
    is_up, ip, lat, err = await check_socket(host, port, use_ssl, timeout=timeout)
   
    if not is_up:
        if err == "Домен не найден/нет сети":
            return CheckResult(url=url, ip=ip, status_label="Домен не найден/нет сети", detail=err or "Проблема с сокетом")
        if err in ["SSL сертификат недействителен","Проблема с SSL"]:
            return CheckResult(url=url, ip=ip, status_label="Проблема с SSL/SSL сертификатом", detail=err or "Проблема с сокетом")   

        trace = await run_diagnostics(host,port) if ip != "127.0.0.1" else "---"
        return CheckResult(url=url, ip=ip, status_label=f"Заблокирован/не работает Trace:{trace}", detail=err or "Проблема с сокетом")

    async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, verify=False) as client:
        try:
            start = perf_counter()
            headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
            response = await client.get(url, headers=headers)
           
            total_lat = (perf_counter() - start) * 1000 + lat           
            label = "Доступен"
            if response.status_code in [403, 451]:
                label = "Нас заблокировали"
            elif response.status_code >= 500:
                label = "Ошибка на сервере"
           
            return CheckResult(
                url=url, ip=ip, port_open=True,
                status_code=response.status_code,
                latency_ms=total_lat,
                status_label=label,
                detail=f"HTTP {response.status_code}"
            )
        except Exception as e:
            return CheckResult(
                url=url, ip=ip, port_open=True,
                status_label="Нет ответа на HTTP запрос", detail=f"Request failed: {str(e)}"
            )

Тут вначале подбираем порт, проверяем запись DNS, доступность по сокету. И реализуем HTTP проверку отправив GET

💬В следующий раз покажу как запускать
[2026-03-26 23:33]
#PXI_NET

🙂Всем привет! Сегодня будет весьма специфичная тема, которая затронет работу с сокетами, IP-пакетами и DNS. Если коротко, то я начал разрабатывать программное средство анализа сетевой доступности веб-сайтов.

⚡️Начнем с модели

Создадим pydantic-модель для хранения нужных нам данных.

from pydantic import Field, BaseModel

class CheckResult(BaseModel):
    url: str
    ip: str | None = None
    port_open: bool = False
    status_code: int | None = None
    latency_ms: float = 0.0
    status_label: str
    detail: str


🖥Реализуем вначале проверку записи в DNS

import socket
import asyncio
import httpx
from time import perf_counter
from app.models import CheckResult
import ssl

async def resolve_host(host: str, timeout: float = 2.0):
    """резолвинг домена"""
    loop = asyncio.get_running_loop()
    try:
        info = await asyncio.wait_for(
            loop.getaddrinfo(host, None, family=socket.AF_INET),
            timeout=timeout
        )
        return info[0][4][0]
    except (asyncio.TimeoutError, socket.gaierror):
        return None

Тут я создал объект loop текущего цикла событий и использовал низкоуровневую функцию этого объекта, породив при этом новый поток в цикле. Метод wait_for выполняет функцию и завершает её автоматически при истечении времени. Метод getaddrinfo возвращает сложную структуру, в которой также находится IPv4, который мы и вернём. В socket есть свой метод для этого (getbyhostname), но он не асинхронный.

Реализуем функцию проверки через сокеты

Тут мы пытаемся (вспоминаем пост про TCP) сделать TCP рукопожатие и проверить SSL (если к 443 порту, а не 80), в случае успеха вернём IPv4 и задержку.

async def check_socket(host: str, port: int = 443, use_ssl: bool = True,timeout: float = 3.0):
    """Проверка доступности порта через сокеты"""
    start = perf_counter()
    ip = await resolve_host(host,2.0)
    if not ip:
        return False, None, 0, "Домен не найден/нет сети"
    ssl_context = ssl.create_default_context() if use_ssl else None
    try:
        conn = asyncio.open_connection(ip, port, ssl=ssl_context, server_hostname=host if use_ssl else None) #обькт корутина, короче обещаю что запущу потом
        reader, writer = await asyncio.wait_for(conn, timeout=timeout)
        writer.close()
        await writer.wait_closed()
       
        latency = (perf_counter() - start) * 1000
        return True, ip, latency, None
    except (ssl.SSLCertVerificationError, ssl.CertificateError) as e:
        return False, ip, 0, f"SSL сертификат недействителен"
    except ssl.SSLError as e:
        return False, ip, 0, f"Проблема с SSL"
    except Exception:
        return False, ip, 0, "Порт закрыт или таймаут"

Сначала вызываем нашу функцию поиска айпи по домену. Далее создаём стандартный контекст SSL, для установки правил проверки протокола при подключении по сокету. Используем метод acyncio, вместо методов socket, так как open_connection высокоуровневый и асинхронный сразу (то есть не надо самому создавать объект сокета, коннектиться и тд). Имя сервера передаем для проверки на чьё имя сертификат SSL. Wait_close используем для того, чтобы произошёл мягкий разрыв (тоже вспоминаем пост про TCP).

💭На этом пока что закончу и про остальное расскажу в следующем посте. Удачи!
[2026-03-22 12:15]
#PXI_AI

🙂Всем привет! В одном из прошлых постов я рассказывал про обратное распространение и обещал показать его на практике. Сегодня мы с вами создадим простую двухслойную нейронную сеть (3 нейрона во входном, 3 в скрытом и 1 в выходном слое), обновлять веса будем с помощью стохастического градиентного спуска.

⚡️Начнём с задания весов и входных данных

Здесь мы продолжаем задачу со светофором. Нужно определить можно ли идти или нет. Теперь скрытый слой у нас будет иметь функцию активации ReLU. Для обратного распространения сразу реализуем функцию производной ReLU от взвешенной суммы.
import numpy as np

data = np.array([
    [1,0,1],
    [1,1,1],
    [0,0,1],
    [0,1,1]
])

def relu(x):
    return (x > 0) * x

def relu2deriv(output):
    return output > 0

alpha = 0.2
size = 3 #количество нейронов в нашем скрытом слое
real_pred = np.array([1,1,0,0])
weight_1 = 2 * np.random.random((3, size)) - 1
weight_2 = 2 * np.random.random((size)) - 1

Важно, что random.random заполняет матрицу значениями от 0 до 1, я же увеличил этот диапазон до 2, а затем центрировал. В итоге матрицу заполнило числами в диапазоне [-1,+1].

🔖Далее реализуем обратное распространение и градиентный спуск

for i in range(60):
    layer_2_sse = 0
    for z in range(len(data)):
        layer_0 = data[z]
        layer_1 = relu(np.dot(layer_0, weight_1))
        layer_2 = np.dot(layer_1, weight_2)
        layer_2_error = layer_2 - real_pred[z]
        layer_2_sse += 0.5*(layer_2_error ** 2)
        layer_2_delta = layer_2_error
        layer_1_delta = layer_2_delta * weight_2 * relu2deriv(layer_1)
        weight_2 -= alpha * layer_1 * layer_2_delta
        weight_1 -= alpha * np.outer(layer_0, layer_1_delta)
    if i % 9 == 0:
        print(f"Iteration: {i} SSE: {layer_2_sse}")

Тут всё достаточно очевидно, используются формулы, которые я объяснял в посте про обратное распространение. Обратите внимание только на знак, тут используется минус, так как ошибка = предсказанное значение - реальное значение. А в посте было наоборот.

Протестируем нашу модель на новом примере

print("Тестируем на новом наборе [1,0,0], где модель должна выдать 1")
ans = relu(np.array([1,0,0]).dot(weight_1)).dot(weight_2)
print(f"Модель дала ответ:  {1 if ans > 0.7 else 0}")

Из скриншота видно, что модель справилась.

💬На этом всё! Пишите, что вам ещё будет интересно разобрать.
[2026-03-19 23:38]
#PXI_LEARN

🙂Всем привет! Сегодня я хочу рассказать про идентификацию, аутентификацию и авторизацию.

⚡️Определения

Идентификация — процедура, в результате выполнения которой для субъекта идентификации выявляется его идентификатор, однозначно определяющий этого субъекта в информационной системе (например, ваш логин при входе в мой мессенджер, я смотрю есть ли вообще такой пользователь).

Аутентификация — процедура проверки подлинности, например проверка подлинности пользователя путем сравнения введенного им пароля с паролем, сохраненным в базе данных.

Авторизация — предоставление определенному лицу или группе лиц прав на выполнение определенных действий, например я показываю нужные чаты и сообщения.
Авторизация может быть единственным этапом, а вот остальные без других бессмысленны (зачем нам просить пароль, если мы не знаем от какого аккаунта он и тд).

‼️Разновидности аутентификации

В зависимости от того, что пользователь предоставляет для проверки, выделяют три основных фактора:
1️⃣Это то, что знает субъект (Knowledge Factor).
Сюда относятся пароли, графические ключи, поверочные вопросы и тд.

2️⃣То, что есть у субъекта (Possession Factor).
Это разные токены, смс-коды, приложения-аутентификаторы.

3️⃣То, кем субъект является (Inherence Factor).
Например, биометрия.

Обычно аутентификацию делят на три уровня по надёжности:
1️⃣Однофакторная(SFA) — это метод проверки личности пользователя при доступе к системе, требующий только одного типа учетных данных, чаще всего пароля.

2️⃣Двухфакторная (2FA) и многофакторная (MFA) — методы защиты, требующие нескольких доказательств для проверки личности.

3️⃣Биометрическая — это метод проверки личности пользователя на основе его уникальных биологических характеристик (отпечатки, лицо, голос, радужка глаза) или поведенческих черт.

Односторонняя и двухсторонняя аутентификация:
1️⃣Односторонняя (One-way) —
метод аутентификации, при котором только клиент доказывает серверу свою личность (мы логинимся в мессенджере). Мы доверяем серверу, что он тот, за кого себя выдает (на базе SSL-сертификата).

2️⃣Двусторонняя (Mutual Auth / mTLS) — метод аутентификации объектов и ресурсов доступа, обеспечивающий взаимную проверку принадлежности предъявленных объектом (ресурсом) доступа идентификаторов при их взаимодействии.
Обычно это делается через обмен цифровыми сертификатами.
Используется в банковских шлюзах, межсерверном взаимодействии (M2M) и критической инфраструктуре, чтобы исключить атаку «человек посередине» (MitM).
💬На этом этот простой пост закончился, в следующий раз расскажу про алгоритмы аутентификации.
[2026-03-17 22:50]
#PXI_WEB

🙂Всем привет, я всё ещё продолжаю работать над мессенджером. Сегодня я хочу рассказать про то, как я и мои бета-тестеры пропустили дыру в безопасности, а также о других модификациях кода.

⚡️Что такое APIRouter?

APIRouter — это инструмент в веб-фреймворке FastAPI, который позволяет разбивать большое приложение на отдельные части.
С самого начала мы делали монолитное приложение, вся логика в одном файле. Но в больших проектах это неудобно и здесь нам поможет APIRouter. Мы создадим директорию routers, в ней отдельные логические роутеры и подключим их в main.py.

Как создаём роутеры:

from fastapi import APIRouter

router = APIRouter(prefix="",tags=["Работа с чатами и пользователями"])

@router.get("/chat")
async def chat(request:Request,database: SessionDep, username: str = Depends(auth.get_current_user)):
    Какой-то код

@router.get("/messages/{chat_with}")
async def get_messages(chat_with):

Здесь мы создаём роутер с префиксом, который будет вставляться в начало всех ручек, но у моих ручек разные пути поэтому пустой. И задаём тэг, чтобы потом выделялось в swagger.

Теперь подключение роутеров в main.py:

from routers import auth,chat,websocket

app = FastAPI()

app.include_router(auth.router)
app.include_router(websocket.router)
app.include_router(chat.router)

Вуаля, теперь у нас нет огромного файла, а разбито всё по полочкам и работает как раньше.

Как я вошёл в аккаунт другого пользователя без пароля?

Если вы забыли, то я делал аутентификацию пользователя и выдавал ему токен. Но я забыл его проверять. В итоге, я решил попробовать, а что если я в url добавлю вместо своего имени, имя другого пользователя... Я случайно прочитал диалоги другого человека, одного из бета-тестеров.

Как я это исправил:

async def get_current_user(request: Request):
    token = request.cookies.get("access_token")
    if not token:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Не авторизован")
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Невалидный токен")
        return username
    except jwt.JWTError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Ошибка авторизации")

Во-первых, написал функцию, которая проверяет токен из куки. Она в случае проблемы выдаст ошибку, иначе вернёт имя пользователя.

В ручке chat изменил следующее:

@router.get("/chat")
async def chat(request:Request,database: SessionDep, username: str = Depends(auth.get_current_user)):

То есть теперь имя берётся из токена всегда. И подставить просто так не выйдет. Надеюсь больше нет никаких проблем с безопасностью, хотя... В сообщении можно сейчас передавать HTML код и он выполнится, это надо тоже исправить🙂

⚠️Мелкие обновления

Теперь использую pydantic-модели

from pydantic import BaseModel, Field

class UserAuth(BaseModel):
    username: str = Field(...)
    password: str = Field(...)
    is_login: bool

Они обеспечивают валидацию входящих данных. То есть автоматически проверяет тип данных и выбрасывает ошибку.

Обновил ручку чата, чтобы получать вверху самые новые

router.get("/chat")
async def chat(request:Request,database: SessionDep, username: str = Depends(auth.get_current_user)):
    query = select(
        case(
            (models.Messages.sender_name == username, models.Messages.receiver_name),
            else_=models.Messages.sender_name
        ).label("interlocutor"),
        func.max(models.Messages.timestamp).label("last_msg_time")
    ).where(or_(
        models.Messages.sender_name == username,
        models.Messages.receiver_name == username
    )).group_by("interlocutor").order_by(text("last_msg_time DESC"))

    result = await database.execute(query)
    chats = [row[0] for row in result.all()]

    return templates.TemplateResponse("chat.html",{"request":request,"name":username, "chats":chats})


💬На этом всё
[2026-03-15 21:19]
#PXI_AI

🙂Всем привет! Я уже успел вам рассказать и показать на практике градиентный спуск, но он позволяет обучить только выходной слой сети. Для решения этой проблемы используют обратное распространение. Сегодня будут теоретические аспекты, а в следующий раз покажу на деле.

☄️В чем суть обратного распространения?

🪙При прямом проходе (от входа до получения результата) мы записываем взвешенную сумму вводов и выход каждого нейрона.
🪙Мы это делаем для того, чтобы смогли просчитать градиент ошибки для нейронов (дельта, насколько сильно меняется отклонение сети относительно взвешенной суммы нейрона) всех слоёв при обратном проходе (от выходного ко входному слою). Главная проблема тут в том, что градиент ошибок уменьшается с каждым слоем.
🪙На основе значения градиента нейронов и алгоритма обновления весов (пусть градиентный спуск) считаем градиенты ошибок для весов сети, затем обновляем веса.
Теперь рассмотрим подробнее.

Передача дельт в обратном направлении

Напомню, что дельта у нас — это скорость изменения отклонения сети относительно взвешенной суммы какого-то (пусть далее будет k-того) нейрона:
дельта = dE/dz = dE/da × da/dz, где здесь и далее E — отклонение сети, а — вывод нейрона, z — взвешенная сумма вводов нейрона

Для выходного слоя справедливо:
dE/da = t - a, где t — ожидаемый вывод

Для скрытого слоя нельзя привязать вывод к отклонению непосредственно, эту связь можно вычислить как взвешенную сумму значений дельта тех нейронов, которые сразу за ним:
dE/da = 🤝w × дельта, где сумма по всем нейронам следующего слоя.

da/dz вычисляется проще, но она включает в себя производную функции активации, поэтому и используют функции различные, одни проще дифференцируются, а другие не так сильно уменьшают нашу дельту.

📊Обновление весов

Основной принцип состоит в том, что вес обновляется прямо пропорционально чувствительности отклонения сети относительно этого веса.
Возьмём вес между нейроном j и k, частная производная отклонения сети будет:
dE/dw = dz/dw × dE/dz

Понятнее будет объяснять на рисунке выше. Начинаем с вывода всей сети, вычисляем дельту для нейрона l по формуле выше, далее умножаем на производную dz/dw (она всегда константа, так как входит только в одну сумму вес) и получаем градиент для этого веса. Далее считаем дельту уже для нейрона k (по формуле выше):
Дельта_k = (da_k/dz_k)×(w_k,l × дельта_l)

Умножая на dz_k/dw_j,k получаем градиент для нашего веса w_j,k.

Для градиентного спуска правило обновления будет таким:
w_j,k = w_j,k + (h × дельта_k × a_j)

💬На этом экскурс в теорию обратного распространения окончен. В следующий раз обязательно покажу это на практике. Не стесняйтесь задавать вопросы, всегда буду рад ответить.
[2026-03-12 15:45]
#PXI_WEB

🙂 Всем привет! Сегодня я покажу обещанный фронтенд моего мессенджера, связанный с отправкой сообщений, поиском других пользователей и чатом

☄️Обработка входящих сообщений и уведомления

        socket.onmessage = function(event) {
            const data = JSON.parse(event.data);
            let item = document.getElementById(chat-item-${data.sender});
            if (data.sender === chatTitle.innerText) {
            renderMessage(data.sender, data.text, 'incoming');
            } else {
                const items = document.querySelectorAll('.list-item');
                items.forEach(item => {
                    if (item.querySelector('.item-name').innerText === data.sender) {
                        item.classList.add('unread-notify');
                        const last = item.querySelector('.item-last');
                        last.innerText = 📩 ${data.text};
                        last.classList.add('unread-badge');
                        contactsList.prepend(item);
                    }
                });
            }
        };

При получении данных вызываем функцию отображения сообщения если мы в том чате, а иначе ищем контакт в списке чатов, добавляем подпись и заносим в класс непрочитанные. Также поднимаем этот чат наверх.

💬Отправка сообщений

        function sendMessage() {
            const text = messageInput.value.trim();
            const target = chatTitle.innerText;
            if (target === "Разработка") return alert("Выберите чат!");
            if (text && socket.readyState === WebSocket.OPEN) {
            socket.send(JSON.stringify({ receiver: target, text: text }));
                renderMessage(myName, text, 'outgoing');
                messageInput.value = '';
            }
        }
        document.getElementById('send-btn').onclick = sendMessage;
        messageInput.onkeypress = (e) => { if(e.key === 'Enter') sendMessage() };

Здесь мы убираем пробелы слева и справа в сообщении. Дальше проверяем с кем мы общаемся и посылаем сообщение серверу.

🖥Загрузка истории чата

async function selectChat(username, element) {
            if (element) {
                element.classList.remove('unread-notify');
                const lastMsg = element.querySelector('.item-last');
                if (lastMsg) lastMsg.classList.remove('unread-badge');
            }

            chatTitle.innerText = username;
            chatViewport.innerHTML = 'Загрузка...';
           
            document.querySelectorAll('.list-item').forEach(el => el.classList.remove('active'));
            if (element) element.classList.add('active');

            try {
                const response = await fetch(/messages/${username}?current_user=${myName});
                const history = await response.json();
                chatViewport.innerHTML = '';
                history.forEach(msg => renderMessage(msg.sender, msg.text, msg.type, msg.time));
            } catch (err) {
                chatViewport.innerHTML = 'Ошибка загрузки';
            }
        }

Здесь главное, что мы получаем сообщения от сервера и рендерим их.

📎Функция визуализации сообщения

function renderMessage(name, text, type, time = null) {
            const displayTime = time || new Date().toLocaleTimeString([], {hour: '2-digit', minute:'2-digit'});
            const msgHtml =
               

                   

                        ${type === 'incoming' ? ${name} : ''}
                       
${text}

                        ${displayTime}
                   

               
;
            chatViewport.insertAdjacentHTML('beforeend', msgHtml);
            chatViewport.scrollTop = chatViewport.scrollHeight;
        }

Создает HTML-структуру для сообщений. Показывает имя отправителя только для входящих сообщений и вставляет сообщение в конец окна чата, автоматически прокручивает к последнему сообщению
[2026-03-10 07:30]
#PXI_AI

🙂 Всем привет! В одном из прошлых постов я рассказывал про градиентный спуск и обещал показать его на практике. Сегодня мы с вами создадим один нейрон, который должен будет обучиться при помощи градиентного спуска определять можно ли нам идти или нет по загадочному светофору.

☄️ Начну с пояснения задачи

Представьте, что мы в другой стране, где светофор имеет 3 индикатора. Понаблюдав за ними мы вычленили 5 образцов и составили размеченный обучающий набор данных для нашей модели из 5, а 6 оставили на тестирование модели.

import numpy as np

input_data = np.array([
    [1,0,1],
    [0,1,1],
    [0,0,1],
    [1,1,1],
    [0,1,1],
])

goal_pred = np.array([0,1,0,1,1])
weight = np.array([0.1, 0.1, 0.1]) # лучше начинать не с полных нулей
alpha = 0.1

Здесь матрица input_data содержит индикаторы светофора (в одном строке один образец), а вектор goal_pred содержит соответствующие этим столбцам правильные решения.
Альфа-коэффициент выбираете на свой вкус.

🥸Создадим наш нейрон

def neiron(input_row, weight):
    return input_row.dot(weight)

Да, всё настолько просто! Если вы помните из прошлых постов, то каждый нейрон считает взвешенную сумму и применяет функцию активации, у нас пример простой и функцию активации можно убрать. Функция dot считает скалярное произведение.

Реализуем градиентный спуск

for i in range(40):
    total_error = 0
    for z in range(len(input_data)):
        pred = neiron(input_data[z], weight)
        error = pred - goal_pred[z]
        sse = error**2
        total_error += sse
       
        weight = weight - (alpha * (input_data[z] * error))
       
    if i % 10 == 0:
        print(f"Итерация номер {i}, Суммарная ошибка: {total_error}")

print("\nИтоговые веса:", weight)

Здесь мы используем стохастический градиентный спуск, а не полный как в давал в посте раньше. Разница лишь в том, что тут мы ошибку считаем и обновляем веса после каждого образца, а в полном после всех образцов из набора данных. Обратите внимание на знак (к весу прибавляем или от него вычитаем), он зависит от того, как считалась чистая ошибка error.

Как поведёт себя наша модель из одного нейрона на новом образце?

#У меня остался ещё один набор на котором модель не обучалась на [1, 0, 0] должна выдать 0
print("Проверка модели на наборе 1,0,0 где ответ верный 0")
pred = neiron(np.array([1,0,0]),weight)
a = 0 if pred < 0.01 else 1
print("Модель дала ответ: " + str(a) )

Как вы можете видеть на скриншоте выше, наша модель справилась на отлично!

💬На этом кончается этот небольшой, но надеюсь интересный пост. Буду рад прочитать ваши комментарии на этот счёт.
[2026-03-07 22:34]
#PXI_WEB

🙂Привет! Сегодня я хочу рассказать про то, как я добавил в мессенджер отправку сообщений, поиск других пользователей и много чего другого.

🏴‍☠️Отправка и получение сообщений при помощи сокетов

Для чего использовать именно сокеты? Чтобы пользователю не приходилось обновлять страницу (дергать ручку) ради нового сообщения.

class ConnectionManager:
    def init(self):
        self.active_connections: dict[str, WebSocket] = {}

    async def connect(self, username: str, websocket: WebSocket):
        await websocket.accept()
        self.active_connections[username] = websocket

    def disconnect(self, username: str):
        if username in self.active_connections:
            del self.active_connections[username]

    async def send_personal_message(self, message: dict, receiver: str):
        if receiver in self.active_connections:
            await self.active_connections[receiver].send_json(message)

manager = ConnectionManager()

Тут думаю класс говорит сам за себя, он нам помогает хранить открытые сокеты в словаре и отправлять сообщения через сокет.

@app.websocket("/ws/{username}")
async def websocket_endpoint(websocket: WebSocket, username: str):
    await manager.connect(username, websocket)
    try:
        while True:
            data = await websocket.receive_json()
            receiver = data.get("receiver")
            text = data.get("text")

            async with SESSIONLOCAL() as db:
                new_msg = models.Messages(
                    sender_name=username,
                    receiver_name=receiver,
                    content=text
                )
                db.add(new_msg)
                await db.commit()

            await manager.send_personal_message({
                "sender": username,
                "text": text,
                "time": datetime.now().strftime("%H:%M")
            }, receiver)
    except WebSocketDisconnect:
     manager.disconnect(username)

Это основная функция, которая слушает всё, что к ней приходит. Потом сохраняет в базу данных и по сокету передаёт нужному пользователю. Для каждого пользователя получаем новый сокет. В случае разрыва сокета вызываем функцию disconnect.

☄️Получение сообщений из базы данных для чата

@app.get("/messages/{chat_with}")
async def get_messages(chat_with: str, current_user: str, database: SessionDep):
    stmt = select(models.Messages).where(
        or_(
            and_(models.Messages.sender_name == current_user, models.Messages.receiver_name == chat_with),
            and_(models.Messages.sender_name == chat_with, models.Messages.receiver_name == current_user)
        )
    ).order_by(models.Messages.timestamp.asc())
    result = await database.execute(stmt)
    return [
        {
            "sender": m.sender_name,
            "text": m.content,
            "type": "outgoing" if m.sender_name == current_user else "incoming",
            "time": m.timestamp.strftime("%H:%M")
        } for m in result.scalars().all()
    ]

Эту функцию мы вызываем каждый раз, когда открываем чат с пользователем. Она просто делает выборку сообщений из базы данных и возвращает их.

📎Поиск других пользователей

@app.get("/search_users")
async def search_users(query: str, database: SessionDep):
    stmt = select(models.Users).where(models.Users.username.ilike(f"%{query}%"))
    result = await database.execute(stmt)
    return [{"username": u.username} for u in result.scalars().all()]

Для поиска других пользователей дергаем эту ручку. ilike(f"%{query}%") — это регистронезависимый поиск.
% — символы подстановки:

🪙%query% — ищет вхождение строки в любом месте имени
🪙query% — ищет имена, начинающиеся с query
🪙%query — ищет заканчивающиеся на query.

Она возвращает нам список словарей с именами.

🖥 С бэкендом понятно, что же на фронте?

А это к сожалению, не вмещается в этот пост, но обязательно расскажу про это в следующем! Хочу услышать ваши идеи насчёт мессенджера.

🌸Также совсем скоро 8️⃣ марта, я хочу поздравить всех девушек, которым по какой-либо причине зашёл мой канал.
[2026-03-05 23:23]
#PXI_WEB

🙂Всем привет! Продолжу про мессенджер, сегодня я реализовал работу с базой данных postgres при помощи orm и sql-алхимии.

☄️Начнем с создании модели, с помощью которой мы создадим таблицу и куда будем записывать данные из таблицы. Я реализовал на данный момент одну модель Users (и таблицу users).

from sqlalchemy import Column, Integer, Boolean, Text, DateTime, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Users(Base):
__tablename__ = "users"
username = Column(Text, primary_key=True)
password = Column(Text) #в конце понял, что лучше бы назвал hash_password

Здесь наследуем от базовой orm-модели и создаём нужную нам модель (таблицу).

🖥 Установление соединения, создание таблицы в БД

import os
from fastapi import Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import OperationalError
from typing import Annotated
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine

from models import Base

DB_URL = os.getenv("DATABASE_URL")

ENGINE = create_async_engine(DB_URL,echo=True)

SESSIONLOCAL = sessionmaker(
autocommit=False,
expire_on_commit=False,
autoflush=False,
bind=ENGINE, #к нашему движку
class_=AsyncSession)

async def init_db():
    async with ENGINE.begin() as conn:
        #await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

async def get_db() -> AsyncSession:
    async with SESSIONLOCAL() as database:
        yield database

SessionDep = Annotated[AsyncSession, Depends(get_db)

Здесь создаём ENGINE (соединение с базой данных), путь до бд берём из переменной окружения, я её заранее прокинул в docker compose.

Дальше создаём сессию без автокоммитов и тд.

Реализуем синхронные функции для инициализации базы данных и получения новой сессии к базе данных.

В конце создаю новый тип, чтобы меньше писать потом в сигнатурах функций.

Обновленный код аутентификации

from sqlalchemy.sql.annotation import Annotated
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import init_db, get_db, Session, SessionDep
import models

from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent
DATA_DIR = BASE_DIR.parent / "data"

app = FastAPI()

@app.on_event("startup")
async def on_startup():
    """Инициализация базы данных при запуске приложения"""
    await init_db()

templates = Jinja2Templates(directory="templates")

app.mount("/static", StaticFiles(directory="static"), name="static")

@app.post("/auth")
async def auth(
    response: Response,
    database: SessionDep,
    username: str = Form(...),
    password: str = Form(...),
    is_login: bool = Form(...)
):
    if not is_login:
        query = select(models.Users).where(models.Users.username == username)
        result = await database.execute(query)
        existing_user = result.scalar_one_or_none()
        if existing_user:
            return {"error": "пользователь уже существует"}
        user = models.Users(
            username=username,
            password=pwd_context.hash(password)
        )
        database.add(user)
        await database.commit()
        return {"status": "успешно зарегистрированы"}

    query = select(models.Users).where(models.Users.username == username)
    result = await database.execute(query)
    user = result.scalar_one_or_none()
   
    if not user or not pwd_context.verify(password, user.password):
        return {"error": "Неверный логин или пароль"}

    token = jwt.encode({"sub": username}, SECRET_KEY, algorithm=ALGORITHM)
response.set_cookie(key="access_token", value=token, httponly=True)
    return {"status": "ok"}

Тут сначала получаем путь до папки data, затем при помощи app.on_event, которая срабатывает при запуске инициализируем БД.

Далее обращаемся к БД, вместо словаря как было. Scalar — превращает чистую строку таблицы в объект (нашу модель).

Что дальше?

Вскоре реализую отправку и хранение сообщений. Скоро увидимся💬
[2026-03-02 19:12]
#PXI_WEB

🙂И вновь всем привет! Продолжаю делать мессенджер, сегодня хочу рассказать про то, как я добавил аутентификацию и сделал шаблон страницы для чатов и каналов.

☄️Аутентификация

Реализовал простую аутентификацию с использованием JWT-токенов и библиотек passlib, jose, bcrypt. Сначала мы придумаем приватный ключ для алгоритма HS256, по сути это SHA256 (симметричный алгоритм шифрования). И создадим объект класса CryptContext, а в качестве схемы или алгоритма шифрования используем bcrypt:
from fastapi.staticfiles import StaticFiles
from passlib.context import CryptContext
from jose import jwt
from datetime import datetime, timedelta

SECRET_KEY = "afvefvvfhhvgyhvd"
ALGORITHM  = "HS256" #SHA256
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

users_db = {}

@app.post("/auth")
async def auth(
    response: Response,
    username: str = Form(...),
    password: str = Form(...),
    is_login: bool = Form(...)
):
    if not is_login:
        if username in users_db:
            return {"error": "пользователь уже существует"}
        users_db[username] = pwd_context.hash(password)
        return {"status": "успешно зарегистрированы"}

    hashed_pass = users_db.get(username)
   
    if not hashed_pass or not pwd_context.verify(password, hashed_pass):
        return {"error": "Неверный логин или пароль"}

    token = jwt.encode({"sub": username}, SECRET_KEY, algorithm=ALGORITHM)

    response.set_cookie(key="access_token", value=token, httponly=True)
    return {"status": "ok"}


Далее создадим словарь для хранения имени и хэша пароля, позже заменю на базу данных. Ну и сама логика проверки пароля или внесения его и имени в базу данных. Каждому пользователю выдаётся свой токен для идентификации на основе имени и приватного ключа, а далее создаётся куки с этим значением.

🖥Что у нас на фронтенде?

Главное отличие in.html от прошлой версии в том, что мы теперь создали объект в который собираем имя, пароль и регистрируется пользователь или входит, далее отправляем на сервер:
authForm.addEventListener('submit', async (e) => {
            e.preventDefault();
            if (usernameInput.value.trim() === "" || passwordInput.value.trim() === "") {
                errorMsg.innerText = "Пожалуйста, заполните все поля";
                errorMsg.style.display = 'block';
                return;
            }
            const formData = new FormData();
            formData.append('username', usernameInput.value);
            formData.append('password', passwordInput.value);
            formData.append('is_login', isLogin);
           
            try {
                const response = await fetch('/auth', {
                    method: 'POST',
                    body: formData
                });
                const result = await response.json();

                if (result.error) {
                    errorMsg.innerText = result.error;
                    errorMsg.style.display = 'block';
                } else if (result.status === "ok") {
                    window.location.href = "/chat";
                } else {
                    alert(result.status);
                    if (!isLogin) link.click();
                }
            } catch (err) {
                errorMsg.innerText = "Ошибка сервера";
                errorMsg.style.display = 'block';
            }
        });

        [usernameInput, passwordInput].forEach(input => {
            input.addEventListener('input', () => {
                errorMsg.style.display = 'none';
            });
        });
   



Окно диалогов и чатов

Так же я добавил ручку /chat и реализовал каркас для окна чатов и каналов:
@app.get("/chat")
async def chat(request:Request):
return templates.TemplateResponse("chat.html",{"request":request})

Следующий шаг будет заключаться в переносе на базу данных postgres и добавление рабочих элементов для окна чатов.

💬На этом я хочу закончить. Как обычно HTML документ для окна чата оставлю в комментариях (вместе с обновленным CSS).
[2026-02-28 22:36]
#PXI_WEB

🙂 Всем привет! Это первый пост в категории PXI_WEB, ведь я начал делать свой небольшой мессенджер. Сегодня хочу поделиться своими мыслями и узнать ваше мнение.

На чём я решил его писать?

Я выбрал язык программирования python и фреймворк fastapi, а также в качестве СУБД postgres.

Разработка ведётся исключительно в виртуальном окружении или докере. Так как надо поднимать не только контейнер с интерпретатором питона и кодом, а ещё и саму базу данных, то использую для простоты docker compose (apt install docker-compose-v2).

На данный момент я подготовил всё необходимое для дальнейшей работы, определил структуру проекта и начал с разработки окна регистрации и входа пользователя, то есть аутентификацию (по заветам Павла Дурова).

☄️Кода на питоне на данный момент немного, потому что не реализована логика работы с паролями и тд:
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles

templates = Jinja2Templates(directory="templates")

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")

@app.get("/")
def home(request:Request):
return templates.TemplateResponse("in.html",{"request": request})

Здесь наверное стоит пояснить только, что app.mount позволяет нашему серверу понимать, что GET запрос по пути /static это просьба дать файл именно из директории, а не URL, на который должна быть отдельная функция.

🖥Всю красоту нам сделал html, css и JavaScript.

Я лично не считаю, что плохо использовать ИИ для написания фронтенда, тем более раз он очень хорошо его пишет (но естественно надо хотя бы понимать, что он нагенерил). А вот с бэкендом дела обстоят похуже, поэтому его делаем полностью сами.




   
   
    Messenger | Вход
   


   

       

           

                Logo
           

       


       

С возвращением!


       

Пожалуйста, войдите в свой аккаунт



       

           

               
           

           

               
           

           

           
       


       

            Нет аккаунта?
            Зарегистрироваться
       

   




💥Полный код оставлю файлом в комментариях.

Если вкратце, то мы создали два поля для ввода имени и пароля, при наведении на них меняется цвет, то же самое с кнопкой войти. И отдельно кнопка зарегистрироваться.

🙂 На этом я хочу закончить это небольшое введение. Если будут пожелания или идеи, что можно добавить, — пишите в комментариях. Скоро увидимся снова!
[2026-02-27 09:54]
#PXI_AI

🙂 Всем привет! Сегодня я хочу рассказать про один из методов обучения нейронных сетей. Из этого поста вы узнаете немного больше про градиентный спуск.

Что такое градиентный спуск?

Градиентный спуск — алгоритм оптимизации для поиска функций с минимальным отклонением при моделировании закономерностей в наборе данных. Именно благодаря нему можно подобрать сочетание весов, которое минимизирует погрешность вывода нейрона. Он часто используется не самостоятельно, а вместе с обратном распространением (расскажу в другом посте про него).

📎 Отклонение (погрешность) модели можно считать по-разному. Я буду использовать формулу среднеквадратической ошибки (SSE):
SSE = 0,5×🤝(y - ŷ)², сумма по всем целевым значениям из правильных образцов, вычитая целевое значение, которое предсказала модель.
Куда спускается градиентный спуск?

Чтобы лучше понять эту идею я начну издалека. Пусть у нас есть один нейрон и два входных параметра (признака), обозначим веса для этих параметров w1, w2. Тогда мы можем нарисовать график зависимости SSE от w1 и w2. У этого графика будет точка минимума, указывающая на нужный нам вес модели. В простейшем случае мы инициализируем модель любым весом и считаем градиент функции SSE в этой точке.
Напомню, градиент — это скорость изменения функции для заданного ввода, то есть вектор из частных производных по всем параметрам в определенной точке функции.
Если градиент большой, то надо двигаться в противоположную от него сторону и с большим шагом, иначе с меньшим шагом.

ℹ️Теперь давайте в нашей формуле для SSE заменим обозначение предсказанного целевого признака (ŷ) на реальную функцию, которую у нас реализует нейрон. Тогда:
SSE = 0,5×🤝(y - (x1×w1+x2×w2))², сумма по всем образцам.

Или можем обобщить её для n входных параметров:
SSE = 0,5×👆(y-👆x×w)², где первая сумма по всем признакам в образце, а вторая по всем образцам

Теперь для градиента нам нужны частные производные (здесь и далее i — это нижний индекс):
dSSE/dwi = 🤝((y - ŷ) × (-xi)),  сумма по всем образцам и только по признаку i-тому, так как смотрим отклонение модели относительно веса для i-того параметра

🥸Эта частная производная определяет, как вычислить градиент ошибок
для веса wi в наборе данных,в котором хi — это ввод, относящийся к весу
wi при обработке каждого образца наборе.

На эту формулу можно посмотреть следующим образом: если при изменении веса существенно меняется вывод взвешенной суммы, градиент ошибок по отношению весу будет большим (резким), так как изменение веса сильно влияет на отклонение. Однако этот градиент является восходящим,а мы хотим двигаться вниз по кривой ошибок.

📉Поэтому в правиле обновления весов методом градиентного спуска знак «-» перед вводом хi опускается. Это правило можно определить так:
wi = wi + (h×👆((y - ŷ) × xi)), где сумма по всем образцам и значение х именно i-того признака, h - гиперпараметр
Эта формула позволяет нам с каждым проходом по набору данных корректировать веса любого признака на определенную величину.

Из этой формулы вытекает, что каждый вес обновляется независимо друг от друга и на разную величину прямо пропорционально своему градиенту ошибок в этой точке.

💬На данный момент мы разобрали только простой случай с нейроном без функции активации, рассмотрим в общих чертах теперь и его.

Пусть w0 — это сдвиг нейрона, а остальные веса относятся к другим его входным значениям.
Вычисление частной производной SSE зависит от структуры функции, которая генерирует ŷ. Чем сложнее эта
функция, тем сложнее становится частная производная.

Наличие функции активации добавляет нее еще один член — частную производную функцию активации по выводу функции взвешенной суммы. Это вызвано тем, что данный вывод подаётся на вход функции активации. Изменения веса влияют на ее вывод опосредованно, через обновление взвешенной суммы.

Логистическая функция активации имеет очень простую производную по своим входным значениям.

Правило для логистической функции:
wi = wi + h*👆((y - ŷ)×(ŷ × (1 - ŷ))×xi)

🏴‍☠️На этом хочу закончить! В следующий раз хочу показать это на практике.
[2026-02-24 10:05]
#PXI_AI

🙂Всем привет! В этот раз я расскажу вам введение в свёрточные нейронные сети.

☄️Начну с того, что свёрточные нейронные сети (CNN) являются результатом адаптации структуры нейронной сети к определённым характеристикам данных.

Что такое свёрточная нейронная сеть?

Свёрточные нейронные сети были созданы для распознавания образов. Основная задача этой архитектуры состояла создании сети, в которой начальный слой извлекает локальные признаки,а последующие слои объединяют их в признаки более высокого уровня.

👀При использовании этого подхода фундаментальная задача распознавания образов сводится к функциям обнаружения признаков, то есть способным определять наличие или отсутствие на изображении локальных визуальных элементов. При этом она должна понимать, что элемент есть вне зависимости от его расположения.

💡Для достижения этой пространственной независимости при обнаружении признаков нейроны в сетях CNN используют общие веса.

В контексте распознавания образов функцию, которую реализует нейрон, можно считать механизмом обнаружения визуальных признаков.

🥸Например, нейроны в первом скрытом слое сети получают на вход набор пиксельных значений, в случае присутствия в них определенной закономерности (локального визуального элемента) сильно активируются.

То, что функция, реализованная нейроном, определяется его весами, означает: два нейрона с одинаковым сочетанием весов реализуют одну и ту же функцию (механизм обнаружения признаков).

Если у двух нейронов разные входные данные (рецептивные поля), то вместе они себя ведут подобно механизму обнаружения признаков их общего набора данных, при наличии хоть в одном наборе признака,то он активируется.

‼️ Основные операции для CNN

На рисунке выше проиллюстрированы разные этапы обработки, которые часто
можно встретить CNN.

Матрица размером 6x6 в левой части представляет изображение, которое подаётся на вход сети. Справа от ввода находится матрица размером 4x4, представляющая слой нейронов, которые вместе прочёсывают изображение в поисках конкретного локального признака. Каждый из них соединен со своим рецептивным полем изображения размером 3х3, все они применяют к своему вводу одну и ту же матрицу весов 3х3.

Здесь рецептивные поля пересекаются, потому что имеют сдвиг равный единице, сдвиг является тут гиперпараметром (его выбирает сам человек).

🔼По сути реализуется операция свёртки, мы последовательно применяем одну и ту же матрицу весов (матрицу свёртки или ядро) к различным частям входных данных. Результатом скалярного умножения матриц является число, которое мы записываем в одну из ячеек карты признаков.

❗️Следует обратить внимание, что операция свёртки не включает в себя нелинейную функции активации, её применим позже.

После заполнения карты признаков применяется нелинейная функция активации к её элементам.

Операция пуллинга (объединения)

После того как свёрточный слой нашёл признаки (например, края, углы, текстуры), приступаем к пуллингу.

Что делает пуллинг?

1️⃣ Уменьшает размерность карты признаков.
2️⃣ Делает сеть устойчивее к небольшим смещениям и искажениям изображения.
3️⃣ Выделяет самые важные признаки из каждой области.

По сути мы делим нашу карту признаков на квадраты меньшего размера (это называется окном пуллинга). Для каждого квадрата берём максимальное значение (Max Pooling) или среднее значение (Average Pooling) и записываем его в новую, уменьшенную карту.

❗️Обычно свёртку, применение нелинейной функции активации и пуллинг называют слоем свёртки и этих слоёв может быть большое количество, но в итоге мы получаем плотный или полносвязный слой.

Все нейроны плотного слоя принимают на вход выходы всех нейронов предыдущего слоя и именно плотный слой генерирует окончательный ответ.

🙂 На этом введение в свёрточные нейронные сети подошло к концу, в следующий раз расскажу что-нибудь ещё более интересное.
[2026-02-23 15:29]
#PXI_AI

🙂 Всем привет! Я наконец-то добавил в нашего голосового ассистента мозг, чтобы с ним можно было вести диалог. Сегодня я вам расскажу про то, как я это сделал.

☄️ Метод инициализации объекта LLM

    def __init__(self, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
        try:
            self.MODEL_NAME = model_name
            self.TOKENIZER = AutoTokenizer.from_pretrained(self.MODEL_NAME)
            self.MODEL = AutoModelForCausalLM.from_pretrained(
                self.MODEL_NAME,
                torch_dtype=torch.float32,
                device_map="cpu"
            )
            print("Загружено из локального кэша.")
        except Exception:
            print("Загружаю из интернета.")
            os.environ['TRANSFORMERS_OFFLINE'] = '0'
            os.environ['HF_HUB_OFFLINE'] = '0'
            self.MODEL_NAME = model_name
            self.TOKENIZER = AutoTokenizer.from_pretrained(self.MODEL_NAME)
            self.MODEL = AutoModelForCausalLM.from_pretrained(
                self.MODEL_NAME,
                torch_dtype=torch.float32,
                device_map="cpu"
            )

⌛Здесь мы в качестве параметра передаём название нашей модели, также можно передать путь до неё, если скачали её уже заранее куда-то.

Потом по идее пытаемся загрузить модель из кэша, а в случае неудачи пробуем скачать из интернета (к сожалению, работает не совсем так). Об этой проблеме ниже в блоке про импорты.

🪙AutoTokenizer - автоматически загружает токенизатор, который подходит нашей модели.
🪙AutoModelForCausalLM - загружает саму модель для генерации текста.
🪙torch.float32 - использует максимальную точность.
🪙device_map="cpu" - загружает модель на процессор.

⚡️Метод, который непосредственно генерирует ответ

   def ask_qwen(self,prompt:str):
        messages = [
        {"role": "system", "content": "Ты краткий ассистент. Отвечай на русском языке (английские слова пиши русскими буквами) лаконично (1-2 предложения)."},
        {"role": "user", "content": prompt}
        ]

        text = self.TOKENIZER.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        inputs = self.TOKENIZER([text], return_tensors="pt").to("cpu")

        # Параметры для скорости на CPU
        with torch.no_grad():
            outputs = self.MODEL.generate(
                **inputs,
                max_new_tokens=64,
                do_sample=False,
                pad_token_id=self.TOKENIZER.eos_token_id
            )
   
        response = self.TOKENIZER.decode(outputs[0][len(inputs.input_ids[0]):], skip_special_tokens=True)
        return response


💬Тут мы реализуем возможность общения. Пишем системный и пользовательский промпт в стиле ChatML.

Разные модели (Llama, Qwen, Mistral) воспринимают текст по-разному (с разными разделителями, например, <|im_start|>). Функция apply_chat_template берет список messages и превращает его в одну длинную строку, которую модель обучена понимать. Флаг add_generation_prompt=True добавляет в конец маркер того, что сейчас должна начать отвечать именно модель. Флаг tokenize=False говорит, что вернуть нужно текстом, а не токенами.

Далее переводим текст в токены и помещаем в словарь inputs. Токенизатор превращает текст в список тензоров, подходящих для pytorch (return_tensors="pt").

💭Генерация ответа

🪙torch.no_grad(): Отключает расчет градиентов.
🪙max_new_tokens=64: Ограничение длины.
🪙do_sample=False: Детерминированные ответы.
Далее декодируем токены в текст, обрезая часть с нашим вопросом и убираем специальные токены.

🙂 И не забываем про импорты

import os
os.environ['TRANSFORMERS_OFFLINE'] = '1'
os.environ['HF_HUB_OFFLINE'] = '1'
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer


🥲Я установил специальные переменные окружения в 1, что значит всё брать только из кэша, но они работают только если их изменить перед импортом библиотек, если их изменить после ничего не происходит. Из-за этого метод инициализации работает не так как надо.

‼️ Важно, хоть помощник и работает, но предстоит исправить ещё много ошибок.
[2026-02-21 20:03]
#PXI_LEARN

🙂Всем привет, сегодня я хочу затронуть достаточно неожиданную для многих тему. Я расскажу немного про теорию информационного кодирования, а точнее про дискретный ансамбль, количество информации, энтропию и зачем нужно помехоустойчивое кодирование и теория информационного кодирования.

❗️Теория информационного кодирования — ...

Её главные задачи:
1) Убрать из сообщения лишнее (избыточность), чтобы передать его быстрее. Это «кодирование источника».

2) Добавить в сообщение правильную избыточность, чтобы, если при передаче возникнут шумы, получатель смог восстановить данные.
Главным является понятие неопределенности, наше сообщение несёт больше информации о событии, если это событие маловероятно и наоборот.

‼️ Дискретный ансамбль

В теории информации дискретный ансамбль (A,p) — это полная математическая модель источника сообщений. Он состоит из двух частей:
1) A — алфавит, множество букв или слов,символов.
2) p — распределение, вероятности всех элементов из A (очевидно их сумма должна быть равна 1).

🌐Количество информации и энтропия

Количество информации (I) — мера уменьшения неопределенности при получении конкретного сообщения a из A (по сути это то насколько мы удивимся получив это сообщение):
I(a) = —log(p(a))

Энтропия Шеннона (H) — среднее количество информации или математическое ожидание количества информации (мера хаоса или неопределенности источника, насколько тяжело угадать, что придёт нам
H = —👆p(a)*log(p(a)),
👆 — это сумма по всем a из А.

При увеличении алфавита и длины сообщения энтропия тоже растёт.

Средняя длина кода — сумма произведений количества бит на символ (слово и тд) и его вероятности.
L = 👆p(a)*n(a),
👆 — это сумма по всем a из А.

Зачем это нужно?

Представьте, что мы передаем текст, где буква «А» встречается в 90% случаев, а «Б» — в 10%.

Без теории: Мы просто кодируем равномерным кодом по 1 биту на букву (А=0, Б=1). На 100 букв — 100 бит. Длина кода L = 1 бит.

С теорией (сжатие): Энтропия такого источника 0,469 бит на символ. Это значит, что математически этот текст можно «упаковать» так, что он будет занимать почти в два раза меньше места без потери данных. Энтропия максимум равна logN, где N — количество разных символов и тд.

Мы можем для повышения энтропии кодировать не отдельные буквы, а пары. Самым частым отведём меньше битов (неравномерное кодирование).
АА —> 0
АБ —> 10
БА —> 110
ББ —>111

💡Теперь наша энтропия для нового ансамбля равна 0,938 бит на блок из двух символов, то есть на символ та же самая энтропия. И здесь тонкий момент, который стоит усвоить.

Почему мы выиграли?

Всё просто: мы смотрим не на «потолок» (максимальную энтропию), а на отношение реальной информации (энтропии) к длине кода (эффективность). В равномерном коде мы тратили 1 бит на символ (длина кода), а в нашем коде Хаффмана — в среднем длина кода всего 1,29 бита на два символа. Мы стали на 35% эффективнее!

💬Фух, это было не просто, надеюсь получилось у меня объяснить более-менее понятно. Если хотите продолжения этой темы — пишите в комментарии.
[2026-02-19 23:39]
#PXI_AI

🙂 Всем привет! В одном из прошлых постов я затронул глубокое обучение, а в этом я хочу рассказать про него подробнее. Сегодня вы узнаете немного больше про нейронные сети.

Что такое нейронные сети?

Если говорить простым языком, то нейронные сети (сети глубокого обучения) — это вычислительная математическая модель (или если более понятно, то функция), имеющая отдаленное сходство со структурой головного мозга.

Самым простым примером модели является линейная модель, где функция есть взвешенная сумма весов и входных переменных + сдвиг (меняя веса мы можем получить нужный нам результат, также сдвиг можно внести в взвешенную сумму, приняв её вес равным 1, так мы позволим модели самой подобрать сдвиг).

❗️Взвешенная сумма весов и входных переменных — это умножение входных значений на их веса с последующим сложением (то есть умножение вектора-строки выходов одного слоя на матрицу весов следующего слоя).

Что из себя представляет семейство моделей нейронных сетей (или просто нейронные сети)?

Нейронная сеть имитирует работу мозга (нейрон имеет несколько дендритов, тело и выход — аксон) и состоит из множества простых элементов обработки информации, которые называются нейронами.

Принято считать, что нейроны организованы в виде слоёв, на рисунке выше их четыре: один входной, два скрытых и один выходной.

💡Сети глубокого обучения — это нейронные сети со множеством скрытых слоёв. Чтобы сеть считалась глубокой скрытых слоёв должно быть не меньше двух.

Глубина сети — это количество скрытых слоев плюс выходной. Каждый нейрон принимает на вход набор значений и генерирует выход. Информация проходит по сети от одного слоя к другому, каждая связь в сети имеет свой вес, соединяет два нейрона и является направленной.

Вес — это число, которое показывает насколько сильно должна влиять эта связь на нейрон. Обучение нейронной сети сводится к поиску оптимального сочетания весов.

🔈Нейрон осуществляет двухэтапный процесс для сопоставления входных значений с выводом. На первом этапе происходит вычисление взвешенной суммы значений, поданных на вход нейрону. Затем результат пропускается через вторую функцию, которая связывает его с итоговым выходным значением.

Обычно вывод нейрона называют значением активации, поэтому вторая функция, связывающая его взвешенной суммой, называется функцией активации. В качестве функции активации могут использоваться много различных функций, например, пороговая, гиперболический тангенс или выпрямляющая линейная функция.

Функция активации нужна для формирования не только линейных, но и нелинейных моделей. Обычно в одном слое все нейроны имеют одинаковую функцию активации.

⚡️Также нейрон с определенной функцией активацией называется модулем, например, нейрон с выпрямляющей функцией активации называют выпрямляющим модулем (ReLU).

На этом пока что всё, в следующий раз расскажу про рекуррентные и свёрточные нейронные сети, а так же про алгоритмы обучения нейронных сетей.
[2026-02-18 19:56]
#PXI_NET

💡 Всем привет, продолжаю прошлый пост. Теперь про установку соединения, передачу данных и разрыв соединения в TCP, а также про таймеры и немного про алгоритмы работы TCP.

⌛Пусть 1 — это отправитель, 2 — получатель, а 1 —> 2  ACK означает, что первый отправил второму сегмент с флагом ACK равным 1.

Установка соединения (трёхэтапное рукопожатие):
1) 1 —> 2 SYN (первый  выбирает свой последовательный номер сегмента).
2) 2 —> 1 SYN + ACK (второй выбирает и отправляет свой последовательный номер и соглашается с отсчетом первого).
3) 1 —> 2 ACK (соглашается и в этом же сегменте может уже послать данные).

Передача данных:
1 —> 2 PSH и данные (хотя раньше флаг PSH указывал на немедленную передачу данных приложению, не ожидая заполнения буфера, сейчас он часто игнорируется, поэтому данные можно отправлять и без PSH).

2 —> 1 ACK

Разрыв соединения:
Мягкий (в 4 этапа):
1 —> 2 FIN
2 —> 1 ACK
2 —> 1 FIN
1 —> 2 ACK (здесь перед окончательным разрывом инициатор ждёт время равное 2*MSL, чтобы дошли все пакеты со старыми сегментами, про MSL ниже)

Жёсткий:
1 —> 2 RST

По тайм-ауту:
Когда кончится таймер контроля работоспособности, а ответа не последовало.

⚙️Алгоритмы работы TCP

Их обычно делят по вариантам подтверждения передачи данных:
1️⃣ Остановка и ожидание (после каждого сегмента с данными следует ACK, небольшой буфер, если размер окна 0, то ждём)
2️⃣ Скользящее окно (после нескольких сегментов с данными один ACK, количество сегментов отправляемых группой определяется в зависимости от размера окна, большой буфер)

🔜 Таймеры и временные параметры TCP
1) RTT — время, за которое сегмент доходит до получателя и обратно.
2) RTO — таймер повторной передачи (RTO = f(RTT), могут использоваться разные функции).
3) Таймер запросов — таймер, который запускается при получении квитанции (сегмента с ACK без других данных) с нулевым размером окна, чтобы переодически проверять не изменилось ли оно, вдруг сегмент про окно потерялся, тогда мы сами через время спросим про размер окна.
4) Таймер контроля работоспособности — проверка, что собеседник всё ещё активен.
5) Таймер времени жизни сегмента = 2*максимальное время жизни сегмента (то самое MSL, обычно принимается за 2 минуты)

Почему порядковый номер берется рандомно

Если бы ISN был легко предсказуем (например, увеличивался на фиксированную величину с каждым новым соединением), это открыло бы двери для серьёзной атаки — TCP Sequence Prediction Attack (или IP Spoofing Attack).

Как работает атака:
1) Злоумышленник (Z) подслушивает трафик между клиентом (A) и сервером (B).
2) Z предсказывает, какой ISN сервер B отправит в ответ на следующий SYN от любого клиента.
3) Z отправляет серверу B поддельный пакет (SYN) с IP-адресом жертвы (A), инициируя соединение.
4) Сервер B отвечает пакетом (SYN-ACK) на адрес A (который его не запрашивал и проигнорирует).
5) Ключевой момент: Чтобы завершить handshake и начать отправку данных от имени A, злоумышленнику Z нужно отправить правильный ACK с предсказанным ISN+1.
6) Если Z угадывает ISN, он устанавливает соединение с B от имени A и может, например, отправить вредоносную команду.

🌛На этом небольшой рассказ про транспортный уровень закончился.
[2026-02-17 23:05]
#PXI_NET

🙂И вновь я всех приветствую, сегодня будет новая для моего канала тема. Я расскажу вкратце про транспортный уровень, UDP и TCP. Хоть стоило бы начать с более низких уровней модели OSI или стека TCP/IP, но про них я обязательно расскажу позже.

Транспортный уровень (Transport Layer) — это 4-й уровень модели OSI и 3 уровень стека TCP/IP, отвечающий за сквозную end-to-end (то есть ответственность за передачу данных лежит на конечных, а не промежуточных узлах) доставку данных между приложениями на разных хостах в сети.

📉Функции транспортного уровня:
Обеспечить с заданным уровнем надежности передачу данных между прикладными программами, абстрагируя прикладные процессы от деталей сетевой инфраструктуры.

🌐 Используется, как минимум, два протокола:

UDP (Протокол пользовательских дейтаграмм) — без установки соединения и гарантии доставки, а также без учёта порядка дейтаграмм и пакетов.

TCP (Протокол управления передачей данных) — с установкой соединения, имеют гарантию передачи и сохранения порядка сегментов.

💡Оба этих протокола пользуются сокетом — это канал связи между приложением и его хостом или адрес приложения в сети (сетевой адрес устройства, то есть IP адрес, + адрес программы,то есть порт).

Порт — это просто 2 байтовое число (от 0 до 65535).

Они делятся на группы:
С 0 по 1023 — системные (для общедоступных сервисов, например, браузер на 80).
С 1024 по 49151 — пользовательские.
С 49152 по 65535 — динамические, автоматически назначаются системой.

На одном и том же порту может быть и UDP и TCP соединение одновременно, они не мешают работе друг друга (потому что это разные сокеты по определению).

‼️ Кратко про UDP

UDP не устанавливает и не гарантирует соединение между получателем и отправителем, не сохраняет порядок дейтаграмм и пакетов. Обычно его используют в TFTP, DNS, SNMP.

Протокол пользовательских дейтаграмм в качестве полезной нагрузки IP-пакета использует дейтаграмму.

📎Дейтаграмма имеет следующий формат (длина его варьируется от 8 до 65515 байт, это заголовок + данные):

1️⃣ 8 байт на заголовок.

2️⃣ 2 байта на порт отправителя.

3️⃣ 2 байта на порт получателя.

4️⃣ 2 байта на длину UDP-сообщения (длина всей дейтаграммы).

5️⃣ 2 байта на контрольную сумму UDP (повреждённые дейтаграммы отбрасываются).

6️⃣ И сами данные.
Напомню, что что MTU (максимальный передаваемый модуль данных) для большей части интернета 1500 байт, то и дейтаграммы чаще всего используются, когда данные помещаются в это ограничение.

‼️ Кратко о TCP

TCP устанавливает соединение и гарантирует доставку и порядок данных. В случае TCP  полезная нагрузка IP-пакета — это сегмент.

📎Сегмент имеет следующий формат:

1️⃣ По 2 байта на порт отправителя и получателя.

2️⃣ 4 байта на порядковый номер или же последовательный номер сегмента (номер первого байта в сегменте, каждому сегменту присваивается порядковый номер, чтобы указать как они упорядочены).

3️⃣ 4 байта на номер подтверждения, номер начального байта сегмента, который ожидается следующим. Тем самым подтверждает получение всех предыдущих.

4️⃣ 4 бита на длину заголовка (но считается длина в 32 битных словах и минимальное значение этого поля равно 5).

5️⃣ 6 бит резервных и 6 бит на 6 флагов по биту (URG — признак срочности; ACK — подтверждение; PSH — признак того, что нужно принять эти данные и отдать приложению или просто признак передачи; RST — жёсткий разрыв соединения; SYN — используется при начальной синхронизации; FIN — мягкий разрыв соединения).

6️⃣ 16 бит на размер окна (свободный размер буфера получателя). Если в необязательных модификаторах стоит модификатор WS, то размер буфера = размер окна*2^WS.

7️⃣ 16 бит на контрольную сумму.

8️⃣ 16 бит на указатель срочных данных.

9️⃣ 32 бита необязательных модификаторов и потом сами данные.
🥲К сожалению, в один пост всё не поместится, поэтому продолжу про установку соединения, отправку данных и разрыв соединения в следующем посте.
[2026-02-16 21:31]
#PXI_AI

🙂 Всем привет! Этот день настал и я наконец-то опубликовал ролик про голосового помощника, где показал полный код (v1.0) с моими, наверное, понятными объяснениями.

🥲Не судите слишком строго первый такой опыт, в следующий раз обязательно проведу работу по улучшению контента. Не буду долго тянуть, переходите по ссылке и смотрите ролик: тыкни_на_меня

✔️Для удобства тут тоже оставлю исходный код (v1.0).
Хочу подметить, что он не лишён ошибок.

🙂Скажу по секрету, что я уже добавил в качестве мозга qwen, но про это расскажу позже, когда исправлю все (или почти все) баги.
[2026-02-15 14:27]
#PXI_AI

🙂 Приветствую всех, сегодня я хочу отвлечься от голосового ассистента и рассказать про то, что такое модели, искусственный интеллект (ИИ), машинное (МО) и глубокое обучение (ГО), а также про виды этапа обучения моделей.

💡Модель — это функция, закодированная в виде программы (по сути слова синонимы).

❗️Искусственный интеллект — это раздел компьютерных наук, связанный с исследованиями в разных областях, включая доказательства математических теорем, анализ естественного языка, игры, компьютерные программы, способные обучаться на примерах, и нейронные сети. Он включает в себя все направления связанные и с МО, и с ГО, и с другими областями.

📎Машинное обучение — это одно из направлений ИИ, связанное с разработкой и оценивание алгоритмов, которые позволяют извлекать функции их набора данных (то есть учатся на этих примерах).

💡Алгоритм МО — это поисковый процесс, предназначенный для выбора функции из ряда потенциальных вариантов функций, которая лучше всего объясняет отношения между признаками набора данных.

Глубокое обучение — это подраздел МО, посвященный моделированию крупных нейронных сетей, которые способны принимать верные решения на основе данных, оно определяет и извлекает закономерности из огромных наборов данных, устанавливая правильные связи между цепочкой сложных входных значений и удачными решениями.

🔈После определения понятий перейдем к процессу машинного обучения.

Он состоит из двух этапов: собственно обучение модели (находим подходящую функцию) и формирование логического вывода (тестируем на новых ,может быть частью старых, наборах данных).

Сейчас я сосредоточусь на этапе обучения модели.

Здесь есть две проблемы: переобучение и недообучение.

⌛Переобучение встречается, когда используется слишком малый индуктивный сдвиг (предложение о функции, то есть даём большую свободу в выборе функции нашему алгоритму МО или нейронной сети , чтобы максимально соответствовать набору данных и больше акцентирует внимание на наборе данных). Из-за переобучения модель слишком сильно акцентирует на данный набор данных, может захватывать шумы данных, ошибки в данных, а также получиться слишком сложной.

📉Недообучение встречается с большим индуктивным сдвигом, алгоритм может проигнорировать важную информацию, и полученная функция не будет учитывать все нюансы закономерностей набора данных.

⚙️МО имеет три категории в зависимости от набора данных, функций-кандидатов и функции приспособленности или пригодности модели: обучение с учителем, обучение без учителя и обучение с подкреплением.

🥸Обучение с учителем самый распространенный. Он подразумевает, что набор данных имеет не только входные признаки, но и целевой параметр (то есть и входы и выход) для сравнения точности модели. Алгоритм МО будет сравнивать результат своей работы функции с целевым признаком и менять свои веса, если нужно для лучшей приспособленности модели.

🤡МО без учителя подходит для кластеризации данных (объединение образцов из набора данных в группы по схожим признакам). Здесь набор данных не имеет целевого выходного признака. Модель сама учиться составлять образцы в правильные группы по схожим признакам. А в качестве функции приспособленности может выступать функция, которая отдает предпочтение кандидатам, которые достигает высокого сходства внутри кластера.

ℹ️МО с подкреплением используется там, где невозможно или очень затратно составить набор данных (например, научить модель играть в змейку). Тогда модель учится непосредственно в среде. Для обучения и оценки приспособленности могут выступать очки(подкрепление).

Если что-то не совсем понятно, то пишите в комментарии — разберём вместе.
[2026-02-13 16:31]
#PXI_LEARN

🙂 Приветствую всех, продолжу про воспроизводимые сборки (приложения).

Как пользователи могут узнать, что созданная ими сборка успешно воспроизвела исходную?

Самый простой способ — убедиться, что выходные данные сборки всегда идентичны побайтно. Побайтовое сравнение — тривиальная операция, которую можно выполнить во многих различных средах.

⚡️Ещё одно преимущество наличия идентичных байтов заключается в возможности использования криптографических контрольных сумм. Такие контрольные суммы очень малы по сравнению с суммами, используемыми в полной сборке. Их легко обменивать даже в условиях очень низкой пропускной способности.

Например, это позволяет создавать релизы программного обеспечения как на сервере с хорошим (но ненадежным) подключением, так и на ноутбуке с плохим мобильным соединением.

✅Цифровая подпись может быть создана локально на ноутбуке. Поскольку результаты сборки будут идентичны, подпись будет действительна для файлов, созданных на сервере с хорошим подключением.

✏️Проблема со встроенными подписями.

Распространяемое программное обеспечение, использующее встроенные криптографические подписи, может создавать проблемы с воспроизведением пользователями идентичных результатов. По определению, они не смогут сгенерировать идентичную подпись. Эту проблему можно решить либо путем включения подписи в процесс сборки, либо путем предоставления инструментов для преобразования распространяемых бинарных файлов в безупречные результаты сборки.

1️⃣Один из способов обработки встроенных криптографических подписей — сделать подпись (необязательным) входным параметром процесса сборки. Если подпись доступна, она просто копируется в нужное место.

Это позволяет реализовать следующий рабочий процесс:

✅Первоначальную сборку выполняют разработчики, имеющие доступ к закрытому ключу.
✅Результат сборки записывается во внешний файл.
✅Подпись становится частью выпущенного исходного кода.
✅Распространяемая сборка создана на основе последнего источника.
Данный wireless-regdb пакет в Debian является примером того, как это можно реализовать .

‼️Ещё один пример — использование F-Droid для копирования подписей APK-файлов с помощью apksigcopier.

Можно разработать специальный инструмент сравнения, способный сравнивать сборки, в которых отсутствуют подписи. В идеале он также должен уметь генерировать криптографические контрольные суммы, чтобы не требовалось скачивать исходную сборку только для сравнения результатов.

Подобный инструмент должен быть очень прост в проверке и понимании. В противном случае трудно доверять тому, что скрипт не игнорирует байты, которые могли бы изменить его поведение.

2️⃣Другой вариант — предоставить инструмент, способный удалять подписи из официальных релизов. Полученный результат затем можно будет сравнивать побайтно с результатами, полученными от пользователя.

😡Недостатком этого метода является то, что для сравнения пользователю необходимо загрузить официальные релизы. Кроме того, сложнее гарантировать, что удаляемые данные не приведут к изменению поведения программного обеспечения.

Как пользователи могут убедиться в том, что сборка не была скомпрометирована, обмениваясь сертификатами, подтверждающими, что все они смогли получить одинаковые результаты сборки?

В Debian рассматривают возможность разрешить нескольким разработчикам Debian загружать подписи, подтверждающие, что им удалось воспроизвести сборку.

Этот вопрос также связан с работой Бена Лори по обеспечению прозрачности бинарных файлов. Идея состоит в создании журнала с возможностью добавления данных, аналогичного системе прозрачности сертификатов, который можно было бы использовать для аутентификации бинарных файлов.

↗️Для повышения эффективности воспроизводимых сборок и раннего выявления взломов необходимы дополнительные исследования в этой области.

⚡️На этом кончается небольшой экскурс в тему воспроизводимых сборок.
[2026-02-11 23:17]
#PXI_LEARN

🙂И вновь я всех приветствую! Сегодня я бы хотел рассказать о такой теме как воспроизводимые приложения/сборки.

🥲К сожалению, я не смогу поместить эту обширную тему в один пост, поэтому разобью её на две части.

‼️Одним из примеров воспроизводимых сборок является как раз сам телеграмм. Телеграмм является приложением с открытым исходным кодом, но есть ли гарантия, что вы скачиваете с магазина приложений именно его? Да, это и есть воспроизводимые сборки.

❗️Воспроизводимые сборки — это набор методов разработки программного обеспечения, которые создают независимо проверяемый путь от исходного кода к бинарному коду.

❓Почему важны воспроизводимые сборки?

Воспроизводимые сборки гарантируют, что программное обеспечение, которому вы доверяете, является безопасным и проверяемым. Это достигается путем проверки соответствия загружаемых вами бинарных файлов исходному коду, не подвергавшемся изменениям. Для инструментов, связанных с безопасностью, это означает высокую степень уверенности в том, что ваши данные и коммуникации защищены от скрытых бэкдоров или уязвимостей.

💭Работает это так:

Во-первых, система сборки должна быть полностью детерминированной: преобразование заданного исходного кода всегда должно приводить к одному и тому же результату. Например, текущая дата и время не должны записываться, а выходные данные всегда должны записываться в одном и том же порядке.

Во-вторых, набор инструментов, используемых для выполнения сборки, и, в более общем смысле, среда сборки должны быть либо зафиксированы, либо предварительно определены.

В-третьих, пользователям следует предоставить возможность воссоздать достаточно близкую к исходной среду сборки, выполнить процесс сборки и убедиться, что результат соответствует исходной сборке.

❓Какие проблемы решают воспроизводимые сборки?

Хотя любой может проверить исходный код свободного и открытого программного обеспечения на наличие вредоносных уязвимостей, большинство программ распространяется в предварительно скомпилированном виде, что не даёт возможности подтвердить соответствуют ли они друг другу.

💡Цель проекта «Воспроизводимые сборки» — обеспечить проверку отсутствия уязвимостей или бэкдоров в процессе компиляции. Обеспечивая получение идентичных результатов из исходного кода, проект позволяет нескольким сторонним разработчикам прийти к единому мнению о «правильном» результате, выявляя любые отклонения как подозрительные и заслуживающие тщательного изучения.

Возможность заметить, если система разработки или сборки была скомпрометирована, предотвращает подобные угрозы или атаки, поскольку любое нарушение безопасности может быть быстро обнаружено. В результате, сотрудников, работающих на передовой, нельзя принудить к использованию уязвимостей или раскрытию информации о своих коллегах.

📣Когда сборку можно воспроизвести?

Сборка считается воспроизводимой , если при наличии одного и того же исходного кода, среды сборки и инструкций по сборке любая сторона может побитово воссоздать идентичные копии всех указанных артефактов.

Соответствующие атрибуты среды сборки, инструкции по сборке и исходный код, а также ожидаемые воспроизводимые артефакты определяются авторами или распространителями. Артефакты сборки — это части результатов сборки, которые являются желаемым основным выходным результатом.

Исходный код обычно представляет собой копию, полученную из системы контроля версий в определенной ревизии, или архив исходного кода.

К соответствующим атрибутам среды сборки обычно относятся зависимости и их версии, флаги конфигурации сборки и переменные среды, если они используются системой сборки (например, локаль). Предпочтительнее сократить этот набор атрибутов.

К артефактам относятся исполняемые файлы, дистрибутивы или образы файловых систем. Обычно они не включают журналы сборки или аналогичные вспомогательные выходные данные.

Воспроизводимость артефактов проверяется побитовым сравнением. Обычно это выполняется с использованием криптографически защищенных хэш-функций.

💬На этом первая часть подошла к концу. В следующий раз расскажу более подробно как происходит сравнение.
[2026-02-10 19:42]
Здесь последний метод должен называться "эвристический метод и анализ семантики", почему-то слово "семантики" не отобразилось.
[2026-02-10 19:40]
#PXI_AI

🙂Приветствую всех, сегодня я защитил свою исследовательскую работу на тему: «Сравнительный анализ существующих средств определения текста, сгенерированного большими языковыми моделями (БЯМ/LLM)».

↗️Хочу поделиться с вами ключевыми результатами этой работы.

💭Во-первых, начну с того, что
сформировал тестовый набор из 40 статей:
    – 20 статей — оригинальные написанные человеком темы, на различные темы и изданные до 2017 года (меньше шанс, что в них есть следы БЯМ/ИИ).
    – 20 статей — сгенерированых БЯМ (в качестве промптов использовал названия оригинальных статей, БЯМ использованы разные от GPT до Яндекс Алисы).

❗️ Во-вторых, проанализировал существующие методы детекции, их можно условно разделить на 4 основные категории по лежащим в их основе методам(смотрите на таблицу ниже).

По совокупности условий предъявляемых к методам для тестирования был выбран метод машинного и глубокого обучения.

❓Среди множества моделей-детекторов выбраны три популярных детектора:
    – GPTZero
    – Copyleaks
    – Gigacheck (Gigachecker)

✅ Результаты тестирования отражены тоже в таблице ниже.

‼️В заключении, Gigacheck и Copyleaks показали очень даже хорошие результаты, но стоит учитывать, что мой тестовый набор скорее всего не является репрезентативным.
[2026-02-09 08:05]
Продолжение прошлого поста.
Множества

# Проверки
s1.issubset(s2)      # False - является ли подмножеством
s1.issuperset(s2)    # False - является ли надмножеством
s1.isdisjoint(s2)    # False - нет общих элементов?

📌 КОРТЕЖИ (tuple)

t = (1, 2, 3, 4, 2)

# Методы (их мало, т.к. кортежи неизменяемы)
t.count(2)           # 2 - количество вхождений
t.index(3)           # 2 - индекс элемента
len(t)               # 5 - длина кортежа
min(t), max(t)       # минимальный и максимальный

📌 ПРОЧИЕ ПОЛЕЗНОСТИ

Срезы (работают для строк, списков, кортежей):
data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
data[2:6]       # [2, 3, 4, 5] - с 2 по 5 индекс
data[:5]        # [0, 1, 2, 3, 4] - с начала до 4
data[5:]        # [5, 6, 7, 8, 9] - с 5 до конца
data[::2]       # [0, 2, 4, 6, 8] - каждый второй
data[::-1]      # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] - реверс

Проверка типа:
type(data) == list   # True
isinstance(data, list)  # True (лучше использовать это)

Преобразования типов:
str(123)        # "123"
int("456")      # 456
float("3.14")   # 3.14
list("hello")   # ['h', 'e', 'l', 'l', 'o']
tuple([1,2,3])  # (1, 2, 3)
set([1,2,2,3])  # {1, 2, 3}
dict([("a",1), ("b",2)])  # {'a': 1, 'b': 2}

Множественное присваивание:
a, b, c = [1, 2, 3]
x, *y, z = [1, 2, 3, 4, 5]  # x=1, y=[2,3,4], z=5

Генераторы списков/словарей/множеств:
[x**2 for x in range(5)]              # [0, 1, 4, 9, 16]
{x: x**2 for x in range(3)}           # {0: 0, 1: 1, 2: 4}
{x for x in "hello" if x != "l"}      # {'h', 'e', 'o'}

Сортировка сложных структур:
users = [{"name": "PXI", "age": 25}, {"name": "Bob", "age": 20}]
sorted(users, key=lambda x: x["age"])  # сортировка по возрасту
[2026-02-09 08:02]
#PXI_LEARN

ШПАРГАЛКА PYTHON: ВСЕ ГЛАВНЫЕ МЕТОДЫ ДЛЯ РАБОТЫ С ДАННЫМИ

Рекомендую смотреть на большом экране или горизонтально на телефоне

Сохраняйте себе — этот пост сэкономит вам часы поисков.

📌 СТРОКИ (str)

s = "Hello World"

# Основные методы
s.upper()           # "HELLO WORLD" - в верхний регистр
s.lower()           # "hello world" - в нижний регистр
s.capitalize()      # "Hello world" - первая буква заглавная
s.title()           # "Hello World" - каждое слово с заглавной

# Поиск и замена
s.find("World")     # 6 - индекс первого вхождения
s.rfind("l")        # 9 - индекс последнего вхождения
s.index("World")    # 6 (как find, но ValueError если нет)
s.replace("Hello", "Hi")  # "Hi World"
s.count("l")        # 3 - количество вхождений

# Проверки
s.startswith("Hello")  # True
s.endswith("World")    # True
s.isalpha()            # False (только буквы?)
s.isdigit()            # False (только цифры?)
s.isalnum()            # False (буквы/цифры?)
s.isspace()            # False (только пробелы?)

# Разделение и соединение
s.split()             # ['Hello', 'World'] - по пробелам
s.split("o")          # ['Hell', ' W', 'rld']
" ".join(["Hello", "World"])  # "Hello World"

# Форматирование
s.strip()             # удалить пробелы с двух сторон
s.lstrip()            # удалить пробелы слева
s.rstrip()            # удалить пробелы справа
s.center(20, "*")     # "****Hello World*****"
s.zfill(10)           # "0Hello World" - дополнить нулями

📌 СПИСКИ (list)

lst = [1, 2, 3, 4, 5]

# Добавление элементов
lst.append(6)        # [1, 2, 3, 4, 5, 6] - в конец
lst.insert(2, 99)    # [1, 2, 99, 3, 4, 5] - на позицию
lst.extend([7, 8])   # [1, 2, 3, 4, 5, 7, 8] - расширить

# Удаление элементов
lst.remove(3)        # удалить первое вхождение значения 3
lst.pop()            # удалить и вернуть последний элемент
lst.pop(2)           # удалить и вернуть элемент на позиции 2
lst.clear()          # [] - очистить весь список

# Поиск и информация
lst.index(4)         # 3 - индекс элемента
lst.count(2)         # 1 - количество вхождений
len(lst)             # 5 - длина списка
min(lst), max(lst)   # минимальный и максимальный

# Сортировка и изменение порядка
lst.sort()           # сортировка по возрастанию
lst.sort(reverse=True)  # по убыванию
lst.reverse()        # развернуть порядок
sorted(lst)          # вернуть отсортированную копию

# Копирование
lst.copy()           # поверхностная копия
lst[:]               # тоже копия (срез)

📌 СЛОВАРИ (dict)

d = {"name": "PXI", "age": 25, "city": "Moscow"}

# Основные операции
d["name"]            # "PXI" - получить значение
d.get("age")         # 25 - получить (без ошибки если нет)
d.get("country", "Russia")  # "Russia" - значение по умолчанию

# Добавление и изменение
d["email"] = "pxi@mail.com"  # добавить/изменить
d.update({"age": 26, "job": "Dev"})  # обновить несколько

# Удаление
d.pop("city")        # удалить ключ и вернуть значение
d.popitem()          # удалить и вернуть последнюю пару
del d["age"]         # удалить ключ
d.clear()            # {} - очистить словарь

# Получение представлений
d.keys()             # dict_keys(['name', 'age', 'city'])
d.values()           # dict_values(['PXI', 25, 'Moscow'])
d.items()            # dict_items([('name', 'PXI'), ...])

# Проверки
"name" in d          # True - проверка наличия ключа
len(d)               # 3 - количество пар

# Создание словарей
dict.fromkeys(["a", "b", "c"], 0)  # {'a': 0, 'b': 0, 'c': 0}

📌 МНОЖЕСТВА (set)

s1 = {1, 2, 3, 4, 5}
s2 = {4, 5, 6, 7, 8}

s1.add(6)            # добавить элемент
s1.remove(3)         # удалить элемент (ошибка если нет)
s1.discard(10)       # удалить (без ошибки если нет)
s1.pop()             # удалить и вернуть случайный элемент
s1.clear()           # очистить множество

s1.union(s2)  # {1, 2, 3, 4, 5, 6, 7, 8} - объединение
s1 | s2  # то же самое
s1.intersection(s2)  # {4, 5} - пересечение
s1 & s2  # то же самое
s1.difference(s2)    # {1, 2, 3} - разность
s1 - s2 # то же самое
s1.symmetric_difference(s2)  # {1, 2, 3, 6, 7, 8}
s1 ^ s2 # то же самое
[2026-02-08 08:03]
#PXI_LEARN

Слышали ли вы что-то про Google Colab?

Знакомьтесь — Google Colab (Colaboratory). Это не просто ещё один сервис Google, а ваш личный бесплатный облачный рабочий стол для Data Science и AI!

Что это такое? 
Это интерактивная среда на базе Jupyter Notebook, которая работает прямо в браузере. Пишете код на Python — получаете результат моментально. Никаких установок.

Зачем это вам?
Представьте: вы делаете голосового ассистента, но ваш ноутбук не справляется.

Решение — Colab:
✅ Бесплатный GPU/TPU — мощные видеокарты от Google для тренировки моделей 
✅ Тестирование и дообучение — можно «прокачать» свою модель без стресса для железа 
✅ Хранение и интеграция — все файлы на Google Диске, доступ с любого устройства 
✅ API из ноутбука — превращаете код в работающий сервис за 5 минут 

Что за «ноутбук»? 📓
Нет, это не ваш лэптоп! В мире Data Science ноутбук (notebook) — это интерактивный документ, где можно:
• Писать код
• Видеть результат выполнения
• Добавлять текст, формулы, картинки
• Делиться с коллегами одним файлом

Выглядит как гибрид текстового файла и консоли программиста — и это невероятно удобно!

Кому подходит?
• Начинающим Data Scientist’ам
• Студентам (лабораторные по ML/AI)
• Исследователям для экспериментов
• Разработчикам, тестирующим тяжёлые модели
• Всем, кто хочет работать с Python без головной боли

Магия интеграции
Самый крутой фишкой Colab можно считать возможность превратить ваш ноутбук в работающий API. Например:
1. Вы обучаете модель распознавания речи в Colab
2. Настраиваете веб-интерфейс прямо там же
3. Подключаете Telegram-бота к этому «облачному мозгу»
4. Бот использует мощь Colab, а ваш телефон просто отправляет запросы

Как начать?
1. Зайдите на colab.research.google.com
2. Нажмите «Новый блокнот»
3. Начните писать код — всё уже настроено!
4. В меню «Среда выполнения» → «Сменить среду выполнения» выберите GPU/TPU

Важный момент ⚠️ 
Бесплатный Colab имеет лимиты

Практический пример 
Хотите собрать голосового ассистента? В Colab вы можете:
• Обучить модель распознавания речи
• Настроить синтез голоса
• Связать всё воедино
• Протестировать на мощном железе
• И всё это — с обычного ноутбука.

Итог:
Google Colab — это демократизация AI. Теперь для работы с нейросетями не нужен дорогой компьютер. Нужен только браузер и идея.

А вы уже пробовали Colab? Делитесь опытом в комментариях! 👇

А какую задачу вы бы хотели решить с помощью Colab? 💬
[2026-02-07 21:00]
#PXI_AI

Продолжаем про голосового ассистента! В прошлый раз разобрали STT (распознавание речи) и TTS (синтез речи), а теперь свяжем это всё вместе в main.py.

Создадим максимально простого голосового ассистента, который умеет выполнять только заранее подготовленные команды. Это наш MVP (Minimum Viable Product) — основа, которую будем развивать дальше.

🔧 Первая функция — сравнение сказанного текста с командами

def equ(text: str, needed) -> bool:
    text_lower = text.lower().strip()
   
    if isinstance(needed, str):
        needed_lower = needed.lower()
       
        # 1. Полное совпадение (самое быстрое)
        if needed_lower == text_lower:
            return True
       
        # 2. Частичное вхождение (для длинных команд)
        if needed_lower in text_lower and len(needed_lower) > 3:
            return True
       
        # 3. Нечёткое сравнение (fuzzy matching)
        return fuzz.ratio(text_lower, needed_lower) >= 70
   
    elif isinstance(needed, list):
        # Для списка команд проверяем каждую
        for cmd in needed:
            cmd_lower = cmd.lower()
           
            # Быстрые проверки сначала
            if cmd_lower == text_lower:
                return True
            if cmd_lower in text_lower and len(cmd_lower) > 3:
                return True
       
        # Если не нашли точного совпадения — нечёткий поиск
        for cmd in needed:
            if fuzz.ratio(text_lower, cmd.lower()) >= 70:
                return True
       
        return False
   
    return False

Что здесь происходит:
text.lower().strip() — приводим текст к нижнему регистру и убираем пробелы
• Проверяем три уровня совпадения (от быстрого к медленному)
• Используем fuzzywuzzy для нечёткого сравнения (пользователь может сказать с ошибкой)
• Порог 70% — оптимальный баланс между точностью и гибкостью

🎯 Сам ассистент — функция execute()

def execute(text: str):
    global tts, stt  # Не забудьте ниже создать объекты наших классов
    text_lower = text.lower()
   
    # ========== ПРИВЕТСТВИЯ ==========
    if equ(text_lower, ["привет", "здравствуй", "добрый день", "хай", "ку"]):
        greetings = ["Приветствую!", "Здравствуйте!", "Привет!", "Рад вас слышать!"]
        tts.text2speech(random.choice(greetings))
   
    if equ(text_lower, ["как дела", "как ты", "как жизнь"]):
        responses = ["Всё отлично!", "Работаю в штатном режиме.", "Прекрасно! А у вас?"]
        tts.text2speech(random.choice(responses))
   
    if equ(text_lower, ["кто ты", "твое имя", "представься"]):
        tts.text2speech("Я ваш голосовой помощник. Можете называть меня пикс+а+й")
   
    # ========== УПРАВЛЕНИЕ ==========
    if equ(text_lower, ["выключись", "стоп", "отключи питание",
                        "вырубайся", "прячься", "отключайся", "пока"]):
        tts.text2speech("Выключаюсь")
        sys.exit(0)
   
    if equ(text_lower, ["перезагрузка", "рестарт", "перезапуск", "обновись"]):
        tts.text2speech("Перезагружаюсь через три секунды")
        sleep(3)
        os.execv(sys.executable, ["python", "-m", "src.main"])

Ключевые моменты:
random.choice() — добавляем вариативность ответов
sys.exit(0) — корректное завершение программы
os.execv() —  остановка старого процесса и запуска нового без закрытия терминала

📦 Полный код

Весь исходный код с подробными комментариями выложу в следующем посте с пометкой #PXI.AI

Также готовлю видео на YouTube с полным разбором кода и объяснением архитектуры (буду стараться с каждым роликом делать объяснения интереснее и понятнее — это моя цель на ближайшее время).

Ссылку на видео тоже добавлю в тот пост!

🚀 Что дальше?

Это лишь базовый набор команд — основа, на которую будем наращивать функционал. В планах сейчас это добавить математических операций, здесь надо будет поиграть с библиотеками num2words и text2num. А также в скором будущем надо добавить нашему голосовому помощнику brain.py в виде настоящей модели (в планах rugpt3-small или qwen, напишите свои предложения в комментариях).
[2026-02-06 07:02]
#PXI_LEARN

Приветствую, хочу немного сменить тему от нашего голосового ассистента и от области ИИ.
Даже в небольшом проекте (как наш голосовой ассистент) используются десятки внешних библиотек. 
Просто ставить их в систему — рискованно и неопрятно. Я рекомендую использовать виртуальное окружение или docker, чтобы изолировать зависимости.

🛡️ Виртуальное окружение (.venv)

Идеально для: быстрых задач, локальной разработки, особенно на Windows.

Создание:
python -m venv .venv


Активация:
Windows:
  .venv\Scripts\activate
 

Linux/macOS:
  source .venv/bin/activate
 

После активации:
• Устанавливаете библиотеки через pip — они попадут только в .venv.
• Проект не засоряет систему.
• Легко удалить (просто удалить папку .venv).

🐳 Docker — изоляция на уровне контейнера

Идеально для: воспроизводимости, деплоя, сложных зависимостей, мультиплатформенности.

Что это:
Контейнер — легковесная виртуальная среда, куда упаковывается приложение со всеми зависимостями.

Пример Dockerfile для Python-проекта:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]

Основные команды (не забудьте про sudo):
# Собрать образ с именем myapp
docker build -t myapp .

# Запустить контейнер в интерактивном режиме с автоудалением
docker run -it --rm myapp

#Запустить с работой в фоне, пробросом портов, папок и названием контейнера
docker run -d
  --name my-app
  -p 8080:8000
  -v "{PWD}/app":/app pxi

# Остановить все контейнеры
docker stop $(docker ps -aq)

Плюсы Docker:
✅ Одинаковая среда на любом компьютере 
✅ Нет конфликтов версий Python/библиотек 
✅ Легко передать проект коллеге или на сервер 
✅ Можно упаковать даже системные зависимости (например, для аудиообработки)

Совет: даже если начинаете с .venv, сохраняйте зависимости в requirements.txt:
pip freeze > requirements.txt

Это позволит в любой момент перейти к Docker или восстановить окружение.

💡 Итог: изоляция зависимостей — must-have для любого проекта. 
Начинайте с .venv, для серьёзных задач осваивайте Docker. 

Вопросы? Пишите — разберём подробнее!
[2026-02-05 16:17]
#PXI_AI

Продолжаю про голосового ассистента. Второй компонент — синтез речи (TTS).

Выбрал Silero за:
✅ Работу оффлайн
✅ Хороший баланс качества и скорости

Как работает:
1. Модель загружается через PyTorch Hub (кэшируется после первого раза).
2. Можно выбрать голос (спикера) и устройство (CPU/GPU).
3. Текст в речь — одна функция.

Основной код:

Инициализация объекта класса:
def __init__(
            self, speaker: str = SPEAKER_AIDAR,
            device: str = DEVICE_CPU,
            samplerate: int = 24_000
    ):
        self.__MODEL__, _ = torch.hub.load(
            repo_or_dir="snakers4/silero-models",
            model="silero_tts",
            language="ru",
            speaker="ru_v3"
        )
        self.__MODEL__.to(torch.device(device))

        self.__SPEAKER__ = speaker
        self.__SAMPLERATE__ = samplerate


Метод получения аудио из текста и отдельно воспроизведение этого аудио:
def text2speech(self, text: str):
        audio = self.__MODEL__.apply_tts(
            text=text,
            speaker=self.__SPEAKER__,
            sample_rate=self.__SAMPLERATE__,
            put_accent=True,
            put_yo=True
        )

# Воспроизведение
sd.play(audio, samplerate=24000)
sd.wait()

Итог: ассистент теперь и слышит, и говорит.

📌Что дальше?

В следующем посте свяжем всё вместе и добавим логику.
[2026-02-04 23:59]
#PXI_AI 
Приветствую всех, кто перешёл в этот канал после роликов про голосового ассистента! 🎙️

Сейчас проект находится на стадии активной разработки v1.0 — впереди ещё много работы по добавлению функционала и оптимизации. Хочу поделиться с вами текущими наработками.

🏗️ Архитектурная основа

Ядро приложения строится на двух ключевых компонентах:

1. Speech-to-Text (STT) — используется Vosk (о нём подробнее ниже)
2. Text-to-Speech (TTS)Silero (расскажу в отдельном посте)

Обе реализации достаточно просты, отдельное спасибо статьям с Хабра.

🔧 Ключевые моменты реализации Vosk

Инициализация модели

def __init__(self, modelpath: str = "./models/vosk-model-small-ru-0.22", 
             samplerate: int = 16000):
    self.__REC__ = vosk.KaldiRecognizer(vosk.Model(modelpath), samplerate)
    self.__Q__ = queue.Queue()
    self.__SAMPLERATE__ = samplerate

Примечание: Пока использую compact-модель (vosk-model-small-ru-0.22). Для текущих задач её достаточно, но в перспективе возможен переход на полноразмерную версию для повышения точности.

Основной метод listen()

def listen(self, executor: callable):
    with sd.RawInputStream(
        samplerate=self.__SAMPLERATE__,
        blocksize=4000,
        device=1,
        dtype='int16',
        channels=1,
        callback=self.q_callback
    ):
        while True:
            data = self.__Q__.get()
            if self.__REC__.AcceptWaveform(data):
                result = json.loads(self.__REC__.Result())["text"]
                executor(result)

Особенности:
executor — принимает вызываемый объект (функцию), которая будет обрабатывать распознанный текст
• Очередь (Queue) используется для буферизации аудиоданных
• Распознанный текст передаётся в main.py для дальнейшей обработки

Важно про callback

Callback-функция должна строго соответствовать сигнатуре, требуемой sounddevice:
def q_callback(self, indata, frames, time, status):
    # Обработка входящего аудиопотока
    self.__Q__.put(bytes(indata))

📌 Что дальше?

В следующем посте подробно расскажу про Text-to-Speech (Silero) и как он интегрирован в систему. А в видеоразборе покажу общую архитектуру проекта, взаимодействие компонентов и планы по развитию.

Вопросы и предложения по архитектуре приветствуются в комментариях!
[2026-02-04 17:10]
🚀 Добро пожаловать в Project X Intelligence (PXI)!

Привет! Можете называть меня Пиксай, и это — наша стартовая точка общего взаимодействия.

🤔 Что такое PXI?
Это открытый проект, построенный на кооперации и взаимопомощи. Моя цель — делиться с вами знаниями и готовыми проектами, чтобы помочь вам расти. И я, в свою очередь, надеюсь на вашу помощь, идеи и обратную связь.

🧠 Чем мы здесь занимаемся?
Всё, что интересно! Хотя отправная точка — искусственный интеллект и машинное обучение, PXI не ограничивается только этим. Мы будем исследовать любые темы, которые придут в голову мне или вам.

📂 Текущая структура проекта:
• PXI_AI — всё, что связано с ИИ, ML, Data Science.
• PXI_WEB — мир веб-разработки, от фронтенда до бэкенда.
• PXI_NET — всё, что связанно с сетями и не относится только к веб разработке.
• PXI_HACK — всё, что связанно с хакингом и смежными к нему областям.
• PXI_OS — погружение в мир операционных систем.
• PXI_LEARN — фундаментальные знания: основы программирования, архитектура ЭВМ и всё, что нужно для старта.
• PXI_PROJECT — готовые проекты.

💬 Как можно участвовать?
1. Предлагайте темы для исследований, статей или проектов.
2. Делитесь своим опытом и задавайте вопросы.
3. Участвуйте в обсуждениях. Ваше мнение важно для развития PXI.

👉 Давайте создавать что-то крутое вместе! Подписывайтесь, чтобы не пропустить обновления, и пишите в комментариях — с чего, по-вашему, нам стоит начать?

👩‍💻 Всё, что публикуется в канале, вы сможете найти ниже по тегам:

Навигация:
#PXI_AI — всё, что связано с ИИ, ML, Data Science.
#PXI_WEB — мир веб-разработки, от фронтенда до бэкенда.
#PXI_NET — всё, что связанно с сетями и не относится только к веб разработке.
#PXI_HACK — всё, что связанно с хакингом и смежными к нему областям.
#PXI_OS — погружение в мир операционных систем.
#PXI_LEARN — фундаментальные знания.
#PXI_PROJECT — готовые проекты.
_