116 lines
3.8 KiB
Python
Executable File
116 lines
3.8 KiB
Python
Executable File
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import datetime
|
|
import traceback
|
|
import logging
|
|
import logging.handlers
|
|
|
|
import pika
|
|
import boto3
|
|
from botocore.client import Config
|
|
|
|
|
|
|
|
###############################################
|
|
# Logger Setting #
|
|
###############################################
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
log_fileHandler = logging.handlers.RotatingFileHandler(
|
|
filename=os.path.join(os.getcwd(), "send_and_pub.log"),
|
|
maxBytes=1024000,
|
|
backupCount=3,
|
|
mode='a')
|
|
|
|
log_fileHandler.setFormatter(formatter)
|
|
logger.addHandler(log_fileHandler)
|
|
|
|
logging.getLogger("pika").setLevel(logging.WARNING)
|
|
|
|
|
|
################################################################################
|
|
# Config #
|
|
################################################################################
|
|
|
|
HOME_DIR = os.getcwd()
|
|
|
|
with open(os.path.join(HOME_DIR,'config/send_and_pub_config.json'), 'r') as f:
|
|
info = json.load(f)
|
|
|
|
################################################################################
|
|
# S3 Set up #
|
|
################################################################################
|
|
|
|
s3 = boto3.resource('s3',
|
|
endpoint_url = info['Minio_url'],
|
|
aws_access_key_id=info['AccessKey'],
|
|
aws_secret_access_key=info['SecretKey'],
|
|
config=Config(signature_version=info['Boto3SignatureVersion']),
|
|
region_name=info['Boto3RegionName']
|
|
)
|
|
|
|
|
|
class Publisher:
|
|
def __init__(self):
|
|
self.__url = info['amqp_url']
|
|
self.__port = info['amqp_port']
|
|
self.__vhost = info['amqp_vhost']
|
|
self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw'])
|
|
self.__queue = info['amqp_queue']
|
|
|
|
|
|
def pub(self, body: dict):
|
|
try:
|
|
conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
|
|
self.__port,
|
|
self.__vhost,
|
|
self.__cred))
|
|
chan = conn.channel()
|
|
chan.basic_publish(exchange='',
|
|
routing_key=self.__queue,
|
|
body=json.dumps(body))
|
|
conn.close()
|
|
return
|
|
except Exception as e:
|
|
print(traceback.format_exc())
|
|
|
|
|
|
|
|
|
|
def main():
|
|
publisher = Publisher()
|
|
logger.info(f"Config Check {info}")
|
|
while True:
|
|
if not os.path.exists(info['file_save_path']):
|
|
os.makedirs(info['file_save_path'])
|
|
logger.info(f"Make_directory {info['file_save_path']}")
|
|
|
|
file_list = os.listdir(info['file_save_path'])
|
|
if(len(file_list) == 0):
|
|
logger.info(f"Folder is empty! send data after {info['send_interval']} sec")
|
|
time.sleep(info['send_interval'])
|
|
continue
|
|
|
|
for file_name in file_list:
|
|
local_file_path = os.path.join(info['file_save_path'],file_name)
|
|
remote_save_path = os.path.join(info['remote_save_path'],file_name)
|
|
s3.Bucket(info['BucketName']).upload_file(local_file_path,remote_save_path)
|
|
send_info={
|
|
"timestamp":int(time.time()*1000),
|
|
"from":local_file_path,
|
|
"to": {"Bucket":info['BucketName'],"filename":remote_save_path}
|
|
}
|
|
publisher.pub(send_info)
|
|
logger.info("Send Message")
|
|
if(info['delete_sended_file_flag']==True):
|
|
os.remove(local_file_path)
|
|
print(f"Remove {local_file_path}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|