From f9af91c688a71afc7bfa596684a45314d9d7ac51 Mon Sep 17 00:00:00 2001 From: hyunjujeong Date: Wed, 30 Aug 2023 13:37:31 +0900 Subject: [PATCH] =?UTF-8?q?-=20=EC=95=B1=20=EB=B0=B0=ED=8F=AC=20MQTT=20?= =?UTF-8?q?=EB=A9=94=EC=8B=9C=EC=A7=80=20=ED=8E=98=EC=9D=B4=EB=A1=9C?= =?UTF-8?q?=EB=93=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../application/DefaultDeployerService.java | 57 +++++++++++++++---- .../devicedeployer/domain/AssetApp.java | 38 +++++++++++-- .../devicedeployer/domain/CommandType.java | 8 +++ .../domain/OutboundMessage.java | 53 ++++++++++++++--- .../mqtt/InboundDeployMessagePayload.java | 20 ++++--- .../mqtt/OutboundMessagePayload.java | 14 ++--- .../relational/AssetAppEntity.java | 20 ++++++- .../AssetAppRelationalRepository.java | 16 ++++-- .../presentation/AssetAppResource.java | 2 +- .../AssetAppResourceConverter.java | 10 ---- .../presentation/MqttMessageHandler.java | 17 +++--- .../presentation/OutboundMessageResource.java | 5 +- .../OutboundMessageResourceConverter.java | 17 ++++-- src/main/resources/application-dev.yml | 3 +- src/main/resources/application-local.yml | 7 +-- 15 files changed, 210 insertions(+), 77 deletions(-) create mode 100644 src/main/java/inc/sdt/blokworks/devicedeployer/domain/CommandType.java diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/application/DefaultDeployerService.java b/src/main/java/inc/sdt/blokworks/devicedeployer/application/DefaultDeployerService.java index aa91e21..1eda8d6 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/application/DefaultDeployerService.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/application/DefaultDeployerService.java @@ -2,10 +2,7 @@ package inc.sdt.blokworks.devicedeployer.application; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import inc.sdt.blokworks.devicedeployer.domain.AssetApp; -import inc.sdt.blokworks.devicedeployer.domain.DeployRequest; -import inc.sdt.blokworks.devicedeployer.domain.OutboundMessage; -import inc.sdt.blokworks.devicedeployer.domain.OperationType; +import inc.sdt.blokworks.devicedeployer.domain.*; import inc.sdt.blokworks.devicedeployer.infrastructure.mqtt.InboundDeployMessagePayload; import inc.sdt.blokworks.devicedeployer.infrastructure.mqtt.OutboundMessagePayload; import org.eclipse.paho.client.mqttv3.IMqttClient; @@ -18,6 +15,8 @@ import org.springframework.data.domain.Page; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +import java.util.HashMap; +import java.util.List; import java.util.Optional; @Service @@ -46,13 +45,43 @@ public class DefaultDeployerService implements DeployerService{ public void publish(OutboundMessage deployMessage, String assetCode) { log.info("[publish] deployMessage = {}, assetCode = {}", deployMessage, assetCode); final String url = filePath + deployMessage.getFileId(); + final String deviceType = "ecn"; + HashMap cmdInfo = new HashMap<>(); + String cmdType = ""; try { + + /*switch (deployMessage.getCommandType()) { + case bash: + break; + case systemd: + break; + case docker: + break; + case deploy: + break; + }*/ + + if(!deployMessage.getCommand().isEmpty()) { + cmdInfo.put("Cmd", deployMessage.getCommand()); + cmdInfo.put("FileUrl", url); + cmdInfo.put("FileType", "python"); + cmdType = String.valueOf(CommandType.deploy); + } + + if(!deployMessage.getEnv().isEmpty()) { + cmdInfo.put("Cmd", "run"); + cmdInfo.put("Image", url); + cmdInfo.put("Name", deployMessage.getName()); + cmdInfo.put("Env", convertToJson(deployMessage.getEnv())); + cmdInfo.put("Ports", convertToJson(deployMessage.getPorts())); + cmdType = String.valueOf(CommandType.docker); + } + OutboundMessagePayload payload = new OutboundMessagePayload( - url, - deployMessage.getName(), - deployMessage.getCommand(), - deployMessage.getEnv(), - OperationType.DEPLOY, + cmdInfo, + cmdType, + assetCode, + deviceType, deployMessage.getRequestId() ); @@ -60,7 +89,7 @@ public class DefaultDeployerService implements DeployerService{ MqttMessage message = new MqttMessage(); message.setPayload(bytes); - mqttClient.publish("/assets/"+assetCode+"/apps/deploy", message); + mqttClient.publish("/devicecontrol/"+deviceType+"/"+assetCode, message); log.info("[publish] message = {}", message); }catch (JsonProcessingException | MqttException e) { throw new IllegalArgumentException(); @@ -93,4 +122,12 @@ public class DefaultDeployerService implements DeployerService{ return deployerRepositoryDelegate.findAllByAssetCode(assetCode, page, size); } + private String convertToJson(Object obj) { + try { + return objectMapper.writeValueAsString(obj); + }catch (JsonProcessingException e) { + throw new IllegalArgumentException(); + } + } + } diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/domain/AssetApp.java b/src/main/java/inc/sdt/blokworks/devicedeployer/domain/AssetApp.java index 9b65da1..7b12815 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/domain/AssetApp.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/domain/AssetApp.java @@ -3,18 +3,22 @@ package inc.sdt.blokworks.devicedeployer.domain; public class AssetApp { private String assetCode; private String name; - private Long size; + private int size; + private int pid; private Long releaseAt; private Long updatedAt; + private Status status; protected AssetApp() {} - public AssetApp(String assetCode, String name, Long size, Long releaseAt, Long updatedAt) { + public AssetApp(String assetCode, String name, int size, int pid, Long releaseAt, Long updatedAt, Status status) { this.assetCode = assetCode; this.name = name; this.size = size; + this.pid = pid; this.releaseAt = releaseAt; this.updatedAt = updatedAt; + this.status = status; } public String getAssetCode() { @@ -25,10 +29,14 @@ public class AssetApp { return name; } - public Long getSize() { + public int getSize() { return size; } + public int getPid() { + return pid; + } + public Long getReleaseAt() { return releaseAt; } @@ -37,14 +45,20 @@ public class AssetApp { return updatedAt; } + public Status getStatus() { + return status; + } + @Override public String toString() { return "AssetApp{" + "assetCode='" + assetCode + '\'' + ", name='" + name + '\'' + ", size=" + size + + ", pid=" + pid + ", releaseAt=" + releaseAt + ", updatedAt=" + updatedAt + + ", status=" + status + '}'; } @@ -55,9 +69,11 @@ public class AssetApp { public static final class Builder { private String assetCode; private String name; - private Long size; + private int size; + private int pid; private Long releasedAt; private Long updatedAt; + private Status status; public Builder assetCode(String assetCode) { this.assetCode = assetCode; @@ -69,11 +85,16 @@ public class AssetApp { return this; } - public Builder size(Long size) { + public Builder size(int size) { this.size = size; return this; } + public Builder pid(int pid) { + this.pid = pid; + return this; + } + public Builder releasedAt(Long releasedAt) { this.releasedAt = releasedAt; return this; @@ -84,13 +105,20 @@ public class AssetApp { return this; } + public Builder status(Status status) { + this.status = status; + return this; + } + public AssetApp build() { AssetApp assetApp = new AssetApp(); assetApp.assetCode = this.assetCode; assetApp.name = this.name; assetApp.size = this.size; + assetApp.pid = this.pid; assetApp.releaseAt = this.releasedAt; assetApp.updatedAt = this.updatedAt; + assetApp.status = this.status; return assetApp; } } diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/domain/CommandType.java b/src/main/java/inc/sdt/blokworks/devicedeployer/domain/CommandType.java new file mode 100644 index 0000000..ee4306b --- /dev/null +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/domain/CommandType.java @@ -0,0 +1,8 @@ +package inc.sdt.blokworks.devicedeployer.domain; + +public enum CommandType { + bash, + systemd, + docker, + deploy +} diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/domain/OutboundMessage.java b/src/main/java/inc/sdt/blokworks/devicedeployer/domain/OutboundMessage.java index 2e25ec4..7b4f5f6 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/domain/OutboundMessage.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/domain/OutboundMessage.java @@ -1,14 +1,17 @@ package inc.sdt.blokworks.devicedeployer.domain; -import java.util.HashMap; -import java.util.Set; +import java.util.List; +import java.util.Map; public class OutboundMessage { private String fileId; private String name; - private HashMap env; + private Map env; + private List> ports; private String command; private String requestId; + private String deviceType; + private CommandType commandType; protected OutboundMessage() {} @@ -20,10 +23,14 @@ public class OutboundMessage { return name; } - public HashMap getEnv() { + public Map getEnv() { return env; } + public List> getPorts() { + return ports; + } + public String getCommand() { return command; } @@ -36,14 +43,25 @@ public class OutboundMessage { return requestId; } + public String getDeviceType() { + return deviceType; + } + + public CommandType getCommandType() { + return commandType; + } + @Override public String toString() { - return "AssetApp{" + + return "OutboundMessage{" + "fileId='" + fileId + '\'' + ", name='" + name + '\'' + ", env=" + env + + ", ports=" + ports + ", command='" + command + '\'' + ", requestId='" + requestId + '\'' + + ", deviceType='" + deviceType + '\'' + + ", commandType=" + commandType + '}'; } @@ -54,9 +72,12 @@ public class OutboundMessage { public static final class Builder { private String fileId; private String name; - private HashMap env; + private Map env; + private List> ports; private String command; private String requestId; + private String deviceType; + private CommandType commandType; public Builder fileId(String fileId) { this.fileId = fileId; @@ -68,11 +89,16 @@ public class OutboundMessage { return this; } - public Builder env(HashMap env) { + public Builder env(Map env) { this.env = env; return this; } + public Builder ports(List> ports) { + this.ports = ports; + return this; + } + public Builder command(String command) { this.command = command; return this; @@ -83,6 +109,16 @@ public class OutboundMessage { return this; } + public Builder deviceType(String deviceType) { + this.deviceType = deviceType; + return this; + } + + public Builder commandType(CommandType commandType) { + this.commandType = commandType; + return this; + } + public OutboundMessage build() { OutboundMessage deployMessage = new OutboundMessage(); deployMessage.fileId = this.fileId; @@ -90,6 +126,9 @@ public class OutboundMessage { deployMessage.env = this.env; deployMessage.command = this.command; deployMessage.requestId = this.requestId; + deployMessage.ports = this.ports; + deployMessage.deviceType = this.deviceType; + deployMessage.commandType = this.commandType; return deployMessage; } } diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/mqtt/InboundDeployMessagePayload.java b/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/mqtt/InboundDeployMessagePayload.java index c94d153..3e7473d 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/mqtt/InboundDeployMessagePayload.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/mqtt/InboundDeployMessagePayload.java @@ -3,12 +3,18 @@ package inc.sdt.blokworks.devicedeployer.infrastructure.mqtt; import inc.sdt.blokworks.devicedeployer.domain.Status; public record InboundDeployMessagePayload( - Status status, String assetCode, - String requestId, - String name, - Long size, - Long releasedAt, - Long updatedAt + String deviceType, + Status status, + Result result, + String requestId ) { -} + public record Result( + String name, + int pid, + int size, + String message, + long releasedAt, + long updatedAt + ){} +} \ No newline at end of file diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/mqtt/OutboundMessagePayload.java b/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/mqtt/OutboundMessagePayload.java index b7bff03..307de6b 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/mqtt/OutboundMessagePayload.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/mqtt/OutboundMessagePayload.java @@ -1,15 +1,13 @@ package inc.sdt.blokworks.devicedeployer.infrastructure.mqtt; -import inc.sdt.blokworks.devicedeployer.domain.OperationType; - import java.util.HashMap; public record OutboundMessagePayload( - String url, - String name, - String command, - HashMap env, - OperationType operationType, - String requestId + HashMap CmdInfo, + String CmdType, + + String AssetCode, + String DeviceType, + String RequestId ) { } diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/relational/AssetAppEntity.java b/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/relational/AssetAppEntity.java index 05b002f..ceff864 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/relational/AssetAppEntity.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/relational/AssetAppEntity.java @@ -1,5 +1,6 @@ package inc.sdt.blokworks.devicedeployer.infrastructure.relational; +import inc.sdt.blokworks.devicedeployer.domain.Status; import jakarta.persistence.*; @Entity(name = "asset_app") @@ -13,20 +14,27 @@ class AssetAppEntity { @Column(name = "app_name", length = 255) private String name; @Column(name = "size") - private Long size; + private int size; @Column(name = "released_at") private Long releasedAt; @Column(name = "updated_at") private Long updatedAt; + @Column(name = "pid") + private int pid; + @Column(name = "status") + @Enumerated(EnumType.STRING) + private Status status; protected AssetAppEntity() {} - public AssetAppEntity(String assetCode, String name, long size, Long releasedAt, Long updatedAt) { + public AssetAppEntity(String assetCode, String name, int size, Long releasedAt, Long updatedAt, int pid, Status status) { this.assetCode = assetCode; this.name = name; this.size = size; this.releasedAt = releasedAt; this.updatedAt = updatedAt; + this.pid = pid; + this.status = status; } public String getId() { @@ -41,7 +49,7 @@ class AssetAppEntity { return name; } - public long getSize() { + public int getSize() { return size; } @@ -53,5 +61,11 @@ class AssetAppEntity { return updatedAt; } + public int getPid() { + return pid; + } + public Status getStatus() { + return status; + } } diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/relational/AssetAppRelationalRepository.java b/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/relational/AssetAppRelationalRepository.java index 1422ffc..5415321 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/relational/AssetAppRelationalRepository.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/infrastructure/relational/AssetAppRelationalRepository.java @@ -50,7 +50,9 @@ public class AssetAppRelationalRepository implements DeployerRepositoryDelegate assetApp.getName(), assetApp.getSize(), assetApp.getReleaseAt(), - assetApp.getUpdatedAt() + assetApp.getUpdatedAt(), + assetApp.getPid(), + assetApp.getStatus() ); } @@ -59,18 +61,22 @@ public class AssetAppRelationalRepository implements DeployerRepositoryDelegate .assetCode(entity.getAssetCode()) .name(entity.getName()) .size(entity.getSize()) + .pid(entity.getPid()) .releasedAt(entity.getReleasedAt()) .updatedAt(entity.getUpdatedAt()) + .status(entity.getStatus()) .build(); } private AssetApp fromMessage(InboundDeployMessagePayload payload) { return AssetApp.builder() .assetCode(payload.assetCode()) - .name(payload.name()) - .size(payload.size()) - .releasedAt(payload.releasedAt()) - .updatedAt(payload.updatedAt()) + .name(payload.result().name()) + .size(payload.result().size()) + .pid(payload.result().pid()) + .releasedAt(payload.result().releasedAt()) + .updatedAt(payload.result().updatedAt()) + .status(payload.status()) .build(); } } diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/AssetAppResource.java b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/AssetAppResource.java index 3e94a53..6176a67 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/AssetAppResource.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/AssetAppResource.java @@ -3,7 +3,7 @@ package inc.sdt.blokworks.devicedeployer.presentation; record AssetAppResource( String assetCode, String name, - Long size, + int size, Long releaseAt, Long updatedAt ) { diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/AssetAppResourceConverter.java b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/AssetAppResourceConverter.java index ecf4af6..934d07f 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/AssetAppResourceConverter.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/AssetAppResourceConverter.java @@ -6,16 +6,6 @@ import org.springframework.stereotype.Component; @Component public class AssetAppResourceConverter { - public AssetApp fromResource(AssetAppResource resource) { - return AssetApp.builder() - .assetCode(resource.assetCode()) - .name(resource.name()) - .size(resource.size()) - .releasedAt(resource.releaseAt()) - .updatedAt(resource.updatedAt()) - .build(); - } - public AssetAppResource toResource(AssetApp assetApp) { return new AssetAppResource( assetApp.getAssetCode(), diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/MqttMessageHandler.java b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/MqttMessageHandler.java index b337d73..c929a43 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/MqttMessageHandler.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/MqttMessageHandler.java @@ -35,20 +35,19 @@ public class MqttMessageHandler { log.info("[handleMessage] message={}", message); if(!message.getPayload().contains("pid")) { - String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); - String id = topic.split("/")[4]; + //String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); + //String id = topic.split("/")[4]; - log.info("[handleMessage] topic = {}, id = {}", topic, id); + //log.info("[handleMessage] topic = {}, id = {}", topic, id); deployMessagePayloadConverter.convertFromByte(message.getPayload(), InboundDeployMessagePayload.class) .map(p -> new InboundDeployMessagePayload( - p.status(), p.assetCode(), - id, - p.name(), - p.size(), - p.releasedAt(), - p.updatedAt())) + p.deviceType(), + p.status(), + p.result(), + p.requestId() + )) .flatMap(deployerService) .subscribe(); }else { diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/OutboundMessageResource.java b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/OutboundMessageResource.java index 46089ec..32cd632 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/OutboundMessageResource.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/OutboundMessageResource.java @@ -1,5 +1,6 @@ package inc.sdt.blokworks.devicedeployer.presentation; +import inc.sdt.blokworks.devicedeployer.domain.CommandType; import org.wildfly.common.annotation.NotNull; import java.util.HashMap; @@ -10,7 +11,9 @@ record OutboundMessageResource( @NotNull String name, String command, - String env + String env, + String deviceType, + CommandType commandType ) { } diff --git a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/OutboundMessageResourceConverter.java b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/OutboundMessageResourceConverter.java index 50e6d21..12a050c 100644 --- a/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/OutboundMessageResourceConverter.java +++ b/src/main/java/inc/sdt/blokworks/devicedeployer/presentation/OutboundMessageResourceConverter.java @@ -1,38 +1,45 @@ package inc.sdt.blokworks.devicedeployer.presentation; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import inc.sdt.blokworks.devicedeployer.domain.CommandType; import inc.sdt.blokworks.devicedeployer.domain.OutboundMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; @Component public class OutboundMessageResourceConverter { private final ObjectMapper objectMapper; - private final Logger log; public OutboundMessageResourceConverter() { this.objectMapper = new ObjectMapper(); - this.log = LoggerFactory.getLogger(this.getClass()); } public OutboundMessage fromResource(OutboundMessageResource resource) { + final Map cmdInfo = resource.env().isEmpty() ? new HashMap<>() : convertToMap(resource.env()); + return OutboundMessage.builder() .fileId(resource.fileId()) .name(resource.name()) .command(resource.command()) - .env(resource.env().isEmpty() ? new HashMap<>() : (HashMap) convertToMap(resource.env())) + .env(cmdInfo.isEmpty() ? new HashMap<>() : (Map) cmdInfo.get("env")) + .ports(cmdInfo.isEmpty() ? new ArrayList<>() : (List>) cmdInfo.get("ports")) + .deviceType(resource.deviceType()) + .commandType(resource.commandType() == null ? CommandType.deploy : resource.commandType()) .build(); } - private Map convertToMap(String env) { + private Map convertToMap(String env) { try { - return objectMapper.readValue(env, Map.class); + return objectMapper.readValue(env, new TypeReference<>() {}); }catch (IOException e) { throw new IllegalArgumentException(); } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 02a79fe..a872951 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -16,8 +16,7 @@ inbound: username: ${INBOUND_MQTT_CREDENTIALS_USERNAME} password: ${INBOUND_MQTT_CREDENTIALS_PASSWORD} topics: - - /assets/+/command-req/+ - - /assets/+/apps/process + - /devicecontrol/result/+/+/+ iam: enabled: ${IAM_REGISTER_ENABLED} diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index 146e4d7..0c1ca25 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -12,13 +12,12 @@ spring: inbound: mqtt: - #url: tcp://localhost:1883 - url: tcp://13.209.39.139:32259 + url: tcp://localhost:1883 + #url: tcp://13.209.39.139:32259 username: sdt password: 251327 topics: - - /assets/+/command-req/+ - - /assets/+/apps/process + - /devicecontrol/result/+/+/+ stackbase: api: