From a2f6eb97b6cd34dbce307e2e40b50286fa8967d3 Mon Sep 17 00:00:00 2001 From: ssung Date: Sat, 9 Sep 2023 12:29:53 +0900 Subject: [PATCH] gseps_test_version --- config/inference_config.json | 8 +- inference.py | 139 ++++++++++++++++++++++------------- 2 files changed, 91 insertions(+), 56 deletions(-) diff --git a/config/inference_config.json b/config/inference_config.json index 89ca767..e1ebd77 100644 --- a/config/inference_config.json +++ b/config/inference_config.json @@ -1,6 +1,6 @@ { "slack_url": "https://hooks.slack.com/services/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", - "slack_message": "Queue is Empty! Restart after 100 second", + "slack_message": "Queue is Empty! Restart after 250 second", "amqp_url": "13.209.39.139", "amqp_port": 30747, "amqp_vhost": "/", @@ -9,6 +9,7 @@ "amqp_TaskQ": "gseps-mq", "amqp_ResultQ": "gseps-ResultQ", "amqp_message_expire_time": "300000", + "amqp_Q_check_interval": 250, "Minio_url": "http://13.209.39.139:31191", "AccessKey":"VV2gooVNevRAIg7HrXQr", "SecretKey":"epJmFWxwfzUUgYeyDqLa8ouitHZaWTwAvPfPNUBL", @@ -20,7 +21,7 @@ "points_per_side": 36, "pred_iou_thresh": 0.86, "stability_score_thresh": 0.9, - "crop_n_layers": 1, + "crop_n_layers": 2, "crop_n_points_downscale_factor": 1, "box_nms_thresh": 0.8, "min_mask_region_area": 10, @@ -32,5 +33,6 @@ "remote_server_id": "sdt", "remote_server_pw": "251327", "copied_image_path_from_remote_server": "/home/sdt/Workspace/gseps/inference/image_bucket", - "inference_result_path": "/home/sdt/Workspace/gseps/inference/result/" + "inference_result_path": "/home/sdt/Workspace/gseps/inference/result/", + "n_bins_for_histogram":20 } diff --git a/inference.py b/inference.py index 2f4b7fe..11cfc25 100644 --- a/inference.py +++ b/inference.py @@ -55,8 +55,8 @@ s3 = boto3.resource('s3', ) -def send_message_to_slack(): - data = {"text": info['slack_message']} +def send_message_to_slack(message): + data = {"text": message} req = requests.post( url=info['slack_url'], data=json.dumps(data) @@ -149,6 +149,7 @@ class Consumer: box_nms_thresh=self.cfg['box_nms_thresh'], min_mask_region_area=self.cfg['min_mask_region_area']) end = time.time() + logger.info(f"Initialize {str(info['model_config'])}") print(f'Initialize time: {(end - start) // 60}m {(end - start) % 60:4f}s') def image_upload_to_ncp(self, path): @@ -209,63 +210,95 @@ class Consumer: save_path = info['inference_result_path'] while True: - method, properties, body = chan.basic_get(queue=self.__TaskQ, - auto_ack=True) + try: + method, properties, body = chan.basic_get(queue=self.__TaskQ, + auto_ack=True) + + amqp_message_properties = pika.BasicProperties(expiration=info['amqp_message_expire_time']) + + # if Queue is empty + if not method: + send_message_to_slack("Empty Queue") + logger.info(f"Empty Queue sleep for {info['amqp_Q_check_interval']}") + time.sleep(info['amqp_Q_check_interval']) + + if method: + logger.info(f" [x] Received {body}") + Task_data = json.loads(body) + if(Task_data.get("to")): + download_path = os.path.join(os.getcwd(),info['download_data_path'],Task_data['to']['filename']) + s3.Bucket(Task_data['to']['bucket']).download_file(Task_data['to']['filename'],download_path) + else: + logger.info("Check Message Data. key 'to' is missing") + continue + + # read image file + image = cv2.imread(download_path) + + # get file name + image_name = download_path.split('/')[-1].split('.')[0] + + # run inference + result_image, count, sizes = self.inference(image) - amqp_message_properties = pika.BasicProperties(expiration=info['amqp_message_expire_time']) - - # if Queue is empty - if not method: - send_message_to_slack() - logger.info("Empty Queue sleep for 100s") - time.sleep(100) - - if method: - logger.info(f" [x] Received {body}") - Task_data = json.loads(body) - if(Task_data.get("to")): - download_path = os.path.join(os.getcwd(),info['download_data_path'],Task_data['to']['filename']) - s3.Bucket(Task_data['to']['Bucket']).download_file(Task_data['to']['filename'],download_path) + # delete original file + os.remove(download_path) + logger.info(f" len(sizes) : {len(sizes)}") + + if(len(sizes) < 30): + logger.info("PASS") + continue + + + # save reulst image + result_filename = f'result_{image_name}.jpg' + plt.imsave(os.path.join(save_path, result_filename), result_image) + + # calculation for histogram + fig, axs = plt.subplots(1,1) + n_bins = info['n_bins_for_histogram'] + xy = axs.hist(sizes, bins=n_bins) + print(xy) + y = xy[0].astype(np.int32) + x = xy[1] + hist_y = y.tolist() + hist_x = x.tolist() + str_x = [] + for single_x in hist_x: + str_x.append(str(single_x)) + hist_x = str_x.copy() + # message contents set-up + Task_data['type']="inference_result" + Task_data['result']={ + 'timestamp':int(time.time()*1000), + 'count':count, + 'sizes':sizes, + 'filename':result_filename, + 'hist':{ + 'x':hist_x, + 'y':hist_y + } + } + Task_data['from']=Task_data['to'] + Task_data.pop('to',None) + + # send message to AMQP Result Queue + self.result_publish(chan, amqp_message_properties,Task_data) + time.sleep(1) else: - logger.info("Check Message Data. key 'to' is missing") - continue - - # read image file - image = cv2.imread(download_path) - - # get file name - image_name = download_path.split('/')[-1].split('.')[0] - - # run inference - result_image, count, sizes = self.inference(image) - - # delete original file - os.remove(download_path) - - # save reulst image - result_filename = f'result_{image_name}.jpg' - plt.imsave(os.path.join(save_path, result_filename), result_image) - - # message contents set-up - Task_data['Type']="inference_result" - Task_data['result']={ - 'timestamp':int(time.time()*1000), - 'count':count, - 'sizes':sizes, - 'filename':result_filename - } - Task_data['from']=Task_data['to'] - Task_data.pop('to',None) - - # send message to AMQP Result Queue - self.result_publish(chan, amqp_message_properties,Task_data) - time.sleep(1) - else: - time.sleep(0.5) + time.sleep(0.5) + except Exception as e: + print(traceback.format_exc()) + send_message_to_slack(f"who : inference_server // error {str(e)}") + logger.error(str(e)) + continue + except Exception as e: print(e) print(traceback.format_exc()) + send_message_to_slack(f"who : inference_server // error {str(e)}") + logger.error(str(e)) conn.close()