[+] 树洞?

This commit is contained in:
2026-03-12 01:03:47 -04:00
parent 8af1661b44
commit b420252a40
2 changed files with 238 additions and 6 deletions
+162 -2
View File
@@ -1,4 +1,5 @@
import re
import time
import tomllib
import urllib.parse
from pathlib import Path
@@ -10,7 +11,7 @@ from hypy_utils.logging_utils import setup_logger
from starlette.responses import HTMLResponse
from starlette.staticfiles import StaticFiles
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters
import db
import utils
@@ -29,6 +30,14 @@ channels_dir = ensure_dir(data_dir / "channels")
validating = set()
# State tracking for tree hole conversations
# user_id -> {"action": "treehole"|"reply", "channel": str, "sender_id": int (for reply)}
user_states: dict[int, dict] = {}
# Rate limiting for tree hole messages: user_id -> last send timestamp
treehole_rate_limit: dict[int, float] = {}
TREEHOLE_COOLDOWN = 30 # seconds
def user_info(update: Update):
return (f"{update.message.from_user.id} {update.message.from_user.username or ''} "
@@ -87,13 +96,14 @@ async def plant(update: Update, context: ContextTypes.DEFAULT_TYPE):
info = utils.extract_meta_tags(text)
if sha in text:
logger.info(f"> 🌿 Registering channel {channel} with parent {parent}.")
height = db.register(channel, info.title, parent)
height = db.register(channel, info.title, parent, owner_id=uid)
await update.message.reply_text(f"""频道 {channel} 上树成功!把下面这条转发到频道里吧~""".strip())
url_enc = urllib.parse.quote_plus(f"https://tree.aza.moe/c/{channel}")
leaf_text = urllib.parse.quote(f"/leaf {channel} ")
leaf_btn = InlineKeyboardMarkup([
[InlineKeyboardButton("🌿 成为树叶", url=f"https://t.me/{BOT_NAME}?text={leaf_text}")],
[InlineKeyboardButton("💧 浇水", callback_data=f"water:{channel}")],
[InlineKeyboardButton("🕳️ 树洞", callback_data=f"th:{channel}")],
])
return await update.message.reply_html(f"""
今天是植树节,想试试和大家一起种一颗 tgcn 频道树 🌳 qwq
@@ -145,10 +155,160 @@ async def water_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
await query.answer(f"你已经浇过水了哦~ 这个树枝已经被浇了 {votes} 次水~", show_alert=False)
async def treehole_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the 树洞 button press — initiate anonymous messaging."""
query = update.callback_query
data = query.data
if not data.startswith("th:"):
return
channel = data[3:]
user_id = query.from_user.id
logger.info(f"🕳️ Tree hole from {user_id} {query.from_user.username or ''} for {channel}")
# Check rate limit
last_time = treehole_rate_limit.get(user_id, 0)
if time.time() - last_time < TREEHOLE_COOLDOWN:
remaining = int(TREEHOLE_COOLDOWN - (time.time() - last_time))
return await query.answer(f"发送太频繁了,请 {remaining} 秒后再试~", show_alert=True)
# Check if blocked
if db.is_blocked(user_id, channel):
return await query.answer("你已经被这个频道的主人屏蔽了哦~", show_alert=True)
# Check if channel has an owner
owner_id = db.get_channel_owner(channel)
if not owner_id:
return await query.answer("这个频道还没有设置主人哦~", show_alert=False)
# Try to DM the user
try:
await context.bot.send_message(
chat_id=user_id,
text=f"🕳️ 树洞模式\n\n想对频道 @{channel} 的主人说什么呢?(发送文字消息即可,消息将会匿名发送)"
)
user_states[user_id] = {"action": "treehole", "channel": channel}
await query.answer("请查看 bot 的私聊~", show_alert=False)
except Exception:
await query.answer("请先私聊 bot 发送 /start 才能使用树洞功能哦~", show_alert=True)
async def reply_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the channel owner clicking reply to a tree hole message."""
query = update.callback_query
data = query.data
# Format: reply:{sender_id}:{channel}
parts = data.split(":", 2)
if len(parts) != 3:
return
_, sender_id_str, channel = parts
sender_id = int(sender_id_str)
owner_id = query.from_user.id
# Verify the person clicking is the channel owner
actual_owner = db.get_channel_owner(channel)
if actual_owner != owner_id:
return await query.answer("只有频道主人才能回复哦~", show_alert=False)
user_states[owner_id] = {"action": "reply", "sender_id": sender_id, "channel": channel}
await query.answer()
await context.bot.send_message(
chat_id=owner_id,
text="💬 回复模式\n\n请输入你想回复的内容(发送文字消息即可)"
)
async def block_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the channel owner clicking block on a tree hole message sender."""
query = update.callback_query
data = query.data
# Format: block:{sender_id}:{channel}
parts = data.split(":", 2)
if len(parts) != 3:
return
_, sender_id_str, channel = parts
sender_id = int(sender_id_str)
owner_id = query.from_user.id
# Verify the person clicking is the channel owner
actual_owner = db.get_channel_owner(channel)
if actual_owner != owner_id:
return await query.answer("只有频道主人才能屏蔽哦~", show_alert=False)
if db.block_user(sender_id, channel):
await query.answer("✅ 已屏蔽该发送者", show_alert=True)
else:
await query.answer("该发送者已经被屏蔽了", show_alert=False)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle text messages for tree hole composing and owner replies."""
if update.message.chat.type != "private":
return
user_id = update.message.from_user.id
state = user_states.get(user_id)
if not state:
return
if state["action"] == "treehole":
channel = state["channel"]
del user_states[user_id]
# Update rate limit
treehole_rate_limit[user_id] = time.time()
# Get channel owner
owner_id = db.get_channel_owner(channel)
if not owner_id:
return await update.message.reply_text("这个频道还没有设置主人哦~")
# Send to owner anonymously
reply_btn = InlineKeyboardMarkup([
[InlineKeyboardButton("💬 回复", callback_data=f"reply:{user_id}:{channel}")],
[InlineKeyboardButton("🚫 屏蔽发送者", callback_data=f"block:{user_id}:{channel}")],
])
try:
await context.bot.send_message(
chat_id=owner_id,
text=f"🕳️ 树洞消息\n\n有人匿名对你的频道 @{channel} 说:\n\n{update.message.text}",
reply_markup=reply_btn
)
await update.message.reply_text("✅ 消息已匿名发送~")
except Exception:
await update.message.reply_text("发送失败了... 频道主人可能还没有启用 bot")
elif state["action"] == "reply":
sender_id = state["sender_id"]
channel = state["channel"]
del user_states[user_id]
try:
await context.bot.send_message(
chat_id=sender_id,
text=f"💬 频道 @{channel} 的主人回复了你的树洞消息:\n\n{update.message.text}"
)
await update.message.reply_text("✅ 回复已发送~")
except Exception:
await update.message.reply_text("回复发送失败了...")
# Add handlers
bot.add_handler(CommandHandler("start", start))
bot.add_handler(CommandHandler("leaf", plant))
bot.add_handler(CallbackQueryHandler(water_callback, pattern=r"^water:"))
bot.add_handler(CallbackQueryHandler(treehole_callback, pattern=r"^th:"))
bot.add_handler(CallbackQueryHandler(reply_callback, pattern=r"^reply:"))
bot.add_handler(CallbackQueryHandler(block_callback, pattern=r"^block:"))
bot.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
@app.on_event("startup")
+76 -4
View File
@@ -1,4 +1,4 @@
from peewee import Model, CharField, ForeignKeyField, IntegerField, BigIntegerField, CompositeKey, PostgresqlDatabase
from peewee import Model, CharField, ForeignKeyField, IntegerField, BigIntegerField, BooleanField, CompositeKey, PostgresqlDatabase
from utils import CONFIG
@@ -17,6 +17,7 @@ class Channel(BaseModel):
name = CharField() # Channel name
parent = ForeignKeyField('self', null=True, backref='children', on_delete='CASCADE', field='username')
height = IntegerField(default=0) # Tree height (depth)
owner_id = BigIntegerField(null=True) # Telegram user ID of the channel owner
class Vote(BaseModel):
@@ -27,8 +28,22 @@ class Vote(BaseModel):
primary_key = CompositeKey('user_id', 'channel')
class Block(BaseModel):
user_id = BigIntegerField() # Blocked Telegram user ID
channel = ForeignKeyField(Channel, backref='blocks', on_delete='CASCADE', field='username')
class Meta:
primary_key = CompositeKey('user_id', 'channel')
class OwnerPref(BaseModel):
user_id = BigIntegerField(primary_key=True) # Owner's Telegram user ID
treehole_optout = BooleanField(default=False) # Whether the owner opted out of tree hole messages
treehole_notified = BooleanField(default=False) # Whether the owner has been sent the intro notice
with db:
db.create_tables([Channel, Vote])
db.create_tables([Channel, Vote, Block, OwnerPref])
def channel_info(username: str) -> Channel | None:
@@ -39,7 +54,7 @@ def channel_info(username: str) -> Channel | None:
return None
def register(username: str, name: str, parent_username: str = None):
def register(username: str, name: str, parent_username: str = None, owner_id: int = None):
"""Register a channel using its username and assign the correct height."""
if parent_username:
try:
@@ -51,7 +66,7 @@ def register(username: str, name: str, parent_username: str = None):
parent = None
height = 0 # Root nodes have height 0
Channel.create(username=username, name=name, parent=parent, height=height)
Channel.create(username=username, name=name, parent=parent, height=height, owner_id=owner_id)
return height
@@ -76,6 +91,63 @@ def has_voted(user_id: int, channel_username: str) -> bool:
).exists()
def get_channel_owner(channel_username: str) -> int | None:
"""Get the owner's Telegram user ID for a channel."""
try:
ch = Channel.get(Channel.username == channel_username)
return ch.owner_id
except Channel.DoesNotExist:
return None
def block_user(user_id: int, channel_username: str) -> bool:
"""Block a user from sending tree hole messages to a channel. Returns True if newly blocked."""
try:
Block.create(user_id=user_id, channel=channel_username)
return True
except Exception:
return False
def is_blocked(user_id: int, channel_username: str) -> bool:
"""Check if a user is blocked from sending tree hole messages to a channel."""
return Block.select().where(
(Block.user_id == user_id) & (Block.channel == channel_username)
).exists()
def is_treehole_opted_out(owner_id: int) -> bool:
"""Check if an owner has opted out of receiving tree hole messages."""
try:
pref = OwnerPref.get(OwnerPref.user_id == owner_id)
return pref.treehole_optout
except OwnerPref.DoesNotExist:
return False
def set_treehole_optout(owner_id: int, opted_out: bool):
"""Set whether an owner has opted out of tree hole messages."""
pref, _ = OwnerPref.get_or_create(user_id=owner_id)
pref.treehole_optout = opted_out
pref.save()
def is_treehole_notified(owner_id: int) -> bool:
"""Check if the owner has received the first-time tree hole intro notice."""
try:
pref = OwnerPref.get(OwnerPref.user_id == owner_id)
return pref.treehole_notified
except OwnerPref.DoesNotExist:
return False
def set_treehole_notified(owner_id: int):
"""Mark the owner as having received the tree hole intro notice."""
pref, _ = OwnerPref.get_or_create(user_id=owner_id)
pref.treehole_notified = True
pref.save()
if __name__ == '__main__':
with db:
# db.drop_tables([Channel])