Vert.X + Ignite 로 클러스터 처리를 해보자.(1) 흔한 전산쟁이의 삽질일기

어느날. 누가 말해서 해보는거긴 한데 생각보다 쓸만한 곳이 많을거 같아서. 삽질해본 결과임


일단 vert.x라는게 찾아보면 


버텍스(Vert.X) 는 이벤트 드리븐(영어: event-driven) 방식의 자바 버추얼 머신(영어: Java Virtual Machine) 위에서 동작하는 어플리케이션 프레임워크이다.
자바스크립트의 Node.js, 파이선의 Twisted, 펄의 Perl Objected Environment, C의 libevent, PHP의 reactPHP 나 amphp 그리고 루비의 EventMachine 등과 비슷하다.


라고 되어있다. 걍 Node.js처럼 비동기 소켓 쓰는건데, netty기반이고 http,tcp등 여러가지 지원을 해준다. 단순 통신할때 좋다.
Node.js는 싱글스레드이고 얘도 싱글스레드인데 단지 동시에 여러개의 인스턴스를 켤 수잇으므로, 한곳에서 여러개를 연결해서 써먹을 수있겠다.

성능도 좀 받쳐주는거 같다. 

네이버쪽 블로그 보면 이런식으로 성능이 좀 된다 라는거고, 나름 나쁘진 않다.
그래서 뭐 이벤트 푸시받던지, 단순 소켓통신할때라던지 이래저래 쓰면 구조도 간단하고 쓰기도 편하다 라고한다.
싱글스레드이긴 하지만, 그런걸 여러개 동시에 켤 수 있으니 나쁘지는 않다.

관련 설명도 좀 많다. 
자 나는 이걸 어떻게 쓰고싶은가. 
1. TCP로 주고받고 싶다.
2. JSON으로 데이터를 왔다갔다 하고싶다.
3. 설치/실행이 아니라 embeded 로 소스상에 넣고 굴리고 싶다. 

우선 단위는 Verticle이라는 놈인데 이게 하나의 인스턴스라고 생각하면 된다. 즉 샘플을 만들어봐야지.

Vertx는 기본적으로 AbstractVerticle를 상속받아 쓰는데.

여기에는 start(),stop()이 비어있다

여기다가 각 구현체를 구현해 주는데, 
NetServer를 생성하고 내부에 수신 및 송신 받고 처리할 부분을 지정하고 listen을 실행한다.
그리고 accept처럼 아래식으로 connect이후 
handler. buffer쪽에 데이터를 수신받아 무언가를 하면 된다.



그러면 서버쪽은 이걸로 끝.

Client는 NetClient를 사용하는데, vertx.createNetClient()를 써서 생성이후 connect(ip,port)로 연결한다.
그리고 수신성공시 뭔가 소켓에서 버퍼를 수신받아 처리하는데, 서버처럼 NetSocket에 read(),write()로 메시지를 송 수신할 수있다.

이렇게 동작시킨 결과는 다음과 같다.


단순 Socket통신용으로 쓸 생각이었던 기본구조다. 이제 여기에 Event를 넣어서 아까 여러개의 인스턴스끼리 통신한다고 했으니,
그 방법을 다음에 알아보도록 하자.



서버 풀 소스

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package com.scblood.test.vertx.cs;
 
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
 
 
/**
 * Vert.x TCP 서버
 * 각 이벤트를 받아서 응답을 넘긴다.
 * JSON기준으로 만들어 넘길것.
 * JDK 8기준
 * http://tutorials.jenkov.com/vert.x/tcp-server.html
 * 
 * @author scblood.egloos.com
 *
 */
public class VertXSampleServer extends AbstractVerticle  {
    private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
    
    private NetServer m_server;
    private int m_port =10000;
    private String m_serverName;
    
    public VertXSampleServer(String name,int port){
        /**
         * VertxOptions 이 꽤 많이 있는데 걍 무시함. 알아서 찾아볼것
         */
        vertx = Vertx.vertx();
        this.m_port = port;
        m_serverName = name;
    }
    
    /*
     * 굳이 안써도 됨. 비동기 응답받고 싶다면 써도 되고. 어차피 start호출하기때문에 오버라이딩 할 필요없음
     * @see io.vertx.core.AbstractVerticle#start(io.vertx.core.Future)
     */
    /*@Override
     public void start(Future<Void> startFuture) throws Exception{
        start();
        startFuture.complete();
    }*/
    
    
    /*
     * 실제 동작 시키는 부분 위 메서드를 갖다 써도 무방 
     * @see io.vertx.core.AbstractVerticle#start()
     */
    @Override
    public void start() throws Exception {
        System.out.println(m_serverName+" START");
        m_server = vertx.createNetServer();
        setNetServer();
        m_server.listen(m_port);
    }
      
    
    /*
     * 여기서 뭔가 수신받아 처리하고 관련 리턴 해주면 됨
     */
    public DataRes doSomething(DataReq bean){
        DataRes res = new DataRes();
        res.setResCd(200);
        res.setResMsg("OK");
        res.setData("200 OK Data");
        return res;
    }
    
    /*
     * 데이터 수신받고 전송하는 부분
     */
    public void setNetServer() {
        m_server.connectHandler(socket -> {
                        
            socket.closeHandler(v->{
                System.out.println("Socket Closed");
            });
            
            
            socket.exceptionHandler(e->{
                //printLog
                e.printStackTrace();
            });
            
            socket.handler(buff -> {
                
                DataReq bean = GSON.fromJson(buff.getString(0, buff.length()), DataReq.class);
                System.out.println("Recieve Data \n"+GSON.toJson(bean));
                Buffer outBuffer = Buffer.buffer();
                
                DataRes res = doSomething(bean);
                
                outBuffer.appendString(GSON.toJson(res));
                socket.write(outBuffer);                
            });
        });
    }
    
    /**
     * 람다 없는 버전
     */
/*    public void setNetServer() {
        m_server.connectHandler(new Handler<NetSocket>() {
            @Override
            public void handle(NetSocket socket) {
                socket.closeHandler(new Handler<Void>() {
                    @Override
                    public void handle(Void aVoid) {
                        System.out.println("Socket Closed");
                        //뭔가 하면 됨
                    }
                });
                socket.exceptionHandler(new Handler<Throwable>() {
                    @Override
                    public void handle(Throwable event) {
                        event.printStackTrace();
                    }
                });
                
                System.out.println("Incoming connection!");
                socket.handler(new Handler<Buffer>() {
                    @Override
                    public void handle(Buffer buff) {
                        System.out.println("incoming data: " + buff.length());
                        DataBean bean = GSON.fromJson(buff.getString(0, buff.length()), DataBean.class);
                        System.out.println(GSON.toJson(bean));
                        Buffer outBuffer = Buffer.buffer();
                        outBuffer.appendString("RESPONSE DATA");
                        socket.write(outBuffer);
                    }
                });
            }
        });
    }*/
    
    @Override
    public void stop() {
        m_server.close(asyncResult -> {
            if (asyncResult.succeeded()) {
                System.out.println(m_serverName + " NetServer close");
            }
        });
        /*
         * 시스템 완전종료를 위함 이거 안닫으면 자동 exit가 안됨
         */
        vertx.close();
        System.out.println(m_serverName + " vertx close");
    }
    
    /**
     * 람다 없는 버전
     */
/*    @Override
    public void stop(){
        m_server.close(new Handler<AsyncResult<Void>>() {
            @Override
            public void handle(AsyncResult result) {
                if(result.succeeded()){
                    System.out.println(m_serverName + " NetServer close");
                }
            }
        });
        vertx.close();
        System.out.println(m_serverName + " vertx close");
    }*/
    
    public static void main(String[] args) throws Exception {
        VertXSampleServer test = new VertXSampleServer("VertxServer "10000);
        test.start();
 
    }
}
 
cs


클라이언트 소스

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.scblood.test.vertx.cs;
 
import java.util.concurrent.TimeUnit;
 
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
 
 
public class VertXSampleClient extends AbstractVerticle{
    private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
    
    private NetClient m_client;
    private NetSocket m_netSocket;
    private int m_port =10000;
    private String m_ip;
    private boolean isConnect = false;
    public VertXSampleClient(String ip, int port){
        vertx = Vertx.vertx();
        m_ip = ip;
        m_port =port;
    }
     
    @Override
     public void start() throws Exception {
        doConnnect();
     }
 
    public void doConnnect(){
        NetClientOptions options = new NetClientOptions().setConnectTimeout(1000);
        setClientClose();
        
        m_client = vertx.createNetClient(options);
        m_client.connect(m_port, m_ip,asyncResult->{
            isConnect = asyncResult.succeeded();
            if(isConnect){
                System.out.println("connect");
                NetSocket socket = asyncResult.result();
                setConnectSocket(socket);
            }else{
                //fail
                System.out.println("connectFail");
                //뭔가 하면 됨 재접속 한다 던지 등
                
            }
        });
    }
 
    private void setConnectSocket(NetSocket socket) {
        /*
         * Socket에서 BatchStream(ReadStream<Buffer> rs, WriteStream<Buffer> ws) 처럼 버퍼나, 핸들러만 따로 빼서 
         * 클래스 만들수있기도 한데. 걍 현재 이렇게 해놓음. 굳이 그럴필요있나....
         */
        
        m_netSocket = socket;
        socket.closeHandler(v->{
            //뭔가하면됨
            System.out.println("Socket Closed");
            doConnnect();
        });
 
        m_netSocket.exceptionHandler(e->{
            //printLog
            e.printStackTrace();
        });
            
    }
    
    public void setSocketRead(){
        m_netSocket.handler(buffer->{
             System.out.println("Received data: " + buffer.length());
             System.out.println(buffer.getString(0, buffer.length()));
        });
    }
    
    public void doSendMsg(){
         DataReq req = new DataReq();
         req.setReqType(1);
         req.setUserId("test");
         m_netSocket.write(GSON.toJson(req));    
         setSocketRead();
    }
    
    
    public void doDemoStart(){
        doConnnect();
        while(!isConnect){
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
 
        doSendMsg();
    }
    
    public void setClientClose(){
        if(m_netSocket!=null){
            m_netSocket.close();    
        }
        if(m_client!=null){
            m_client.close();    
        }
            
        
    }
    
    public void setVertxClose(){
        vertx.close();
    }
    
    
    
 
    public static void main(String[] args){
        VertXSampleClient a = new VertXSampleClient("127.0.0.1",10000);
        a.doDemoStart();
        
    }
}
 
cs


메이븐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
 
  <groupId>com.scblood.test</groupId>
  <artifactId>vertx</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
 
  <name>vertx</name>
  <url>http://maven.apache.org</url>
 
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
 
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
     <dependency>
          <groupId>io.vertx</groupId>
          <artifactId>vertx-core</artifactId>
          <version>3.5.4</version>
    </dependency>
        
    
    <dependency>
           <groupId>com.google.code.gson</groupId>
           <artifactId>gson</artifactId>
           <version>2.7</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <optional>true</optional>
    </dependency>
  </dependencies>
  
  <build>
        <plugins>
            <plugin>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.9</version>
                <configuration>
 
                    <downloadSources>true</downloadSources>
                    <downloadJavadocs>true</downloadJavadocs>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <compilerArgument>-Xlint:all</compilerArgument>
                    <showWarnings>true</showWarnings>
                    <showDeprecation>true</showDeprecation>
 
                </configuration>
            </plugin>
            <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-dependency-plugin</artifactId>
      <executions>
         <execution>
            <id>copy-dependencies</id>
            <phase>prepare-package</phase>
            <goals>
              <goal>copy-dependencies</goal>
            </goals>
            <configuration>
               <outputDirectory>${project.build.directory}/lib</outputDirectory>
               <overWriteReleases>false</overWriteReleases>
               <overWriteSnapshots>false</overWriteSnapshots>
               <overWriteIfNewer>true</overWriteIfNewer>
            </configuration>
        </execution>
      </executions>
   </plugin>
   <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-jar-plugin</artifactId>
      <configuration>
        <archive>
           <manifest>
            <addClasspath>true</addClasspath>
            <classpathPrefix>lib/</classpathPrefix>
            <mainClass>theMainClass</mainClass>
           </manifest>
         </archive>
       </configuration>
   </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <configuration>
                    <mainClass>org.test.int1.Main</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
 
cs


송수신 Object

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.scblood.test.vertx.cs;
 
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.google.gson.annotations.SerializedName;
 
import lombok.Data;
 
/**
 * Request 데이터 빈
 * 롬복으로 세팅
 * @author scblood.egloos.com
 *
 */
@Data
public class DataReq {
    
    @JsonProperty("req_type")
    @SerializedName("req_type")        
    private int reqType;
    
    @JsonProperty("user_id")
    @SerializedName("user_id")        
    private String userId;
    
    @JsonProperty("ext_data")
    @SerializedName("ext_data")    
    @JsonInclude(Include.NON_NULL)
    private String extData;
    
}
 
cs



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.scblood.test.vertx.cs;
 
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.annotations.SerializedName;
 
import lombok.Data;
 
@Data
public class DataRes {
 
    @JsonProperty("res_code")
    @SerializedName("res_code")    
    private int resCd;
    
    @JsonProperty("res_msg")
    @SerializedName("res_msg")        
    private String resMsg;
    
    @JsonProperty("data")
    private Object data;
}
 
cs

덧글

  • 은이 2018/11/02 13:30 # 답글

    뒤에꺼 보고 이걸 봤군.
    이벤트 처리 방식이 택시면 이건 대중교통 같은건가..
    택시같은 이벤트 리스너대신 버스를 상시 굴려서 효율화를 추구한다는 사상같기도 하고.. 인터레스뜅!
  • 떠리 2018/11/02 14:10 #

    클러스터 안붙이고 단순 TCP통신할라고 쓰는거지 Node.js 말고 걍 임베디드로 소켓쓰기 싫어서 하는거임. 이벤트리스너는 서버끼리 통신할때는 유용할듯.
댓글 입력 영역