167 lines
5.6 KiB
Python
167 lines
5.6 KiB
Python
from peewee import Model, CharField, ForeignKeyField, IntegerField, BigIntegerField, BooleanField, CompositeKey, PostgresqlDatabase
|
|
|
|
from utils import CONFIG
|
|
|
|
# Database configuration
|
|
db = PostgresqlDatabase('tgtree', user='cat',
|
|
password=CONFIG["db-pass"], host=CONFIG["db-host"], port=CONFIG["db-port"])
|
|
|
|
|
|
class BaseModel(Model):
|
|
class Meta:
|
|
database = db
|
|
|
|
|
|
class Channel(BaseModel):
|
|
username = CharField(primary_key=True) # Unique username as primary key
|
|
name = CharField() # Channel name
|
|
parent = ForeignKeyField('self', null=True, backref='children', on_delete='CASCADE', field='username')
|
|
height = IntegerField(default=0) # Tree height (depth)
|
|
owner_id = BigIntegerField(null=True) # Telegram user ID of the channel owner
|
|
|
|
|
|
class Vote(BaseModel):
|
|
user_id = BigIntegerField() # Telegram user ID
|
|
channel = ForeignKeyField(Channel, backref='votes', on_delete='CASCADE', field='username')
|
|
|
|
class Meta:
|
|
primary_key = CompositeKey('user_id', 'channel')
|
|
|
|
|
|
class Block(BaseModel):
|
|
user_id = BigIntegerField() # Blocked Telegram user ID
|
|
channel = ForeignKeyField(Channel, backref='blocks', on_delete='CASCADE', field='username')
|
|
|
|
class Meta:
|
|
primary_key = CompositeKey('user_id', 'channel')
|
|
|
|
|
|
class OwnerPref(BaseModel):
|
|
user_id = BigIntegerField(primary_key=True) # Owner's Telegram user ID
|
|
treehole_optout = BooleanField(default=False) # Whether the owner opted out of tree hole messages
|
|
treehole_notified = BooleanField(default=False) # Whether the owner has been sent the intro notice
|
|
|
|
|
|
with db:
|
|
db.create_tables([Channel, Vote, Block, OwnerPref])
|
|
|
|
|
|
def channel_info(username: str) -> Channel | None:
|
|
"""Fetch channel information using the username as an identifier."""
|
|
try:
|
|
return Channel.get(Channel.username == username)
|
|
except Channel.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def register(username: str, name: str, parent_username: str = None, owner_id: int = None):
|
|
"""Register a channel using its username and assign the correct height."""
|
|
if parent_username:
|
|
try:
|
|
parent = Channel.get(Channel.username == parent_username)
|
|
height = parent.height + 1 # Child height is parent's height + 1
|
|
except Channel.DoesNotExist:
|
|
raise ValueError("Parent channel does not exist.")
|
|
else:
|
|
parent = None
|
|
height = 0 # Root nodes have height 0
|
|
|
|
Channel.create(username=username, name=name, parent=parent, height=height, owner_id=owner_id)
|
|
return height
|
|
|
|
|
|
def add_vote(user_id: int, channel_username: str) -> bool:
|
|
"""Add a vote for a channel. Returns True if the vote was added, False if already voted."""
|
|
try:
|
|
Vote.create(user_id=user_id, channel=channel_username)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def get_votes(channel_username: str) -> int:
|
|
"""Get the total number of votes for a channel."""
|
|
return Vote.select().where(Vote.channel == channel_username).count()
|
|
|
|
|
|
def has_voted(user_id: int, channel_username: str) -> bool:
|
|
"""Check if a user has already voted for a channel."""
|
|
return Vote.select().where(
|
|
(Vote.user_id == user_id) & (Vote.channel == channel_username)
|
|
).exists()
|
|
|
|
|
|
def get_channel_owner(channel_username: str) -> int | None:
|
|
"""Get the owner's Telegram user ID for a channel."""
|
|
try:
|
|
ch = Channel.get(Channel.username == channel_username)
|
|
return ch.owner_id
|
|
except Channel.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def block_user(user_id: int, channel_username: str) -> bool:
|
|
"""Block a user from sending tree hole messages to a channel. Returns True if newly blocked."""
|
|
try:
|
|
Block.create(user_id=user_id, channel=channel_username)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def is_blocked(user_id: int, channel_username: str) -> bool:
|
|
"""Check if a user is blocked from sending tree hole messages to a channel."""
|
|
return Block.select().where(
|
|
(Block.user_id == user_id) & (Block.channel == channel_username)
|
|
).exists()
|
|
|
|
|
|
def is_treehole_opted_out(owner_id: int) -> bool:
|
|
"""Check if an owner has opted out of receiving tree hole messages."""
|
|
try:
|
|
pref = OwnerPref.get(OwnerPref.user_id == owner_id)
|
|
return pref.treehole_optout
|
|
except OwnerPref.DoesNotExist:
|
|
return False
|
|
|
|
|
|
def set_treehole_optout(owner_id: int, opted_out: bool):
|
|
"""Set whether an owner has opted out of tree hole messages."""
|
|
pref, _ = OwnerPref.get_or_create(user_id=owner_id)
|
|
pref.treehole_optout = opted_out
|
|
pref.save()
|
|
|
|
|
|
def is_treehole_notified(owner_id: int) -> bool:
|
|
"""Check if the owner has received the first-time tree hole intro notice."""
|
|
try:
|
|
pref = OwnerPref.get(OwnerPref.user_id == owner_id)
|
|
return pref.treehole_notified
|
|
except OwnerPref.DoesNotExist:
|
|
return False
|
|
|
|
|
|
def set_treehole_notified(owner_id: int):
|
|
"""Mark the owner as having received the tree hole intro notice."""
|
|
pref, _ = OwnerPref.get_or_create(user_id=owner_id)
|
|
pref.treehole_notified = True
|
|
pref.save()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
with db:
|
|
# db.drop_tables([Channel])
|
|
db.create_tables([Channel])
|
|
|
|
register('hykilp', '小桂桂的回忆录 📒')
|
|
|
|
# Test the register function
|
|
print(register('test', 'Test Channel'))
|
|
print(register('test2', 'Test Channel 2', 'test'))
|
|
print(register('test3', 'Test Channel 3', 'test2'))
|
|
|
|
# Test the channel_info function
|
|
print(channel_info('test'))
|
|
print(channel_info('test2'))
|
|
print(channel_info('test3'))
|
|
print(channel_info('nonexistent')) |