- 앱 배포 MQTT 메시지 페이로드 수정

This commit is contained in:
hyunjujeong 2023-08-30 13:37:31 +09:00
parent b74398813e
commit f9af91c688
15 changed files with 210 additions and 77 deletions

View File

@ -2,10 +2,7 @@ package inc.sdt.blokworks.devicedeployer.application;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import inc.sdt.blokworks.devicedeployer.domain.AssetApp;
import inc.sdt.blokworks.devicedeployer.domain.DeployRequest;
import inc.sdt.blokworks.devicedeployer.domain.OutboundMessage;
import inc.sdt.blokworks.devicedeployer.domain.OperationType;
import inc.sdt.blokworks.devicedeployer.domain.*;
import inc.sdt.blokworks.devicedeployer.infrastructure.mqtt.InboundDeployMessagePayload;
import inc.sdt.blokworks.devicedeployer.infrastructure.mqtt.OutboundMessagePayload;
import org.eclipse.paho.client.mqttv3.IMqttClient;
@ -18,6 +15,8 @@ import org.springframework.data.domain.Page;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
@Service
@ -46,13 +45,43 @@ public class DefaultDeployerService implements DeployerService{
public void publish(OutboundMessage deployMessage, String assetCode) {
log.info("[publish] deployMessage = {}, assetCode = {}", deployMessage, assetCode);
final String url = filePath + deployMessage.getFileId();
final String deviceType = "ecn";
HashMap<String, String> cmdInfo = new HashMap<>();
String cmdType = "";
try {
/*switch (deployMessage.getCommandType()) {
case bash:
break;
case systemd:
break;
case docker:
break;
case deploy:
break;
}*/
if(!deployMessage.getCommand().isEmpty()) {
cmdInfo.put("Cmd", deployMessage.getCommand());
cmdInfo.put("FileUrl", url);
cmdInfo.put("FileType", "python");
cmdType = String.valueOf(CommandType.deploy);
}
if(!deployMessage.getEnv().isEmpty()) {
cmdInfo.put("Cmd", "run");
cmdInfo.put("Image", url);
cmdInfo.put("Name", deployMessage.getName());
cmdInfo.put("Env", convertToJson(deployMessage.getEnv()));
cmdInfo.put("Ports", convertToJson(deployMessage.getPorts()));
cmdType = String.valueOf(CommandType.docker);
}
OutboundMessagePayload payload = new OutboundMessagePayload(
url,
deployMessage.getName(),
deployMessage.getCommand(),
deployMessage.getEnv(),
OperationType.DEPLOY,
cmdInfo,
cmdType,
assetCode,
deviceType,
deployMessage.getRequestId()
);
@ -60,7 +89,7 @@ public class DefaultDeployerService implements DeployerService{
MqttMessage message = new MqttMessage();
message.setPayload(bytes);
mqttClient.publish("/assets/"+assetCode+"/apps/deploy", message);
mqttClient.publish("/devicecontrol/"+deviceType+"/"+assetCode, message);
log.info("[publish] message = {}", message);
}catch (JsonProcessingException | MqttException e) {
throw new IllegalArgumentException();
@ -93,4 +122,12 @@ public class DefaultDeployerService implements DeployerService{
return deployerRepositoryDelegate.findAllByAssetCode(assetCode, page, size);
}
private String convertToJson(Object obj) {
try {
return objectMapper.writeValueAsString(obj);
}catch (JsonProcessingException e) {
throw new IllegalArgumentException();
}
}
}

View File

@ -3,18 +3,22 @@ package inc.sdt.blokworks.devicedeployer.domain;
public class AssetApp {
private String assetCode;
private String name;
private Long size;
private int size;
private int pid;
private Long releaseAt;
private Long updatedAt;
private Status status;
protected AssetApp() {}
public AssetApp(String assetCode, String name, Long size, Long releaseAt, Long updatedAt) {
public AssetApp(String assetCode, String name, int size, int pid, Long releaseAt, Long updatedAt, Status status) {
this.assetCode = assetCode;
this.name = name;
this.size = size;
this.pid = pid;
this.releaseAt = releaseAt;
this.updatedAt = updatedAt;
this.status = status;
}
public String getAssetCode() {
@ -25,10 +29,14 @@ public class AssetApp {
return name;
}
public Long getSize() {
public int getSize() {
return size;
}
public int getPid() {
return pid;
}
public Long getReleaseAt() {
return releaseAt;
}
@ -37,14 +45,20 @@ public class AssetApp {
return updatedAt;
}
public Status getStatus() {
return status;
}
@Override
public String toString() {
return "AssetApp{" +
"assetCode='" + assetCode + '\'' +
", name='" + name + '\'' +
", size=" + size +
", pid=" + pid +
", releaseAt=" + releaseAt +
", updatedAt=" + updatedAt +
", status=" + status +
'}';
}
@ -55,9 +69,11 @@ public class AssetApp {
public static final class Builder {
private String assetCode;
private String name;
private Long size;
private int size;
private int pid;
private Long releasedAt;
private Long updatedAt;
private Status status;
public Builder assetCode(String assetCode) {
this.assetCode = assetCode;
@ -69,11 +85,16 @@ public class AssetApp {
return this;
}
public Builder size(Long size) {
public Builder size(int size) {
this.size = size;
return this;
}
public Builder pid(int pid) {
this.pid = pid;
return this;
}
public Builder releasedAt(Long releasedAt) {
this.releasedAt = releasedAt;
return this;
@ -84,13 +105,20 @@ public class AssetApp {
return this;
}
public Builder status(Status status) {
this.status = status;
return this;
}
public AssetApp build() {
AssetApp assetApp = new AssetApp();
assetApp.assetCode = this.assetCode;
assetApp.name = this.name;
assetApp.size = this.size;
assetApp.pid = this.pid;
assetApp.releaseAt = this.releasedAt;
assetApp.updatedAt = this.updatedAt;
assetApp.status = this.status;
return assetApp;
}
}

View File

@ -0,0 +1,8 @@
package inc.sdt.blokworks.devicedeployer.domain;
public enum CommandType {
bash,
systemd,
docker,
deploy
}

View File

@ -1,14 +1,17 @@
package inc.sdt.blokworks.devicedeployer.domain;
import java.util.HashMap;
import java.util.Set;
import java.util.List;
import java.util.Map;
public class OutboundMessage {
private String fileId;
private String name;
private HashMap<String, String> env;
private Map<String, Object> env;
private List<Map<String, String>> ports;
private String command;
private String requestId;
private String deviceType;
private CommandType commandType;
protected OutboundMessage() {}
@ -20,10 +23,14 @@ public class OutboundMessage {
return name;
}
public HashMap<String, String> getEnv() {
public Map<String, Object> getEnv() {
return env;
}
public List<Map<String, String>> getPorts() {
return ports;
}
public String getCommand() {
return command;
}
@ -36,14 +43,25 @@ public class OutboundMessage {
return requestId;
}
public String getDeviceType() {
return deviceType;
}
public CommandType getCommandType() {
return commandType;
}
@Override
public String toString() {
return "AssetApp{" +
return "OutboundMessage{" +
"fileId='" + fileId + '\'' +
", name='" + name + '\'' +
", env=" + env +
", ports=" + ports +
", command='" + command + '\'' +
", requestId='" + requestId + '\'' +
", deviceType='" + deviceType + '\'' +
", commandType=" + commandType +
'}';
}
@ -54,9 +72,12 @@ public class OutboundMessage {
public static final class Builder {
private String fileId;
private String name;
private HashMap<String, String> env;
private Map<String, Object> env;
private List<Map<String, String>> ports;
private String command;
private String requestId;
private String deviceType;
private CommandType commandType;
public Builder fileId(String fileId) {
this.fileId = fileId;
@ -68,11 +89,16 @@ public class OutboundMessage {
return this;
}
public Builder env(HashMap<String, String> env) {
public Builder env(Map<String, Object> env) {
this.env = env;
return this;
}
public Builder ports(List<Map<String, String>> ports) {
this.ports = ports;
return this;
}
public Builder command(String command) {
this.command = command;
return this;
@ -83,6 +109,16 @@ public class OutboundMessage {
return this;
}
public Builder deviceType(String deviceType) {
this.deviceType = deviceType;
return this;
}
public Builder commandType(CommandType commandType) {
this.commandType = commandType;
return this;
}
public OutboundMessage build() {
OutboundMessage deployMessage = new OutboundMessage();
deployMessage.fileId = this.fileId;
@ -90,6 +126,9 @@ public class OutboundMessage {
deployMessage.env = this.env;
deployMessage.command = this.command;
deployMessage.requestId = this.requestId;
deployMessage.ports = this.ports;
deployMessage.deviceType = this.deviceType;
deployMessage.commandType = this.commandType;
return deployMessage;
}
}

View File

@ -3,12 +3,18 @@ package inc.sdt.blokworks.devicedeployer.infrastructure.mqtt;
import inc.sdt.blokworks.devicedeployer.domain.Status;
public record InboundDeployMessagePayload(
Status status,
String assetCode,
String requestId,
String name,
Long size,
Long releasedAt,
Long updatedAt
String deviceType,
Status status,
Result result,
String requestId
) {
public record Result(
String name,
int pid,
int size,
String message,
long releasedAt,
long updatedAt
){}
}

View File

@ -1,15 +1,13 @@
package inc.sdt.blokworks.devicedeployer.infrastructure.mqtt;
import inc.sdt.blokworks.devicedeployer.domain.OperationType;
import java.util.HashMap;
public record OutboundMessagePayload(
String url,
String name,
String command,
HashMap<String, String> env,
OperationType operationType,
String requestId
HashMap<String, String> CmdInfo,
String CmdType,
String AssetCode,
String DeviceType,
String RequestId
) {
}

View File

@ -1,5 +1,6 @@
package inc.sdt.blokworks.devicedeployer.infrastructure.relational;
import inc.sdt.blokworks.devicedeployer.domain.Status;
import jakarta.persistence.*;
@Entity(name = "asset_app")
@ -13,20 +14,27 @@ class AssetAppEntity {
@Column(name = "app_name", length = 255)
private String name;
@Column(name = "size")
private Long size;
private int size;
@Column(name = "released_at")
private Long releasedAt;
@Column(name = "updated_at")
private Long updatedAt;
@Column(name = "pid")
private int pid;
@Column(name = "status")
@Enumerated(EnumType.STRING)
private Status status;
protected AssetAppEntity() {}
public AssetAppEntity(String assetCode, String name, long size, Long releasedAt, Long updatedAt) {
public AssetAppEntity(String assetCode, String name, int size, Long releasedAt, Long updatedAt, int pid, Status status) {
this.assetCode = assetCode;
this.name = name;
this.size = size;
this.releasedAt = releasedAt;
this.updatedAt = updatedAt;
this.pid = pid;
this.status = status;
}
public String getId() {
@ -41,7 +49,7 @@ class AssetAppEntity {
return name;
}
public long getSize() {
public int getSize() {
return size;
}
@ -53,5 +61,11 @@ class AssetAppEntity {
return updatedAt;
}
public int getPid() {
return pid;
}
public Status getStatus() {
return status;
}
}

View File

@ -50,7 +50,9 @@ public class AssetAppRelationalRepository implements DeployerRepositoryDelegate
assetApp.getName(),
assetApp.getSize(),
assetApp.getReleaseAt(),
assetApp.getUpdatedAt()
assetApp.getUpdatedAt(),
assetApp.getPid(),
assetApp.getStatus()
);
}
@ -59,18 +61,22 @@ public class AssetAppRelationalRepository implements DeployerRepositoryDelegate
.assetCode(entity.getAssetCode())
.name(entity.getName())
.size(entity.getSize())
.pid(entity.getPid())
.releasedAt(entity.getReleasedAt())
.updatedAt(entity.getUpdatedAt())
.status(entity.getStatus())
.build();
}
private AssetApp fromMessage(InboundDeployMessagePayload payload) {
return AssetApp.builder()
.assetCode(payload.assetCode())
.name(payload.name())
.size(payload.size())
.releasedAt(payload.releasedAt())
.updatedAt(payload.updatedAt())
.name(payload.result().name())
.size(payload.result().size())
.pid(payload.result().pid())
.releasedAt(payload.result().releasedAt())
.updatedAt(payload.result().updatedAt())
.status(payload.status())
.build();
}
}

View File

@ -3,7 +3,7 @@ package inc.sdt.blokworks.devicedeployer.presentation;
record AssetAppResource(
String assetCode,
String name,
Long size,
int size,
Long releaseAt,
Long updatedAt
) {

View File

@ -6,16 +6,6 @@ import org.springframework.stereotype.Component;
@Component
public class AssetAppResourceConverter {
public AssetApp fromResource(AssetAppResource resource) {
return AssetApp.builder()
.assetCode(resource.assetCode())
.name(resource.name())
.size(resource.size())
.releasedAt(resource.releaseAt())
.updatedAt(resource.updatedAt())
.build();
}
public AssetAppResource toResource(AssetApp assetApp) {
return new AssetAppResource(
assetApp.getAssetCode(),

View File

@ -35,20 +35,19 @@ public class MqttMessageHandler {
log.info("[handleMessage] message={}", message);
if(!message.getPayload().contains("pid")) {
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
String id = topic.split("/")[4];
//String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
//String id = topic.split("/")[4];
log.info("[handleMessage] topic = {}, id = {}", topic, id);
//log.info("[handleMessage] topic = {}, id = {}", topic, id);
deployMessagePayloadConverter.convertFromByte(message.getPayload(), InboundDeployMessagePayload.class)
.map(p -> new InboundDeployMessagePayload(
p.status(),
p.assetCode(),
id,
p.name(),
p.size(),
p.releasedAt(),
p.updatedAt()))
p.deviceType(),
p.status(),
p.result(),
p.requestId()
))
.flatMap(deployerService)
.subscribe();
}else {

View File

@ -1,5 +1,6 @@
package inc.sdt.blokworks.devicedeployer.presentation;
import inc.sdt.blokworks.devicedeployer.domain.CommandType;
import org.wildfly.common.annotation.NotNull;
import java.util.HashMap;
@ -10,7 +11,9 @@ record OutboundMessageResource(
@NotNull
String name,
String command,
String env
String env,
String deviceType,
CommandType commandType
) {
}

View File

@ -1,38 +1,45 @@
package inc.sdt.blokworks.devicedeployer.presentation;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import inc.sdt.blokworks.devicedeployer.domain.CommandType;
import inc.sdt.blokworks.devicedeployer.domain.OutboundMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class OutboundMessageResourceConverter {
private final ObjectMapper objectMapper;
private final Logger log;
public OutboundMessageResourceConverter() {
this.objectMapper = new ObjectMapper();
this.log = LoggerFactory.getLogger(this.getClass());
}
public OutboundMessage fromResource(OutboundMessageResource resource) {
final Map<String, Object> cmdInfo = resource.env().isEmpty() ? new HashMap<>() : convertToMap(resource.env());
return OutboundMessage.builder()
.fileId(resource.fileId())
.name(resource.name())
.command(resource.command())
.env(resource.env().isEmpty() ? new HashMap<>() : (HashMap<String, String>) convertToMap(resource.env()))
.env(cmdInfo.isEmpty() ? new HashMap<>() : (Map<String, Object>) cmdInfo.get("env"))
.ports(cmdInfo.isEmpty() ? new ArrayList<>() : (List<Map<String, String>>) cmdInfo.get("ports"))
.deviceType(resource.deviceType())
.commandType(resource.commandType() == null ? CommandType.deploy : resource.commandType())
.build();
}
private Map convertToMap(String env) {
private Map<String, Object> convertToMap(String env) {
try {
return objectMapper.readValue(env, Map.class);
return objectMapper.readValue(env, new TypeReference<>() {});
}catch (IOException e) {
throw new IllegalArgumentException();
}

View File

@ -16,8 +16,7 @@ inbound:
username: ${INBOUND_MQTT_CREDENTIALS_USERNAME}
password: ${INBOUND_MQTT_CREDENTIALS_PASSWORD}
topics:
- /assets/+/command-req/+
- /assets/+/apps/process
- /devicecontrol/result/+/+/+
iam:
enabled: ${IAM_REGISTER_ENABLED}

View File

@ -12,13 +12,12 @@ spring:
inbound:
mqtt:
#url: tcp://localhost:1883
url: tcp://13.209.39.139:32259
url: tcp://localhost:1883
#url: tcp://13.209.39.139:32259
username: sdt
password: 251327
topics:
- /assets/+/command-req/+
- /assets/+/apps/process
- /devicecontrol/result/+/+/+
stackbase:
api: