file_transfer_interval/send_and_pub.py

116 lines
3.8 KiB
Python
Executable File

import os
import sys
import json
import time
import datetime
import traceback
import logging
import logging.handlers
import pika
import boto3
from botocore.client import Config
###############################################
# Logger Setting #
###############################################
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log_fileHandler = logging.handlers.RotatingFileHandler(
filename=os.path.join(os.getcwd(), "send_and_pub.log"),
maxBytes=1024000,
backupCount=3,
mode='a')
log_fileHandler.setFormatter(formatter)
logger.addHandler(log_fileHandler)
logging.getLogger("pika").setLevel(logging.WARNING)
################################################################################
# Config #
################################################################################
HOME_DIR = os.getcwd()
with open(os.path.join(HOME_DIR,'config/send_and_pub_config.json'), 'r') as f:
info = json.load(f)
################################################################################
# S3 Set up #
################################################################################
s3 = boto3.resource('s3',
endpoint_url = info['Minio_url'],
aws_access_key_id=info['AccessKey'],
aws_secret_access_key=info['SecretKey'],
config=Config(signature_version=info['Boto3SignatureVersion']),
region_name=info['Boto3RegionName']
)
class Publisher:
def __init__(self):
self.__url = info['amqp_url']
self.__port = info['amqp_port']
self.__vhost = info['amqp_vhost']
self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw'])
self.__queue = info['amqp_queue']
def pub(self, body: dict):
try:
conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
self.__port,
self.__vhost,
self.__cred))
chan = conn.channel()
chan.basic_publish(exchange='',
routing_key=self.__queue,
body=json.dumps(body))
conn.close()
return
except Exception as e:
print(traceback.format_exc())
def main():
publisher = Publisher()
logger.info(f"Config Check {info}")
while True:
if not os.path.exists(info['file_save_path']):
os.makedirs(info['file_save_path'])
logger.info(f"Make_directory {info['file_save_path']}")
file_list = os.listdir(info['file_save_path'])
if(len(file_list) == 0):
logger.info(f"Folder is empty! send data after {info['send_interval']} sec")
time.sleep(info['send_interval'])
continue
for file_name in file_list:
local_file_path = os.path.join(info['file_save_path'],file_name)
remote_save_path = os.path.join(info['remote_save_path'],file_name)
s3.Bucket(info['BucketName']).upload_file(local_file_path,remote_save_path)
send_info={
"timestamp":int(time.time()*1000),
"from":local_file_path,
"to": {"Bucket":info['BucketName'],"filename":remote_save_path}
}
publisher.pub(send_info)
logger.info("Send Message")
if(info['delete_sended_file_flag']==True):
os.remove(local_file_path)
print(f"Remove {local_file_path}")
if __name__ == '__main__':
main()