diff --git a/AWX_collector.py b/AWX_collector.py index c2bc9f0..3a24acb 100644 --- a/AWX_collector.py +++ b/AWX_collector.py @@ -13,7 +13,7 @@ import paho.mqtt.client as mqtt import numpy as np from multiprocessing import shared_memory import argparse - +import random ''' 전달해야할 데이터 들 @@ -26,8 +26,8 @@ tube 번호: 1 - 128 , chamber / slot 상태: 정상이냐, 초기화, 불량 -ER1: 1초동안 판별, 판별할 동안 counting 할 예정 2V 이하라면 CNT UP - if) er: 10이 되었음 (1초 유지) -> 0으로 변환, ER2 cnt up +ER1: 1초동안 판별, 판별할 동안 counting 할 예정 2V 이하라면 CNT UP + if) er: 10이 되었음 (1초 유지) -> 0으로 변환, ER2 cnt up if) 2v 이상인 경우가 발생했다 -> 0으로 변환 ER2: OFF 횟수 @@ -63,7 +63,7 @@ with open(f"{DEVICE_PATH}/config.yaml","r") as f: with open(f"{ROOT_PATH}/config.json","r") as f: #with open(f"{args.config}/config.json","r") as f: data = json.load(f) - + print(data) print(data["1"]) @@ -77,7 +77,7 @@ a = np.array(a) shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm") b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) -b[:] = a[:] +b[:] = a[:] ############################################################################### @@ -85,7 +85,7 @@ b[:] = a[:] ############################################################################### #MQTT_TOPIC = info['mqtt']['topic'] -MQTT_TOPIC = dev_info['assetcode'] +MQTT_TOPIC = f"/device-data/{dev_info['assetcode']}" MQTT_ID = info['mqtt']['id'] MQTT_PW = info['mqtt']['pw'] MQTT_HOST_IP = info['mqtt']['host_ip'] @@ -164,12 +164,15 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port prev_a_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] prev_b_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] - + + ai_array = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + random_index = random.randint(0, len(my_array) -1) + ai_array = [random.randint(1,50000) for _ in range(16)] #value list on -> do control data - #추후에 do power on 할 리스트는 어디선가 읽어서 와야 해요 + #추��에 do power on 할 리스트는 어디선가 읽어서 와야 해요 #do1 do2 나눠야 해요 - - + + ''' | AI1 | AI2 | DO1 | DO2 | @@ -178,14 +181,14 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port Success | 1 | 0 | x | x | Success | 1 | 1 | x | x | => 유효한 데이터 - + 최소 한개 이상 실패 했을 때 -> retry 해볼 것이냐 -> 데이터를 어떻게 할 것이냐 - -> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음 + -> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음 ''' - timenow_a = int(time.time()*1000) + timenow = int(time.time()*1000) isSuccess = True # PORT A @@ -197,27 +200,26 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port isSuccess = False if isSuccess: - prev_a_ai = a_ai_data + prev_a_ai = ai_array try: port_A_DO.write_bits(0, port_a_do_value) # suc_cnt_do = suc_cnt_do +1 except: None - - timenow_b = int(time.time()*1000) + # PORT B port_b_do_value = list(b[17:32]) try: b_ai_data=port_B_AI.read_registers(0x32, 32) - prev_b_ai = b_ai_data + prev_b_ai = ai_array # suc_cnt_ai = suc_cnt_ai +1 except: isSuccess = False if isSuccess: - prev_b_ai = b_ai_data - + prev_b_ai = ai_array + try: port_B_DO.write_bits(0, port_b_do_value) # suc_cnt_do = suc_cnt_do +1 @@ -225,15 +227,15 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port None if isSuccess: - queue_a.put([timenow_a,prev_a_ai, port_a_do_value]) - queue_b.put([timenow_b,prev_b_ai, port_b_do_value]) + queue_a.put([timenow,ai_array, port_a_do_value]) + queue_b.put([timenow,ai_array, port_b_do_value]) else: - queue_a.put([0,prev_a_ai, port_a_do_value]) - queue_b.put([0,prev_b_ai, port_b_do_value]) - + queue_a.put([0,ai_array, port_a_do_value]) + queue_b.put([0,ai_array, port_b_do_value]) + + - # lock.release() @@ -246,7 +248,7 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port ##데이터 생성 할 때 # ai1, ai2 # -> ai1 [32] -> [curr, volt, curr, volt ...] - + ''' data_for_list = { "slot" : 1, @@ -261,9 +263,9 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port } } lists.append(data_for_list) - ### 여기서 보냄 - # 10번 시그널 받으면 - # + ### 여기서 보냄 + # 10번 시그널 �으면 + # # list mqtt send # list clear if read_cnt_ch0 % 10 == 0: @@ -283,6 +285,10 @@ def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port # time.sleep(1) ''' +############################################################################### +# dummy data thread # +############################################################################### + ############################################################################### # data process thread # @@ -304,7 +310,7 @@ def data_process_thread(data_queue, ch, chamber): do_temp = [] for d in do: do_temp.append(int(d)) - do = do_temp.copy() + do = do_temp.copy() #================================================== @@ -323,7 +329,7 @@ def data_process_thread(data_queue, ch, chamber): else: voltage.append(int(i)) - + #전압값으로 error count check voltage_cnt = 0 for i in voltage: @@ -363,7 +369,7 @@ def data_process_thread(data_queue, ch, chamber): else: data_list_for_send_2.append(data) - + if queue_count == 10: queue_count = 0 mqtt_msg = {} @@ -373,8 +379,8 @@ def data_process_thread(data_queue, ch, chamber): "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) "dataType":"DATA", "data": { - "channel": ch, # 채널 입력 받자 - "chamber":chamber, #chamber 도 입력 받자 + "channel": ch, # 채널 입력 �자 + "chamber":chamber, #chamber 도 입력 �자 "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 "list": data_list_for_send_1 } @@ -387,8 +393,8 @@ def data_process_thread(data_queue, ch, chamber): "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) "dataType":"DATA", "data": { - "channel": ch, # 채널 입력 받자 - "chamber":chamber, #chamber 도 입력 받자 + "channel": ch, # 채널 입력 �자 + "chamber":chamber, #chamber 도 입력 �자 "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 "list": data_list_for_send_2 } @@ -396,7 +402,7 @@ def data_process_thread(data_queue, ch, chamber): queue_select_count = 0 data_list_for_send_1.clear() - + # print(type(mqtt_msg['data']['list'][0]['do'][0])) data_json_str = json.dumps(mqtt_msg, indent=4) # print(data_json_str) @@ -476,7 +482,7 @@ if __name__ == '__main__': chip0_evt = threading.Event() chip1_evt = threading.Event() - read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4개씩 연결되어 있어서 테스트 용 + read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4 개씩 연결되 어 있어서 테스트 용 read_data_chip1 = threading.Thread(target=cwt_thread,args=(chip1_evt, 1, 9,10, 13,14, ch3_queue, ch4_queue)) data_process_ch1 = threading.Thread(target=data_process_thread,args=(ch1_queue, 1, 1)) diff --git a/AWX_collector.py~ b/AWX_collector.py~ new file mode 100644 index 0000000..88b9715 --- /dev/null +++ b/AWX_collector.py~ @@ -0,0 +1,493 @@ + +import threading +import time +import logging +import serial +import minimalmodbus +import serial.rs485 +import subprocess +import json +import yaml +import queue +import paho.mqtt.client as mqtt +import numpy as np +from multiprocessing import shared_memory +import argparse + + +''' +전달해야할 데이터 들 +mes code | 상태 | 전압 | 전류 | 4pin (DO) | ER1 | ER2 +tube 번호| 상태 | volt | curr | do | cnt | cnt + +tube 번호: 1 - 128 , chamber / slot + - slot(n) - tube(n) + - 1-1 ~ 8-16 : 128개다 + +상태: 정상이냐, 초기화, 불량 + +ER1: 1초동안 판별, 판별할 동안 counting 할 예정 2V 이하라면 CNT UP + if) er: 10이 되었음 (1초 유지) -> 0으로 변환, ER2 cnt up + if) 2v 이상인 경우가 발생했다 -> 0으로 변환 + +ER2: OFF 횟수 + +''' + +parser = argparse.ArgumentParser() +parser.add_argument('-config',help='') +args = parser.parse_args() + +ROOT_PATH = f"/usr/local/sdt/app/{args.config}" +DEVICE_PATH = f"/etc/sdt/device-control" + +############################################################################### +# JSON FILE READ # +############################################################################### + +with open(f"{ROOT_PATH}/connect_info.json","r") as f: +#with open(f"{args.config}/connect_info.json","r") as f: + info = json.load(f) + +with open(f"{DEVICE_PATH}/config.yaml","r") as f: +#with open(f"{args.config}/connect_info.json","r") as f: + dev_info = yaml.load(f, Loader=yaml.FullLoader) + + +############################################################################### +# Shared memory setting # +############################################################################### + + + +with open(f"{ROOT_PATH}/config.json","r") as f: +#with open(f"{args.config}/config.json","r") as f: + data = json.load(f) + +print(data) +print(data["1"]) + +a = [ data[str(n+1)] for n in range(64)] +a = np.array(a) + + +############################################################################### +# Get Data from Shared Memory # +############################################################################### + +shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm") +b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) +b[:] = a[:] + + +############################################################################### +# MQTT Broker Setting # +############################################################################### + +#MQTT_TOPIC = info['mqtt']['topic'] +MQTT_TOPIC = dev_info['assetcode'] +MQTT_ID = info['mqtt']['id'] +MQTT_PW = info['mqtt']['pw'] +MQTT_HOST_IP = info['mqtt']['host_ip'] +MQTT_PORT = info['mqtt']['port'] + + +def publish_json_message(client, data): + topic = MQTT_TOPIC + client.publish(topic, payload=data) + + +client = mqtt.Client("python_pub") +client.username_pw_set(MQTT_ID, MQTT_PW) +client.connect(host=MQTT_HOST_IP,port=MQTT_PORT) + +client.loop(2) # timeout = 2초 + + + + +############################################################################### +# Serial setting # +############################################################################### + +_serial_port3 = serial.Serial("/dev/ttyMAX2", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) +_serial_port3.rs485_mode = serial.rs485.RS485Settings() +_serial_port4 = serial.Serial("/dev/ttyMAX3", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) +_serial_port4.rs485_mode = serial.rs485.RS485Settings() + + +######################################### thread 함수 작성 ################################################ + + + +############################################################################### +# Serial thread # +############################################################################### +isThreadRun = True + +def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port_b_do_id, queue_a, queue_b): + + print("index: ", chip_index) + print("a-ai: %d, a-do: %d, b-ai: %d, b-do: %d " % (port_a_ai_id, port_a_do_id, port_b_ai_id, port_b_do_id)) + if chip_index == 0: + serial_dev1 = "/dev/ttyMAX1" + serial_dev2 = "/dev/ttyMAX0" + else: + serial_dev1 = "/dev/ttyMAX2" + serial_dev2 = "/dev/ttyMAX3" + + print(serial_dev1) + print(serial_dev2) + _serial_port_A = serial.Serial(serial_dev1, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) + _serial_port_A.rs485_mode = serial.rs485.RS485Settings() + _serial_port_B = serial.Serial(serial_dev2, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) + _serial_port_B.rs485_mode = serial.rs485.RS485Settings() + + + port_A_AI = minimalmodbus.Instrument(serial_dev1, port_a_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_AI.serial.baudrate = 115200 # baudrate + + port_A_DO = minimalmodbus.Instrument(serial_dev1, port_a_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_DO.serial.baudrate = 115200 # baudrate + + port_B_AI = minimalmodbus.Instrument(serial_dev2, port_b_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_AI.serial.baudrate = 115200 # baudrate + + port_B_DO = minimalmodbus.Instrument(serial_dev2, port_b_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_DO.serial.baudrate = 115200 # baudrate + + + while isThreadRun: + evt.wait() + #print("read thread get even") + evt.clear() + + prev_a_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + prev_b_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + + #value list on -> do control data + #추후에 do power on 할 리스트는 어디선가 읽어서 와야 해요 + #do1 do2 나눠야 해요 + + + + ''' + | AI1 | AI2 | DO1 | DO2 | + Success | 0 | 0 | x | x | + Success | 0 | 1 | x | x | + Success | 1 | 0 | x | x | + Success | 1 | 1 | x | x | => 유효한 데이터 + + + 최소 한개 이상 실패 했을 때 + -> retry 해볼 것이냐 + + -> 데이터를 어떻게 할 것이냐 + -> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음 + ''' + timenow = int(time.time()*1000) + isSuccess = True + + # PORT A + port_a_do_value = list(b[:16]) + try: + a_ai_data=port_A_AI.read_registers(0x32, 32) + # suc_cnt_ai = suc_cnt_ai +1 + except: + isSuccess = False + + if isSuccess: + prev_a_ai = a_ai_data + + try: + port_A_DO.write_bits(0, port_a_do_value) + # suc_cnt_do = suc_cnt_do +1 + except: + None + + # PORT B + port_b_do_value = list(b[17:32]) + try: + b_ai_data=port_B_AI.read_registers(0x32, 32) + prev_b_ai = b_ai_data + # suc_cnt_ai = suc_cnt_ai +1 + except: + isSuccess = False + + if isSuccess: + prev_b_ai = b_ai_data + + try: + port_B_DO.write_bits(0, port_b_do_value) + # suc_cnt_do = suc_cnt_do +1 + except: + None + + if isSuccess: + queue_a.put([timenow,prev_a_ai, port_a_do_value]) + queue_b.put([timenow,prev_b_ai, port_b_do_value]) + else: + queue_a.put([0,prev_a_ai, port_a_do_value]) + queue_b.put([0,prev_b_ai, port_b_do_value]) + + + + + + + # lock.release() + + ''' + 리스트를 생성, 10개를 딱 잘라서 어떻게 해야 할ㅈ지 모르겠음 + + + ''' + ##데이터 생성 할 때 + # ai1, ai2 + # -> ai1 [32] -> [curr, volt, curr, volt ...] + + ''' + data_for_list = { + "slot" : 1, + "timestamp": int(timenow*1000), + "data":{ + "current": ai1, + "volatge": ai2, + "do1": value_list_ON, + "err1": value_list_ON, + "err2": value_list_ON + #err1, 2 + } + } + lists.append(data_for_list) + ### 여기서 보냄 + # 10번 시그널 받으면 + # + # list mqtt send + # list clear + if read_cnt_ch0 % 10 == 0: + data = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", + "dataType":"DATA", + "data": { + "chamber":1, + "list": lists + } + } + + data_json_str = json.dumps(data, indent=0) + publish_json_message(client, data_json_str) + #print(data_json_str) + # time.sleep(1) + ''' + + +############################################################################### +# data process thread # +############################################################################### + + +def data_process_thread(data_queue, ch, chamber): + error_cnt_1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + error_cnt_2 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + data_list_for_send_1=[] + data_list_for_send_2=[] + queue_count = 0 + queue_select_count = 0 + while isThreadRun: + # 0.1s 마다 발생할 것으로 예상 + timestamp, ai, do = data_queue.get() + + #================================================== + do_temp = [] + for d in do: + do_temp.append(int(d)) + do = do_temp.copy() + #================================================== + + + #print("timestamp: {timestamp}, \n ai: {ai}, \n do: {do}" .format(timestamp=timestamp, ai=ai, do=do)) + + #ai data -> current, voltage 전환 + queue_count = queue_count + 1 + current = [] + voltage = [] + ai_index = 0 + for i in ai: + #짝수 current (확인 필요) + ai_index = ai_index +1 + if (ai_index%2) == 0: + current.append(int(i)) + else: + voltage.append(int(i)) + + + #전압값으로 error count check + voltage_cnt = 0 + for i in voltage: + if i < 2: + #count up + error_cnt_1[voltage_cnt] = error_cnt_1[voltage_cnt] +1 + if error_cnt_1[voltage_cnt] == 10: + error_cnt_1[voltage_cnt] = 0 + error_cnt_2[voltage_cnt] = error_cnt_2[voltage_cnt] + 1 + #shared memory do control modify + #ch, tube number->off + else: + # count reset + error_cnt_1[voltage_cnt] = 0 + + + + voltage_cnt = voltage_cnt + 1 + + # 로깅 기능을 넣어야 겠다...? + # 파일 하나 열어서, 데이터를 쓴다 + + # 데이터 만들어서 전달 + if timestamp > 0: + data = { + "timestamp" : int(timestamp), + "data": { + "current" : current, + "voltage" : voltage, + "do" : do, + "er1" : error_cnt_1, + "er2" : error_cnt_2 + } + } + if(queue_select_count == 0): + data_list_for_send_1.append(data) + else: + data_list_for_send_2.append(data) + + + if queue_count == 10: + queue_count = 0 + mqtt_msg = {} + if (queue_select_count == 0): + mqtt_msg = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) + "dataType":"DATA", + "data": { + "channel": ch, # 채널 입력 받자 + "chamber":chamber, #chamber 도 입력 받자 + "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 + "list": data_list_for_send_1 + } + } + queue_select_count = 1 + data_list_for_send_2.clear() + else: + mqtt_msg = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) + "dataType":"DATA", + "data": { + "channel": ch, # 채널 입력 받자 + "chamber":chamber, #chamber 도 입력 받자 + "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 + "list": data_list_for_send_2 + } + } + queue_select_count = 0 + data_list_for_send_1.clear() + + +# print(type(mqtt_msg['data']['list'][0]['do'][0])) + data_json_str = json.dumps(mqtt_msg, indent=4) +# print(data_json_str) +# print(data_json_str) + publish_json_message(client, data_json_str) + + ##erro cnt check routine + + + + + + +######################################### thread 함수 작성 ################################################ + + +#event는 개별 생성을 해준다 th1, th2 + + +def timer_event_loop(e1, e2): + cnt = 0 + # global read_cnt + + while True: + # print("event set, meanning : periodic sensing") + e1.set() + e2.set() + time.sleep(0.1) + + #cnt = cnt +1 + #if cnt > 6000: + # print("chip0 read count:", read_cnt_ch0) + #print("\r\nchip1 read count:", read_cnt_ch0) + #print("\r\nai1:", suc_cnt_ai, "\r\nai2:", suc_cnt_ai2) + #print(ai2) + #break + + #print(cnt) +# print(read_cnt) + exit() +# timer = threading.Timer(3, lambda: print("test event")) +# timer.start() +# timer.join() +# # timer = threading.Timer(3, lambda: e.set()) + + +################################# 작성 중 ##################################### +# def publish_json_message(client, data): +# topic = "devicedata/axr/nodeq1" +# client.publish(topic, payload=data) + + +# client = mqtt.Client("python_pub") +# client.username_pw_set("sdt", "251327") +# client.connect(host="13.209.39.139",port=32259) + +# # get_value() + + +# data = json.dumps(dict_1) +# #publish_json_message(client, data) + +# client.loop(2) # timeout = 2초 + +################################# 작성 중 ###################################### + +if __name__ == '__main__': + ############################################################################### + # Queue x 4 max- infinite # + ############################################################################### + + ch1_queue = queue.Queue() + ch2_queue = queue.Queue() + ch3_queue = queue.Queue() + ch4_queue = queue.Queue() + + chip0_evt = threading.Event() + chip1_evt = threading.Event() + + read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4개씩 연결되어 있어서 테스트 용 + read_data_chip1 = threading.Thread(target=cwt_thread,args=(chip1_evt, 1, 9,10, 13,14, ch3_queue, ch4_queue)) + + data_process_ch1 = threading.Thread(target=data_process_thread,args=(ch1_queue, 1, 1)) + data_process_ch2 = threading.Thread(target=data_process_thread,args=(ch2_queue, 2, 1)) + data_process_ch3 = threading.Thread(target=data_process_thread,args=(ch3_queue, 3, 1)) + data_process_ch4 = threading.Thread(target=data_process_thread,args=(ch4_queue, 4, 1)) + + event_thread = threading.Thread(target=timer_event_loop, args=(chip0_evt,chip1_evt,)) + read_data_chip0.start() + read_data_chip1.start() + data_process_ch1.start() + data_process_ch2.start() + data_process_ch3.start() + data_process_ch4.start() + event_thread.start() diff --git a/AWX_collector_1.py b/AWX_collector_1.py new file mode 100644 index 0000000..f276c68 --- /dev/null +++ b/AWX_collector_1.py @@ -0,0 +1,493 @@ + +import threading +import time +import logging +import serial +import minimalmodbus +import serial.rs485 +import subprocess +import json +import yaml +import queue +import paho.mqtt.client as mqtt +import numpy as np +from multiprocessing import shared_memory +import argparse + + +''' +전달해야할 데이터 들 +mes code | 상태 | 전압 | 전류 | 4pin (DO) | ER1 | ER2 +tube 번호| 상태 | volt | curr | do | cnt | cnt + +tube 번호: 1 - 128 , chamber / slot + - slot(n) - tube(n) + - 1-1 ~ 8-16 : 128개다 + +상태: 정상이냐, 초기화, 불량 + +ER1: 1초동안 판별, 판별할 동안 counting 할 예정 2V 이하라면 CNT UP + if) er: 10이 되었음 (1초 유지) -> 0으로 변환, ER2 cnt up + if) 2v 이상인 경우가 발생했다 -> 0으로 변환 + +ER2: OFF 횟수 + +''' + +parser = argparse.ArgumentParser() +parser.add_argument('-config',help='') +args = parser.parse_args() + +ROOT_PATH = f"/usr/local/sdt/app/{args.config}" +DEVICE_PATH = f"/etc/sdt/device-control" + +############################################################################### +# JSON FILE READ # +############################################################################### + +with open(f"{ROOT_PATH}/connect_info.json","r") as f: +#with open(f"{args.config}/connect_info.json","r") as f: + info = json.load(f) + +with open(f"{DEVICE_PATH}/config.yaml","r") as f: +#with open(f"{args.config}/connect_info.json","r") as f: + dev_info = yaml.load(f, Loader=yaml.FullLoader) + + +############################################################################### +# Shared memory setting # +############################################################################### + + + +with open(f"{ROOT_PATH}/config.json","r") as f: +#with open(f"{args.config}/config.json","r") as f: + data = json.load(f) + +print(data) +print(data["1"]) + +a = [ data[str(n+1)] for n in range(64)] +a = np.array(a) + + +############################################################################### +# Get Data from Shared Memory # +############################################################################### + +shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm") +b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) +b[:] = a[:] + + +############################################################################### +# MQTT Broker Setting # +############################################################################### + +#MQTT_TOPIC = info['mqtt']['topic'] +MQTT_TOPIC = f"/device-data/{dev_info['assetcode']}" +MQTT_ID = info['mqtt']['id'] +MQTT_PW = info['mqtt']['pw'] +MQTT_HOST_IP = info['mqtt']['host_ip'] +MQTT_PORT = info['mqtt']['port'] + + +def publish_json_message(client, data): + topic = MQTT_TOPIC + client.publish(topic, payload=data) + + +client = mqtt.Client("python_pub") +client.username_pw_set(MQTT_ID, MQTT_PW) +client.connect(host=MQTT_HOST_IP,port=MQTT_PORT) + +client.loop(2) # timeout = 2초 + + + + +############################################################################### +# Serial setting # +############################################################################### + +_serial_port3 = serial.Serial("/dev/ttyMAX2", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) +_serial_port3.rs485_mode = serial.rs485.RS485Settings() +_serial_port4 = serial.Serial("/dev/ttyMAX3", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) +_serial_port4.rs485_mode = serial.rs485.RS485Settings() + + +######################################### thread 함수 작성 ################################################ + + + +############################################################################### +# Serial thread # +############################################################################### +isThreadRun = True + +def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port_b_do_id, queue_a, queue_b): + + print("index: ", chip_index) + print("a-ai: %d, a-do: %d, b-ai: %d, b-do: %d " % (port_a_ai_id, port_a_do_id, port_b_ai_id, port_b_do_id)) + if chip_index == 0: + serial_dev1 = "/dev/ttyMAX1" + serial_dev2 = "/dev/ttyMAX0" + else: + serial_dev1 = "/dev/ttyMAX2" + serial_dev2 = "/dev/ttyMAX3" + + print(serial_dev1) + print(serial_dev2) + _serial_port_A = serial.Serial(serial_dev1, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) + _serial_port_A.rs485_mode = serial.rs485.RS485Settings() + _serial_port_B = serial.Serial(serial_dev2, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) + _serial_port_B.rs485_mode = serial.rs485.RS485Settings() + + + port_A_AI = minimalmodbus.Instrument(serial_dev1, port_a_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_AI.serial.baudrate = 115200 # baudrate + + port_A_DO = minimalmodbus.Instrument(serial_dev1, port_a_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_DO.serial.baudrate = 115200 # baudrate + + port_B_AI = minimalmodbus.Instrument(serial_dev2, port_b_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_AI.serial.baudrate = 115200 # baudrate + + port_B_DO = minimalmodbus.Instrument(serial_dev2, port_b_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_DO.serial.baudrate = 115200 # baudrate + + + while isThreadRun: + evt.wait() + #print("read thread get even") + evt.clear() + + prev_a_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + prev_b_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + + #value list on -> do control data + #추후에 do power on 할 리스트는 어디선가 읽어서 와야 해요 + #do1 do2 나눠야 해요 + + + + ''' + | AI1 | AI2 | DO1 | DO2 | + Success | 0 | 0 | x | x | + Success | 0 | 1 | x | x | + Success | 1 | 0 | x | x | + Success | 1 | 1 | x | x | => 유효한 데이터 + + + 최소 한개 이상 실패 했을 때 + -> retry 해볼 것이냐 + + -> 데이터를 어떻게 할 것이냐 + -> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음 + ''' + timenow = int(time.time()*1000) + isSuccess = True + + # PORT A + port_a_do_value = list(b[:16]) + try: + a_ai_data=port_A_AI.read_registers(0x32, 32) + # suc_cnt_ai = suc_cnt_ai +1 + except: + isSuccess = False + + if isSuccess: + prev_a_ai = a_ai_data + + try: + port_A_DO.write_bits(0, port_a_do_value) + # suc_cnt_do = suc_cnt_do +1 + except: + None + + # PORT B + port_b_do_value = list(b[17:32]) + try: + b_ai_data=port_B_AI.read_registers(0x32, 32) + prev_b_ai = b_ai_data + # suc_cnt_ai = suc_cnt_ai +1 + except: + isSuccess = False + + if isSuccess: + prev_b_ai = b_ai_data + + try: + port_B_DO.write_bits(0, port_b_do_value) + # suc_cnt_do = suc_cnt_do +1 + except: + None + + if isSuccess: + queue_a.put([timenow,prev_a_ai, port_a_do_value]) + queue_b.put([timenow,prev_b_ai, port_b_do_value]) + else: + queue_a.put([0,prev_a_ai, port_a_do_value]) + queue_b.put([0,prev_b_ai, port_b_do_value]) + + + + + + + # lock.release() + + ''' + 리스트를 생성, 10개를 딱 잘라서 어떻게 해야 할ㅈ지 모르겠음 + + + ''' + ##데이터 생성 할 때 + # ai1, ai2 + # -> ai1 [32] -> [curr, volt, curr, volt ...] + + ''' + data_for_list = { + "slot" : 1, + "timestamp": int(timenow*1000), + "data":{ + "current": ai1, + "volatge": ai2, + "do1": value_list_ON, + "err1": value_list_ON, + "err2": value_list_ON + #err1, 2 + } + } + lists.append(data_for_list) + ### 여기서 보냄 + # 10번 시그널 받으면 + # + # list mqtt send + # list clear + if read_cnt_ch0 % 10 == 0: + data = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", + "dataType":"DATA", + "data": { + "chamber":1, + "list": lists + } + } + + data_json_str = json.dumps(data, indent=0) + publish_json_message(client, data_json_str) + #print(data_json_str) + # time.sleep(1) + ''' + + +############################################################################### +# data process thread # +############################################################################### + + +def data_process_thread(data_queue, ch, chamber): + error_cnt_1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + error_cnt_2 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + data_list_for_send_1=[] + data_list_for_send_2=[] + queue_count = 0 + queue_select_count = 0 + while isThreadRun: + # 0.1s 마다 발생할 것으로 예상 + timestamp, ai, do = data_queue.get() + + #================================================== + do_temp = [] + for d in do: + do_temp.append(int(d)) + do = do_temp.copy() + #================================================== + + + #print("timestamp: {timestamp}, \n ai: {ai}, \n do: {do}" .format(timestamp=timestamp, ai=ai, do=do)) + + #ai data -> current, voltage 전환 + queue_count = queue_count + 1 + current = [] + voltage = [] + ai_index = 0 + for i in ai: + #짝수 current (확인 필요) + ai_index = ai_index +1 + if (ai_index%2) == 0: + current.append(int(i)) + else: + voltage.append(int(i)) + + + #전압값으로 error count check + voltage_cnt = 0 + for i in voltage: + if i < 2: + #count up + error_cnt_1[voltage_cnt] = error_cnt_1[voltage_cnt] +1 + if error_cnt_1[voltage_cnt] == 10: + error_cnt_1[voltage_cnt] = 0 + error_cnt_2[voltage_cnt] = error_cnt_2[voltage_cnt] + 1 + #shared memory do control modify + #ch, tube number->off + else: + # count reset + error_cnt_1[voltage_cnt] = 0 + + + + voltage_cnt = voltage_cnt + 1 + + # 로깅 기능을 넣어야 겠다...? + # 파일 하나 열어서, 데이터를 쓴다 + + # 데이터 만들어서 전달 + if timestamp > 0: + data = { + "timestamp" : int(timestamp), + "data": { + "current" : current, + "voltage" : voltage, + "do" : do, + "er1" : error_cnt_1, + "er2" : error_cnt_2 + } + } + if(queue_select_count == 0): + data_list_for_send_1.append(data) + else: + data_list_for_send_2.append(data) + + + if queue_count == 10: + queue_count = 0 + mqtt_msg = {} + if (queue_select_count == 0): + mqtt_msg = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) + "dataType":"DATA", + "data": { + "channel": ch, # 채널 입력 받자 + "chamber":chamber, #chamber 도 입력 받자 + "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 + "list": data_list_for_send_1 + } + } + queue_select_count = 1 + data_list_for_send_2.clear() + else: + mqtt_msg = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) + "dataType":"DATA", + "data": { + "channel": ch, # 채널 입력 받자 + "chamber":chamber, #chamber 도 입력 받자 + "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 + "list": data_list_for_send_2 + } + } + queue_select_count = 0 + data_list_for_send_1.clear() + + +# print(type(mqtt_msg['data']['list'][0]['do'][0])) + data_json_str = json.dumps(mqtt_msg, indent=4) +# print(data_json_str) +# print(data_json_str) + publish_json_message(client, data_json_str) + + ##erro cnt check routine + + + + + + +######################################### thread 함수 작성 ################################################ + + +#event는 개별 생성을 해준다 th1, th2 + + +def timer_event_loop(e1, e2): + cnt = 0 + # global read_cnt + + while True: + # print("event set, meanning : periodic sensing") + e1.set() + e2.set() + time.sleep(0.1) + + #cnt = cnt +1 + #if cnt > 6000: + # print("chip0 read count:", read_cnt_ch0) + #print("\r\nchip1 read count:", read_cnt_ch0) + #print("\r\nai1:", suc_cnt_ai, "\r\nai2:", suc_cnt_ai2) + #print(ai2) + #break + + #print(cnt) +# print(read_cnt) + exit() +# timer = threading.Timer(3, lambda: print("test event")) +# timer.start() +# timer.join() +# # timer = threading.Timer(3, lambda: e.set()) + + +################################# 작성 중 ##################################### +# def publish_json_message(client, data): +# topic = "devicedata/axr/nodeq1" +# client.publish(topic, payload=data) + + +# client = mqtt.Client("python_pub") +# client.username_pw_set("sdt", "251327") +# client.connect(host="13.209.39.139",port=32259) + +# # get_value() + + +# data = json.dumps(dict_1) +# #publish_json_message(client, data) + +# client.loop(2) # timeout = 2초 + +################################# 작성 중 ###################################### + +if __name__ == '__main__': + ############################################################################### + # Queue x 4 max- infinite # + ############################################################################### + + ch1_queue = queue.Queue() + ch2_queue = queue.Queue() + ch3_queue = queue.Queue() + ch4_queue = queue.Queue() + + chip0_evt = threading.Event() + chip1_evt = threading.Event() + + read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4개씩 연결되어 있어서 테스트 용 + read_data_chip1 = threading.Thread(target=cwt_thread,args=(chip1_evt, 1, 9,10, 13,14, ch3_queue, ch4_queue)) + + data_process_ch1 = threading.Thread(target=data_process_thread,args=(ch1_queue, 1, 1)) + data_process_ch2 = threading.Thread(target=data_process_thread,args=(ch2_queue, 2, 1)) + data_process_ch3 = threading.Thread(target=data_process_thread,args=(ch3_queue, 3, 1)) + data_process_ch4 = threading.Thread(target=data_process_thread,args=(ch4_queue, 4, 1)) + + event_thread = threading.Thread(target=timer_event_loop, args=(chip0_evt,chip1_evt,)) + read_data_chip0.start() + read_data_chip1.start() + data_process_ch1.start() + data_process_ch2.start() + data_process_ch3.start() + data_process_ch4.start() + event_thread.start() diff --git a/AWX_collector_copy.py b/AWX_collector_copy.py new file mode 100644 index 0000000..f276c68 --- /dev/null +++ b/AWX_collector_copy.py @@ -0,0 +1,493 @@ + +import threading +import time +import logging +import serial +import minimalmodbus +import serial.rs485 +import subprocess +import json +import yaml +import queue +import paho.mqtt.client as mqtt +import numpy as np +from multiprocessing import shared_memory +import argparse + + +''' +전달해야할 데이터 들 +mes code | 상태 | 전압 | 전류 | 4pin (DO) | ER1 | ER2 +tube 번호| 상태 | volt | curr | do | cnt | cnt + +tube 번호: 1 - 128 , chamber / slot + - slot(n) - tube(n) + - 1-1 ~ 8-16 : 128개다 + +상태: 정상이냐, 초기화, 불량 + +ER1: 1초동안 판별, 판별할 동안 counting 할 예정 2V 이하라면 CNT UP + if) er: 10이 되었음 (1초 유지) -> 0으로 변환, ER2 cnt up + if) 2v 이상인 경우가 발생했다 -> 0으로 변환 + +ER2: OFF 횟수 + +''' + +parser = argparse.ArgumentParser() +parser.add_argument('-config',help='') +args = parser.parse_args() + +ROOT_PATH = f"/usr/local/sdt/app/{args.config}" +DEVICE_PATH = f"/etc/sdt/device-control" + +############################################################################### +# JSON FILE READ # +############################################################################### + +with open(f"{ROOT_PATH}/connect_info.json","r") as f: +#with open(f"{args.config}/connect_info.json","r") as f: + info = json.load(f) + +with open(f"{DEVICE_PATH}/config.yaml","r") as f: +#with open(f"{args.config}/connect_info.json","r") as f: + dev_info = yaml.load(f, Loader=yaml.FullLoader) + + +############################################################################### +# Shared memory setting # +############################################################################### + + + +with open(f"{ROOT_PATH}/config.json","r") as f: +#with open(f"{args.config}/config.json","r") as f: + data = json.load(f) + +print(data) +print(data["1"]) + +a = [ data[str(n+1)] for n in range(64)] +a = np.array(a) + + +############################################################################### +# Get Data from Shared Memory # +############################################################################### + +shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm") +b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) +b[:] = a[:] + + +############################################################################### +# MQTT Broker Setting # +############################################################################### + +#MQTT_TOPIC = info['mqtt']['topic'] +MQTT_TOPIC = f"/device-data/{dev_info['assetcode']}" +MQTT_ID = info['mqtt']['id'] +MQTT_PW = info['mqtt']['pw'] +MQTT_HOST_IP = info['mqtt']['host_ip'] +MQTT_PORT = info['mqtt']['port'] + + +def publish_json_message(client, data): + topic = MQTT_TOPIC + client.publish(topic, payload=data) + + +client = mqtt.Client("python_pub") +client.username_pw_set(MQTT_ID, MQTT_PW) +client.connect(host=MQTT_HOST_IP,port=MQTT_PORT) + +client.loop(2) # timeout = 2초 + + + + +############################################################################### +# Serial setting # +############################################################################### + +_serial_port3 = serial.Serial("/dev/ttyMAX2", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) +_serial_port3.rs485_mode = serial.rs485.RS485Settings() +_serial_port4 = serial.Serial("/dev/ttyMAX3", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) +_serial_port4.rs485_mode = serial.rs485.RS485Settings() + + +######################################### thread 함수 작성 ################################################ + + + +############################################################################### +# Serial thread # +############################################################################### +isThreadRun = True + +def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port_b_do_id, queue_a, queue_b): + + print("index: ", chip_index) + print("a-ai: %d, a-do: %d, b-ai: %d, b-do: %d " % (port_a_ai_id, port_a_do_id, port_b_ai_id, port_b_do_id)) + if chip_index == 0: + serial_dev1 = "/dev/ttyMAX1" + serial_dev2 = "/dev/ttyMAX0" + else: + serial_dev1 = "/dev/ttyMAX2" + serial_dev2 = "/dev/ttyMAX3" + + print(serial_dev1) + print(serial_dev2) + _serial_port_A = serial.Serial(serial_dev1, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) + _serial_port_A.rs485_mode = serial.rs485.RS485Settings() + _serial_port_B = serial.Serial(serial_dev2, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) + _serial_port_B.rs485_mode = serial.rs485.RS485Settings() + + + port_A_AI = minimalmodbus.Instrument(serial_dev1, port_a_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_AI.serial.baudrate = 115200 # baudrate + + port_A_DO = minimalmodbus.Instrument(serial_dev1, port_a_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_A_DO.serial.baudrate = 115200 # baudrate + + port_B_AI = minimalmodbus.Instrument(serial_dev2, port_b_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_AI.serial.baudrate = 115200 # baudrate + + port_B_DO = minimalmodbus.Instrument(serial_dev2, port_b_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) + port_B_DO.serial.baudrate = 115200 # baudrate + + + while isThreadRun: + evt.wait() + #print("read thread get even") + evt.clear() + + prev_a_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + prev_b_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + + #value list on -> do control data + #추후에 do power on 할 리스트는 어디선가 읽어서 와야 해요 + #do1 do2 나눠야 해요 + + + + ''' + | AI1 | AI2 | DO1 | DO2 | + Success | 0 | 0 | x | x | + Success | 0 | 1 | x | x | + Success | 1 | 0 | x | x | + Success | 1 | 1 | x | x | => 유효한 데이터 + + + 최소 한개 이상 실패 했을 때 + -> retry 해볼 것이냐 + + -> 데이터를 어떻게 할 것이냐 + -> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음 + ''' + timenow = int(time.time()*1000) + isSuccess = True + + # PORT A + port_a_do_value = list(b[:16]) + try: + a_ai_data=port_A_AI.read_registers(0x32, 32) + # suc_cnt_ai = suc_cnt_ai +1 + except: + isSuccess = False + + if isSuccess: + prev_a_ai = a_ai_data + + try: + port_A_DO.write_bits(0, port_a_do_value) + # suc_cnt_do = suc_cnt_do +1 + except: + None + + # PORT B + port_b_do_value = list(b[17:32]) + try: + b_ai_data=port_B_AI.read_registers(0x32, 32) + prev_b_ai = b_ai_data + # suc_cnt_ai = suc_cnt_ai +1 + except: + isSuccess = False + + if isSuccess: + prev_b_ai = b_ai_data + + try: + port_B_DO.write_bits(0, port_b_do_value) + # suc_cnt_do = suc_cnt_do +1 + except: + None + + if isSuccess: + queue_a.put([timenow,prev_a_ai, port_a_do_value]) + queue_b.put([timenow,prev_b_ai, port_b_do_value]) + else: + queue_a.put([0,prev_a_ai, port_a_do_value]) + queue_b.put([0,prev_b_ai, port_b_do_value]) + + + + + + + # lock.release() + + ''' + 리스트를 생성, 10개를 딱 잘라서 어떻게 해야 할ㅈ지 모르겠음 + + + ''' + ##데이터 생성 할 때 + # ai1, ai2 + # -> ai1 [32] -> [curr, volt, curr, volt ...] + + ''' + data_for_list = { + "slot" : 1, + "timestamp": int(timenow*1000), + "data":{ + "current": ai1, + "volatge": ai2, + "do1": value_list_ON, + "err1": value_list_ON, + "err2": value_list_ON + #err1, 2 + } + } + lists.append(data_for_list) + ### 여기서 보냄 + # 10번 시그널 받으면 + # + # list mqtt send + # list clear + if read_cnt_ch0 % 10 == 0: + data = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", + "dataType":"DATA", + "data": { + "chamber":1, + "list": lists + } + } + + data_json_str = json.dumps(data, indent=0) + publish_json_message(client, data_json_str) + #print(data_json_str) + # time.sleep(1) + ''' + + +############################################################################### +# data process thread # +############################################################################### + + +def data_process_thread(data_queue, ch, chamber): + error_cnt_1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + error_cnt_2 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + data_list_for_send_1=[] + data_list_for_send_2=[] + queue_count = 0 + queue_select_count = 0 + while isThreadRun: + # 0.1s 마다 발생할 것으로 예상 + timestamp, ai, do = data_queue.get() + + #================================================== + do_temp = [] + for d in do: + do_temp.append(int(d)) + do = do_temp.copy() + #================================================== + + + #print("timestamp: {timestamp}, \n ai: {ai}, \n do: {do}" .format(timestamp=timestamp, ai=ai, do=do)) + + #ai data -> current, voltage 전환 + queue_count = queue_count + 1 + current = [] + voltage = [] + ai_index = 0 + for i in ai: + #짝수 current (확인 필요) + ai_index = ai_index +1 + if (ai_index%2) == 0: + current.append(int(i)) + else: + voltage.append(int(i)) + + + #전압값으로 error count check + voltage_cnt = 0 + for i in voltage: + if i < 2: + #count up + error_cnt_1[voltage_cnt] = error_cnt_1[voltage_cnt] +1 + if error_cnt_1[voltage_cnt] == 10: + error_cnt_1[voltage_cnt] = 0 + error_cnt_2[voltage_cnt] = error_cnt_2[voltage_cnt] + 1 + #shared memory do control modify + #ch, tube number->off + else: + # count reset + error_cnt_1[voltage_cnt] = 0 + + + + voltage_cnt = voltage_cnt + 1 + + # 로깅 기능을 넣어야 겠다...? + # 파일 하나 열어서, 데이터를 쓴다 + + # 데이터 만들어서 전달 + if timestamp > 0: + data = { + "timestamp" : int(timestamp), + "data": { + "current" : current, + "voltage" : voltage, + "do" : do, + "er1" : error_cnt_1, + "er2" : error_cnt_2 + } + } + if(queue_select_count == 0): + data_list_for_send_1.append(data) + else: + data_list_for_send_2.append(data) + + + if queue_count == 10: + queue_count = 0 + mqtt_msg = {} + if (queue_select_count == 0): + mqtt_msg = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) + "dataType":"DATA", + "data": { + "channel": ch, # 채널 입력 받자 + "chamber":chamber, #chamber 도 입력 받자 + "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 + "list": data_list_for_send_1 + } + } + queue_select_count = 1 + data_list_for_send_2.clear() + else: + mqtt_msg = { + "modelCode":"test4", + "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) + "dataType":"DATA", + "data": { + "channel": ch, # 채널 입력 받자 + "chamber":chamber, #chamber 도 입력 받자 + "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 + "list": data_list_for_send_2 + } + } + queue_select_count = 0 + data_list_for_send_1.clear() + + +# print(type(mqtt_msg['data']['list'][0]['do'][0])) + data_json_str = json.dumps(mqtt_msg, indent=4) +# print(data_json_str) +# print(data_json_str) + publish_json_message(client, data_json_str) + + ##erro cnt check routine + + + + + + +######################################### thread 함수 작성 ################################################ + + +#event는 개별 생성을 해준다 th1, th2 + + +def timer_event_loop(e1, e2): + cnt = 0 + # global read_cnt + + while True: + # print("event set, meanning : periodic sensing") + e1.set() + e2.set() + time.sleep(0.1) + + #cnt = cnt +1 + #if cnt > 6000: + # print("chip0 read count:", read_cnt_ch0) + #print("\r\nchip1 read count:", read_cnt_ch0) + #print("\r\nai1:", suc_cnt_ai, "\r\nai2:", suc_cnt_ai2) + #print(ai2) + #break + + #print(cnt) +# print(read_cnt) + exit() +# timer = threading.Timer(3, lambda: print("test event")) +# timer.start() +# timer.join() +# # timer = threading.Timer(3, lambda: e.set()) + + +################################# 작성 중 ##################################### +# def publish_json_message(client, data): +# topic = "devicedata/axr/nodeq1" +# client.publish(topic, payload=data) + + +# client = mqtt.Client("python_pub") +# client.username_pw_set("sdt", "251327") +# client.connect(host="13.209.39.139",port=32259) + +# # get_value() + + +# data = json.dumps(dict_1) +# #publish_json_message(client, data) + +# client.loop(2) # timeout = 2초 + +################################# 작성 중 ###################################### + +if __name__ == '__main__': + ############################################################################### + # Queue x 4 max- infinite # + ############################################################################### + + ch1_queue = queue.Queue() + ch2_queue = queue.Queue() + ch3_queue = queue.Queue() + ch4_queue = queue.Queue() + + chip0_evt = threading.Event() + chip1_evt = threading.Event() + + read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4개씩 연결되어 있어서 테스트 용 + read_data_chip1 = threading.Thread(target=cwt_thread,args=(chip1_evt, 1, 9,10, 13,14, ch3_queue, ch4_queue)) + + data_process_ch1 = threading.Thread(target=data_process_thread,args=(ch1_queue, 1, 1)) + data_process_ch2 = threading.Thread(target=data_process_thread,args=(ch2_queue, 2, 1)) + data_process_ch3 = threading.Thread(target=data_process_thread,args=(ch3_queue, 3, 1)) + data_process_ch4 = threading.Thread(target=data_process_thread,args=(ch4_queue, 4, 1)) + + event_thread = threading.Thread(target=timer_event_loop, args=(chip0_evt,chip1_evt,)) + read_data_chip0.start() + read_data_chip1.start() + data_process_ch1.start() + data_process_ch2.start() + data_process_ch3.start() + data_process_ch4.start() + event_thread.start() diff --git a/install.sh b/install.sh index f5f85f1..f0878bf 100644 --- a/install.sh +++ b/install.sh @@ -4,6 +4,7 @@ pip install -r requirements.txt sudo sed -i "s/appName/$1/g" /usr/local/sdt/app/$1/AWX_collector.service sudo cp /usr/local/sdt/app/$1/AWX_collector.service /etc/systemd/system/$1.service +sudo sed -i "s/appName/$1/g" /usr/local/sdt/app/$1/AWX_shm_updater.service sudo cp /usr/local/sdt/app/$1/AWX_shm_updater.service /etc/systemd/system/AWX_shm_updater.service sudo systemctl start $1 sudo systemctl enable $1