삶 가운데 남긴 기록 AACII.TISTORY.COM
NIO TCP 서버/클라이언트 채팅 프로그램 본문
TCP 블로킹 채널
NIO에서는 non-blocking과 blocking 동기와 비동기 모두 제공하고 있습니다.
ServerSocketChannel은 클라이언트의 SockeChannel의 연결 요청을 수락하고 SocketChannel을 생성 한 후에 서버와 클라이언트간 통신을 수행합니다.
소켓 채널 연결 및 데이터 전송
서버측 예제
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ServerEx {
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;
try {
//서버 소켓 채널 오픈
serverSocketChannel = ServerSocketChannel.open();
//블로킹 모드
serverSocketChannel.configureBlocking(true);
//포트 바인딩
serverSocketChannel.bind(new InetSocketAddress(5001));
//연결대기
while(true) {
System.out.println("[연결 대기]");
SocketChannel socketChannel = serverSocketChannel.accept();
InetSocketAddress isa = (InetSocketAddress)socketChannel.getRemoteAddress();
System.out.println("[연결 수락]:"+isa.getHostName());
ByteBuffer byteBuffer = null;
Charset charset = Charset.forName("UTF-8");
byteBuffer = ByteBuffer.allocate(100);
int byteCount = socketChannel.read(byteBuffer);
byteBuffer.flip();
String message = charset.decode(byteBuffer).toString();
System.out.println("[받은 데이터]:"+message);
byteBuffer = charset.encode("서버에서 클라이언트로 전송되는 메시지");
socketChannel.write(byteBuffer);
System.out.println("[데이터 전송 완료]");
}
}catch(Exception e) {
e.printStackTrace();
}
//ServerSocketChannel 닫기
if(serverSocketChannel.isOpen()) {
try {
serverSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
클라이언트측 예제
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ClientEx {
public static void main(String[] args) {
SocketChannel socketChannel = null;
try {
//소켓 채널 열기
socketChannel = SocketChannel.open();
//블로킹 모드
socketChannel.configureBlocking(true);
System.out.println("[연결 요청]");
socketChannel.connect(new InetSocketAddress("localhost", 5001));
System.out.println("[연결 성공]");
ByteBuffer byteBuffer = null;
Charset charset = Charset.forName("UTF-8");
byteBuffer = charset.encode("클라이언트에서 서버로 전송되는 메시지");
socketChannel.write(byteBuffer);
System.out.println("[데이터 전송 성공]");
byteBuffer = ByteBuffer.allocate(100);
int byteCount = socketChannel.read(byteBuffer);
byteBuffer.flip();
String message = charset.decode(byteBuffer).toString();
System.out.println("[받은 데이터]:" + message);
} catch (IOException e) {
e.printStackTrace();
}
//연결 종료
if(socketChannel.isOpen()) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
서버측 예제를 먼저 실행시키고 클라이언트를 나중에 실행시켜야 합니다.
스레드 병렬 처리
블로킹 방식은 입출력 작업시 블록되기 때문에 클라이언트 연결마다 작업 스레드를 할당해서 병렬 처리 해야 합니다.
채팅 서버 구현
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;
public class ServerExample extends Application {
//스레드 풀
ExecutorService executorService;
//서버소켓 채널
ServerSocketChannel serverSocketChannel;
//멀티스레드에 안전한 벡터로 커넥션 리스트를 구성
List<Client> connections = new Vector<Client>();
//서버 시작 메소드
void startServer() {
//CPU 코어수에 맞는 스레드를 생성해서 관리하는 스레드 풀
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
//서버 소켓 채널을 생성
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true); //블로킹 모드
serverSocketChannel.bind(new InetSocketAddress(5001)); //통신 포트 5001 바인딩
} catch (IOException e) {
if(serverSocketChannel.isOpen()) {
stopServer();
}
e.printStackTrace();
}
Runnable runnable = new Runnable() {
@Override
public void run() {
Platform.runLater(()->{
displayText("[서버시작]");
btnStartStop.setText("stop");
});
while(true) {
try {
//연결 수락
SocketChannel socketChannel = serverSocketChannel.accept();
String message = "[연결 수락: "+socketChannel.getRemoteAddress()+" : "+Thread.currentThread().getName() + "]";
Platform.runLater(()->displayText(message));
//클라이언트 인스턴스 객체
Client client = new Client(socketChannel);
connections.add(client);
Platform.runLater(()->displayText("[연결 개수: "+connections.size() + "]"));
}catch(Exception e) {
if(serverSocketChannel.isOpen()) {
stopServer();
}
break;
}
}
}
};
//스레드 풀에 스레드를 제출
executorService.submit(runnable);
}
//서버 종료 메소드
void stopServer() {
try {
Iterator<Client> iterator = connections.iterator();
while(iterator.hasNext()) {
Client client = iterator.next();
client.socketChannel.close();
iterator.remove();
}
if(serverSocketChannel != null && serverSocketChannel.isOpen()) {
serverSocketChannel.close();
}
if(executorService != null && !executorService.isShutdown()) {
executorService.shutdown();
}
Platform.runLater(()->{
displayText("[서버 멈춤]");
btnStartStop.setText("start");
});
}catch(Exception e) {
}
}
//데이터 통신 코드
class Client{
SocketChannel socketChannel;
public Client(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
receive();
}
//클라이언트로부터 데이터 받기
void receive() {
Runnable runnable = new Runnable() {
@Override
public void run() {
while(true) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
//클라이언트가 비정상 종료 했을 경우 IOException 발생
int readByteCount = socketChannel.read(byteBuffer); //받은 데이터 읽기
//클라이언트가 정상적으로 close()를 호출한 경우
if(readByteCount == -1) {
throw new IOException();
}
String message = "[요청 처리: " + socketChannel.getRemoteAddress()+" : "+Thread.currentThread().getName()+"]";
Platform.runLater(()->displayText(message));
//버퍼 내용을 문자열로 변환
byteBuffer.flip();
Charset charset = Charset.forName("UTF-8");
String data = charset.decode(byteBuffer).toString();
for(Client client : connections) {
//모든 클라이언트에게 전송
client.send(data);
}
}catch(Exception e) {
try {
connections.remove(Client.this);
String message = "[클라이언트 통신 안됨: "+socketChannel.getRemoteAddress() + " : "+ Thread.currentThread().getName() + "]";
Platform.runLater(()->displayText(message));
socketChannel.close();
}catch(IOException e2) {
e2.printStackTrace();
}
break;
}
}
}
};
//스레드 풀에 제출
executorService.submit(runnable);
}
//클라이언트로 데이터 전송
void send(String data) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
//버퍼 내용을 클라이언트에 보내기
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode(data);
socketChannel.write(byteBuffer);
}catch(Exception e) {
try{
String message = "[클라이언트 통신 안됨: "+socketChannel.getRemoteAddress() + " : "+ Thread.currentThread().getName() + "]";
Platform.runLater(()->displayText(message));
connections.remove(Client.this);
socketChannel.close();
}catch(IOException e2) {
e2.printStackTrace();
}
}
}
};
//스레드 풀에 제출
executorService.submit(runnable);
}
}
//UI 생성 코드
TextArea txtDisplay;
Button btnStartStop;
@Override
public void start(Stage primaryStage) throws Exception {
BorderPane root = new BorderPane();
root.setPrefSize(500, 300);
txtDisplay = new TextArea();
txtDisplay.setEditable(false);
BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
root.setCenter(txtDisplay);
btnStartStop = new Button("start");
btnStartStop.setPrefHeight(30);
btnStartStop.setMaxWidth(Double.MAX_VALUE);
btnStartStop.setOnAction(e->{
if(btnStartStop.getText().equals("start")) {
startServer();
}else {
stopServer();
}
});
root.setBottom(btnStartStop);
Scene scene = new Scene(root);
scene.getStylesheets().add(getClass().getResource("app.css").toString());
primaryStage.setScene(scene);
primaryStage.setTitle("Server");
//닫기 버튼 이벤트 처리
primaryStage.setOnCloseRequest(event->stopServer());
primaryStage.show();
}
void displayText(String text) {
txtDisplay.appendText(text + "\n");
}
public static void main(String[] args) {
launch(args);
}
}
javafx 를 위한 app.css
/*text-area 배경색*/
.text-area{
-fx-background-color: gold;
}
/*scroll-pane 배경색*/
.text-area .scroll-pane{
-fx-background-color: transparent;
}
/*viewport 배경색*/
.text-area .scroll-pane .viewport{
-fx-background-color: transparent;
}
/*content 배경색*/
.text-area .scroll-pane .content{
-fx-background-color: transparent;
}
채팅 클라이언트
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;
public class ClientExample extends Application {
SocketChannel socketChannel;
//연결 시작
void startClient() {
//스레드 생성
Thread thread = new Thread() {
public void run() {
try {
socketChannel = SocketChannel.open(); //소켓 생성
socketChannel.configureBlocking(true); //블로킹 모드
socketChannel.connect(new InetSocketAddress("localhost", 5001)); //소켓 연결 요청
Platform.runLater(()->{
try {
displayText("[연결 완료: " + socketChannel.getRemoteAddress()+"]");
btnConn.setText("stop");
btnSend.setDisable(false);
}catch(Exception e) {
e.printStackTrace();
}
});
}catch(Exception e) {
Platform.runLater(()->displayText("[서버 통신 안됨]"));
if(socketChannel.isOpen()) {
stopClient();
}
return;
}
//서버에서 보낸 데이터 받기
receive();
}
};
thread.start();
}
//연결 끊기
void stopClient() {
try {
Platform.runLater(()->{
displayText("[연결 끊음]");
btnConn.setText("start");
btnSend.setDisable(true);
});
if(socketChannel != null && socketChannel.isOpen()) {
socketChannel.close();
}
}catch(IOException e) {
e.printStackTrace();
}
}
//서버에서 데이터 받기
void receive() {
while(true) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
//서버가 비정상 종료인경우 IOException 발생
int readByteCount = socketChannel.read(byteBuffer); //데이터 받기
//서버가 정성 종료 close()를 호출한경우
if(readByteCount == -1) {
throw new IOException();
}
//버퍼의 내용을 문자열로 변환
byteBuffer.flip();
Charset charset = Charset.forName("UTF-8");
String data = charset.decode(byteBuffer).toString();
Platform.runLater(()->displayText("[받기 완료]"+data));
}catch(Exception e) {
Platform.runLater(()->displayText("[서버 통신 안됨]"));
stopClient();
break;
}
}
}
//서버로 데이터 보내기
void send(String data) {
//스레드 생성
Thread thread = new Thread() {
public void run() {
try {
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode(data);
socketChannel.write(byteBuffer); //서버로 데이터 전송
Platform.runLater(()->displayText("[보내기 완료]"));
}catch(Exception e) {
Platform.runLater(()->displayText("[서버 통신 안됨]"));
stopClient();
}
}
};
thread.start();
}
// UI생성 코드
TextArea txtDisplay;
TextField txtInput;
Button btnConn, btnSend;
@Override
public void start(Stage primaryStage) throws Exception {
BorderPane root = new BorderPane();
root.setPrefSize(500, 300);
txtDisplay = new TextArea();
txtDisplay.setEditable(false);
BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
root.setCenter(txtDisplay);
BorderPane bottom = new BorderPane();
txtInput = new TextField();
txtInput.setPrefSize(60, 30);
BorderPane.setMargin(txtInput, new Insets(0,1,1,1));
btnConn = new Button("start");
btnConn.setPrefSize(60, 30);
//start stop 버튼이벤트 처리
btnConn.setOnAction(e->{
if(btnConn.getText().equals("start")) {
startClient();
}else if(btnConn.getText().equals("stop")) {
stopClient();
}
});
btnSend = new Button("send");
btnSend.setPrefSize(60, 30);
btnSend.setDisable(true);
//send 버튼 이벤트 처리
btnSend.setOnAction(e->send(txtInput.getText()));
bottom.setCenter(txtInput);
bottom.setLeft(btnConn);
bottom.setRight(btnSend);
root.setBottom(bottom);
Scene scene = new Scene(root);
scene.getStylesheets().add(getClass().getResource("app.css").toString());
primaryStage.setScene(scene);
primaryStage.setTitle("Client");
primaryStage.setOnCloseRequest(event->stopClient());
primaryStage.show();
}
void displayText(String text) {
txtDisplay.appendText(text+"\n");
}
public static void main(String[] args) {
launch(args);
}
}
블로킹과 인터럽트
IO의 Socket에서는 read(), wirte()에 의해 스레드가 블로킹되었을 때 다른 스레드가 interrupt()를 호출해도 블로킹 상태가 풀리지 않아서 Socket의 close()메소드를 호출해서 SocketException을 발생시켜야 합니다.
그러나 NIO의 SocketChannel의 경우 스레드가 블로킹 되었을 때 다른 스레드가 interrupt()를 호출하면 ClosedByInterruptException이 발생하고 SocketChannel이 닫히면서 블로킹 상태가 풀립니다.
728x90
'DEV&OPS > Java' 카테고리의 다른 글
NIO TCP 비동기 채널방식 채팅 서버/클라이언트 (0) | 2022.05.26 |
---|---|
NIO TCP Non-Blocking Channel (0) | 2022.05.26 |
NIO 파일 채널 (0) | 2022.05.25 |
NIO Buffer (0) | 2022.05.24 |
NIO Path (0) | 2022.05.23 |