Innovenergy_trunk/S3ExtractingTool/extractS3data.py

257 lines
10 KiB
Python

import os
import csv
import subprocess
import argparse
import matplotlib.pyplot as plt
from collections import defaultdict
import zipfile
import base64
import shutil
import json
import sys
def extract_timestamp(filename):
timestamp_str = filename[:10]
try:
timestamp = int(timestamp_str)
return timestamp
except ValueError:
return 0
def list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize,product_type,bucket_number):
if product_type == "Salimax" or product_type=="SodistoreMax":
hash = "3e5b3069-214a-43ee-8d85-57d72000c19d"
elif product_type == "Salidomo":
hash = "c0436b6a-d276-4cd8-9c44-1eae86cf5d0e"
else:
raise ValueError("Invalid product type option. Use Salimax or Salidomo or SodistoreMax")
# Find common prefix
common_prefix = ""
for s_char, e_char in zip(str(start_timestamp), str(end_timestamp)):
if s_char == e_char:
common_prefix += s_char
else:
break
s3_path = f"s3://{bucket_number}-{hash}/{common_prefix}*"
s3cmd_command = f"s3cmd ls {s3_path}"
print(f"Running: {s3cmd_command}")
try:
output = subprocess.check_output(s3cmd_command, shell=True, text=True)
files = [line.split()[-1] for line in output.strip().split("\n") if line.strip()]
filenames = []
for f in files:
name = f.split("/")[-1] # e.g., 1748802020.json
timestamp_str = name.split(".")[0] # extract '1748802020'
if timestamp_str.isdigit() and int(timestamp_str) <= int(end_timestamp):
filenames.append(name)
else:
break
print(filenames)
return filenames
except subprocess.CalledProcessError:
print(f"No files found for prefix {common_prefix}")
return []
def get_nested_value(data, key_path):
try:
for key in key_path:
data = data[key]
return data
except (KeyError, TypeError):
return None
def process_json_files_to_csv(output_directory, json_files, keys, start_timestamp, end_timestamp, bucket_number, booleans_as_numbers):
# Generate output file name from all keys
keypath = '_'.join(get_last_component(k) for k in keys)
output_csv_filename = f"{keypath}_from_{start_timestamp}_to_{end_timestamp}_bucket_{bucket_number}.csv"
with open(output_csv_filename, 'w', newline='') as csvfile:
csv_writer = csv.writer(csvfile)
# Write header: 'time' + key names
header = ['time'] + [k.split('/')[-1] for k in keys]
csv_writer.writerow(header)
for json_file in json_files:
file_path = os.path.join(output_directory, json_file)
with open(file_path, 'r') as f:
lines = f.readlines()
i = 0
while i < len(lines) - 1:
timestamp_line = lines[i].strip()
json_line = lines[i + 1].strip()
i += 2
if not timestamp_line.startswith("Timestamp;"):
continue
try:
timestamp = int(timestamp_line.split(';')[1])
except ValueError:
continue
if not (start_timestamp <= timestamp <= end_timestamp):
continue
try:
data = json.loads(json_line)
except json.JSONDecodeError:
print(f"❌ Failed to parse JSON in {json_file}, line {i}")
continue
row = [timestamp]
for key in keys:
value = get_nested_value(data, key.split('/'))
if booleans_as_numbers and isinstance(value, str) and value.lower() in ["true", "false"]:
value = 1 if value.lower() == "true" else 0
if value is None:
value = "No value provided"
row.append(value)
csv_writer.writerow(row)
print(f"✅ Extracted data saved in '{output_csv_filename}'")
def download_files(bucket_number, filenames_to_download, product_type):
if product_type == "Salimax" or product_type=="SodistoreMax":
hash = "3e5b3069-214a-43ee-8d85-57d72000c19d"
elif product_type == "Salidomo":
hash = "c0436b6a-d276-4cd8-9c44-1eae86cf5d0e"
else:
raise ValueError("Invalid product type option. Use Salimax or Salidomo or SodistoreMax")
output_directory = f"S3cmdData_{bucket_number}"
if not os.path.exists(output_directory):
os.makedirs(output_directory)
print(f"Directory '{output_directory}' created.")
for filename in filenames_to_download:
print(filename)
local_path = os.path.join(output_directory, filename)
if not os.path.exists(local_path):
s3cmd_command = f"s3cmd get s3://{bucket_number}-{hash}/{filename} {output_directory}/"
try:
subprocess.run(s3cmd_command, shell=True, check=True)
downloaded_files = [file for file in os.listdir(output_directory) if file.startswith(filename)]
if not downloaded_files:
print(f"No matching files found for prefix '{filename}'.")
else:
print(f"Files with prefix '{filename}' downloaded successfully.")
decompress_file(os.path.join(output_directory, filename), output_directory)
except subprocess.CalledProcessError as e:
# print(f"Error downloading files: {e}")
continue
else:
print(f"File '{filename}.json' already exists locally. Skipping download.")
def decompress_file(compressed_file, output_directory):
base_name = os.path.splitext(os.path.basename(compressed_file))[0]
with open(compressed_file, 'rb') as file:
compressed_data = file.read()
# Decode the base64 encoded content
decoded_data = base64.b64decode(compressed_data)
zip_path = os.path.join(output_directory, 'temp.zip')
with open(zip_path, 'wb') as zip_file:
zip_file.write(decoded_data)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(output_directory)
# Rename the extracted data.json file to the original timestamp-based name
extracted_csv_path = os.path.join(output_directory, 'data.json')
if os.path.exists(extracted_csv_path):
new_csv_path = os.path.join(output_directory, f"{base_name}.json")
os.rename(extracted_csv_path, new_csv_path)
os.remove(zip_path)
print(f"Decompressed and renamed '{compressed_file}' to '{new_csv_path}'.")
def get_last_component(path):
path_without_slashes = path.replace('/', '')
return path_without_slashes
def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, keys, booleans_as_numbers, exact_match, product_type):
output_directory = f"S3cmdData_{bucket_number}"
#if os.path.exists(output_directory):
# shutil.rmtree(output_directory)
if not os.path.exists(output_directory):
os.makedirs(output_directory)
print(f"Directory '{output_directory}' created.")
filenames_to_check = list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize,product_type,bucket_number)
existing_files = [filename for filename in filenames_to_check if os.path.exists(os.path.join(output_directory, f"{filename}.json"))]
files_to_download = set(filenames_to_check) - set(existing_files)
print(files_to_download)
#if os.listdir(output_directory):
# print("Files already exist in the local folder. Skipping download.")
#else:
if files_to_download:
download_files(bucket_number, files_to_download, product_type)
json_files = [file for file in os.listdir(output_directory) if file.endswith('.json')]
json_files.sort(key=extract_timestamp)
process_json_files_to_csv(
output_directory=output_directory,
json_files=json_files,
keys=keys,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
bucket_number=bucket_number,
booleans_as_numbers=booleans_as_numbers
)
def parse_keys(input_string):
keys = [key.strip() for key in input_string.split(',')]
return keys
def main():
parser = argparse.ArgumentParser(description='Download files from S3 using s3cmd and extract specific values from CSV files.')
parser.add_argument('start_timestamp', type=int, help='The start timestamp for the range (even number)')
parser.add_argument('end_timestamp', type=int, help='The end timestamp for the range (even number)')
parser.add_argument('--keys', type=parse_keys, required=True, help='The part to match from each CSV file, can be a single key or a comma-separated list of keys')
parser.add_argument('--bucket-number', type=int, required=True, help='The number of the bucket to download from')
parser.add_argument('--sampling_stepsize', type=int, required=False, default=1, help='The number of 2sec intervals, which define the length of the sampling interval in S3 file retrieval')
parser.add_argument('--booleans_as_numbers', action="store_true", required=False, help='If key used, then booleans are converted to numbers [0/1], if key not used, then booleans maintained as text [False/True]')
parser.add_argument('--exact_match', action="store_true", required=False, help='If key used, then key has to match exactly "=", else it is enough that key is found "in" text')
parser.add_argument('--product_name', required=True, help='Use Salimax, Salidomo or SodistoreMax')
args = parser.parse_args()
start_timestamp = args.start_timestamp
end_timestamp = args.end_timestamp
keys = args.keys
bucket_number = args.bucket_number
sampling_stepsize = args.sampling_stepsize
booleans_as_numbers = args.booleans_as_numbers
exact_match = args.exact_match
# new arg for product type
product_type = args.product_name
if start_timestamp >= end_timestamp:
print("Error: start_timestamp must be smaller than end_timestamp.")
return
download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, keys, booleans_as_numbers, exact_match, product_type)
if __name__ == "__main__":
main()