import os import csv import subprocess import argparse import matplotlib.pyplot as plt from collections import defaultdict import zipfile import base64 import shutil import json import sys def extract_timestamp(filename): timestamp_str = filename[:10] try: timestamp = int(timestamp_str) return timestamp except ValueError: return 0 import subprocess def list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize, product_type, bucket_number): if product_type in ["Salimax", "SodistoreMax"]: hash = "3e5b3069-214a-43ee-8d85-57d72000c19d" elif product_type == "Salidomo": hash = "c0436b6a-d276-4cd8-9c44-1eae86cf5d0e" else: raise ValueError("Invalid product type option.") # Find common prefix common_prefix = "" for s_char, e_char in zip(str(start_timestamp), str(end_timestamp)): if s_char == e_char: common_prefix += s_char else: break s3_path = f"s3://{bucket_number}-{hash}/{common_prefix}*" s3cmd_command = f"s3cmd ls {s3_path}" print(f"Running: {s3cmd_command}") try: output = subprocess.check_output(s3cmd_command, shell=True, text=True) files = [line.split()[-1] for line in output.strip().split("\n") if line.strip()] filenames = [] count=0 for f in files: name = f.split("/")[-1] timestamp_str = name.split(".")[0] if timestamp_str.isdigit(): timestamp = int(timestamp_str) if start_timestamp <= timestamp <= end_timestamp : if count % sampling_stepsize == 0: filenames.append(name) count += 1 print(filenames) return filenames except subprocess.CalledProcessError: print(f"No files found for prefix {common_prefix}") return [] def get_nested_value(data, key_path): try: for key in key_path: data = data[key] return data except (KeyError, TypeError): return None def process_json_files_to_csv(output_directory, json_files, keys, start_timestamp, end_timestamp, bucket_number, booleans_as_numbers): # Generate output file name from all keys keypath = '_'.join(get_last_component(k) for k in keys) output_csv_filename = f"{keypath}_from_{start_timestamp}_to_{end_timestamp}_bucket_{bucket_number}.csv" with open(output_csv_filename, 'w', newline='') as csvfile: csv_writer = csv.writer(csvfile) # Write header: 'time' + key names header = ['time'] + [k.split('/')[-1] for k in keys] csv_writer.writerow(header) for json_file in json_files: file_path = os.path.join(output_directory, json_file) with open(file_path, 'r') as f: lines = f.readlines() i = 0 while i < len(lines) - 1: timestamp_line = lines[i].strip() json_line = lines[i + 1].strip() i += 2 if not timestamp_line.startswith("Timestamp;"): continue try: timestamp = int(timestamp_line.split(';')[1]) except ValueError: continue if not (start_timestamp <= timestamp <= end_timestamp): continue try: data = json.loads(json_line) except json.JSONDecodeError: print(f"❌ Failed to parse JSON in {json_file}, line {i}") continue row = [timestamp] for key in keys: value = get_nested_value(data, key.split('/')) if booleans_as_numbers and isinstance(value, str) and value.lower() in ["true", "false"]: value = 1 if value.lower() == "true" else 0 if value is None: value = "No value provided" row.append(value) csv_writer.writerow(row) print(f"✅ Extracted data saved in '{output_csv_filename}'") def download_files(bucket_number, filenames_to_download, product_type): if product_type == "Salimax" or product_type=="SodistoreMax": hash = "3e5b3069-214a-43ee-8d85-57d72000c19d" elif product_type == "Salidomo": hash = "c0436b6a-d276-4cd8-9c44-1eae86cf5d0e" else: raise ValueError("Invalid product type option. Use Salimax or Salidomo or SodistoreMax") output_directory = f"S3cmdData_{bucket_number}" if not os.path.exists(output_directory): os.makedirs(output_directory) print(f"Directory '{output_directory}' created.") for filename in filenames_to_download: print(filename) local_path = os.path.join(output_directory, filename) if not os.path.exists(local_path): s3cmd_command = f"s3cmd get s3://{bucket_number}-{hash}/{filename} {output_directory}/" try: subprocess.run(s3cmd_command, shell=True, check=True) downloaded_files = [file for file in os.listdir(output_directory) if file.startswith(filename)] if not downloaded_files: print(f"No matching files found for prefix '{filename}'.") else: print(f"Files with prefix '{filename}' downloaded successfully.") decompress_file(os.path.join(output_directory, filename), output_directory) except subprocess.CalledProcessError as e: print(f"Error downloading files: {e}") continue else: print(f"File '{filename}.json' already exists locally. Skipping download.") def decompress_file(compressed_file, output_directory): base_name = os.path.splitext(os.path.basename(compressed_file))[0] with open(compressed_file, 'rb') as file: compressed_data = file.read() # Decode the base64 encoded content decoded_data = base64.b64decode(compressed_data) zip_path = os.path.join(output_directory, 'temp.zip') with open(zip_path, 'wb') as zip_file: zip_file.write(decoded_data) with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(output_directory) # Rename the extracted data.json file to the original timestamp-based name extracted_csv_path = os.path.join(output_directory, 'data.json') if os.path.exists(extracted_csv_path): new_csv_path = os.path.join(output_directory, f"{base_name}.json") os.rename(extracted_csv_path, new_csv_path) os.remove(zip_path) print(f"Decompressed and renamed '{compressed_file}' to '{new_csv_path}'.") def get_last_component(path): path_without_slashes = path.replace('/', '') return path_without_slashes def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, keys, booleans_as_numbers, product_type): output_directory = f"S3cmdData_{bucket_number}" #if os.path.exists(output_directory): # shutil.rmtree(output_directory) if not os.path.exists(output_directory): os.makedirs(output_directory) print(f"Directory '{output_directory}' created.") filenames_to_check = list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize,product_type,bucket_number) existing_files = [filename for filename in filenames_to_check if os.path.exists(os.path.join(output_directory, f"{filename}.json"))] files_to_download = set(filenames_to_check) - set(existing_files) #print(files_to_download) #if os.listdir(output_directory): # print("Files already exist in the local folder. Skipping download.") #else: if files_to_download: download_files(bucket_number, files_to_download, product_type) json_files = [file for file in os.listdir(output_directory) if file.endswith('.json')] json_files.sort(key=extract_timestamp) process_json_files_to_csv( output_directory=output_directory, json_files=json_files, keys=keys, start_timestamp=start_timestamp, end_timestamp=end_timestamp, bucket_number=bucket_number, booleans_as_numbers=booleans_as_numbers ) def parse_keys(input_string): keys = [key.strip() for key in input_string.split(',')] return keys def main(): parser = argparse.ArgumentParser(description='Download files from S3 using s3cmd and extract specific values from CSV files.') parser.add_argument('start_timestamp', type=int, help='The start timestamp for the range (even number)') parser.add_argument('end_timestamp', type=int, help='The end timestamp for the range (even number)') parser.add_argument('--keys', type=parse_keys, required=True, help='The part to match from each CSV file, can be a single key or a comma-separated list of keys') parser.add_argument('--bucket-number', type=int, required=True, help='The number of the bucket to download from') parser.add_argument('--sampling_stepsize', type=int, required=False, default=1, help='The number of 1 minute intervals, which define the length of the sampling interval in S3 file retrieval') parser.add_argument('--booleans_as_numbers', action="store_true", required=False, help='If key used, then booleans are converted to numbers [0/1], if key not used, then booleans maintained as text [False/True]') parser.add_argument('--product_name', required=True, help='Use Salimax, Salidomo or SodistoreMax') args = parser.parse_args() start_timestamp = args.start_timestamp end_timestamp = args.end_timestamp keys = args.keys bucket_number = args.bucket_number sampling_stepsize = args.sampling_stepsize booleans_as_numbers = args.booleans_as_numbers # new arg for product type product_type = args.product_name if start_timestamp >= end_timestamp: print("Error: start_timestamp must be smaller than end_timestamp.") return download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, keys, booleans_as_numbers, product_type) if __name__ == "__main__": main()