From 41b7df709036d8b281afcd512924d7079d7d0ad2 Mon Sep 17 00:00:00 2001 From: Hykilpikonna Date: Wed, 24 Nov 2021 10:24:51 -0500 Subject: [PATCH] [O] Make directories constant --- src/process/twitter_process.py | 43 +++++++++------------------- src/process/twitter_visualization.py | 8 ++---- src/raw_collect/twitter.py | 30 +++++++------------ src/utils.py | 22 -------------- 4 files changed, 26 insertions(+), 77 deletions(-) diff --git a/src/process/twitter_process.py b/src/process/twitter_process.py index 5178c98..63e2245 100644 --- a/src/process/twitter_process.py +++ b/src/process/twitter_process.py @@ -7,6 +7,7 @@ from dataclasses import dataclass from py7zr import SevenZipFile +from main import data_dir, tweets_dir, user_dir from utils import * @@ -30,7 +31,7 @@ class ProcessedUser(NamedTuple): lang: str -def process_users(user_dir: str = './data/twitter/user/') -> None: +def process_users() -> None: """ After downloading a wide range of users using download_users_start in raw_collect/twitter.py, this function will read the user files, extract only relevant information defined in the @@ -38,11 +39,8 @@ def process_users(user_dir: str = './data/twitter/user/') -> None: This function will save the processed user data to /processed/users.json - :param user_dir: Download directory of users data, should be the same as the downloads dir in - download_user_start. (Default: "./data/twitter/user/") :return: None """ - user_dir = normalize_directory(user_dir) users = [] # Loop through all the files @@ -74,27 +72,23 @@ def process_users(user_dir: str = './data/twitter/user/') -> None: write(f'{user_dir}/processed/users.json', json_stringify(users)) -def load_users(user_dir: str = './data/twitter/user/') -> list[ProcessedUser]: +def load_users() -> list[ProcessedUser]: """ Load processed user data after process_users - :param user_dir: Download directory of users data, should be the same as the downloads dir in - download_user_start. (Default: "./data/twitter/user/") :return: List of processed users, sorted descending by popularity. """ - user_dir = normalize_directory(user_dir) return [ProcessedUser(*u) for u in json.loads(read(f'{user_dir}/processed/users.json'))] -def get_user_popularity_ranking(user: str, user_dir: str = './data/twitter/user/') -> int: +def get_user_popularity_ranking(user: str) -> int: """ Get a user's popularity ranking. This is not used in data analysis, just for curiosity. :param user: Username - :param user_dir: Download directory :return: User's popularity ranking """ - pop = load_users(user_dir) + pop = load_users() for i in range(len(pop)): if pop[i].username == user: return i + 1 @@ -110,7 +104,7 @@ class Sample: random: list[ProcessedUser] -def select_user_sample(user_dir: str = './data/twitter/user/') -> None: +def select_user_sample() -> None: """ Select our sample of 500 most popular users and 500 random users who meet the criteria. The criteria we use is that the user must have at least 150 followers, and must have a number of @@ -120,10 +114,8 @@ def select_user_sample(user_dir: str = './data/twitter/user/') -> None: The result will be stored in /processed/sample.json - :param user_dir: Download directory for users :return: None """ - user_dir = normalize_directory(user_dir) file = f'{user_dir}/processed/sample.json' # Exists @@ -133,7 +125,7 @@ def select_user_sample(user_dir: str = './data/twitter/user/') -> None: return # Load users - users = load_users(user_dir) + users = load_users() # Filter by language first users = [u for u in users if u.lang is not None and @@ -154,15 +146,12 @@ def select_user_sample(user_dir: str = './data/twitter/user/') -> None: write(file, json_stringify(Sample(most_popular, sample))) -def load_user_sample(user_dir: str = './data/twitter/user/') -> Sample: +def load_user_sample() -> Sample: """ Load the selected sample - :param user_dir: Download directory for users :return: None """ - user_dir = normalize_directory(user_dir) - j = json.loads(read(f'{user_dir}/processed/sample.json')) return Sample([ProcessedUser(*u) for u in j['most_popular']], [ProcessedUser(*u) for u in j['random']]) @@ -183,7 +172,7 @@ class Posting(NamedTuple): date: datetime -def process_tweets(tweets_dir: str = './data/twitter/user-tweets/') -> None: +def process_tweets() -> None: """ Process tweets, reduce the tweets data to only a few fields defined in the Posting class. These include whether or not the tweet is covid-related, how popular is the tweet, if it is a repost, @@ -191,13 +180,10 @@ def process_tweets(tweets_dir: str = './data/twitter/user-tweets/') -> None: If a user's tweets is already processed, this function will skip over that user's data. - This function will save the processed tweets data to /processed/.json + This function will save the processed tweets data to /processed/.json - :param tweets_dir: Raw tweets directory (Default: './data/twitter/user-tweets/') - :return: + :return: None """ - tweets_dir = normalize_directory(tweets_dir) - # Loop through all the files for filename in os.listdir(f'{tweets_dir}/user'): # Only check json files and ignore macos dot files @@ -219,11 +205,10 @@ def process_tweets(tweets_dir: str = './data/twitter/user-tweets/') -> None: debug(f'Processed: {filename}') -def load_tweets(tweets_dir: str, username: str) -> list[Posting]: +def load_tweets(username: str) -> list[Posting]: """ Load tweets for a specific user - :param tweets_dir: Tweets directory :param username: User's screen name :return: User's processed tweets """ @@ -257,14 +242,12 @@ def is_covid_related(text: str) -> bool: return any(k in text.lower() for k in keywords) -def pack_data(data_dir: str = './data/') -> None: +def pack_data() -> None: """ This function packs processed data and raw data separately. - :param data_dir: Root directory of all data. :return: None """ - data_dir = normalize_directory(data_dir) packed_dir = f'{data_dir}/packed' Path(packed_dir).mkdir(parents=True, exist_ok=True) diff --git a/src/process/twitter_visualization.py b/src/process/twitter_visualization.py index 7427228..37f233c 100644 --- a/src/process/twitter_visualization.py +++ b/src/process/twitter_visualization.py @@ -8,8 +8,7 @@ from process.twitter_process import * def view_covid_tweets_freq(users: list[ProcessedUser], - sample_name: str, - tweets_dir: str = './data/twitter/user-tweets/') -> None: + sample_name: str) -> None: """ Visualize the frequency that the sampled users post about COVID. For example, someone who posted every single tweet about COVID will have a frequency of 1, and someone who doesn't @@ -17,16 +16,13 @@ def view_covid_tweets_freq(users: list[ProcessedUser], :param users: Sample users :param sample_name: Name of the sample - :param tweets_dir: Data dir for tweets :return: None """ - tweets_dir = normalize_directory(tweets_dir) - # Load tweets, and get the frequency of covid tweets for each user user_frequency = [] for u in users: # Load processed tweet - tweets = load_tweets(tweets_dir, u.username) + tweets = load_tweets(u.username) # Get the frequency of COVID-related tweets freq = len([1 for t in tweets if t.covid_related]) / len(tweets) user_frequency.append((u.username, freq)) diff --git a/src/raw_collect/twitter.py b/src/raw_collect/twitter.py index cec5463..c5b95d9 100644 --- a/src/raw_collect/twitter.py +++ b/src/raw_collect/twitter.py @@ -9,6 +9,7 @@ from typing import List import tweepy from tweepy import API, TooManyRequests, User, Tweet, Unauthorized +from main import tweets_dir, user_dir from utils import * @@ -42,7 +43,6 @@ def get_tweets(api: API, name: str, rate_delay: float, max_id: Union[int, None]) def download_all_tweets(api: API, screen_name: str, - base_dir: str = './data/twitter/user-tweets/', download_if_exists: bool = False) -> None: """ Download all tweets from a specific individual to a local folder. @@ -61,13 +61,11 @@ def download_all_tweets(api: API, screen_name: str, :param api: Tweepy API object :param screen_name: Screen name of that individual - :param base_dir: The downloads folder (Default: "./data/twitter/user-tweets/") :param download_if_exists: Whether or not to download if it already exists (Default: False) :return: None """ # Ensure directories exist - base_dir = normalize_directory(base_dir) + '/user' - file = f'{base_dir}/{screen_name}.json' + file = f'{tweets_dir}/user/{screen_name}.json' # Check if user already exists if os.path.isfile(file): @@ -116,8 +114,7 @@ def download_all_tweets(api: API, screen_name: str, write(file, json_stringify([t._json for t in tweets])) -def download_users_start(api: API, start_point: str, n: float = math.inf, - base_dir: str = './data/twitter/user/') -> None: +def download_users_start(api: API, start_point: str, n: float = math.inf) -> None: """ This function downloads n twitter users by using a friends-chain. @@ -155,7 +152,6 @@ def download_users_start(api: API, start_point: str, n: float = math.inf, :param api: Tweepy's API object :param start_point: Starting user's screen name. :param n: How many users do you want to download? (Default: math.inf) - :param base_dir: The downloads folder (Default: "./data/twitter/user/") :return: None """ @@ -172,28 +168,27 @@ def download_users_start(api: API, start_point: str, n: float = math.inf, next_set = set() # Start download - download_users_execute(api, n, base_dir, downloaded, + download_users_execute(api, n, downloaded, done_set, current_set, next_set) -def download_users_resume_progress(api: API, base_dir: str = './data/twitter/user/') -> None: +def download_users_resume_progress(api: API) -> None: """ Resume from started progress :param api: Tweepy's API object - :param base_dir: The downloads folder :return: None """ # Open file and read - meta = json.loads(read(f'{base_dir}/meta/meta.json')) + meta = json.loads(read(f'{user_dir}/meta/meta.json')) # Resume - download_users_execute(api, meta['n'], base_dir, + download_users_execute(api, meta['n'], set(meta['downloaded']), set(meta['done_set']), set(meta['current_set']), set(meta['next_set'])) -def download_users_execute(api: API, n: float, base_dir: str, +def download_users_execute(api: API, n: float, downloaded: set[str], done_set: set[str], current_set: set[str], next_set: set[str]) -> None: """ @@ -207,15 +202,12 @@ def download_users_execute(api: API, n: float, base_dir: str, :param api: Tweepy's API object :param n: How many users do you want to download? - :param base_dir: The downloads folder :param downloaded: Set of all the downloaded users' screen names :param done_set: The set of starting users that are queried :param current_set: The set of starting users currently looping through :param next_set: The next set of starting users :return: None """ - base_dir = normalize_directory(base_dir) - # Rate limit for this API endpoint is 1 request per minute, and rate delay defines how many # seconds to sleep for each request. rate_delay = calculate_rate_delay(1) + 1 @@ -223,7 +215,7 @@ def download_users_execute(api: API, n: float, base_dir: str, print("Executing friends-chain download:") print(f"- n: {n}") print(f"- Requests per minute: 1") - print(f"- Directory: {base_dir}") + print(f"- Directory: {user_dir}") print(f"- Downloaded: {len(downloaded)}") print(f"- Current search set: {len(current_set)}") print(f"- Next search set: {len(next_set)}") @@ -249,7 +241,7 @@ def download_users_execute(api: API, n: float, base_dir: str, # This user was not saved, save the user. if user not in downloaded: # Save user json - write(f'{base_dir}/users/{user.screen_name}.json', json_stringify(user._json)) + write(f'{user_dir}/users/{user.screen_name}.json', json_stringify(user._json)) # Add to set downloaded.add(user.screen_name) @@ -289,7 +281,7 @@ def download_users_execute(api: API, n: float, base_dir: str, # Update meta info so that downloading can be continued meta = {'downloaded': downloaded, 'done_set': done_set, 'current_set': current_set, 'next_set': next_set, 'n': n} - write(f'{base_dir}/meta/meta.json', json_stringify(meta)) + write(f'{user_dir}/meta/meta.json', json_stringify(meta)) debug(f'Finished saving friends of {screen_name}') debug(f'============= Total {len(downloaded)} saved =============') diff --git a/src/utils.py b/src/utils.py index 6bc277f..304d199 100644 --- a/src/utils.py +++ b/src/utils.py @@ -59,28 +59,6 @@ def debug(msg: object) -> None: print(f'[DEBUG] {caller}: {msg}') -def normalize_directory(directory: str) -> str: - """ - Normalize a directory input: Ensure that the directory doesn't end with "/", and ensure that an - empty directory input will be relative (".") - - >>> normalize_directory('') - '.' - >>> normalize_directory('path/') - 'path' - >>> normalize_directory('path') - 'path' - - :param directory: Input directory - :return: Normalized directory - """ - if directory == '': - directory = '.' - if directory.endswith('/'): - directory = directory[:-1] - return directory - - def calculate_rate_delay(rate_limit: float) -> float: """ Calculate the rate delay for each request given rate limit in request per minute