import threading import time import logging import serial import minimalmodbus import serial.rs485 import subprocess import json import yaml import queue import paho.mqtt.client as mqtt import numpy as np from multiprocessing import shared_memory import argparse import random ''' 전달해야할 데이터 들 mes code | 상태 | 전압 | 전류 | 4pin (DO) | ER1 | ER2 tube 번호| 상태 | volt | curr | do | cnt | cnt tube 번호: 1 - 128 , chamber / slot - slot(n) - tube(n) - 1-1 ~ 8-16 : 128개다 상태: 정상이냐, 초기화, 불량 ER1: 1초동안 판별, 판별할 동안 counting 할 예정 2V 이하라면 CNT UP if) er: 10이 되었음 (1초 유지) -> 0으로 변환, ER2 cnt up if) 2v 이상인 경우가 발생했다 -> 0으로 변환 ER2: OFF 횟수 ''' parser = argparse.ArgumentParser() parser.add_argument('-config',help='') args = parser.parse_args() ROOT_PATH = f"/usr/local/sdt/app/{args.config}" DEVICE_PATH = f"/etc/sdt/device-control" ############################################################################### # JSON FILE READ # ############################################################################### with open(f"{ROOT_PATH}/connect_info.json","r") as f: #with open(f"{args.config}/connect_info.json","r") as f: info = json.load(f) with open(f"{DEVICE_PATH}/config.yaml","r") as f: #with open(f"{args.config}/connect_info.json","r") as f: dev_info = yaml.load(f, Loader=yaml.FullLoader) ############################################################################### # Shared memory setting # ############################################################################### with open(f"{ROOT_PATH}/config.json","r") as f: #with open(f"{args.config}/config.json","r") as f: data = json.load(f) print(data) print(data["1"]) a = [ data[str(n+1)] for n in range(64)] a = np.array(a) ############################################################################### # Get Data from Shared Memory # ############################################################################### shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm") b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) b[:] = a[:] ############################################################################### # MQTT Broker Setting # ############################################################################### #MQTT_TOPIC = info['mqtt']['topic'] MQTT_TOPIC = f"/device-data/{dev_info['assetcode']}" MQTT_ID = info['mqtt']['id'] MQTT_PW = info['mqtt']['pw'] MQTT_HOST_IP = info['mqtt']['host_ip'] MQTT_PORT = info['mqtt']['port'] def publish_json_message(client, data): topic = MQTT_TOPIC client.publish(topic, payload=data) client = mqtt.Client("python_pub") client.username_pw_set(MQTT_ID, MQTT_PW) client.connect(host=MQTT_HOST_IP,port=MQTT_PORT) client.loop(2) # timeout = 2초 ############################################################################### # Serial setting # ############################################################################### _serial_port3 = serial.Serial("/dev/ttyMAX2", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) _serial_port3.rs485_mode = serial.rs485.RS485Settings() _serial_port4 = serial.Serial("/dev/ttyMAX3", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) _serial_port4.rs485_mode = serial.rs485.RS485Settings() ######################################### thread 함수 작성 ################################################ ############################################################################### # Serial thread # ############################################################################### isThreadRun = True def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port_b_do_id, queue_a, queue_b): print("index: ", chip_index) print("a-ai: %d, a-do: %d, b-ai: %d, b-do: %d " % (port_a_ai_id, port_a_do_id, port_b_ai_id, port_b_do_id)) if chip_index == 0: serial_dev1 = "/dev/ttyMAX1" serial_dev2 = "/dev/ttyMAX0" else: serial_dev1 = "/dev/ttyMAX2" serial_dev2 = "/dev/ttyMAX3" print(serial_dev1) print(serial_dev2) _serial_port_A = serial.Serial(serial_dev1, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) _serial_port_A.rs485_mode = serial.rs485.RS485Settings() _serial_port_B = serial.Serial(serial_dev2, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) _serial_port_B.rs485_mode = serial.rs485.RS485Settings() port_A_AI = minimalmodbus.Instrument(serial_dev1, port_a_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) port_A_AI.serial.baudrate = 115200 # baudrate port_A_DO = minimalmodbus.Instrument(serial_dev1, port_a_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) port_A_DO.serial.baudrate = 115200 # baudrate port_B_AI = minimalmodbus.Instrument(serial_dev2, port_b_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) port_B_AI.serial.baudrate = 115200 # baudrate port_B_DO = minimalmodbus.Instrument(serial_dev2, port_b_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) port_B_DO.serial.baudrate = 115200 # baudrate while isThreadRun: evt.wait() #print("read thread get even") evt.clear() prev_a_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] prev_b_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] ai_array = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] random_index = random.randint(0, len(my_array) -1) ai_array = [random.randint(1,50000) for _ in range(16)] #value list on -> do control data #추��에 do power on 할 리스트는 어디선가 읽어서 와야 해요 #do1 do2 나눠야 해요 ''' | AI1 | AI2 | DO1 | DO2 | Success | 0 | 0 | x | x | Success | 0 | 1 | x | x | Success | 1 | 0 | x | x | Success | 1 | 1 | x | x | => 유효한 데이터 최소 한개 이상 실패 했을 때 -> retry 해볼 것이냐 -> 데이터를 어떻게 할 것이냐 -> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음 ''' timenow = int(time.time()*1000) isSuccess = True # PORT A port_a_do_value = list(b[:16]) try: a_ai_data=port_A_AI.read_registers(0x32, 32) # suc_cnt_ai = suc_cnt_ai +1 except: isSuccess = False if isSuccess: prev_a_ai = ai_array try: port_A_DO.write_bits(0, port_a_do_value) # suc_cnt_do = suc_cnt_do +1 except: None # PORT B port_b_do_value = list(b[17:32]) try: b_ai_data=port_B_AI.read_registers(0x32, 32) prev_b_ai = ai_array # suc_cnt_ai = suc_cnt_ai +1 except: isSuccess = False if isSuccess: prev_b_ai = ai_array try: port_B_DO.write_bits(0, port_b_do_value) # suc_cnt_do = suc_cnt_do +1 except: None if isSuccess: queue_a.put([timenow,ai_array, port_a_do_value]) queue_b.put([timenow,ai_array, port_b_do_value]) else: queue_a.put([0,ai_array, port_a_do_value]) queue_b.put([0,ai_array, port_b_do_value]) # lock.release() ''' 리스트를 생성, 10개를 딱 잘라서 어떻게 해야 할ㅈ지 모르겠음 ''' ##데이터 생성 할 때 # ai1, ai2 # -> ai1 [32] -> [curr, volt, curr, volt ...] ''' data_for_list = { "slot" : 1, "timestamp": int(timenow*1000), "data":{ "current": ai1, "volatge": ai2, "do1": value_list_ON, "err1": value_list_ON, "err2": value_list_ON #err1, 2 } } lists.append(data_for_list) ### 여기서 보냄 # 10번 시그널 �으면 # # list mqtt send # list clear if read_cnt_ch0 % 10 == 0: data = { "modelCode":"test4", "assetCode":"NQ-R04-TEST-003", "dataType":"DATA", "data": { "chamber":1, "list": lists } } data_json_str = json.dumps(data, indent=0) publish_json_message(client, data_json_str) #print(data_json_str) # time.sleep(1) ''' ############################################################################### # dummy data thread # ############################################################################### ############################################################################### # data process thread # ############################################################################### def data_process_thread(data_queue, ch, chamber): error_cnt_1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] error_cnt_2 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] data_list_for_send_1=[] data_list_for_send_2=[] queue_count = 0 queue_select_count = 0 while isThreadRun: # 0.1s 마다 발생할 것으로 예상 timestamp, ai, do = data_queue.get() #================================================== do_temp = [] for d in do: do_temp.append(int(d)) do = do_temp.copy() #================================================== #print("timestamp: {timestamp}, \n ai: {ai}, \n do: {do}" .format(timestamp=timestamp, ai=ai, do=do)) #ai data -> current, voltage 전환 queue_count = queue_count + 1 current = [] voltage = [] ai_index = 0 for i in ai: #짝수 current (확인 필요) ai_index = ai_index +1 if (ai_index%2) == 0: current.append(int(i)) else: voltage.append(int(i)) #전압값으로 error count check voltage_cnt = 0 for i in voltage: if i < 2: #count up error_cnt_1[voltage_cnt] = error_cnt_1[voltage_cnt] +1 if error_cnt_1[voltage_cnt] == 10: error_cnt_1[voltage_cnt] = 0 error_cnt_2[voltage_cnt] = error_cnt_2[voltage_cnt] + 1 #shared memory do control modify #ch, tube number->off else: # count reset error_cnt_1[voltage_cnt] = 0 voltage_cnt = voltage_cnt + 1 # 로깅 기능을 넣어야 겠다...? # 파일 하나 열어서, 데이터를 쓴다 # 데이터 만들어서 전달 if timestamp > 0: data = { "timestamp" : int(timestamp), "data": { "current" : current, "voltage" : voltage, "do" : do, "er1" : error_cnt_1, "er2" : error_cnt_2 } } if(queue_select_count == 0): data_list_for_send_1.append(data) else: data_list_for_send_2.append(data) if queue_count == 10: queue_count = 0 mqtt_msg = {} if (queue_select_count == 0): mqtt_msg = { "modelCode":"test4", "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) "dataType":"DATA", "data": { "channel": ch, # 채널 입력 �자 "chamber":chamber, #chamber 도 입력 �자 "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 "list": data_list_for_send_1 } } queue_select_count = 1 data_list_for_send_2.clear() else: mqtt_msg = { "modelCode":"test4", "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) "dataType":"DATA", "data": { "channel": ch, # 채널 입력 �자 "chamber":chamber, #chamber 도 입력 �자 "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 "list": data_list_for_send_2 } } queue_select_count = 0 data_list_for_send_1.clear() # print(type(mqtt_msg['data']['list'][0]['do'][0])) data_json_str = json.dumps(mqtt_msg, indent=4) # print(data_json_str) # print(data_json_str) publish_json_message(client, data_json_str) ##erro cnt check routine ######################################### thread 함수 작성 ################################################ #event는 개별 생성을 해준다 th1, th2 def timer_event_loop(e1, e2): cnt = 0 # global read_cnt while True: # print("event set, meanning : periodic sensing") e1.set() e2.set() time.sleep(0.1) #cnt = cnt +1 #if cnt > 6000: # print("chip0 read count:", read_cnt_ch0) #print("\r\nchip1 read count:", read_cnt_ch0) #print("\r\nai1:", suc_cnt_ai, "\r\nai2:", suc_cnt_ai2) #print(ai2) #break #print(cnt) # print(read_cnt) exit() # timer = threading.Timer(3, lambda: print("test event")) # timer.start() # timer.join() # # timer = threading.Timer(3, lambda: e.set()) ################################# 작성 중 ##################################### # def publish_json_message(client, data): # topic = "devicedata/axr/nodeq1" # client.publish(topic, payload=data) # client = mqtt.Client("python_pub") # client.username_pw_set("sdt", "251327") # client.connect(host="13.209.39.139",port=32259) # # get_value() # data = json.dumps(dict_1) # #publish_json_message(client, data) # client.loop(2) # timeout = 2초 ################################# 작성 중 ###################################### if __name__ == '__main__': ############################################################################### # Queue x 4 max- infinite # ############################################################################### ch1_queue = queue.Queue() ch2_queue = queue.Queue() ch3_queue = queue.Queue() ch4_queue = queue.Queue() chip0_evt = threading.Event() chip1_evt = threading.Event() read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4 개씩 연결되 어 있어서 테스트 용 read_data_chip1 = threading.Thread(target=cwt_thread,args=(chip1_evt, 1, 9,10, 13,14, ch3_queue, ch4_queue)) data_process_ch1 = threading.Thread(target=data_process_thread,args=(ch1_queue, 1, 1)) data_process_ch2 = threading.Thread(target=data_process_thread,args=(ch2_queue, 2, 1)) data_process_ch3 = threading.Thread(target=data_process_thread,args=(ch3_queue, 3, 1)) data_process_ch4 = threading.Thread(target=data_process_thread,args=(ch4_queue, 4, 1)) event_thread = threading.Thread(target=timer_event_loop, args=(chip0_evt,chip1_evt,)) read_data_chip0.start() read_data_chip1.start() data_process_ch1.start() data_process_ch2.start() data_process_ch3.start() data_process_ch4.start() event_thread.start()