검색과 추천 - (사이드 플젝) 데이터 다운로드 및 전처리 micro app 개발 (2)
데이터 다운로드 및 전처리 micro app 개발
데이터를 효과적으로 처리하기 위해서는 데이터를 다운로드하고, 압축을 해제하며, 필요한 정보를 추출해 저장하는 일련의 과정이 필요합니다. 이러한 과정은 수동으로 처리하기에는 번거롭고, 오류를 피하기도 어렵습니다. 그래서 Python을 사용해 데이터 처리 파이프라인을 자동화하고, logging
을 통해 진행 상황을 추적하는 방법을 소개해 보려 합니다.
STEP 1: 데이터를 다운로드하기
첫 번째 단계는 데이터를 다운로드하는 작업입니다. requests
라이브러리를 사용해서 웹에서 데이터를 가져오고, 이를 로컬 파일로 저장합니다. 이 단계에서 우리는 logging
을 통해 다운로드의 시작과 완료 시점을 기록하게 됩니다. 만약 다운로드 중 오류가 발생한다면, 이를 로깅하여 문제를 쉽게 추적할 수 있도록 합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def download_abo_dataset(url: str, output_path: str):
"""
Downloads the ABO dataset tar file from the specified URL and saves it to the given output path.
"""
logger.info("[STEP 1] Starting download from %s", url)
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
logger.info("[STEP 1] Downloaded dataset to %s", output_path)
else:
logger.error("[STEP 1] Failed to download dataset. Status code: %s", response.status_code)
raise Exception(f"Failed to download dataset. Status code: {response.status_code}")
이 함수는 주어진 URL에서 데이터를 스트리밍 방식으로 다운로드하고, 이를 지정한 파일 경로에 저장합니다. requests.get()
메서드를 사용해 데이터를 가져오고, 각 데이터를 작은 청크(chunk) 단위로 나누어 파일에 저장함으로써 큰 파일도 쉽게 처리할 수 있습니다. 다운로드 성공 여부는 logging
을 통해 기록되며, 만약 문제가 발생하면 이를 예외로 처리하여 로그 파일과 콘솔에 메시지를 남깁니다. 이렇게 하면 다운로드 실패 시 문제를 바로 확인할 수 있습니다.
STEP 2: 압축 파일을 해제하기
데이터를 다운로드한 후에는 압축을 해제해야 합니다. 여기서는 tarfile
모듈을 사용하여 .tar
형식의 파일을 풀어내고, 그 내용을 특정 폴더에 저장합니다. 이 과정 역시 logging
을 활용해 진행 상황을 기록합니다.
1
2
3
4
5
6
7
8
9
10
11
def extract_tar_file(tar_path: str, extract_to: str):
"""
Extracts the contents of a tar file to a specified directory.
"""
logger.info("[STEP 2] Extracting %s to %s", tar_path, extract_to)
if not os.path.exists(tar_path):
logger.error("[STEP 2] Tar file %s does not exist.", tar_path)
raise FileNotFoundError(f"Tar file {tar_path} does not exist.")
with tarfile.open(tar_path, 'r') as tar:
tar.extractall(path=extract_to)
logger.info("[STEP 2] Extracted tar file to %s", extract_to)
이 함수는 .tar
파일을 지정한 디렉토리로 풀어줍니다. 만약 파일이 존재하지 않으면 FileNotFoundError
를 발생시키고, 이 오류도 로그에 기록하게 됩니다. 압축 해제 과정을 로깅함으로써 작업이 제대로 수행되었는지 쉽게 추적할 수 있으며, 예상치 못한 문제 발생 시 그 원인을 파악하는 데 도움을 줍니다.
STEP 3: JSON 데이터를 로드하고 탐색하기
압축 해제 후에는 데이터를 로드하고, 이를 탐색할 수 있어야 합니다. 여기서는 gzip
모듈을 사용해 압축된 JSON 파일을 읽고, 각 파일의 데이터를 pandas
DataFrame으로 변환합니다. 그리고 데이터 샘플을 로깅하여 실제로 어떤 데이터가 로드되었는지 확인할 수 있도록 했습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def load_and_explore_gzipped_json(directory: str, sample_size: int = 5):
"""
Loads gzipped JSON files from a specified directory and combines them into a DataFrame.
"""
logger.info("[STEP 3] Loading gzipped JSON files from %s", directory)
all_data = []
json_files = [file for file in os.listdir(directory) if file.endswith('.json.gz')]
if not json_files:
logger.error("[STEP 3] No gzipped JSON files found in directory %s", directory)
raise FileNotFoundError(f"No gzipped JSON files found in directory {directory}")
for file_name in json_files:
file_path = os.path.join(directory, file_name)
logger.info("[STEP 3] Processing file: %s", file_path)
with gzip.open(file_path, 'rt', encoding='utf-8') as f:
for line in f:
try:
data = json.loads(line)
all_data.append(data)
except json.JSONDecodeError as e:
logger.error("[STEP 3] Error decoding JSON in %s: %s", file_path, e)
if not all_data:
logger.error("[STEP 3] No data loaded from the JSON files.")
raise ValueError("No data loaded from the JSON files.")
df = pd.DataFrame(all_data)
logger.info("[STEP 3] Loaded data with %d records.", len(df))
logger.info("[STEP 3] Sample data: %s", df.head(sample_size).to_dict(orient='records'))
return df
이 함수는 디렉토리 내에 있는 모든 .json.gz
파일을 찾아 데이터를 로드합니다. 각 파일을 한 줄씩 읽어 JSON 객체로 파싱한 후, 이를 리스트에 저장하고 pandas
DataFrame으로 변환합니다. 로드된 데이터의 샘플은 로그로 기록하여 데이터의 형태와 내용물을 쉽게 확인할 수 있습니다. 만약 파일이 없거나 데이터를 읽는 중 오류가 발생하면, 이에 대한 정보를 로그에 남겨 문제를 해결할 단서를 제공합니다.
STEP 4: 필요한 데이터 필드 추출 및 전처리
데이터를 로드한 후에는 필요한 정보를 추출하고, 이를 정리하는 전처리 작업이 필요합니다. 여기서는 분석에 필요한 필드만 추출하고, 텍스트 데이터를 소문자로 정규화하는 작업을 수행합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def extract_relevant_fields(df: pd.DataFrame):
"""
Extracts relevant fields from a DataFrame and preprocesses the data.
"""
logger.info("[STEP 4] Extracting relevant fields and preprocessing data.")
fields_to_keep = ['item_id', 'item_keywords', 'product_description', 'product_type', 'brand']
available_fields = [field for field in fields_to_keep if field in df.columns]
if not available_fields:
logger.error("[STEP 4] None of the specified fields are available in the DataFrame.")
raise ValueError("None of the specified fields are available in the DataFrame.")
filtered_df = df[available_fields]
for field in ['item_keywords', 'product_description', 'brand']:
if field in filtered_df.columns:
filtered_df[field] = filtered_df[field].apply(preprocess_field)
logger.info("[STEP 4] Filtered fields: %s", available_fields)
logger.info("[STEP 4] Filtered data sample: %s", filtered_df.head().to_dict(orient='records'))
return filtered_df
이 함수는 필요한 필드만 추출해 새로운 DataFrame을 생성하고, 텍스트 필드에 대해 소문자로 변환하는 전처리 작업을 수행합니다. 각 필드의 존재 여부를 검사해 데이터의 일관성을 유지하고, 전처리 과정 중 발생할 수 있는 문제를 로깅해 쉽게 추적할 수 있습니다. 필터링한 데이터의 샘플을 로그로 남겨 전처리 결과를 확인할 수 있습니다.
STEP 5: 데이터를 CSV 파일로 저장하기
마지막으로 전처리된 데이터를 CSV 파일로 저장합니다. 이 단계는 데이터 분석 또는 Elasticsearch와 같은 검색 엔진에 데이터를 인덱싱할 때 유용합니다.
1
2
3
4
5
6
7
def save_to_csv(df: pd.DataFrame, output_path: str):
"""
Saves a DataFrame to a CSV file.
"""
logger.info("[STEP 5] Saving preprocessed data to %s", output_path)
df.to_csv(output_path, index=False)
logger.info("[STEP 5] Saved preprocessed data to %s", output_path)
이 함수는 전처리된 데이터를 CSV 파일로 저장합니다. to_csv()
메서드를 사용해 DataFrame의 데이터를 지정한 경로에 저장하며, 저장 성공 여부를 logging
을 통해 기록합니다.
이렇게 하면 저장된 파일의 위치를 쉽게 파악할 수 있으며, 파일 저장 과정에서 문제가 발생했을 때 로그를 통해 그 원인을 추적할 수 있습니다.