Интернет Програмиране с Java

1.10. Разработка на chat клиент/сървър

Светлин Наков

www.nakov.com

Многопотребителски сървър за разговори (multi-user chat server)

Нека сега си поставим една по-сложна задача – реализация на сървър за разговори (chat server). Чрез него ще демонстрираме силата на многонишковото програмиране при разработка на мрежови приложения. Да разгледаме първо една примерна реализация на многопотребителски сървър за разговори:

ChatServer.java
import java.io.*;
import java.net.*;
import java.util.Vector;
 
public class ChatServer {
    public static void main(String[] args)
    throws IOException {
        ServerSocket serverSocket = new ServerSocket(5555);
        System.out.println("Chat server started on port " +
            serverSocket.getLocalPort());
 
        ServerMsgDispatcher dispatcher =
            new ServerMsgDispatcher();
        dispatcher.start();
 
        while (true) {
            Socket clientSocket = serverSocket.accept();
            ClientListener clientListener =
                new ClientListener(clientSocket, dispatcher);
            dispatcher.addClient(clientSocket);
            clientListener.start();
        }
    }
}
 
class ClientListener extends Thread {
    private Socket mSocket;
    private ServerMsgDispatcher mDispatcher;
    private BufferedReader mSocketReader;
 
    public ClientListener(Socket aSocket,
        ServerMsgDispatcher aServerMsgDispatcher)
    throws IOException {
        mSocket = aSocket;
        mSocketReader = new BufferedReader(
            new InputStreamReader(
                mSocket.getInputStream()));
        mDispatcher = aServerMsgDispatcher;
    }
 
    public void run() {
        try {
            while (!isInterrupted()) {
                String msg = mSocketReader.readLine();
                if (msg == null)
                    break;
                mDispatcher.dispatchMsg(mSocket, msg);
            }
        } catch (IOException ioex) {
            System.err.println("Error communicating " +
                "with some of the clients.");
        }
        mDispatcher.deleteClient(mSocket);
    }
}
 
class ServerMsgDispatcher extends Thread {
    private Vector mClients = new Vector();
    private Vector mMsgQueue = new Vector();
 
    public synchronized void addClient(Socket aClientSocket) {
        mClients.add(aClientSocket);
    }
 
    public synchronized void deleteClient(Socket aClientSock) {
        int i = mClients.indexOf(aClientSock);
        if (i != -1) {
            mClients.removeElementAt(i);
            try {
                aClientSock.close();
            } catch (IOException ioe) {
                // Probably the socket already is closed
            }
        }
    }
 
    public synchronized void dispatchMsg(
            Socket aSocket, String aMsg) {
        String IP = aSocket.getInetAddress().getHostAddress();
        String port = "" + aSocket.getPort();
        aMsg = IP + ":" + port + " : " + aMsg + "\n\r";
        mMsgQueue.add(aMsg);
        notify();
    }
 
    private synchronized String getNextMsgFromQueue()
    throws InterruptedException {
        while (mMsgQueue.size() == 0)
            wait();
        String msg = (String) mMsgQueue.get(0);
        mMsgQueue.removeElementAt(0);
        return msg;
    }
 
    private synchronized void sendMsgToAllClients(String aMsg) { 
        for (int i=0; i<mClients.size(); i++) {
            Socket socket = (Socket) mClients.get(i);
            try {
                OutputStream out = socket.getOutputStream(); 
                out.write(aMsg.getBytes());
                out.flush();
            } catch (IOException ioe) {
                deleteClient(socket);
            }
        }
    }
 
    public void run() {
        try {
            while (true) {
                String msg = getNextMsgFromQueue();
                sendMsgToAllClients(msg);
            }
        } catch (InterruptedException ie) {
            // Thread interrupted. Do nothing
        }
    }
}

Как работи сървърът за разговори

Като функционалност сървърът не е много сложен. Единственото, което прави, е да приема съобщения от клиентите си и да изпраща всяко прието съобщение до всеки клиент, като отбелязва в него от кого го е получил. Можем да го изтестваме като отворим няколко telnet-сесии към порт 5555 по същия начин, както в предния пример с Date-сървъра.

Сървърът има две основни нишки. Едната е главната програма (ChatServer), която слуша на порт 5555 и приема нови клиенти, а другата е нишката-диспечер (ServerMsgDispatcher), която разпраща получените от клиентите съобщения до всички свързани към сървъра. За всеки клиент в сървъра се създава още една нишка (обект от класа ClientListener), която служи за получаване на съобщения от него. При стартирането си сървърът отваря за слушане порт 5555, създава диспечера за съобщения и го стартира. След това в безкраен цикъл започва да приема клиенти. При приемане на нов клиент той първо се добавя в списъка на диспечера, а след това се създава една нишка за получаване на съобщенията идващи от него и тази нишка се стартира.

Нишката за получаване на съобщения от клиент в основния си цикъл (метода run()) чете съобщения от клиента, добавя ги в опашката на диспечера (извиквайки метода DispatchMsg()), след което го събужда ако е заспал (като му вика notify() метода). Четенето на съобщение става с метода readLine() и е операция, която блокира нишката докато не пристигне съобщение или не настъпи грешка. При настъпване на грешка, клиентът се премахва от списъка на диспечера (чрез извикване на deleteClient()).

Нишката ServerMsgDispatcher е добър пример за приложение на модела “производител – потребител” в практиката. В основния си цикъл (в метода run()) нишката взема от опашката си поредното съобщение и го разпраща до всички клиенти. В този цикъл тя се явява потребител (консуматор) на съобщения. Ако опашката е празна, нишката чака (като извиква wait()) докато пристигне ново съобщение. Съобщенията пристигат асинхронно чрез извиквания от нишките за обслужване на клиент. Клиентите играят ролята на производител на съобщения. Диспечерът пази всички активни клиенти в един списък. За да поддържа списъка актуален, сървърът добавя в него всеки нов клиент при пристигането му и го премахва от там при първия неуспешен опит за изпращане или получаване на съобщение от него (т.е. когато връзката със клиента се разпадне). Така например, ако клиентът затвори сокета, той ще бъде премахнат от списъка, защото четенето на съобщение от него ще се провали.

Защо сървърът за разговори не е добре написан

Макар и всичко да изглежда добре, в този сървър има един сериозен проблем. Сървърът наистина е способен да обслужва много клиенти едновременно, но не може все още да твърди, че е добре написан. Проблемът е, че нишката-диспечер, която изпраща получените съобщения, работи последователно с един for-цикъл. Тя не преминава към изпращането на следващо съобщение от опашката докато не изпрати текущото съобщение на всички клиенти. Ако връзката с един от клиентите е много бавна, заради него ще се наложи всички да чакат преди да получат следващото изпратено съобщение. Следователно е необходимо диспечерът да разпраща съобщенията от опашката в някакъв смисъл паралелно.

Как може да се подобри chat сървъра

Единият вариант е да се създава по една нишка за всяко изпращане на съобщение до някой клиент. Това обаче означава, че ако сървърът има 1000 клиента и получи почти едновременно 100 съобщения, ще трябва да създаде 100000 нишки, за да разпрати съобщенията до клиентите. Създаването и изпълнението на толкова много нишки обаче, изисква огромно количество процесорно време, памет и други ресурси (особено при програмиране на Java), така че ще ни е необходим доста мощен компютър за да може така модифицираният chat-сървър да работи със задоволителна скорост.

Има и по-разумен вариант – при свързването на нов клиент за него да се създава още една нишка, която служи за разпращане на съобщенията, предназначени конкретно за него. Тази нишка трябва да поддържа опашка от съобщения, защото ако съобщенията пристигат по-бързо отколкото се могат да се изпратят, ще възникне проблем. Тя трябва да заспива, когато опашката е празна и да се събужда, когато в нея постъпи съобщение, за да започне изпращането му. Тази нишка може да се реализира по същия начин като класа ServerMsgDispatcher, защото има много сходна функционалност. Тя трябва само да чака в опашката й да постъпят някакви съобщения, след това да ги разпраща едно по едно. Да видим как можем да реализираме описаната идея.

Разработка на истински многопотребителски chat сървър

NakovChatServer.java
/** 
 * Nakov Chat Server 
 * (c) Svetlin Nakov, 2002 
 * http://www.nakov.com 
 * 
 * Nakov Chat Server is multithreaded chat server. It accepts 
 * multiple clients simultaneously and serves them. Clients are 
 * able to send messages to the server. When some client sends 
 * a message to the server, the message is dispatched to all 
 * the clients connected to the server. 
 * 
 * The server consists of two components - "server core" and 
 * "client handlers". 
 * 
 * The "server core" consists of two threads: 
 *   - NakovChatServer - accepts client connections, creates 
 * client threads to handle them and starts these threads 
 *   - ServerDispatcher - waits for messages and when some 
 * message arrive sends it to all the clients connected to 
 * the server 
 * 
 * The "client handlers" consist of two threads: 
 *   - ClientListener - listens for message arrivals from the 
 * socket and forwards them to the ServerDispatcher thread 
 *   - ClientSender - sends messages to the client 
 * 
 * For each accepted client, a ClientListener and ClientSender 
 * threads are created and started. A Client object is also 
 * created to maintain the information about the client and is 
 * added to the ServerDispatcher's clients list. When some 
 * client is disconnected, is it removed from the clients list 
 * and both its ClientListener and ClientSender threads are 
 * interrupted. 
 */ 
 
import java.net.*; 
import java.io.*; 
import java.util.Vector; 
 
/** 
 * NakovChatServer class is the entry point for the server. 
 * It opens a server socket, starts the dispatcher thread and 
 * infinitely accepts client connections, creates threads for 
 * handling them and starts these threads. 
 */ 
public class NakovChatServer { 
    public static final int LISTENING_PORT = 2002; 
    private static ServerSocket mServerSocket; 
 
    private static ServerDispatcher mServerDispatcher; 
 
    public static void main(String[] args) { 
        // Start listening on the server socket 
        bindServerSocket(); 
 
        // Start the ServerDispatcher thread 
        mServerDispatcher = new ServerDispatcher(); 
        mServerDispatcher.start(); 
 
        // Infinitely accept and handle client connections 
        handleClientConnections(); 
    } 
 
    private static void bindServerSocket() { 
        try { 
            mServerSocket = new ServerSocket(LISTENING_PORT); 
            System.out.println("NakovChatServer started on " + 
                "port " + LISTENING_PORT); 
        } catch (IOException ioe) { 
            System.err.println("Can not start listening on " + 
                "port " + LISTENING_PORT); 
            ioe.printStackTrace(); 
            System.exit(-1); 
        } 
    } 
 
    private static void handleClientConnections() { 
        while (true) { 
            try { 
                Socket socket = mServerSocket.accept(); 
                Client client = new Client(); 
                client.mSocket = socket; 
                ClientListener clientListener = new 
                    ClientListener(client, mServerDispatcher); 
                ClientSender clientSender = 
                    new ClientSender(client, mServerDispatcher); 
                client.mClientListener = clientListener; 
                clientListener.start(); 
                client.mClientSender = clientSender; 
                clientSender.start(); 
                mServerDispatcher.addClient(client); 
            } catch (IOException ioe) { 
                ioe.printStackTrace(); 
            } 
        } 
    } 
} 
 
/** 
 * ServerDispatcher class is purposed to listen for messages 
 * received from the clients and to dispatch them to all the 
 * clients connected to the chat server. 
 */ 
class ServerDispatcher extends Thread { 
    private Vector mMessageQueue = new Vector(); 
    private Vector mClients = new Vector(); 
 
    /** 
     * Adds given client to the server's client list. 
     */ 
    public synchronized void addClient(Client aClient) { 
        mClients.add(aClient); 
    } 
 
    /** 
     * Deletes given client from the server's client list if 
     * the client is in the list. 
     */ 
    public synchronized void deleteClient(Client aClient) { 
        int clientIndex = mClients.indexOf(aClient); 
        if (clientIndex != -1) 
            mClients.removeElementAt(clientIndex); 
    } 
 
    /** 
     * Adds given message to the dispatcher's message queue and 
     * notifies this thread to wake up the message queue reader 
     * (getNextMessageFromQueue method). dispatchMessage method 
     * is called by other threads (ClientListener) when a 
     * message is arrived. 
     */ 
    public synchronized void dispatchMessage( 
            Client aClient, String aMessage) { 
        Socket socket = aClient.mSocket; 
        String senderIP = 
            socket.getInetAddress().getHostAddress(); 
        String senderPort = "" + socket.getPort(); 
        aMessage = senderIP + ":" + senderPort + 
            " : " + aMessage; 
        mMessageQueue.add(aMessage); 
        notify(); 
    } 
 
    /** 
     * @return and deletes the next message from the message 
     * queue. If there is no messages in the queue, falls in 
     * sleep until notified by dispatchMessage method. 
     */ 
    private synchronized String getNextMessageFromQueue() 
    throws InterruptedException { 
        while (mMessageQueue.size()==0) 
            wait(); 
        String message = (String) mMessageQueue.get(0); 
        mMessageQueue.removeElementAt(0); 
        return message; 
    } 
 
    /** 
     * Sends given message to all clients in the client list. 
     * Actually the message is added to the client sender 
     * thread's message queue and this client sender thread 
     * is notified to process it. 
     */ 
    private void sendMessageToAllClients( 
            String aMessage) { 
        for (int i=0; i<mClients.size(); i++) { 
            Client client = (Client) mClients.get(i); 
            client.mClientSender.sendMessage(aMessage); 
        } 
    } 
 
    /** 
     * Infinitely reads messages from the queue and dispatches 
     * them to all clients connected to the server. 
     */ 
    public void run() { 
        try { 
            while (true) { 
                String message = getNextMessageFromQueue(); 
                sendMessageToAllClients(message); 
            } 
        } catch (InterruptedException ie) { 
            // Thread interrupted. Stop its execution 
        } 
    } 
} 
 
/** 
 * Client class contains information about a client, 
 * connected to the server. 
 */ 
class Client { 
    public Socket mSocket = null; 
    public ClientListener mClientListener = null; 
    public ClientSender mClientSender = null; 
} 
 
/** 
 * ClientListener class listens for client messages and 
 * forwards them to ServerDispatcher. 
 */ 
class ClientListener extends Thread { 
    private ServerDispatcher mServerDispatcher; 
    private Client mClient; 
    private BufferedReader mSocketReader; 
 
    public ClientListener(Client aClient, ServerDispatcher 
            aSrvDispatcher) throws IOException { 
        mClient = aClient; 
        mServerDispatcher = aSrvDispatcher; 
        Socket socket = aClient.mSocket; 
        mSocketReader = new BufferedReader( 
            new InputStreamReader(socket.getInputStream()) ); 
    } 
 
    /** 
     * Until interrupted, reads messages from the client 
     * socket, forwards them to the server dispatcher's 
     * queue and notifies the server dispatcher. 
     */ 
    public void run() { 
        try { 
            while (!isInterrupted()) { 
                String message = mSocketReader.readLine(); 
                if (message == null) 
                    break; 
                mServerDispatcher.dispatchMessage( 
                    mClient, message); 
            } 
        } catch (IOException ioex) { 
            // Problem reading from socket (broken connection) 
        } 
 
        // Communication is broken. Interrupt both listener and 
        // sender threads 
        mClient.mClientSender.interrupt(); 
        mServerDispatcher.deleteClient(mClient); 
    } 
} 
 
/** 
 * Sends messages to the client. Messages waiting to be sent 
 * are stored in a message queue. When the queue is empty, 
 * ClientSender falls in sleep until a new message is arrived 
 * in the queue. When the queue is not empty, ClientSender 
 * sends the messages from the queue to the client socket. 
 */ 
class ClientSender extends Thread { 
    private Vector mMessageQueue = new Vector(); 
 
    private ServerDispatcher mServerDispatcher; 
    private Client mClient; 
    private PrintWriter mOut; 
 
    public ClientSender(Client aClient, ServerDispatcher 
            aServerDispatcher) throws IOException { 
        mClient = aClient; 
        mServerDispatcher = aServerDispatcher; 
        Socket socket = aClient.mSocket; 
        mOut = new PrintWriter( 
            new OutputStreamWriter(socket.getOutputStream()) ); 
    } 
 
    /** 
     * Adds given message to the message queue and notifies 
     * this thread (actually getNextMessageFromQueue method) 
     * that a message is arrived. sendMessage is always called 
     * by other threads (ServerDispatcher). 
     */ 
    public synchronized void sendMessage(String aMessage) { 
        mMessageQueue.add(aMessage); 
        notify(); 
    } 
 
    /** 
     * @return and deletes the next message from the message 
     * queue. If the queue is empty, falls in sleep until 
     * notified for message arrival by sendMessage method. 
     */ 
    private synchronized String getNextMessageFromQueue() 
            throws InterruptedException { 
        while (mMessageQueue.size()==0) 
            wait(); 
        String message = (String) mMessageQueue.get(0); 
        mMessageQueue.removeElementAt(0); 
        return message; 
    } 
 
    /** 
     * Sends given message to the client's socket. 
     */ 
    private void sendMessageToClient(String aMessage) { 
        mOut.println(aMessage); 
        mOut.flush(); 
    } 
 
    /** 
     * Until interrupted, reads messages from the message queue 
     * and sends them to the client's socket. 
     */ 
    public void run() { 
        try { 
            while (!isInterrupted()) { 
                String message = getNextMessageFromQueue(); 
                sendMessageToClient(message); 
            } 
        } catch (Exception e) { 
            // Commuication problem 
        } 
 
        // Communication is broken. Interrupt both listener 
        // and sender threads 
        mClient.mClientListener.interrupt(); 
        mServerDispatcher.deleteClient(mClient); 
    } 
} 

Как работи истинският многопотребителски chat сървър

Примерът по-горе се състои от няколко класа, показани на диаграмата:

При стартиране на програмата главният клас на chat сървъра NakovChatServer създава една нишка ServerDispatcher, стартира я, отваря един сървърски TCP сокет и започва да слуша на него постоянно за нови клиенти.

При пристигане на нов клиент NakovChatServer създава за него един обект от класа Client, и две нишки – ClientListener и ClientSender съответно за получаване и изпращане на съобщения към този клиент. В Client обекта NakovChatServer записва сокета на клиента, както и двете нишки, които го обслужват и добавя този обект към списъка с клиентите на нишката ServerDispatcher. С това добавянето на нов клиент приключва.

Задачата на нишката ClientListener е постоянно да слуша за съобщения идващи от клиента, за който е създадена и при получаване на съобщение, да го изпраща към на ServerDispatcher нишката, която има грижата да го достави до всички клиенти. Нишката ClientListener прекарва основната част от времето си заспала очаквайки да прочете данни от клиентския сокет.

Нишката ClientSender служи да разпраща съобщения до даден клиент, за когото е създадена. През цялото време, когато не разпраща съобщения, тя спи в очакване в опашката й да бъде получено ново съобщение за изпращане към нейния клиент. Нишката ClientSender използва модела „производител-потребител” при достъпа до собствената си опашка.

Нишката ServerDispatcher служи да разпраща всички подадени й съобщения до всички клиенти, свързани към сървъра. Тя е реализирана с една опашка, в която натрупва всички получени съобщения, които все още не са разпратени. Когато опашката не е празна нишката разпраща съобщенията към диспечерите на всички клиенти, а когато опашката е празна, нишката заспива и чака събуждане. Отново се използва моделът „производител-потребител” при достъпа до опашката.

Във всеки един момент ServerDispatcher нишката поддържа списък от всички активни клиенти и следи да актуализирва списъка винаги, когато се прекъсне връзката с някой от клиентите.

Проблемът от предходната реализация на chat сървъра вече е решен ефективно. За всеки клиент в сървъра има отделени специално за него две нишки – една за получаване на съобщения и една за изпращане на съобщения, които работят само за него и спят, когато нямат работа. Всеки клиент се обслужва отделно, сякаш е сам на сървъра и същевременно благодарение на главния диспечер всяко получено съобщение се доставя до опашките за изпращане на всеки от клиентите.

Ако разгледаме внимателно сорс-кода, можем да забележим, че няма особена нужда от ServerDispatcher нишката. Единственото, което тя прави, е да поддържа списък от активните клиенти и когато получи съобщение, да го разпрати към техните диспечери. Това наистина е така, защото се очаква операцията изпращане на съобщение към диспечера на даден клиент да не е блокираща операция и да завършва веднага. Единствения смисъл от отделна нишка за разпращането на получените от клиентите съобщения е, че тази нишка позволява клиентът, който е получил съобщението, да си продължи работата веднага вместо да си губи времето да го до опашките на всички свързани клиенти. По този начин би могло леко се подобри скоростта на обслужване на клиента.

Клиент за нашия chat сървър

Нека сега се спрем на една по-проста задача – да създадем клиент за нашия сървър. Клиентското приложение, въпреки че обслужва само един клиент, също трябва да е многонишково, защото обслужването на един клиент реално включва два процеса – получаване на съобщения от сървъра, изпращане на съобщения от клиента към сървъра. И двата процеса в основната част от времето си спят в очакване да получат данни, които да обработят – единият блокира в очакване потребителят да въведе нещо, а другият блокира в очакване от сървъра да се получи нещо. Ето една примерна реализация на chat клиент:

NakovChatClient.java
/** 
 * Nakov Chat Client 
 * (c) Svetlin Nakov, 2002 
 * http://www.nakov.com 
 */ 
import java.io.*; 
import java.net.*; 
 
/** 
 * NakovChatClient is a client for Nakov Chat Server. After 
 * creating a socket connection to the chat server it starts 
 * two threads. The first one listens for data comming from 
 * the socket and transmits it to the console and the second 
 * one listens for data comming from the console and transmits 
 * it to the socket. After creating the two threads the main 
 * program's thread finishes its execution, but the two data 
 * transmitting threads stay running as long as the socket 
 * connection is not closed. When the socket connection is 
 * closed, the thread that reads it terminates the program 
 * execution. 
 */ 
public class NakovChatClient { 
    public static final String SERVER_HOSTNAME = "localhost"; 
    public static final int SERVER_PORT = 2002; 
 
    private static BufferedReader mSocketReader; 
    private static PrintWriter mSocketWriter; 
 
    public static void main(String[] args) { 
        // Connect to the chat server 
        try { 
            Socket socket = 
                new Socket(SERVER_HOSTNAME, SERVER_PORT); 
            mSocketReader = new BufferedReader(new 
                InputStreamReader(socket.getInputStream())); 
            mSocketWriter = new PrintWriter(new 
                OutputStreamWriter(socket.getOutputStream())); 
            System.out.println("Connected to server " + 
                    SERVER_HOSTNAME + ":" + SERVER_PORT); 
        } catch (IOException ioe) { 
            System.err.println("Can not connect to " + 
                SERVER_HOSTNAME + ":" + SERVER_PORT); 
            ioe.printStackTrace(); 
            System.exit(-1); 
        } 
 
        // Start socket --> console transmitter thread 
        PrintWriter consoleWriter = new PrintWriter(System.out); 
        TextDataTransmitter socketToConsoleTransmitter = new 
            TextDataTransmitter(mSocketReader, consoleWriter); 
        socketToConsoleTransmitter.setDaemon(false); 
        socketToConsoleTransmitter.start(); 
 
        // Start console --> socket transmitter thread 
        BufferedReader consoleReader = new BufferedReader( 
            new InputStreamReader(System.in)); 
        TextDataTransmitter consoleToSocketTransmitter = new 
            TextDataTransmitter(consoleReader, mSocketWriter); 
        consoleToSocketTransmitter.setDaemon(false); 
        consoleToSocketTransmitter.start(); 
    } 
} 
 
/** 
 * Transmits text data from the given reader to given writer 
 * and runs as a separete thread. 
 */ 
class TextDataTransmitter extends Thread { 
    private BufferedReader mReader; 
    private PrintWriter mWriter; 
 
    public TextDataTransmitter(BufferedReader aReader, 
            PrintWriter aWriter) { 
        mReader = aReader; 
        mWriter = aWriter; 
    } 
 
    /** 
     * Until interrupted reads a text line from the reader 
     * and sends it to the writer. 
     */ 
    public void run() { 
        try { 
            while (!isInterrupted()) { 
                String data = mReader.readLine(); 
                mWriter.println(data); 
                mWriter.flush(); 
            } 
        } catch (IOException ioe) { 
            System.err.println("Lost connection to server."); 
            System.exit(-1); 
        } 
    } 
}

Основната нишка NakovChatClient отваря сокет връзка към chat сървъра, създава 2 нишки, които да обслужват приемането и изпращането на съобщения и след това завършва изпълнението си. Едната от създадените нишки чете постоянно идващите от сървъра съобщения и ги печата на стандартния изход (на конзолата), а другата нишка чете постоянно идващите от стандартния вход (въведените от клавиатурата) съобщения и ги изпраща към сървъра. Ако по време на работа възникне входно-изходен проблем в някоя от двете нишки, това най-вероятно означава, че се е прекъснала връзката със сървъра и програмата завършва аварийно.