Upload file to s3
''' import boto3 import os import glob Your existing S3 path variable FILE_OUTPUT_LOCATION = "s3://outputbucket/data/" # Example value Define local path LOCAL_DIR = "/tmp/output.parquet" # Spark writes as a directory Ensure FILE_OUTPUT_LOCATION ends with '/' if not FILE_OUTPUT_LOCATION.endswith("/"): FILE_OUTPUT_LOCATION += "/" Step 1: Write to local disk df.coalesce(1).write.mode("overwrite").parquet(LOCAL_DIR) Step 2: Find the actual Parquet file (Spark gives it a random name) files = glob.glob(f"{LOCAL_DIR}/part-.parquet") # Find files like part-00000-.parquet if files: actual_file = files[0] # Get the exact filename # Extract the filename from the path original_filename = os.path.basename(actual_file) # Construct full S3 path with the original filename S3_FULL_PATH = FILE_OUTPUT_LOCATION + original_filename # Extract bucket and key for boto3 S3_BUCKET = S3_FULL_PATH.split("/")[2] # Extract 'outputbucket' S3_KEY = "/".join(S3_FULL_PATH.split("/")[3:]) # Extract 'data/part-00000-xyz.parquet' # Step 3: Upload to S3 without renaming s3 = boto3.client("s3") s3.upload_file(actual_file, S3_BUCKET, S3_KEY) print(f"✅ Uploaded {actual_file} to {S3_FULL_PATH}") else: print("❌ No .parquet file found in the directory!")

'''
import boto3
import os
import glob
Your existing S3 path variable
FILE_OUTPUT_LOCATION = "s3://outputbucket/data/" # Example value
Define local path
LOCAL_DIR = "/tmp/output.parquet" # Spark writes as a directory
Ensure FILE_OUTPUT_LOCATION ends with '/'
if not FILE_OUTPUT_LOCATION.endswith("/"):
FILE_OUTPUT_LOCATION += "/"
Step 1: Write to local disk
df.coalesce(1).write.mode("overwrite").parquet(LOCAL_DIR)
Step 2: Find the actual Parquet file (Spark gives it a random name)
files = glob.glob(f"{LOCAL_DIR}/part-.parquet") # Find files like part-00000-.parquet
if files:
actual_file = files[0] # Get the exact filename
# Extract the filename from the path
original_filename = os.path.basename(actual_file)
# Construct full S3 path with the original filename
S3_FULL_PATH = FILE_OUTPUT_LOCATION + original_filename
# Extract bucket and key for boto3
S3_BUCKET = S3_FULL_PATH.split("/")[2] # Extract 'outputbucket'
S3_KEY = "/".join(S3_FULL_PATH.split("/")[3:]) # Extract 'data/part-00000-xyz.parquet'
# Step 3: Upload to S3 without renaming
s3 = boto3.client("s3")
s3.upload_file(actual_file, S3_BUCKET, S3_KEY)
print(f"✅ Uploaded {actual_file} to {S3_FULL_PATH}")
else:
print("❌ No .parquet file found in the directory!")