Recent Posts
Recent Comments
Link
10-15 06:21
Today
Total
관리 메뉴

삶 가운데 남긴 기록 AACII.TISTORY.COM

NIO TCP Non-Blocking Channel 본문

DEV&OPS/Java

NIO TCP Non-Blocking Channel

ALEPH.GEM 2022. 5. 26. 15:02

Non-Blocking

블로킹 방식은 accept()에서 블로킹 되고, read() 메소드도 데이터를 읽을 준비를 위해 블로킹 됩니다.

그래서 연결된 SocketChannel당 하나의 스레드가 할당되어야 합니다.

그래서 연결된 클라이언트 수가 많으면 스레드 풀(ExecutorService)를 사용했었습니다.

그런데 non-blocking 방식에서는 connect(), accept(), read(), write() 메소드에서 블로킹을 하지 않습니다.

non-blocking방식에서는 요청이 없거나 데이터를 보내지 않으면 null이나 0을 즉시 리턴하며 버퍼에는 아무 데이터도 저장되지 않습니다. 그래서 Selector를 사용해서 이벤트 리스너 역할을 합니다.

 

Non-blocking 채팅 서버

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Vector;

import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;

public class NonBlockingServerEx extends Application {
	//셀렉터
	Selector selector;
	//서버소켓 채널
	ServerSocketChannel serverSocketChannel;
	//멀티스레드에 안전한 벡터로 커넥션 리스트를 구성
	List<Client> connections = new Vector<Client>();
	
	//서버 시작 메소드
	void startServer() {
		
		try {
			//selector 생성
			selector = Selector.open();
			//서버 소켓 채널을 생성
			serverSocketChannel = ServerSocketChannel.open();
			serverSocketChannel.configureBlocking(false);	//non-blocking 모드
			serverSocketChannel.bind(new InetSocketAddress(5001));	//통신 포트 5001 바인딩
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);	//selector에 작업 유형(accept)을 등록
		} catch (IOException e) {
			if(serverSocketChannel.isOpen()) {
				stopServer();
			}
			e.printStackTrace();
			return;
		}
		
		Thread thread = new Thread() {

			@Override
			public void run() {
				while(true) {
					try {
						int keyCount = selector.select(); //작업 처리 준비가 된 채널이 생길때까지 대기. SelectionKey의 통보를 받기 위해 블로킹 대기
						if(keyCount == 0) {
							continue; //wakeup()되면 selector.select()는 0을 리턴
						}
						Set<SelectionKey> selectedKeys = selector.selectedKeys(); //셀렉터는 작업 처리 준비가 된 키를 얻고 Set으로 리턴
						Iterator<SelectionKey> iterator = selectedKeys.iterator();
						
						while(iterator.hasNext()) {
							SelectionKey selectionKey = iterator.next();
							if(selectionKey.isAcceptable()) {
								accept(selectionKey);
							}else if(selectionKey.isReadable()) {
								Client client = (Client)selectionKey.attachment();
								client.receive(selectionKey);
							}else if(selectionKey.isWritable()) {
								Client client = (Client)selectionKey.attachment();
								client.send(selectionKey);
							}
							iterator.remove(); //처리완료된 SelectionKey를 제거
						}
						
					}catch(Exception e) {
						//예외 발생시 소켓채널이 열려있으면 stopServer()를 호출해 종료.
						if(serverSocketChannel.isOpen()) {
							stopServer();
						}
						break;
					}
					
				}
			}
			
		};
		thread.start();
		
		Platform.runLater(()->{
			displayText("[서버 시작]");
			btnStartStop.setText("stop");
		});
	}
	
	//서버 종료 메소드
	void stopServer() {
		try {
			Iterator<Client> iterator = connections.iterator();
			while(iterator.hasNext()) {
				Client client = iterator.next();
				client.socketChannel.close();
				iterator.remove();
			}
			if(serverSocketChannel != null && serverSocketChannel.isOpen()) {
				serverSocketChannel.close();
			}
			if(selector != null && selector.isOpen()) {
				selector.close();
			}
			Platform.runLater(()->{
				displayText("[서버 멈춤]");
				btnStartStop.setText("start");
			});
		}catch(Exception e) {

		}
	}
	
	//연결 수락 동작
	void accept(SelectionKey selectionKey) {
		try {
			ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
			//연결 수락
			SocketChannel socketChannel = serverSocketChannel.accept();
			String message = "[연결 수락: "+socketChannel.getRemoteAddress()+ " : "+Thread.currentThread().getName()+"]";
			Platform.runLater(()->displayText(message));
			
			//클라이언트 객체 생성
			Client client = new Client(socketChannel);
			connections.add(client);
			
			Platform.runLater(()->displayText("[연결 개수: "+connections.size() + "]"));
		}catch(Exception e) {
			if(serverSocketChannel.isOpen()) {
				stopServer();
			}
		}
	}
	
	//데이터 통신 코드
	class Client{
		SocketChannel socketChannel;
		String sendData; //클라이언트로 보낼 데이터를 저장하는 필드

		public Client(SocketChannel socketChannel) throws IOException {
			this.socketChannel = socketChannel;
			socketChannel.configureBlocking(false);
			SelectionKey selectionKey = socketChannel.register(selector,  SelectionKey.OP_READ);
			selectionKey.attach(this);
		}
		
		//클라이언트로부터 데이터 받기
		void receive(SelectionKey selectionKey) {
			try {
				ByteBuffer byteBuffer = ByteBuffer.allocate(100);
							
				//클라이언트가 비정상 종료 했을 경우 IOException 발생
				int byteCount = socketChannel.read(byteBuffer);	//받은 데이터 읽기
							
				//클라이언트가 정상적으로 close()를 호출한 경우
				if(byteCount == -1) {
					throw new IOException();
				}
							
				String message = "[요청 처리: " + socketChannel.getRemoteAddress()+" : "+Thread.currentThread().getName()+"]";
				Platform.runLater(()->displayText(message));
							
				//버퍼 내용을 문자열로 변환
				byteBuffer.flip();
				Charset charset = Charset.forName("UTF-8");
				String data = charset.decode(byteBuffer).toString();
							
				for(Client client : connections) {
					//모든 클라이언트에게 전송
					client.sendData = data;
					SelectionKey key = client.socketChannel.keyFor(selector);
					key.interestOps(SelectionKey.OP_WRITE); //작업 유형 변경
				}
				
				selector.wakeup();
			}catch(Exception e) {
				try {
					connections.remove(Client.this);
					String message = "[클라이언트 통신 안됨: "+socketChannel.getRemoteAddress() + " : "+ Thread.currentThread().getName() + "]";
					Platform.runLater(()->displayText(message));		
					socketChannel.close();
				}catch(IOException e2) {
					
				}
			}
		}

		//클라이언트로 데이터 전송
		void send(SelectionKey selectionKey) {
			try {
				//버퍼 내용을 클라이언트에 보내기
				Charset charset = Charset.forName("UTF-8");
				ByteBuffer byteBuffer = charset.encode(sendData);
				socketChannel.write(byteBuffer); //데이터 보내기
				selectionKey.interestOps(SelectionKey.OP_READ); //작업 유형 변경
				selector.wakeup(); //변경된 작업 유형을 감지하도록 selector의 selec()블로킹을 해제
			}catch(Exception e) {
				try{
					String message = "[클라이언트 통신 안됨: "+socketChannel.getRemoteAddress() + " : "+ Thread.currentThread().getName() + "]";
					Platform.runLater(()->displayText(message));
					connections.remove(Client.this);
					socketChannel.close();
				}catch(IOException e2) {
					
				}
			}				
		}
	}
	
	//UI 생성 코드
	TextArea txtDisplay;
	Button btnStartStop;
	@Override
	public void start(Stage primaryStage) throws Exception {
		BorderPane root = new BorderPane();
		root.setPrefSize(500, 300);
		
		txtDisplay = new TextArea();
		txtDisplay.setEditable(false);
		BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
		root.setCenter(txtDisplay);
		
		btnStartStop = new Button("start");
		btnStartStop.setPrefHeight(30);
		btnStartStop.setMaxWidth(Double.MAX_VALUE);
		btnStartStop.setOnAction(e->{
			if(btnStartStop.getText().equals("start")) {
				startServer();
			}else {
				stopServer();
			}
		});
		root.setBottom(btnStartStop);
		
		Scene scene = new Scene(root);
		scene.getStylesheets().add(getClass().getResource("app.css").toString());
		primaryStage.setScene(scene);
		primaryStage.setTitle("Server");
		//닫기 버튼 이벤트 처리
		primaryStage.setOnCloseRequest(event->stopServer());
		primaryStage.show();
	}	
	
	void displayText(String text) {
		txtDisplay.appendText(text + "\n");
	}

	public static void main(String[] args) {
		launch(args);
	}



}

 

Non-blocking 방식은 서버를 구현할 때 사용하는 방법이므로 채팅 클라이언트는 이전 게시글의 채팅 클라이언트 예제를 사용하면됩니다.

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;

public class ClientExample extends Application {
	SocketChannel socketChannel;
	
	//연결 시작
	void startClient() {
		//스레드 생성
		Thread thread = new Thread() {
			public void run() {
				try {
					socketChannel = SocketChannel.open(); //소켓 생성
					socketChannel.configureBlocking(true); //블로킹 모드
					socketChannel.connect(new InetSocketAddress("localhost", 5001)); //소켓 연결 요청
					Platform.runLater(()->{
						try {
							displayText("[연결 완료: " + socketChannel.getRemoteAddress()+"]");
							btnConn.setText("stop");
							btnSend.setDisable(false);							
						}catch(Exception e) {
							e.printStackTrace();
						}
					});
				}catch(Exception e) {
					Platform.runLater(()->displayText("[서버 통신 안됨]"));
					if(socketChannel.isOpen()) {
						stopClient();
					}
					return;
				}
				//서버에서 보낸 데이터 받기
				receive();
			}
		};
		thread.start();
	}
	
	//연결 끊기
	void stopClient() {
		try {
			Platform.runLater(()->{
				displayText("[연결 끊음]");
				btnConn.setText("start");
				btnSend.setDisable(true);
			});
			if(socketChannel != null && socketChannel.isOpen()) {
				socketChannel.close();
			}
		}catch(IOException e) {
			e.printStackTrace();
		}
	}
	
	//서버에서 데이터 받기
	void receive() {
		while(true) {
			try {
				ByteBuffer byteBuffer = ByteBuffer.allocate(100);
				
				//서버가 비정상 종료인경우 IOException 발생
				int readByteCount = socketChannel.read(byteBuffer); //데이터 받기
				
				//서버가 정성 종료 close()를 호출한경우
				if(readByteCount == -1) {
					throw new IOException();
				}
				
				//버퍼의 내용을 문자열로 변환
				byteBuffer.flip();
				Charset charset = Charset.forName("UTF-8");
				String data = charset.decode(byteBuffer).toString();
				
				Platform.runLater(()->displayText("[받기 완료]"+data));
			}catch(Exception e) {
				Platform.runLater(()->displayText("[서버 통신 안됨]"));
				stopClient();
				break;
			}
		}
	}
	
	//서버로 데이터 보내기
	void send(String data) {
		//스레드 생성
		Thread thread = new Thread() {
			public void run() {
				try {
					Charset charset = Charset.forName("UTF-8");
					ByteBuffer byteBuffer = charset.encode(data);
					socketChannel.write(byteBuffer); 	//서버로 데이터 전송
					Platform.runLater(()->displayText("[보내기 완료]"));
				}catch(Exception e) {
					Platform.runLater(()->displayText("[서버 통신 안됨]"));
					stopClient();
				}
			}
		};
		thread.start();
	}

	// UI생성 코드
	TextArea txtDisplay;
	TextField txtInput;
	Button btnConn, btnSend;
	
	@Override
	public void start(Stage primaryStage) throws Exception {
		BorderPane root = new BorderPane();
		root.setPrefSize(500, 300);
		
		txtDisplay = new TextArea();
		txtDisplay.setEditable(false);
		BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
		root.setCenter(txtDisplay);
		
		BorderPane bottom = new BorderPane();
		txtInput = new TextField();
		txtInput.setPrefSize(60, 30);
		BorderPane.setMargin(txtInput, new Insets(0,1,1,1));
		
		btnConn = new Button("start");
		btnConn.setPrefSize(60, 30);
		//start stop 버튼이벤트 처리
		btnConn.setOnAction(e->{
			if(btnConn.getText().equals("start")) {
				startClient();
			}else if(btnConn.getText().equals("stop")) {
				stopClient();
			}
		});
		
		btnSend = new Button("send");
		btnSend.setPrefSize(60, 30);
		btnSend.setDisable(true);
		//send 버튼 이벤트 처리
		btnSend.setOnAction(e->send(txtInput.getText()));
		
		bottom.setCenter(txtInput);
		bottom.setLeft(btnConn);
		bottom.setRight(btnSend);
		root.setBottom(bottom);
		
		Scene scene = new Scene(root);
		scene.getStylesheets().add(getClass().getResource("app.css").toString());
		primaryStage.setScene(scene);
		primaryStage.setTitle("Client");
		primaryStage.setOnCloseRequest(event->stopClient());
		primaryStage.show();
	}
	
	void displayText(String text) {
		txtDisplay.appendText(text+"\n");
	}

	public static void main(String[] args) {
		launch(args);
	}

}

 

 

 

 

 

 

 

 

 

 

 

728x90

'DEV&OPS > Java' 카테고리의 다른 글

BASE64 (64진법)  (2) 2022.05.26
NIO TCP 비동기 채널방식 채팅 서버/클라이언트  (0) 2022.05.26
NIO TCP 서버/클라이언트 채팅 프로그램  (0) 2022.05.26
NIO 파일 채널  (0) 2022.05.25
NIO Buffer  (0) 2022.05.24