import threading import time import logging import serial import minimalmodbus import serial.rs485 import subprocess import json import queue import paho.mqtt.client as mqtt import numpy as np from multiprocessing import shared_memory import argparse ''' 전달해야할 데이터 들 mes code | 상태 | 전압 | 전류 | 4pin (DO) | ER1 | ER2 tube 번호| 상태 | volt | curr | do | cnt | cnt tube 번호: 1 - 128 , chamber / slot - slot(n) - tube(n) - 1-1 ~ 8-16 : 128개다 상태: 정상이냐, 초기화, 불량 ER1: 1초동안 판별, 판별할 동안 counting 할 예정 2V 이하라면 CNT UP if) er: 10이 되었음 (1초 유지) -> 0으로 변환, ER2 cnt up if) 2v 이상인 경우가 발생했다 -> 0으로 변환 ER2: OFF 횟수 ''' parser = argparse.ArgumentParser() parser.add_argument('-config',help='') args = parser.parse_args() ROOT_PATH = f"/usr/local/sdt/app/{args.config}" ############################################################################### # JSON FILE READ # ############################################################################### with open(f"{args.config}/connect_info.json","r") as f: info = json.load(f) ############################################################################### # Shared memory setting # ############################################################################### with open(f"{args.config}/config.json","r") as f: data = json.load(f) print(data) print(data["1"]) a = [ data[str(n+1)] for n in range(64)] a = np.array(a) ############################################################################### # Get Data from Shared Memory # ############################################################################### shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm") b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) b[:] = a[:] ############################################################################### # MQTT Broker Setting # ############################################################################### MQTT_TOPIC = info['mqtt']['topic'] MQTT_ID = info['mqtt']['id'] MQTT_PW = info['mqtt']['pw'] MQTT_HOST_IP = info['mqtt']['host_ip'] MQTT_PORT = info['mqtt']['port'] def publish_json_message(client, data): topic = MQTT_TOPIC client.publish(topic, payload=data) client = mqtt.Client("python_pub") client.username_pw_set(MQTT_ID, MQTT_PW) client.connect(host=MQTT_HOST_IP,port=MQTT_PORT) client.loop(2) # timeout = 2초 ############################################################################### # Serial setting # ############################################################################### _serial_port3 = serial.Serial("/dev/ttyMAX2", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) _serial_port3.rs485_mode = serial.rs485.RS485Settings() _serial_port4 = serial.Serial("/dev/ttyMAX3", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) _serial_port4.rs485_mode = serial.rs485.RS485Settings() ######################################### thread 함수 작성 ################################################ ############################################################################### # Serial thread # ############################################################################### isThreadRun = True def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port_b_do_id, queue_a, queue_b): print("index: ", chip_index) print("a-ai: %d, a-do: %d, b-ai: %d, b-do: %d " % (port_a_ai_id, port_a_do_id, port_b_ai_id, port_b_do_id)) if chip_index == 0: serial_dev1 = "/dev/ttyMAX1" serial_dev2 = "/dev/ttyMAX0" else: serial_dev1 = "/dev/ttyMAX2" serial_dev2 = "/dev/ttyMAX3" print(serial_dev1) print(serial_dev2) _serial_port_A = serial.Serial(serial_dev1, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) _serial_port_A.rs485_mode = serial.rs485.RS485Settings() _serial_port_B = serial.Serial(serial_dev2, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True) _serial_port_B.rs485_mode = serial.rs485.RS485Settings() port_A_AI = minimalmodbus.Instrument(serial_dev1, port_a_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) port_A_AI.serial.baudrate = 115200 # baudrate port_A_DO = minimalmodbus.Instrument(serial_dev1, port_a_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) port_A_DO.serial.baudrate = 115200 # baudrate port_B_AI = minimalmodbus.Instrument(serial_dev2, port_b_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) port_B_AI.serial.baudrate = 115200 # baudrate port_B_DO = minimalmodbus.Instrument(serial_dev2, port_b_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal) port_B_DO.serial.baudrate = 115200 # baudrate while isThreadRun: evt.wait() #print("read thread get even") evt.clear() prev_a_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] prev_b_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] #value list on -> do control data #추후에 do power on 할 리스트는 어디선가 읽어서 와야 해요 #do1 do2 나눠야 해요 ''' | AI1 | AI2 | DO1 | DO2 | Success | 0 | 0 | x | x | Success | 0 | 1 | x | x | Success | 1 | 0 | x | x | Success | 1 | 1 | x | x | => 유효한 데이터 최소 한개 이상 실패 했을 때 -> retry 해볼 것이냐 -> 데이터를 어떻게 할 것이냐 -> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음 ''' timenow = int(time.time()*1000) isSuccess = True # PORT A port_a_do_value = list(b[:16]) try: a_ai_data=port_A_AI.read_registers(0x32, 32) # suc_cnt_ai = suc_cnt_ai +1 except: isSuccess = False if isSuccess: prev_a_ai = a_ai_data try: port_A_DO.write_bits(0, port_a_do_value) # suc_cnt_do = suc_cnt_do +1 except: None # PORT B port_b_do_value = list(b[17:32]) try: b_ai_data=port_B_AI.read_registers(0x32, 32) prev_b_ai = b_ai_data # suc_cnt_ai = suc_cnt_ai +1 except: isSuccess = False if isSuccess: prev_b_ai = b_ai_data try: port_B_DO.write_bits(0, port_b_do_value) # suc_cnt_do = suc_cnt_do +1 except: None if isSuccess: queue_a.put([timenow,prev_a_ai, port_a_do_value]) queue_b.put([timenow,prev_b_ai, port_b_do_value]) else: queue_a.put([0,prev_a_ai, port_a_do_value]) queue_b.put([0,prev_b_ai, port_b_do_value]) # lock.release() ''' 리스트를 생성, 10개를 딱 잘라서 어떻게 해야 할ㅈ지 모르겠음 ''' ##데이터 생성 할 때 # ai1, ai2 # -> ai1 [32] -> [curr, volt, curr, volt ...] ''' data_for_list = { "slot" : 1, "timestamp": int(timenow*1000), "data":{ "current": ai1, "volatge": ai2, "do1": value_list_ON, "err1": value_list_ON, "err2": value_list_ON #err1, 2 } } lists.append(data_for_list) ### 여기서 보냄 # 10번 시그널 받으면 # # list mqtt send # list clear if read_cnt_ch0 % 10 == 0: data = { "modelCode":"test4", "assetCode":"NQ-R04-TEST-003", "dataType":"DATA", "data": { "chamber":1, "list": lists } } data_json_str = json.dumps(data, indent=0) publish_json_message(client, data_json_str) #print(data_json_str) # time.sleep(1) ''' ############################################################################### # data process thread # ############################################################################### def data_process_thread(data_queue, ch, chamber): error_cnt_1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] error_cnt_2 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] data_list_for_send_1=[] data_list_for_send_2=[] queue_count = 0 queue_select_count = 0 while isThreadRun: # 0.1s 마다 발생할 것으로 예상 timestamp, ai, do = data_queue.get() #================================================== do_temp = [] for d in do: do_temp.append(int(d)) do = do_temp.copy() #================================================== #print("timestamp: {timestamp}, \n ai: {ai}, \n do: {do}" .format(timestamp=timestamp, ai=ai, do=do)) #ai data -> current, voltage 전환 queue_count = queue_count + 1 current = [] voltage = [] ai_index = 0 for i in ai: #짝수 current (확인 필요) ai_index = ai_index +1 if (ai_index%2) == 0: current.append(int(i)) else: voltage.append(int(i)) #전압값으로 error count check voltage_cnt = 0 for i in voltage: if i < 2: #count up error_cnt_1[voltage_cnt] = error_cnt_1[voltage_cnt] +1 if error_cnt_1[voltage_cnt] == 10: error_cnt_1[voltage_cnt] = 0 error_cnt_2[voltage_cnt] = error_cnt_2[voltage_cnt] + 1 #shared memory do control modify #ch, tube number->off else: # count reset error_cnt_1[voltage_cnt] = 0 voltage_cnt = voltage_cnt + 1 # 로깅 기능을 넣어야 겠다...? # 파일 하나 열어서, 데이터를 쓴다 # 데이터 만들어서 전달 if timestamp > 0: data = { "timestamp" : int(timestamp), "data": { "current" : current, "voltage" : voltage, "do" : do, "er1" : error_cnt_1, "er2" : error_cnt_2 } } if(queue_select_count == 0): data_list_for_send_1.append(data) else: data_list_for_send_2.append(data) if queue_count == 10: queue_count = 0 mqtt_msg = {} if (queue_select_count == 0): mqtt_msg = { "modelCode":"test4", "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) "dataType":"DATA", "data": { "channel": ch, # 채널 입력 받자 "chamber":chamber, #chamber 도 입력 받자 "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 "list": data_list_for_send_1 } } queue_select_count = 1 data_list_for_send_2.clear() else: mqtt_msg = { "modelCode":"test4", "assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset) "dataType":"DATA", "data": { "channel": ch, # 채널 입력 받자 "chamber":chamber, #chamber 도 입력 받자 "slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다 "list": data_list_for_send_2 } } queue_select_count = 0 data_list_for_send_1.clear() # print(type(mqtt_msg['data']['list'][0]['do'][0])) data_json_str = json.dumps(mqtt_msg, indent=4) # print(data_json_str) # print(data_json_str) publish_json_message(client, data_json_str) ##erro cnt check routine ######################################### thread 함수 작성 ################################################ #event는 개별 생성을 해준다 th1, th2 def timer_event_loop(e1, e2): cnt = 0 # global read_cnt while True: # print("event set, meanning : periodic sensing") e1.set() e2.set() time.sleep(0.1) #cnt = cnt +1 #if cnt > 6000: # print("chip0 read count:", read_cnt_ch0) #print("\r\nchip1 read count:", read_cnt_ch0) #print("\r\nai1:", suc_cnt_ai, "\r\nai2:", suc_cnt_ai2) #print(ai2) #break #print(cnt) # print(read_cnt) exit() # timer = threading.Timer(3, lambda: print("test event")) # timer.start() # timer.join() # # timer = threading.Timer(3, lambda: e.set()) ################################# 작성 중 ##################################### # def publish_json_message(client, data): # topic = "devicedata/axr/nodeq1" # client.publish(topic, payload=data) # client = mqtt.Client("python_pub") # client.username_pw_set("sdt", "251327") # client.connect(host="13.209.39.139",port=32259) # # get_value() # data = json.dumps(dict_1) # #publish_json_message(client, data) # client.loop(2) # timeout = 2초 ################################# 작성 중 ###################################### if __name__ == '__main__': ############################################################################### # Queue x 4 max- infinite # ############################################################################### ch1_queue = queue.Queue() ch2_queue = queue.Queue() ch3_queue = queue.Queue() ch4_queue = queue.Queue() chip0_evt = threading.Event() chip1_evt = threading.Event() read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4개씩 연결되어 있어서 테스트 용 read_data_chip1 = threading.Thread(target=cwt_thread,args=(chip1_evt, 1, 9,10, 13,14, ch3_queue, ch4_queue)) data_process_ch1 = threading.Thread(target=data_process_thread,args=(ch1_queue, 1, 1)) data_process_ch2 = threading.Thread(target=data_process_thread,args=(ch2_queue, 2, 1)) data_process_ch3 = threading.Thread(target=data_process_thread,args=(ch3_queue, 3, 1)) data_process_ch4 = threading.Thread(target=data_process_thread,args=(ch4_queue, 4, 1)) event_thread = threading.Thread(target=timer_event_loop, args=(chip0_evt,chip1_evt,)) read_data_chip0.start() read_data_chip1.start() data_process_ch1.start() data_process_ch2.start() data_process_ch3.start() data_process_ch4.start() event_thread.start()