import os import sys import json import time import datetime import traceback import logging import logging.handlers import pika import boto3 from botocore.client import Config ############################################### # Logger Setting # ############################################### logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') log_fileHandler = logging.handlers.RotatingFileHandler( filename=os.path.join(os.getcwd(), "send_and_pub.log"), maxBytes=1024000, backupCount=3, mode='a') log_fileHandler.setFormatter(formatter) logger.addHandler(log_fileHandler) logging.getLogger("pika").setLevel(logging.WARNING) ################################################################################ # Config # ################################################################################ HOME_DIR = os.getcwd() with open(os.path.join(HOME_DIR,'config/send_and_pub_config.json'), 'r') as f: info = json.load(f) ################################################################################ # S3 Set up # ################################################################################ s3 = boto3.resource('s3', endpoint_url = info['Minio_url'], aws_access_key_id=info['AccessKey'], aws_secret_access_key=info['SecretKey'], config=Config(signature_version=info['Boto3SignatureVersion']), region_name=info['Boto3RegionName'] ) class Publisher: def __init__(self): self.__url = info['amqp_url'] self.__port = info['amqp_port'] self.__vhost = info['amqp_vhost'] self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw']) self.__queue = info['amqp_queue'] def pub(self, body: dict): try: conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url, self.__port, self.__vhost, self.__cred)) chan = conn.channel() chan.basic_publish(exchange='', routing_key=self.__queue, body=json.dumps(body)) conn.close() return except Exception as e: print(traceback.format_exc()) def main(): publisher = Publisher() logger.info(f"Config Check {info}") while True: if not os.path.exists(info['file_save_path']): os.makedirs(info['file_save_path']) logger.info(f"Make_directory {info['file_save_path']}") file_list = os.listdir(info['file_save_path']) if(len(file_list) == 0): logger.info(f"Folder is empty! send data after {info['send_interval']} sec") time.sleep(info['send_interval']) continue for file_name in file_list: local_file_path = os.path.join(info['file_save_path'],file_name) remote_save_path = os.path.join(info['remote_save_path'],file_name) s3.Bucket(info['BucketName']).upload_file(local_file_path,remote_save_path) send_info={ "timestamp":int(time.time()*1000), "from":local_file_path, "to": {"Bucket":info['BucketName'],"filename":remote_save_path} } publisher.pub(send_info) logger.info("Send Message") if(info['delete_sended_file_flag']==True): os.remove(local_file_path) print(f"Remove {local_file_path}") if __name__ == '__main__': main()