AWX_collector/AWX_collector_1.py

494 lines
16 KiB
Python

import threading
import time
import logging
import serial
import minimalmodbus
import serial.rs485
import subprocess
import json
import yaml
import queue
import paho.mqtt.client as mqtt
import numpy as np
from multiprocessing import shared_memory
import argparse
'''
전달해야할 데이터 들
mes code | 상태 | 전압 | 전류 | 4pin (DO) | ER1 | ER2
tube 번호| 상태 | volt | curr | do | cnt | cnt
tube 번호: 1 - 128 , chamber / slot
- slot(n) - tube(n)
- 1-1 ~ 8-16 : 128개다
상태: 정상이냐, 초기화, 불량
ER1: 1초동안 판별, 판별할 동안 counting 할 예정 2V 이하라면 CNT UP
if) er: 10이 되었음 (1초 유지) -> 0으로 변환, ER2 cnt up
if) 2v 이상인 경우가 발생했다 -> 0으로 변환
ER2: OFF 횟수
'''
parser = argparse.ArgumentParser()
parser.add_argument('-config',help='')
args = parser.parse_args()
ROOT_PATH = f"/usr/local/sdt/app/{args.config}"
DEVICE_PATH = f"/etc/sdt/device-control"
###############################################################################
# JSON FILE READ #
###############################################################################
with open(f"{ROOT_PATH}/connect_info.json","r") as f:
#with open(f"{args.config}/connect_info.json","r") as f:
info = json.load(f)
with open(f"{DEVICE_PATH}/config.yaml","r") as f:
#with open(f"{args.config}/connect_info.json","r") as f:
dev_info = yaml.load(f, Loader=yaml.FullLoader)
###############################################################################
# Shared memory setting #
###############################################################################
with open(f"{ROOT_PATH}/config.json","r") as f:
#with open(f"{args.config}/config.json","r") as f:
data = json.load(f)
print(data)
print(data["1"])
a = [ data[str(n+1)] for n in range(64)]
a = np.array(a)
###############################################################################
# Get Data from Shared Memory #
###############################################################################
shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name="DO-shm")
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]
###############################################################################
# MQTT Broker Setting #
###############################################################################
#MQTT_TOPIC = info['mqtt']['topic']
MQTT_TOPIC = f"/device-data/{dev_info['assetcode']}"
MQTT_ID = info['mqtt']['id']
MQTT_PW = info['mqtt']['pw']
MQTT_HOST_IP = info['mqtt']['host_ip']
MQTT_PORT = info['mqtt']['port']
def publish_json_message(client, data):
topic = MQTT_TOPIC
client.publish(topic, payload=data)
client = mqtt.Client("python_pub")
client.username_pw_set(MQTT_ID, MQTT_PW)
client.connect(host=MQTT_HOST_IP,port=MQTT_PORT)
client.loop(2) # timeout = 2초
###############################################################################
# Serial setting #
###############################################################################
_serial_port3 = serial.Serial("/dev/ttyMAX2", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True)
_serial_port3.rs485_mode = serial.rs485.RS485Settings()
_serial_port4 = serial.Serial("/dev/ttyMAX3", 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True)
_serial_port4.rs485_mode = serial.rs485.RS485Settings()
######################################### thread 함수 작성 ################################################
###############################################################################
# Serial thread #
###############################################################################
isThreadRun = True
def cwt_thread(evt, chip_index, port_a_ai_id , port_a_do_id, port_b_ai_id , port_b_do_id, queue_a, queue_b):
print("index: ", chip_index)
print("a-ai: %d, a-do: %d, b-ai: %d, b-do: %d " % (port_a_ai_id, port_a_do_id, port_b_ai_id, port_b_do_id))
if chip_index == 0:
serial_dev1 = "/dev/ttyMAX1"
serial_dev2 = "/dev/ttyMAX0"
else:
serial_dev1 = "/dev/ttyMAX2"
serial_dev2 = "/dev/ttyMAX3"
print(serial_dev1)
print(serial_dev2)
_serial_port_A = serial.Serial(serial_dev1, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True)
_serial_port_A.rs485_mode = serial.rs485.RS485Settings()
_serial_port_B = serial.Serial(serial_dev2, 115200, parity= serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout = 1, rtscts=True)
_serial_port_B.rs485_mode = serial.rs485.RS485Settings()
port_A_AI = minimalmodbus.Instrument(serial_dev1, port_a_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal)
port_A_AI.serial.baudrate = 115200 # baudrate
port_A_DO = minimalmodbus.Instrument(serial_dev1, port_a_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal)
port_A_DO.serial.baudrate = 115200 # baudrate
port_B_AI = minimalmodbus.Instrument(serial_dev2, port_b_ai_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal)
port_B_AI.serial.baudrate = 115200 # baudrate
port_B_DO = minimalmodbus.Instrument(serial_dev2, port_b_do_id, debug=False, close_port_after_each_call=False) # port name, slave address (in decimal)
port_B_DO.serial.baudrate = 115200 # baudrate
while isThreadRun:
evt.wait()
#print("read thread get even")
evt.clear()
prev_a_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
prev_b_ai = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
#value list on -> do control data
#추후에 do power on 할 리스트는 어디선가 읽어서 와야 해요
#do1 do2 나눠야 해요
'''
| AI1 | AI2 | DO1 | DO2 |
Success | 0 | 0 | x | x |
Success | 0 | 1 | x | x |
Success | 1 | 0 | x | x |
Success | 1 | 1 | x | x | => 유효한 데이터
최소 한개 이상 실패 했을 때
-> retry 해볼 것이냐
-> 데이터를 어떻게 할 것이냐
-> 이전 데이터가 있다면, 이전 데이터를 가지고 더미 데이터로 사용하는 방법 => 안하기로 했었음
'''
timenow = int(time.time()*1000)
isSuccess = True
# PORT A
port_a_do_value = list(b[:16])
try:
a_ai_data=port_A_AI.read_registers(0x32, 32)
# suc_cnt_ai = suc_cnt_ai +1
except:
isSuccess = False
if isSuccess:
prev_a_ai = a_ai_data
try:
port_A_DO.write_bits(0, port_a_do_value)
# suc_cnt_do = suc_cnt_do +1
except:
None
# PORT B
port_b_do_value = list(b[17:32])
try:
b_ai_data=port_B_AI.read_registers(0x32, 32)
prev_b_ai = b_ai_data
# suc_cnt_ai = suc_cnt_ai +1
except:
isSuccess = False
if isSuccess:
prev_b_ai = b_ai_data
try:
port_B_DO.write_bits(0, port_b_do_value)
# suc_cnt_do = suc_cnt_do +1
except:
None
if isSuccess:
queue_a.put([timenow,prev_a_ai, port_a_do_value])
queue_b.put([timenow,prev_b_ai, port_b_do_value])
else:
queue_a.put([0,prev_a_ai, port_a_do_value])
queue_b.put([0,prev_b_ai, port_b_do_value])
# lock.release()
'''
리스트를 생성, 10개를 딱 잘라서 어떻게 해야 할ㅈ지 모르겠음
'''
##데이터 생성 할 때
# ai1, ai2
# -> ai1 [32] -> [curr, volt, curr, volt ...]
'''
data_for_list = {
"slot" : 1,
"timestamp": int(timenow*1000),
"data":{
"current": ai1,
"volatge": ai2,
"do1": value_list_ON,
"err1": value_list_ON,
"err2": value_list_ON
#err1, 2
}
}
lists.append(data_for_list)
### 여기서 보냄
# 10번 시그널 받으면
#
# list mqtt send
# list clear
if read_cnt_ch0 % 10 == 0:
data = {
"modelCode":"test4",
"assetCode":"NQ-R04-TEST-003",
"dataType":"DATA",
"data": {
"chamber":1,
"list": lists
}
}
data_json_str = json.dumps(data, indent=0)
publish_json_message(client, data_json_str)
#print(data_json_str)
# time.sleep(1)
'''
###############################################################################
# data process thread #
###############################################################################
def data_process_thread(data_queue, ch, chamber):
error_cnt_1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
error_cnt_2 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
data_list_for_send_1=[]
data_list_for_send_2=[]
queue_count = 0
queue_select_count = 0
while isThreadRun:
# 0.1s 마다 발생할 것으로 예상
timestamp, ai, do = data_queue.get()
#==================================================
do_temp = []
for d in do:
do_temp.append(int(d))
do = do_temp.copy()
#==================================================
#print("timestamp: {timestamp}, \n ai: {ai}, \n do: {do}" .format(timestamp=timestamp, ai=ai, do=do))
#ai data -> current, voltage 전환
queue_count = queue_count + 1
current = []
voltage = []
ai_index = 0
for i in ai:
#짝수 current (확인 필요)
ai_index = ai_index +1
if (ai_index%2) == 0:
current.append(int(i))
else:
voltage.append(int(i))
#전압값으로 error count check
voltage_cnt = 0
for i in voltage:
if i < 2:
#count up
error_cnt_1[voltage_cnt] = error_cnt_1[voltage_cnt] +1
if error_cnt_1[voltage_cnt] == 10:
error_cnt_1[voltage_cnt] = 0
error_cnt_2[voltage_cnt] = error_cnt_2[voltage_cnt] + 1
#shared memory do control modify
#ch, tube number->off
else:
# count reset
error_cnt_1[voltage_cnt] = 0
voltage_cnt = voltage_cnt + 1
# 로깅 기능을 넣어야 겠다...?
# 파일 하나 열어서, 데이터를 쓴다
# 데이터 만들어서 전달
if timestamp > 0:
data = {
"timestamp" : int(timestamp),
"data": {
"current" : current,
"voltage" : voltage,
"do" : do,
"er1" : error_cnt_1,
"er2" : error_cnt_2
}
}
if(queue_select_count == 0):
data_list_for_send_1.append(data)
else:
data_list_for_send_2.append(data)
if queue_count == 10:
queue_count = 0
mqtt_msg = {}
if (queue_select_count == 0):
mqtt_msg = {
"modelCode":"test4",
"assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset)
"dataType":"DATA",
"data": {
"channel": ch, # 채널 입력 받자
"chamber":chamber, #chamber 도 입력 받자
"slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다
"list": data_list_for_send_1
}
}
queue_select_count = 1
data_list_for_send_2.clear()
else:
mqtt_msg = {
"modelCode":"test4",
"assetCode":"NQ-R04-TEST-003", #나중에는 config에서 셋팅되는 값, or model 값으로 변경 (asset)
"dataType":"DATA",
"data": {
"channel": ch, # 채널 입력 받자
"chamber":chamber, #chamber 도 입력 받자
"slot": ch, #슬롯 값은 chamber와 ch 값으로 만든다
"list": data_list_for_send_2
}
}
queue_select_count = 0
data_list_for_send_1.clear()
# print(type(mqtt_msg['data']['list'][0]['do'][0]))
data_json_str = json.dumps(mqtt_msg, indent=4)
# print(data_json_str)
# print(data_json_str)
publish_json_message(client, data_json_str)
##erro cnt check routine
######################################### thread 함수 작성 ################################################
#event는 개별 생성을 해준다 th1, th2
def timer_event_loop(e1, e2):
cnt = 0
# global read_cnt
while True:
# print("event set, meanning : periodic sensing")
e1.set()
e2.set()
time.sleep(0.1)
#cnt = cnt +1
#if cnt > 6000:
# print("chip0 read count:", read_cnt_ch0)
#print("\r\nchip1 read count:", read_cnt_ch0)
#print("\r\nai1:", suc_cnt_ai, "\r\nai2:", suc_cnt_ai2)
#print(ai2)
#break
#print(cnt)
# print(read_cnt)
exit()
# timer = threading.Timer(3, lambda: print("test event"))
# timer.start()
# timer.join()
# # timer = threading.Timer(3, lambda: e.set())
################################# 작성 중 #####################################
# def publish_json_message(client, data):
# topic = "devicedata/axr/nodeq1"
# client.publish(topic, payload=data)
# client = mqtt.Client("python_pub")
# client.username_pw_set("sdt", "251327")
# client.connect(host="13.209.39.139",port=32259)
# # get_value()
# data = json.dumps(dict_1)
# #publish_json_message(client, data)
# client.loop(2) # timeout = 2초
################################# 작성 중 ######################################
if __name__ == '__main__':
###############################################################################
# Queue x 4 max- infinite #
###############################################################################
ch1_queue = queue.Queue()
ch2_queue = queue.Queue()
ch3_queue = queue.Queue()
ch4_queue = queue.Queue()
chip0_evt = threading.Event()
chip1_evt = threading.Event()
read_data_chip0 = threading.Thread(target=cwt_thread,args=(chip0_evt, 0, 1,2, 5,6, ch1_queue, ch2_queue)) #현재 4개씩 연결되어 있어서 테스트 용
read_data_chip1 = threading.Thread(target=cwt_thread,args=(chip1_evt, 1, 9,10, 13,14, ch3_queue, ch4_queue))
data_process_ch1 = threading.Thread(target=data_process_thread,args=(ch1_queue, 1, 1))
data_process_ch2 = threading.Thread(target=data_process_thread,args=(ch2_queue, 2, 1))
data_process_ch3 = threading.Thread(target=data_process_thread,args=(ch3_queue, 3, 1))
data_process_ch4 = threading.Thread(target=data_process_thread,args=(ch4_queue, 4, 1))
event_thread = threading.Thread(target=timer_event_loop, args=(chip0_evt,chip1_evt,))
read_data_chip0.start()
read_data_chip1.start()
data_process_ch1.start()
data_process_ch2.start()
data_process_ch3.start()
data_process_ch4.start()
event_thread.start()