From d7455305428c20e94ae443323e99c623c743fff6 Mon Sep 17 00:00:00 2001 From: YujinChu Date: Fri, 8 Sep 2023 07:06:29 +0000 Subject: [PATCH] add shm updater --- AWX_collector.py | 402 ++++++++++++++++++++++++++++++++--------------- install.sh~ | 6 - 2 files changed, 273 insertions(+), 135 deletions(-) delete mode 100644 install.sh~ diff --git a/AWX_collector.py b/AWX_collector.py index 393518d..992e139 100644 --- a/AWX_collector.py +++ b/AWX_collector.py @@ -8,8 +8,10 @@ import serial.rs485 import subprocess import json import queue -import argparse import paho.mqtt.client as mqtt +import numpy as np +from multiprocessing import shared_memory +import argparse ''' @@ -29,7 +31,6 @@ ER1: 1초동안 판별, 판별할 동안 counting 할 예정 2V 이하라면 CNT ER2: OFF 횟수 - ''' parser = argparse.ArgumentParser() @@ -46,6 +47,33 @@ with open(f"{args.config}/connect_info.json","r") as f: info = json.load(f) +############################################################################### +# Shared memory setting # +############################################################################### + + + +with open(f"{args.config}/config.json","r") as f: + data = json.load(f) + +print(data) +print(data["1"]) + +a = [ data[str(n+1)] for n in range(64)] +a = np.array(a) + + + + +############################################################################### +# Get Data from Shared Memory # +############################################################################### + +shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm") +b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) +b[:] = a[:] + + ############################################################################### # MQTT Broker Setting # ############################################################################### @@ -69,80 +97,72 @@ client.connect(host=MQTT_HOST_IP,port=MQTT_PORT) client.loop(2) # timeout = 2초 -serial_dev1 = "" -serial_dev2 = "" -# chip0_lock = threading.Lock() -# chip1_lock = threading.Lock() -_serial_port1 = serial.Serial("/dev/ttyMAX1", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) -_serial_port1.rs485_mode = serial.rs485.RS485Settings() -_serial_port2 = serial.Serial("/dev/ttyMAX0", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) -_serial_port2.rs485_mode = serial.rs485.RS485Settings() -#_serial_port3 = serial.Serial("/dev/ttyMAX2", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) -#_serial_port3.rs485_mode = serial.rs485.RS485Settings() -#_serial_port4 = serial.Serial("/dev/ttyMAX3", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) -#_serial_port4.rs485_mode = serial.rs485.RS485Settings() +############################################################################### +# Serial setting # +############################################################################### + +_serial_port3 = serial.Serial("/dev/ttyMAX2", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) +_serial_port3.rs485_mode = serial.rs485.RS485Settings() +_serial_port4 = serial.Serial("/dev/ttyMAX3", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) +_serial_port4.rs485_mode = serial.rs485.RS485Settings() + -#data_queue = queue.Queue() ######################################### thread 함수 작성 ################################################ -dict_1={} - -def read_cihp0(e,port_num1,port_num2,id1,id2,id3,id4,id5,id6): - - serial_dev1 = "/dev/ttyMAX1" - serial_dev2 = "/dev/ttyMAX0" - # lock = chip1_lock - #_serial_port3 = serial.Serial("/dev/ttyMAX2", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) - #_serial_port3.rs485_mode = serial.rs485.RS485Settings() - #_serial_port4 = serial.Serial("/dev/ttyMAX3", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) - #_serial_port4.rs485_mode = serial.rs485.RS485Settings() +############################################################################### +# Serial thread # +############################################################################### +isThreadRun = True - port1_AI1 = minimalmodbus.Instrument(serial_dev1, id1, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port1_AI1.serial.baudrate = 115200 # baudrate +def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port_b_do_id, queue_a, queue_b): - port1_DO = minimalmodbus.Instrument(serial_dev1, id2, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port1_DO.serial.baudrate = 115200 # baudrate + print("index: ", chip_index) + print("a-ai: %d, a-do: %d, b-ai: %d, b-do: %d " % (port_a_ai_id, port_a_do_id, port_b_ai_id, port_b_do_id)) + if chip_index == 0: + serial_dev1 = "/dev/ttyMAX1" + serial_dev2 = "/dev/ttyMAX0" + else: + serial_dev1 = "/dev/ttyMAX2" + serial_dev2 = "/dev/ttyMAX3" - port1_AI2 = minimalmodbus.Instrument(serial_dev1, id3, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port1_AI2.serial.baudrate = 115200 # baudrate + print(serial_dev1) + print(serial_dev2) + _serial_port_A = serial.Serial(serial_dev1, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) + _serial_port_A.rs485_mode = serial.rs485.RS485Settings() + _serial_port_B = serial.Serial(serial_dev2, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) + _serial_port_B.rs485_mode = serial.rs485.RS485Settings() - port2_AI1 = minimalmodbus.Instrument(serial_dev2, id4, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port2_AI1.serial.baudrate = 115200 # baudrate - port2_DO = minimalmodbus.Instrument(serial_dev2, id5, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port2_DO.serial.baudrate = 115200 # baudrate + port_A_AI = minimalmodbus.Instrument(serial_dev1, port_a_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_AI.serial.baudrate = 115200 # baudrate - port2_AI2 = minimalmodbus.Instrument(serial_dev2, id6, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) - port2_AI2.serial.baudrate = 115200 # baudrate + port_A_DO = minimalmodbus.Instrument(serial_dev1, port_a_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_DO.serial.baudrate = 115200 # baudrate - global read_cnt_ch0 - read_cnt_ch0 = 0 - global suc_cnt_ai - global suc_cnt_do - global suc_cnt_ai2 - global suc_cnt_do2 - suc_cnt_ai = 0 - suc_cnt_do = 0 - suc_cnt_ai2 = 0 - suc_cnt_do2 = 0 - global lists - lists = [] - while True: - e.wait() + port_B_AI = minimalmodbus.Instrument(serial_dev2, port_b_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_AI.serial.baudrate = 115200 # baudrate + + port_B_DO = minimalmodbus.Instrument(serial_dev2, port_b_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_DO.serial.baudrate = 115200 # baudrate + + + while isThreadRun: + evt.wait() #print("read thread get even") - e.clear() - read_cnt_ch0 = read_cnt_ch0 +1 + evt.clear() + + prev_a_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + prev_b_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] #value list on -> do control data #추후에 do power on 할 리스트는 어디선가 읽어서 와야 해요 #do1 do2 나눠야 해요 - value_list_ON = [1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0] - # lock.acquire() - timenow = time.time() + + ''' | AI1 | AI2 | DO1 | DO2 | @@ -158,54 +178,55 @@ def read_cihp0(e,port_num1,port_num2,id1,id2,id3,id4,id5,id6): -> 데이터를 어떻게 할 것이냐 -> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음 ''' + timenow = int(time.time()*1000) + isSuccess = True + + # PORT A + port_a_do_value = list(b[:16]) + try: + a_ai_data=port_A_AI.read_registers(0x32, 32) + # suc_cnt_ai = suc_cnt_ai +1 + except: + isSuccess = False + + if isSuccess: + prev_a_ai = a_ai_data try: - ai1=port1_AI1.read_registers(0x32, 32) - suc_cnt_ai = suc_cnt_ai +1 + port_A_DO.write_bits(0, port_a_do_value) + # suc_cnt_do = suc_cnt_do +1 except: -# pass None - # print("Port3 AI1 Fail") - + + # PORT B + port_b_do_value = list(b[17:32]) try: - #print("try") - port1_DO.write_bits(0, value_list_ON) - suc_cnt_do = suc_cnt_do +1 - #print("do1") + b_ai_data=port_B_AI.read_registers(0x32, 32) + prev_b_ai = b_ai_data + # suc_cnt_ai = suc_cnt_ai +1 + except: + isSuccess = False + + if isSuccess: + prev_b_ai = b_ai_data + + try: + port_B_DO.write_bits(0, port_b_do_value) + # suc_cnt_do = suc_cnt_do +1 except: None - # print("Port3 DO Fail") -# try: -# ai2 = port3_AI2.read_registers(0x32, 32) -# except: -# print("Port3 AI2 Fail") - #print(ai1) - #print(ai2) + if isSuccess: + queue_a.put([timenow,prev_a_ai, port_a_do_value]) + queue_b.put([timenow,prev_b_ai, port_b_do_value]) + else: + queue_a.put([0,prev_a_ai, port_a_do_value]) + queue_b.put([0,prev_b_ai, port_b_do_value]) + - try: - global ai2 - ai2=port2_AI1.read_registers(0x32, 32) - suc_cnt_ai2 = suc_cnt_ai2 +1 - except: - pass - # print("Port4 AI1 Fail") - try: - do2 = port2_DO.write_bits(0, value_list_ON) - suc_cnt_do2 = suc_cnt_do2 +1 - except: - pass - # print("Port4 DO Fail, ") - -# try: -# ai2 = port4_AI2.read_registers(0x32, 32) -# except: -# print("Port4 AI2 Fail") - #print(ai1) - #print(ai2) # lock.release() @@ -214,12 +235,11 @@ def read_cihp0(e,port_num1,port_num2,id1,id2,id3,id4,id5,id6): ''' - ##데이터 생성 할 때 # ai1, ai2 # -> ai1 [32] -> [curr, volt, curr, volt ...] - + ''' data_for_list = { "slot" : 1, "timestamp": int(timenow*1000), @@ -248,33 +268,154 @@ def read_cihp0(e,port_num1,port_num2,id1,id2,id3,id4,id5,id6): "list": lists } } - global dict_1 + data_json_str = json.dumps(data, indent=0) publish_json_message(client, data_json_str) #print(data_json_str) # time.sleep(1) - - - # PRINT("CHip1 read count:", read_cnt) + ''' + + +############################################################################### +# data process thread # +############################################################################### + + +def data_process_thread(data_queue, ch, chamber): + error_cnt_1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + error_cnt_2 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + data_list_for_send_1=[] + data_list_for_send_2=[] + queue_count = 0 + queue_select_count = 0 + while isThreadRun: + # 0.1s 마다 발생할 것으로 예상 + timestamp, ai, do = data_queue.get() + + #================================================== + do_temp = [] + for d in do: + do_temp.append(int(d)) + do = do_temp.copy() + #================================================== + + + #print("timestamp: {timestamp}, \n ai: {ai}, \n do: {do}" .format(timestamp=timestamp, ai=ai, do=do)) + + #ai data -> current, voltage 전환 + queue_count = queue_count + 1 + current = [] + voltage = [] + ai_index = 0 + for i in ai: + #짝수 current (확인 필요) + ai_index = ai_index +1 + if (ai_index%2) == 0: + current.append(int(i)) + else: + voltage.append(int(i)) + + + #전압값으로 error count check + voltage_cnt = 0 + for i in voltage: + if i < 2: + #count up + error_cnt_1[voltage_cnt] = error_cnt_1[voltage_cnt] +1 + if error_cnt_1[voltage_cnt] == 10: + error_cnt_1[voltage_cnt] = 0 + error_cnt_2[voltage_cnt] = error_cnt_2[voltage_cnt] + 1 + #shared memory do control modify + #ch, tube number->off + else: + # count reset + error_cnt_1[voltage_cnt] = 0 + + + + voltage_cnt = voltage_cnt + 1 + + # 로깅 기능을 넣어야 겠다...? + # 파일 하나 열어서, 데이터를 쓴다 + + # 데이터 만들어서 전달 + if timestamp > 0: + data = { + "timestamp" : int(timestamp), + "data": { + "current" : current, + "voltage" : voltage, + "do" : do, + "er1" : error_cnt_1, + "er2" : error_cnt_2 + } + } + if(queue_select_count == 0): + data_list_for_send_1.append(data) + else: + data_list_for_send_2.append(data) + + + if queue_count == 10: + queue_count = 0 + mqtt_msg = {} + if (queue_select_count == 0): + mqtt_msg = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) + "dataType":"DATA", + "data": { + "channel": ch, # 채널 입력 받자 + "chamber":chamber, #chamber 도 입력 받자 + "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 + "list": data_list_for_send_1 + } + } + queue_select_count = 1 + data_list_for_send_2.clear() + else: + mqtt_msg = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) + "dataType":"DATA", + "data": { + "channel": ch, # 채널 입력 받자 + "chamber":chamber, #chamber 도 입력 받자 + "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 + "list": data_list_for_send_2 + } + } + queue_select_count = 0 + data_list_for_send_1.clear() + + +# print(type(mqtt_msg['data']['list'][0]['do'][0])) + data_json_str = json.dumps(mqtt_msg, indent=4) +# print(data_json_str) +# print(data_json_str) + publish_json_message(client, data_json_str) + + ##erro cnt check routine + + + ######################################### thread 함수 작성 ################################################ -def event_trigger_thread(): - print("inpuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuut\r\n") - if input("Enter to exit(): "): - e.set() - #exit() - return +#event는 개별 생성을 해준다 th1, th2 -def timer_event_loop(e): + +def timer_event_loop(e1, e2): cnt = 0 # global read_cnt + while True: # print("event set, meanning : periodic sensing") - e.set() + e1.set() + e2.set() time.sleep(0.1) #cnt = cnt +1 @@ -315,28 +456,31 @@ def timer_event_loop(e): ################################# 작성 중 ###################################### if __name__ == '__main__': - e = threading.Event() - read_data_chip0 = threading.Thread(target=read_cihp0,args=(e,1,2,1,2,3,5,6,7)) - # read_data_chip1 = threading.Thread(target=read_cihp1,args=(e,2,3,9,10,11,13,14,15)) - # read_data_ch3 = threading.Thread(target=read_thread,args=(e,3,9,10,11)) - # read_data_ch4 = threading.Thread(target=read_thread,args=(e,4,13,14,15)) - event_trigger = threading.Thread(target=event_trigger_thread) - event_thread = threading.Thread(target=timer_event_loop, args=(e,)) -# timer = threading.Timer(3,lambda: e.set()) # 1 time + ############################################################################### + # Queue x 4 max- infinite # + ############################################################################### - # read_data_ch1.start() - # read_data_ch2.start() - # read_data_chip1.start() + ch1_queue = queue.Queue() + ch2_queue = queue.Queue() + ch3_queue = queue.Queue() + ch4_queue = queue.Queue() + + chip0_evt = threading.Event() + chip1_evt = threading.Event() + + read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4개씩 연결되어 있어서 테스트 용 + read_data_chip1 = threading.Thread(target=cwt_thread,args=(chip1_evt, 1, 9,10, 13,14, ch3_queue, ch4_queue)) + + data_process_ch1 = threading.Thread(target=data_process_thread,args=(ch1_queue, 1, 1)) + data_process_ch2 = threading.Thread(target=data_process_thread,args=(ch2_queue, 2, 1)) + data_process_ch3 = threading.Thread(target=data_process_thread,args=(ch3_queue, 3, 1)) + data_process_ch4 = threading.Thread(target=data_process_thread,args=(ch4_queue, 4, 1)) + + event_thread = threading.Thread(target=timer_event_loop, args=(chip0_evt,chip1_evt,)) read_data_chip0.start() -# timer.start() - event_trigger.start() + read_data_chip1.start() + data_process_ch1.start() + data_process_ch2.start() + data_process_ch3.start() + data_process_ch4.start() event_thread.start() - # wait_and_timer.start() - - - -# while True: -# input("Enter to trigger the event: ") -# exit(): -# e.set() - # time.sleep(600 diff --git a/install.sh~ b/install.sh~ deleted file mode 100644 index 4a47564..0000000 --- a/install.sh~ +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash - -sudo sed -i "s/appName/$1/g" /usr/local/sdt/app/$1/AWX_collector.service -sudo cp /usr/local/sdt/app/$1/AWX_collector.service /etc/systemd/system/$1.service -sudo systemctl start $1 -sudo systemctl enable $1