diff --git a/src/PS5TrackingBot.py b/src/PS5TrackingBot.py index 20a3161..c23a2f8 100644 --- a/src/PS5TrackingBot.py +++ b/src/PS5TrackingBot.py @@ -1,33 +1,24 @@ import os -import re import time -import traceback -import requests -from selenium.common.exceptions import StaleElementReferenceException from selenium.webdriver import Chrome from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By +from utils import parse_retry, TelegramReporter, CSS + # Config # Telegram chat ID that receives update messages (could be a channel in @channel_id format) # TG_RECEIVER = 1770239825 TG_RECEIVER = '@toronto_ps5_bestbuy' # Telegram bot token TG_TOKEN = os.environ['TG_TOKEN'] +# Alert receiver telegram chat ID +ALERT_RECEIVER = -1001655384423 # Constants -CSS = By.CSS_SELECTOR AVAIL_TABLE: dict[str, bool] = {} -IGNORED = [] - - -def send_message(msg: str): - r = requests.get(f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage', - params={'chat_id': TG_RECEIVER, 'parse_mode': 'Markdown', 'text': msg}) - - if r.status_code != 200: - print('Request not OK:', r.status_code, r.text) +TG = TelegramReporter(TG_TOKEN, TG_RECEIVER, ALERT_RECEIVER) def parse_page(browser: Chrome): @@ -35,17 +26,13 @@ def parse_page(browser: Chrome): for item in browser.find_elements(By.CLASS_NAME, 'x-productListItem'): title = item.find_element(CSS, 'div[data-automation="productItemName"]').get_attribute('innerHTML') - # Ignored - if title in IGNORED: - continue - # Check availability avail = item.find_elements(CSS, 'div[data-automation="store-availability-messages"] span[data-automation="store-availability-checkmark"]') # Not available, check if it was previously available if len(avail) == 0: if title in AVAIL_TABLE: - send_message(f'Sold out: `{title}`') + TG.send(f'Sold out: `{title}`') del AVAIL_TABLE[title] continue @@ -62,8 +49,11 @@ def parse_page(browser: Chrome): # Available and meets threshold criteria, notify user AVAIL_TABLE[title] = True - send_message(f'PS5 Became Available!\n' - f'- [{title}]({link})') + TG.send(f'PS5 Became Available!\n' + f'- [{title}]({link}) ${price:.2f}') + + # Check alert + TG.alert() if __name__ == '__main__': @@ -73,23 +63,14 @@ if __name__ == '__main__': browser = Chrome(options=web_options) browser.get('https://www.bestbuy.ca/en-ca/category/ps5-consoles/17583383') - send_message('Bot started') + TG.send('Bot restarted') # parse_page(browser) # browser.close() - def parse(tries: int = 0): - try: - parse_page(browser) - except StaleElementReferenceException: - if tries < 3: - parse(tries + 1) - except Exception as e: - traceback.print_exc() - # Refresh indefinitely while True: time.sleep(5) - parse() + parse_retry(parse_page, browser) browser.refresh() time.sleep(2) diff --git a/src/StockTrackingBot.py b/src/StockTrackingBot.py index a720b19..65e9bd8 100644 --- a/src/StockTrackingBot.py +++ b/src/StockTrackingBot.py @@ -1,25 +1,30 @@ import os import re import time -import traceback import requests -from selenium.common.exceptions import StaleElementReferenceException from selenium.webdriver import Chrome from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By +from bestbuy_utils import get_availability, avail_str, get_avail_v2 +from utils import parse_retry, TelegramReporter, CSS + # Config # Price increase ratio threshold (ignore everything higher than this ratio) INCR_MAX = 0.2 +ALERT_MODELS = ['3060 ti', '3070', '3070 ti', '3080'] +ALERT_MIN_AVAILABLE = 2 + # Telegram chat ID that receives update messages (could be a channel in @channel_id format) # TG_RECEIVER = 1770239825 TG_RECEIVER = '@toronto_bestbuy_gpu' # Telegram bot token TG_TOKEN = os.environ['TG_TOKEN'] +# Alert receiver telegram chat ID +ALERT_RECEIVER = -1001655384423 # Constants -CSS = By.CSS_SELECTOR MODELS = [ ['3060 ti', 400, 132], ['3070 ti', 600, 167], @@ -34,6 +39,7 @@ USD_TO_CAD = 1.27 AVAIL_TABLE: dict[str, bool] = {} IGNORED = [] TITLE_SHORTEN = re.compile('(rtx|nvidia|geforce|edition|gddr[56]x*|video|card)', flags=re.IGNORECASE) +TG = TelegramReporter(TG_TOKEN, TG_RECEIVER, ALERT_RECEIVER) def shorten_title(title: str): @@ -43,15 +49,9 @@ def shorten_title(title: str): return short_title.strip() -def send_message(msg: str): - r = requests.get(f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage', - params={'chat_id': TG_RECEIVER, 'parse_mode': 'Markdown', 'text': msg}) - - if r.status_code != 200: - print('Request not OK:', r.status_code, r.text) - - def parse_page(browser: Chrome): + become_available = [] + # Parse page for item in browser.find_elements(By.CLASS_NAME, 'x-productListItem'): title = item.find_element(CSS, 'div[data-automation="productItemName"]').get_attribute('innerHTML') @@ -67,7 +67,7 @@ def parse_page(browser: Chrome): # Not available, check if it was previously available if len(avail) == 0: if title in AVAIL_TABLE: - send_message(f'Sold out: `{title}`') + TG.send(f'Sold out: `{title}`') del AVAIL_TABLE[title] continue @@ -103,10 +103,30 @@ def parse_page(browser: Chrome): # Available and meets threshold criteria, notify user AVAIL_TABLE[title] = True - send_message(f'{model[0].upper()} Became Available!\n' - f'\n' - f'${price:.0f} | {price_incr * 100:.0f}% Incr | Value: {value:.0f}\n' - f'- [{title}]({link})') + msg = TG.send(f'{model[0].upper()} Became Available!\n' + f'\n' + f'${price:.0f} | {price_incr * 100:.0f}% Incr | Value: {value:.0f}\n' + f'- [{title}]({link})') + become_available.append((title, model, link, msg, price, price_incr, value)) + + # Notify user + for tup in become_available: + title, model, link, msg, price, price_incr, value = tup + + # Get availability + avail = get_avail_v2(link) + + # Edit message + msg.edit_text(f'{model[0].upper()} (${price:.0f} {price_incr * 100:.0f}% {value:.0f}) Became Available!\n' + f'\n' + f'[{title}]({link})\n' + f'{avail_str(avail[:5])}\n', + parse_mode='Markdown') + + # Check alert + if model[0] in ALERT_MODELS: + if sum(a.n for a in avail) >= ALERT_MIN_AVAILABLE: + TG.alert() if __name__ == '__main__': @@ -120,18 +140,11 @@ if __name__ == '__main__': # parse_page(browser) # browser.close() - def parse(tries: int = 0): - try: - parse_page(browser) - except StaleElementReferenceException: - if tries < 3: - parse(tries + 1) - except Exception as e: - traceback.print_exc() + TG.send('Bot restarted') # Refresh indefinitely while True: time.sleep(5) - parse() + parse_retry(parse_page, browser) browser.refresh() time.sleep(2) diff --git a/src/bestbuy_utils.py b/src/bestbuy_utils.py new file mode 100644 index 0000000..924b8d9 --- /dev/null +++ b/src/bestbuy_utils.py @@ -0,0 +1,91 @@ +import math +from dataclasses import dataclass +from html import unescape +from urllib.parse import unquote + +import requests +from selenium.webdriver import Chrome +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.common.by import By + +from utils import CSS + + +@dataclass() +class AvailableStore: + loc: str + avail: str + n: int + + +@dataclass() +class AvailableStoreV2(AvailableStore): + loc_key: str + fulfillment_key: str + + +def get_avail_v2(url: str, postal='M5H1N1') -> list[AvailableStore]: + sku = int(url.split('/')[-1]) + params = { + 'accept': 'application/vnd.bestbuy.standardproduct.v1+json', + 'accept-language': 'en-CA', + 'locations': '977|203|931|62|617|927|965|57|938|237|943|932|956|202|200|937|926|795|916|233|544|910|954|207|930|622|223|245|925|985|990|959|949|942', + 'postalCode': postal, + 'skus': sku + } + + headers = { + "accept": "*/*", + "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", + "sec-ch-ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"98\", \"Google Chrome\";v=\"98\"", + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": "\"macOS\"", + "sec-fetch-dest": "empty", + "sec-fetch-mode": "cors", + "sec-fetch-site": "same-origin", + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.109 Safari/537.36', + "referer": url, + } + + r = requests.get('https://www.bestbuy.ca/ecomm-api/availability/products', params=params, headers=headers) + o = r.json()['availabilities'][0]['pickup']['locations'] + return [AvailableStoreV2(loc['name'], str(loc['quantityOnHand']), loc['quantityOnHand'], loc['locationKey'], loc['fulfillmentKey']) + for loc in o if loc['quantityOnHand'] > 0] + + +def get_availability(browser: WebDriver, url: str) -> list[AvailableStore]: + browser.get(url) + + # Get stores element + stores = browser.find_elements(CSS, '.x-nearby-stores > *') + + # Filter stores, not 'check other stores' button + stores = [s for s in stores if not s.get_attribute('data-automation')] + + # Filter stores that are available + stores = [s for s in stores if len(s.find_elements(CSS, '[data-automation="store-availability-checkmark"]'))] + + if not stores: + return [] + + # Get availability + result = [] + for store in stores: + loc = unescape(store.find_element(CSS, '[data-automation="pickup-store-list-item-store-name"]').get_attribute('innerHTML').strip()) + avail = store.find_elements(By.XPATH, './/*[@data-automation="pickup-store-list-item-reserve-button"]/following-sibling::span/following-sibling::span') + + if not avail: + avail = 'Available' + n = 100 + else: + avail = unescape(avail[0].get_attribute('innerHTML').strip()) + n = [int(s) for s in avail.split() if s.isdigit()][0] + + result.append(AvailableStore(loc, avail, n)) + + return result + + +def avail_str(avail: list[AvailableStore]) -> str: + return '\n'.join([f'- {a.loc}: {"∞" if a.n == 100 else a.n}' for a in avail]) + diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..7573fc8 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import os +import time +import traceback +from typing import Callable, Optional + +from selenium.common.exceptions import StaleElementReferenceException +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.common.by import By +from telegram import Bot, Message + +CSS = By.CSS_SELECTOR + + +def parse_retry(parser: Callable, browser: WebDriver, tries: int = 0): + try: + parser(browser) + except StaleElementReferenceException: + if tries < 3: + parse_retry(parser, browser, tries + 1) + except Exception as e: + traceback.print_exc() + + +class TelegramReporter: + bot: Bot + receiver: str | int + alert_receiver: Optional[str | int] + + def __init__(self, token: str, receiver: str | int, alert_receiver: Optional[str | int]): + self.bot = Bot(token) + self.receiver = receiver + self.alert_receiver = alert_receiver + + def send(self, msg: str, rec: Optional[str | int] = None) -> Message: + """ + Send a message + + :param msg: Message string + :param rec: Receiver + :return: Edit code or None if failed + """ + if rec is None: + rec = self.receiver + + return self.bot.send_message(chat_id=rec, parse_mode='Markdown', text=msg) + + def alert(self) -> Message: + if self.alert_receiver: + return self.send('/alarm', self.alert_receiver)