[+] db
This commit is contained in:
@@ -0,0 +1,66 @@
|
||||
from peewee import Model, CharField, ForeignKeyField, IntegerField, PostgresqlDatabase
|
||||
|
||||
from utils import CONFIG
|
||||
|
||||
# Database configuration
|
||||
db = PostgresqlDatabase('tgtree', user='cat',
|
||||
password=CONFIG["db-pass"], host=CONFIG["db-host"], port=CONFIG["db-port"])
|
||||
|
||||
|
||||
class BaseModel(Model):
|
||||
class Meta:
|
||||
database = db
|
||||
|
||||
|
||||
class Channel(BaseModel):
|
||||
username = CharField(primary_key=True) # Unique username as primary key
|
||||
name = CharField() # Channel name
|
||||
parent = ForeignKeyField('self', null=True, backref='children', on_delete='CASCADE', field='username')
|
||||
height = IntegerField(default=0) # Tree height (depth)
|
||||
|
||||
|
||||
with db:
|
||||
db.create_tables([Channel])
|
||||
|
||||
|
||||
def channel_info(username: str) -> Channel | None:
|
||||
"""Fetch channel information using the username as an identifier."""
|
||||
try:
|
||||
return Channel.get(Channel.username == username)
|
||||
except Channel.DoesNotExist:
|
||||
return None
|
||||
|
||||
|
||||
def register(username: str, name: str, parent_username: str = None):
|
||||
"""Register a channel using its username and assign the correct height."""
|
||||
if parent_username:
|
||||
try:
|
||||
parent = Channel.get(Channel.username == parent_username)
|
||||
height = parent.height + 1 # Child height is parent's height + 1
|
||||
except Channel.DoesNotExist:
|
||||
raise ValueError("Parent channel does not exist.")
|
||||
else:
|
||||
parent = None
|
||||
height = 0 # Root nodes have height 0
|
||||
|
||||
Channel.create(username=username, name=name, parent=parent, height=height)
|
||||
return height
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with db:
|
||||
# db.drop_tables([Channel])
|
||||
db.create_tables([Channel])
|
||||
|
||||
register('hykilp', '小桂桂的回忆录 📒')
|
||||
|
||||
# Test the register function
|
||||
print(register('test', 'Test Channel'))
|
||||
print(register('test2', 'Test Channel 2', 'test'))
|
||||
print(register('test3', 'Test Channel 3', 'test2'))
|
||||
|
||||
# Test the channel_info function
|
||||
print(channel_info('test'))
|
||||
print(channel_info('test2'))
|
||||
print(channel_info('test3'))
|
||||
print(channel_info('nonexistent'))
|
||||
@@ -0,0 +1,8 @@
|
||||
peewee~=3.17.9
|
||||
playhouse~=0.1.0
|
||||
uvicorn~=0.23.2
|
||||
fastapi~=0.103.2
|
||||
starlette~=0.27.0
|
||||
python-telegram-bot~=21.1.1
|
||||
hypy_utils~=1.0.27
|
||||
requests~=2.32.3
|
||||
@@ -0,0 +1,30 @@
|
||||
import hashlib
|
||||
import re
|
||||
import tomllib
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple
|
||||
|
||||
|
||||
CONFIG = tomllib.loads((Path(__file__).parent / "config.toml").read_text())
|
||||
|
||||
|
||||
def gen_sha(channel: str, uid: int, parent: str):
|
||||
# 生成验证码
|
||||
sha = hashlib.sha1()
|
||||
sha.update(f"{channel}/{uid}/{parent}".encode())
|
||||
return sha.hexdigest()[:6]
|
||||
|
||||
|
||||
class PageMeta(NamedTuple):
|
||||
title: str
|
||||
description: str
|
||||
|
||||
|
||||
def extract_meta_tags(html):
|
||||
title_pattern = re.search(r'<meta property="og:title" content="(.*?)">', html)
|
||||
description_pattern = re.search(r'<meta property="og:description" content="(.*?)">', html)
|
||||
|
||||
title = title_pattern.group(1) if title_pattern else None
|
||||
description = description_pattern.group(1) if description_pattern else None
|
||||
|
||||
return PageMeta(title, description)
|
||||
Reference in New Issue
Block a user