[O] Make directories constant

This commit is contained in:
Hykilpikonna
2021-11-24 10:24:51 -05:00
parent 20e9805c6c
commit 41b7df7090
4 changed files with 26 additions and 77 deletions
+13 -30
View File
@@ -7,6 +7,7 @@ from dataclasses import dataclass
from py7zr import SevenZipFile
from main import data_dir, tweets_dir, user_dir
from utils import *
@@ -30,7 +31,7 @@ class ProcessedUser(NamedTuple):
lang: str
def process_users(user_dir: str = './data/twitter/user/') -> None:
def process_users() -> None:
"""
After downloading a wide range of users using download_users_start in raw_collect/twitter.py,
this function will read the user files, extract only relevant information defined in the
@@ -38,11 +39,8 @@ def process_users(user_dir: str = './data/twitter/user/') -> None:
This function will save the processed user data to <user_dir>/processed/users.json
:param user_dir: Download directory of users data, should be the same as the downloads dir in
download_user_start. (Default: "./data/twitter/user/")
:return: None
"""
user_dir = normalize_directory(user_dir)
users = []
# Loop through all the files
@@ -74,27 +72,23 @@ def process_users(user_dir: str = './data/twitter/user/') -> None:
write(f'{user_dir}/processed/users.json', json_stringify(users))
def load_users(user_dir: str = './data/twitter/user/') -> list[ProcessedUser]:
def load_users() -> list[ProcessedUser]:
"""
Load processed user data after process_users
:param user_dir: Download directory of users data, should be the same as the downloads dir in
download_user_start. (Default: "./data/twitter/user/")
:return: List of processed users, sorted descending by popularity.
"""
user_dir = normalize_directory(user_dir)
return [ProcessedUser(*u) for u in json.loads(read(f'{user_dir}/processed/users.json'))]
def get_user_popularity_ranking(user: str, user_dir: str = './data/twitter/user/') -> int:
def get_user_popularity_ranking(user: str) -> int:
"""
Get a user's popularity ranking. This is not used in data analysis, just for curiosity.
:param user: Username
:param user_dir: Download directory
:return: User's popularity ranking
"""
pop = load_users(user_dir)
pop = load_users()
for i in range(len(pop)):
if pop[i].username == user:
return i + 1
@@ -110,7 +104,7 @@ class Sample:
random: list[ProcessedUser]
def select_user_sample(user_dir: str = './data/twitter/user/') -> None:
def select_user_sample() -> None:
"""
Select our sample of 500 most popular users and 500 random users who meet the criteria. The
criteria we use is that the user must have at least 150 followers, and must have a number of
@@ -120,10 +114,8 @@ def select_user_sample(user_dir: str = './data/twitter/user/') -> None:
The result will be stored in <user_dir>/processed/sample.json
:param user_dir: Download directory for users
:return: None
"""
user_dir = normalize_directory(user_dir)
file = f'{user_dir}/processed/sample.json'
# Exists
@@ -133,7 +125,7 @@ def select_user_sample(user_dir: str = './data/twitter/user/') -> None:
return
# Load users
users = load_users(user_dir)
users = load_users()
# Filter by language first
users = [u for u in users if u.lang is not None and
@@ -154,15 +146,12 @@ def select_user_sample(user_dir: str = './data/twitter/user/') -> None:
write(file, json_stringify(Sample(most_popular, sample)))
def load_user_sample(user_dir: str = './data/twitter/user/') -> Sample:
def load_user_sample() -> Sample:
"""
Load the selected sample
:param user_dir: Download directory for users
:return: None
"""
user_dir = normalize_directory(user_dir)
j = json.loads(read(f'{user_dir}/processed/sample.json'))
return Sample([ProcessedUser(*u) for u in j['most_popular']],
[ProcessedUser(*u) for u in j['random']])
@@ -183,7 +172,7 @@ class Posting(NamedTuple):
date: datetime
def process_tweets(tweets_dir: str = './data/twitter/user-tweets/') -> None:
def process_tweets() -> None:
"""
Process tweets, reduce the tweets data to only a few fields defined in the Posting class. These
include whether or not the tweet is covid-related, how popular is the tweet, if it is a repost,
@@ -191,13 +180,10 @@ def process_tweets(tweets_dir: str = './data/twitter/user-tweets/') -> None:
If a user's tweets is already processed, this function will skip over that user's data.
This function will save the processed tweets data to <user_dir>/processed/<username>.json
This function will save the processed tweets data to <tweets_dir>/processed/<username>.json
:param tweets_dir: Raw tweets directory (Default: './data/twitter/user-tweets/')
:return:
:return: None
"""
tweets_dir = normalize_directory(tweets_dir)
# Loop through all the files
for filename in os.listdir(f'{tweets_dir}/user'):
# Only check json files and ignore macos dot files
@@ -219,11 +205,10 @@ def process_tweets(tweets_dir: str = './data/twitter/user-tweets/') -> None:
debug(f'Processed: {filename}')
def load_tweets(tweets_dir: str, username: str) -> list[Posting]:
def load_tweets(username: str) -> list[Posting]:
"""
Load tweets for a specific user
:param tweets_dir: Tweets directory
:param username: User's screen name
:return: User's processed tweets
"""
@@ -257,14 +242,12 @@ def is_covid_related(text: str) -> bool:
return any(k in text.lower() for k in keywords)
def pack_data(data_dir: str = './data/') -> None:
def pack_data() -> None:
"""
This function packs processed data and raw data separately.
:param data_dir: Root directory of all data.
:return: None
"""
data_dir = normalize_directory(data_dir)
packed_dir = f'{data_dir}/packed'
Path(packed_dir).mkdir(parents=True, exist_ok=True)
+2 -6
View File
@@ -8,8 +8,7 @@ from process.twitter_process import *
def view_covid_tweets_freq(users: list[ProcessedUser],
sample_name: str,
tweets_dir: str = './data/twitter/user-tweets/') -> None:
sample_name: str) -> None:
"""
Visualize the frequency that the sampled users post about COVID. For example, someone who
posted every single tweet about COVID will have a frequency of 1, and someone who doesn't
@@ -17,16 +16,13 @@ def view_covid_tweets_freq(users: list[ProcessedUser],
:param users: Sample users
:param sample_name: Name of the sample
:param tweets_dir: Data dir for tweets
:return: None
"""
tweets_dir = normalize_directory(tweets_dir)
# Load tweets, and get the frequency of covid tweets for each user
user_frequency = []
for u in users:
# Load processed tweet
tweets = load_tweets(tweets_dir, u.username)
tweets = load_tweets(u.username)
# Get the frequency of COVID-related tweets
freq = len([1 for t in tweets if t.covid_related]) / len(tweets)
user_frequency.append((u.username, freq))
+11 -19
View File
@@ -9,6 +9,7 @@ from typing import List
import tweepy
from tweepy import API, TooManyRequests, User, Tweet, Unauthorized
from main import tweets_dir, user_dir
from utils import *
@@ -42,7 +43,6 @@ def get_tweets(api: API, name: str, rate_delay: float, max_id: Union[int, None])
def download_all_tweets(api: API, screen_name: str,
base_dir: str = './data/twitter/user-tweets/',
download_if_exists: bool = False) -> None:
"""
Download all tweets from a specific individual to a local folder.
@@ -61,13 +61,11 @@ def download_all_tweets(api: API, screen_name: str,
:param api: Tweepy API object
:param screen_name: Screen name of that individual
:param base_dir: The downloads folder (Default: "./data/twitter/user-tweets/")
:param download_if_exists: Whether or not to download if it already exists (Default: False)
:return: None
"""
# Ensure directories exist
base_dir = normalize_directory(base_dir) + '/user'
file = f'{base_dir}/{screen_name}.json'
file = f'{tweets_dir}/user/{screen_name}.json'
# Check if user already exists
if os.path.isfile(file):
@@ -116,8 +114,7 @@ def download_all_tweets(api: API, screen_name: str,
write(file, json_stringify([t._json for t in tweets]))
def download_users_start(api: API, start_point: str, n: float = math.inf,
base_dir: str = './data/twitter/user/') -> None:
def download_users_start(api: API, start_point: str, n: float = math.inf) -> None:
"""
This function downloads n twitter users by using a friends-chain.
@@ -155,7 +152,6 @@ def download_users_start(api: API, start_point: str, n: float = math.inf,
:param api: Tweepy's API object
:param start_point: Starting user's screen name.
:param n: How many users do you want to download? (Default: math.inf)
:param base_dir: The downloads folder (Default: "./data/twitter/user/")
:return: None
"""
@@ -172,28 +168,27 @@ def download_users_start(api: API, start_point: str, n: float = math.inf,
next_set = set()
# Start download
download_users_execute(api, n, base_dir, downloaded,
download_users_execute(api, n, downloaded,
done_set, current_set, next_set)
def download_users_resume_progress(api: API, base_dir: str = './data/twitter/user/') -> None:
def download_users_resume_progress(api: API) -> None:
"""
Resume from started progress
:param api: Tweepy's API object
:param base_dir: The downloads folder
:return: None
"""
# Open file and read
meta = json.loads(read(f'{base_dir}/meta/meta.json'))
meta = json.loads(read(f'{user_dir}/meta/meta.json'))
# Resume
download_users_execute(api, meta['n'], base_dir,
download_users_execute(api, meta['n'],
set(meta['downloaded']), set(meta['done_set']),
set(meta['current_set']), set(meta['next_set']))
def download_users_execute(api: API, n: float, base_dir: str,
def download_users_execute(api: API, n: float,
downloaded: set[str], done_set: set[str],
current_set: set[str], next_set: set[str]) -> None:
"""
@@ -207,15 +202,12 @@ def download_users_execute(api: API, n: float, base_dir: str,
:param api: Tweepy's API object
:param n: How many users do you want to download?
:param base_dir: The downloads folder
:param downloaded: Set of all the downloaded users' screen names
:param done_set: The set of starting users that are queried
:param current_set: The set of starting users currently looping through
:param next_set: The next set of starting users
:return: None
"""
base_dir = normalize_directory(base_dir)
# Rate limit for this API endpoint is 1 request per minute, and rate delay defines how many
# seconds to sleep for each request.
rate_delay = calculate_rate_delay(1) + 1
@@ -223,7 +215,7 @@ def download_users_execute(api: API, n: float, base_dir: str,
print("Executing friends-chain download:")
print(f"- n: {n}")
print(f"- Requests per minute: 1")
print(f"- Directory: {base_dir}")
print(f"- Directory: {user_dir}")
print(f"- Downloaded: {len(downloaded)}")
print(f"- Current search set: {len(current_set)}")
print(f"- Next search set: {len(next_set)}")
@@ -249,7 +241,7 @@ def download_users_execute(api: API, n: float, base_dir: str,
# This user was not saved, save the user.
if user not in downloaded:
# Save user json
write(f'{base_dir}/users/{user.screen_name}.json', json_stringify(user._json))
write(f'{user_dir}/users/{user.screen_name}.json', json_stringify(user._json))
# Add to set
downloaded.add(user.screen_name)
@@ -289,7 +281,7 @@ def download_users_execute(api: API, n: float, base_dir: str,
# Update meta info so that downloading can be continued
meta = {'downloaded': downloaded, 'done_set': done_set,
'current_set': current_set, 'next_set': next_set, 'n': n}
write(f'{base_dir}/meta/meta.json', json_stringify(meta))
write(f'{user_dir}/meta/meta.json', json_stringify(meta))
debug(f'Finished saving friends of {screen_name}')
debug(f'============= Total {len(downloaded)} saved =============')
-22
View File
@@ -59,28 +59,6 @@ def debug(msg: object) -> None:
print(f'[DEBUG] {caller}: {msg}')
def normalize_directory(directory: str) -> str:
"""
Normalize a directory input: Ensure that the directory doesn't end with "/", and ensure that an
empty directory input will be relative (".")
>>> normalize_directory('')
'.'
>>> normalize_directory('path/')
'path'
>>> normalize_directory('path')
'path'
:param directory: Input directory
:return: Normalized directory
"""
if directory == '':
directory = '.'
if directory.endswith('/'):
directory = directory[:-1]
return directory
def calculate_rate_delay(rate_limit: float) -> float:
"""
Calculate the rate delay for each request given rate limit in request per minute