file_transfer_interval/send_and_pub.py

84 lines
2.9 KiB
Python
Executable File

import os
import sys
import json
import time
import datetime
import traceback
import pika
import boto3
from botocore.client import Config
################################################################################
# Config #
################################################################################
with open('/home/sdt/Workspace/gseps/send_and_pub/config/send_and_pub_config.json', 'r') as f:
info = json.load(f)
################################################################################
# S3 Set up #
################################################################################
s3 = boto3.resource('s3',
endpoint_url = info['Minio_url'],
aws_access_key_id=info['AccessKey'],
aws_secret_access_key=info['SecretKey'],
config=Config(signature_version=info['Boto3SignatureVersion']),
region_name=info['Boto3RegionName']
)
class Publisher:
def __init__(self):
self.__url = info['amqp_url']
self.__port = info['amqp_port']
self.__vhost = info['amqp_vhost']
self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw'])
self.__queue = info['amqp_queue']
def pub(self, body: dict):
try:
conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
self.__port,
self.__vhost,
self.__cred))
chan = conn.channel()
chan.basic_publish(exchange='',
routing_key=self.__queue,
body=json.dumps(body))
conn.close()
return
except Exception as e:
print(traceback.format_exc())
def main():
publisher = Publisher()
while True:
if not os.path.exists(info['file_save_path']):
os.makedirs(info['file_save_path'])
file_list = os.listdir(info['file_save_path'])
if(len(file_list) == 0):
print(f"Folder is empty! send data after {info['send_interval']} sec")
time.sleep(info['send_interval'])
continue
for file_name in file_list:
local_file_path = os.path.join(info['file_save_path'],file_name)
print(s3.Bucket(info['BucketName']).upload_file(local_file_path,file_name))
if(info['send_json_flag']==True):
if(file_name.split(".")[1] == "json"):
with open(local_file_path,"r") as f:
publisher.pub(f.read())
if(info['delete_sended_file_flag']==True):
os.remove(local_file_path)
print(f"Remove {local_file_path}")
if __name__ == '__main__':
main()