import os import sys import json import time import datetime import traceback import pika import boto3 from botocore.client import Config ################################################################################ # Config # ################################################################################ with open('/home/sdt/Workspace/gseps/send_and_pub/config/send_and_pub_config.json', 'r') as f: info = json.load(f) ################################################################################ # S3 Set up # ################################################################################ s3 = boto3.resource('s3', endpoint_url = info['Minio_url'], aws_access_key_id=info['AccessKey'], aws_secret_access_key=info['SecretKey'], config=Config(signature_version=info['Boto3SignatureVersion']), region_name=info['Boto3RegionName'] ) class Publisher: def __init__(self): self.__url = info['amqp_url'] self.__port = info['amqp_port'] self.__vhost = info['amqp_vhost'] self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw']) self.__queue = info['amqp_queue'] def pub(self, body: dict): try: conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, self.__port, self.__vhost, self.__cred)) chan = conn.channel() chan.basic_publish(exchange='', routing_key=self.__queue, body=json.dumps(body)) conn.close() return except Exception as e: print(traceback.format_exc()) def main(): publisher = Publisher() while True: if not os.path.exists(info['file_save_path']): os.makedirs(info['file_save_path']) file_list = os.listdir(info['file_save_path']) if(len(file_list) == 0): print(f"Folder is empty! send data after {info['send_interval']} sec") time.sleep(info['send_interval']) continue for file_name in file_list: local_file_path = os.path.join(info['file_save_path'],file_name) print(s3.Bucket(info['BucketName']).upload_file(local_file_path,file_name)) if(info['send_json_flag']==True): if(file_name.split(".")[1] == "json"): with open(local_file_path,"r") as f: publisher.pub(f.read()) if(info['delete_sended_file_flag']==True): os.remove(local_file_path) print(f"Remove {local_file_path}") if __name__ == '__main__': main()