From e9cc977949d06536643a46ef7baeaf8295120655 Mon Sep 17 00:00:00 2001 From: yjchu Date: Wed, 18 Oct 2023 10:16:31 +0900 Subject: [PATCH] =?UTF-8?q?=EC=B5=9C=EC=8B=A0=20=EC=BD=94=EB=93=9C3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AWX_collector.py | 197 ++++++++++++++++++++++++----------------------- 1 file changed, 102 insertions(+), 95 deletions(-) diff --git a/AWX_collector.py b/AWX_collector.py index 3275a21..4d4854a 100644 --- a/AWX_collector.py +++ b/AWX_collector.py @@ -97,7 +97,7 @@ c = np.array(c) # Get Data from Shared Memory_DO # ############################################################################### -shm_DO = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm2") +shm_DO = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm") do_value = np.ndarray(a.shape, dtype=a.dtype, buffer=shm_DO.buf) do_value[:] = a[:] @@ -108,7 +108,7 @@ do_value[:] = a[:] # Get Data from Shared Memory_CORR # ############################################################################### -shm_CORR = shared_memory.SharedMemory(create=True, size=c.nbytes, name="Correction_Test2") +shm_CORR = shared_memory.SharedMemory(create=True, size=c.nbytes, name="Correction_Test") correction_value = np.ndarray(c.shape, dtype=c.dtype, buffer=shm_CORR.buf) correction_value[:] = c[:] @@ -165,17 +165,17 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port _serial_port_B.rs485_mode = serial.rs485.RS485Settings() - port_A_AI = minimalmodbus.Instrument(serial_dev1, port_a_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port_A_AI.serial.baudrate = 115200 + port_A_AI = minimalmodbus.Instrument(serial_dev1, port_a_ai_id, debug=True, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_AI.serial.baudrate = 115200 - port_A_DO = minimalmodbus.Instrument(serial_dev1, port_a_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port_A_DO.serial.baudrate = 115200 + port_A_DO = minimalmodbus.Instrument(serial_dev1, port_a_do_id, debug=True, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_DO.serial.baudrate = 115200 - port_B_AI = minimalmodbus.Instrument(serial_dev2, port_b_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port_B_AI.serial.baudrate = 115200 + port_B_AI = minimalmodbus.Instrument(serial_dev2, port_b_ai_id, debug=True, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_AI.serial.baudrate = 115200 - port_B_DO = minimalmodbus.Instrument(serial_dev2, port_b_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port_B_DO.serial.baudrate = 115200 + port_B_DO = minimalmodbus.Instrument(serial_dev2, port_b_do_id, debug=True, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_DO.serial.baudrate = 115200 while isThreadRun: @@ -189,8 +189,8 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port prev_a_ai = ai_array prev_b_ai = ai_array - - + + ''' | AI1 | AI2 | DO1 | DO2 | Success | 0 | 0 | x | x | @@ -205,7 +205,7 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port -> 데이터를 어떻게 할 것이냐 -> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음 ''' - + timenow = int(time.time()*1000) a_isSuccess = True b_isSuccess = True @@ -214,17 +214,19 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port port_a_do_value = [] for i in list(do_value[:16]): port_a_do_value.append(int(i)) - port_a_do_value = list(do_value[:16]) + #print("port a do: ", port_a_do_value) #print("do type: {}".format(type(port_a_do_value[0]))) - + try: a_ai_data=port_A_AI.read_registers(0x32, 32) prev_a_ai = a_ai_data + print("a_ai ;", a_ai_data) # suc_cnt_ai = suc_cnt_ai +1 - + except: a_isSuccess = False + print("fail") #if isSuccess: # prev_a_ai = ai_array @@ -232,8 +234,9 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port try: port_A_DO.write_bits(0, port_a_do_value) # suc_cnt_do = suc_cnt_do +1 - + except: + print("fail") None # PORT B @@ -245,9 +248,10 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port b_ai_data=port_B_AI.read_registers(0x32, 32) prev_b_ai = b_ai_data # suc_cnt_ai = suc_cnt_ai +1 - + except: b_isSuccess = False + print("fial") #if isSuccess: # prev_b_ai = ai_array @@ -255,19 +259,20 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port try: port_B_DO.write_bits(0, port_b_do_value) # suc_cnt_do = suc_cnt_do +1 - + except: None - + print("fail") + if a_isSuccess: - queue_a.put([timenow,ai_array, port_a_do_value]) + queue_a.put([timenow,a_ai_data, port_a_do_value]) else: - queue_a.put([0,ai_array, port_a_do_value]) + queue_a.put([0,prev_a_ai, port_a_do_value]) if b_isSuccess: - queue_b.put([timenow,ai_array, port_b_do_value]) + queue_b.put([timenow,b_ai_data, port_b_do_value]) else: - queue_b.put([0,ai_array, port_b_do_value]) + queue_b.put([0,prev_b_ai, port_b_do_value]) @@ -290,8 +295,8 @@ def data_process_thread(data_queue, ch, chamber): below_20 = 0 element_counts = {} current_cycle_elements = 0 - cycle_counts = 0 - + cycle_counts = 0 + while isThreadRun: # 0.1s 마다 발생할 것으로 예상 timestamp, ai, do = data_queue.get() @@ -306,10 +311,10 @@ def data_process_thread(data_queue, ch, chamber): #print("timestamp: {timestamp}, \n ai: {ai}, \n do: {do}" .format(timestamp=timestamp, ai=ai, do=do)) - - - + + + ############################################################################### # Distribute AI data, Calculate value with correction # ############################################################################### @@ -320,40 +325,42 @@ def data_process_thread(data_queue, ch, chamber): voltage = [] ai_index = 0 correction_value_array = [] - - if data_queue == ch1_queue: + + if data_queue == ch1_queue: correction_value_array = np.array(correction_value[:16]) #key_to_change = str(index+1) - elif data_queue == ch2_queue: + elif data_queue == ch2_queue: correction_value_array = np.array(correction_value[16:32]) #key_to_change = str(index+17) - elif data_queue == ch3_queue: + elif data_queue == ch3_queue: correction_value_array = np.array(correction_value[32:48]) #key_to_change = str(index+33) - elif data_queue == ch4_queue: + elif data_queue == ch4_queue: correction_value_array = np.array(correction_value[48:64]) #key_to_change = str(index+49) - + for i in ai: #짝수 current (확인 필요) ai_index = ai_index +1 + print("ai list :", ai) if (ai_index%2) == 0: current.append(int(i)) + print("current :", current) else: voltage.append(int(i)) - #print(voltage) + print("voltage :", voltage) + - calculated_voltage = voltage + np.array(correction_value_array) calculated_voltage_list = calculated_voltage.tolist() #print("be calculated: ", calculated_voltage) - + #print("do list : ", do) #print("calculated_voltage list :", calculated_voltage_list) #mult_voltage = [do[i] * calculated_voltage_list[i] for i in range(len(do))] #print("multed value :", mult_voltage) #print(type(mult_voltage)) - + ############################################################################### # HJ ORG CODE # ############################################################################### @@ -373,13 +380,13 @@ def data_process_thread(data_queue, ch, chamber): else: # count reset error_cnt_1[voltage_cnt] = 0 - + voltage_cnt = voltage_cnt + 1 print("voltage count:",voltage_cnt) ''' - - + + ############################################################################### # YJ TEST CODE # ############################################################################### @@ -393,73 +400,73 @@ def data_process_thread(data_queue, ch, chamber): element_counts = [x + 1 if y == 1 else x for x, y in zip(element_counts, do)] #element_counts[i] = element_counts.get(i,0) + 1 print(element_counts) - + #mult_element_count = [element_count[i] * do[i] for i in range(len(do))] #이건 아닌거 같다 카운트는 계속 올라갈 거고 10일 때 DO가 켜지면 또 에러 발생임 cycle_counts += 1 - + for index, count in element_counts.items(): if cycle_counts % 10 == 0: if count == 10: #print(f"Error elements: {index}") - error_cnt_1[index] = error_cnt_1[index] +1 + error_cnt_1[index] = error_cnt_1[index] +1 #error_cnt_1 = [error_cnt_1[index] +1 * do[index] for index in range(len(error_cnt_1))] mult_error_cnt_1 = [error_cnt_1[i] * do[i] for i in range(len(do))] #mult_error_cnt_1 = [x + 1 * y for x,y in zip(error_cnt_1, do)] - + print("check error_cnt_1 : ", error_cnt_1) print("check multed value to do index : ", mult_error_cnt_1) - + global do_dict - + #key_to_change = str(index+1) - if data_queue == ch1_queue: + if data_queue == ch1_queue: key_to_change = str(index+1) - elif data_queue == ch2_queue: + elif data_queue == ch2_queue: key_to_change = str(index+17) - elif data_queue == ch3_queue: + elif data_queue == ch3_queue: key_to_change = str(index+33) - elif data_queue == ch4_queue: + elif data_queue == ch4_queue: key_to_change = str(index+49) - + if key_to_change in do_dict: - do_dict[key_to_change] = "0" + do_dict[key_to_change] = "0" with open(f"{ROOT_PATH}/config.json","w") as f: json.dump(do_dict, f) - - #print("check do off from config.json: ",do_dict.values()) - + + #print("check do off from config.json: ",do_dict.values()) + element_counts[index] = 0 #error_cnt_1 = 0 #mult_error_cnt_1 = 0 else: - element_counts[index] = 0 + element_counts[index] = 0 ''' ERR1 = 0 #count = 0 #cycle_counts = 0 #element_counts = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] - + for i, value in enumerate(calculated_voltage_list): if value <= 20000 and do[i] == 1: #print(f"Updating count for index {i}") #element_counts = [x + 1 if y == 1 else x for x, y in zip(element_counts, do)] - + #element_counts[i] += 1 element_counts[i] = element_counts.get(i,0) + 1 - + #print("check do : " ,do) #print("voltage list : ", calculated_voltage_list) - + #mult_element_count = [element_count[i] * do[i] for i in range(len(do))] #이건 아닌거 같다 카운트는 계속 올라갈 거고 10일 때 DO가 켜지면 또 에러 발생임 - + #print("check do : " ,do) - #print("voltage list : ", calculated_voltage_list) + #print("voltage list : ", calculated_voltage_list) #print("element cosunts : ", element_counts,"\r\n") cycle_counts += 1 - + error_cnt_1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] for index, count in element_counts.items(): if cycle_counts % 10 == 0: @@ -468,49 +475,49 @@ def data_process_thread(data_queue, ch, chamber): #print(f"Error elements: {index}") error_cnt_1[index] += 1 #error_cnt_1 = [error_cnt_1[index] +1 * do[index] for index in range(len(error_cnt_1))] - + #mult_error_cnt_1 = [error_cnt_1[i] * do[i] for i in range(len(do))] - + #mult_error_cnt_1 = [x + 1 * y for x,y in zip(error_cnt_1, do)] //zip 함수 사용법 - + #print("check error_cnt_1 : ", error_cnt_1) #print("check multed value to do index : ", mult_error_cnt_1) - + global do_dict - + #key_to_change = str(index+1) - if data_queue == ch1_queue: + if data_queue == ch1_queue: key_to_change = str(index+1) - elif data_queue == ch2_queue: + elif data_queue == ch2_queue: key_to_change = str(index+17) - elif data_queue == ch3_queue: + elif data_queue == ch3_queue: key_to_change = str(index+33) - elif data_queue == ch4_queue: + elif data_queue == ch4_queue: key_to_change = str(index+49) - + if key_to_change in do_dict: - do_dict[key_to_change] = "0" + do_dict[key_to_change] = "0" with open(f"{ROOT_PATH}/config.json","w") as f: json.dump(do_dict, f) - - #print("check do off from config.json: ",do_dict.values()) - + + #print("check do off from config.json: ",do_dict.values()) + element_counts[index] = 0 - + #mult_error_cnt_1 = 0 else: element_counts[index] = 0 - + # 로깅 기능을 넣어야 겠다...? # 파일 하나 열어서, 데이터를 쓴다 - - - + + + ############################################################################### # Make data form to json and Publish message # ############################################################################### - + #데이터 만들어서 전달 if timestamp > 0: #if timestamp == 0: @@ -528,16 +535,16 @@ def data_process_thread(data_queue, ch, chamber): else: data_list_for_send_2.append(data) - + ''' - ############## Error Do 데이터 잘려오는 증상 추정 원인 개선 방안 중 하나 ################ + ############## Error Do 데이터 잘려오는 증상 추정 원인 개선 방안 중 하나 ################ # -> string 만들어서 전달하자 json_obj1 = json.loads(data) if(queue_select_count == 0): data_list_for_send_1.append(data) else: data_list_for_send_2.append(data) - + ''' if queue_count == 10: @@ -551,8 +558,8 @@ def data_process_thread(data_queue, ch, chamber): "assetCode":"S0NQR0423090001", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) "dataType":"DATA", "data": { - "channel": ch, # 채널 입력 �자 - "chamber":chamber, #chamber 도 입력 �자 + "channel": ch, # 채널 입력 ?자 + "chamber":chamber, #chamber 도 입력 ?자 "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 "list": data_list_for_send_1 } @@ -565,8 +572,8 @@ def data_process_thread(data_queue, ch, chamber): "assetCode":"S0NQR0423090001", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) "dataType":"DATA", "data": { - "channel": ch, # 채널 입력 �자 - "chamber":chamber, #chamber 도 입력 �자 + "channel": ch, # 채널 입력 ?자 + "chamber":chamber, #chamber 도 입력 ?자 "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 "list": data_list_for_send_2 } @@ -578,11 +585,11 @@ def data_process_thread(data_queue, ch, chamber): # print(type(mqtt_msg['data']['list'][0]['do'][0])) data_json_str = json.dumps(mqtt_msg, indent=4) - #print(data_json_str) + print(data_json_str) publish_json_message(client, data_json_str) - + ############################################################################### @@ -620,7 +627,7 @@ if __name__ == '__main__': chip0_evt = threading.Event() chip1_evt = threading.Event() - read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4개씩 연결되 어 있어서 테스트 용 + read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4 개씩 연결되 어 있어서 테스트 용 read_data_chip1 = threading.Thread(target=cwt_thread,args=(chip1_evt, 1, 9,10, 13,14, ch3_queue, ch4_queue)) data_process_ch1 = threading.Thread(target=data_process_thread,args=(ch1_queue, 1, 1))