diff --git a/JChatClient/bin/client b/JChatClient/bin/client index 161f6b2..aa6f80a 100755 --- a/JChatClient/bin/client +++ b/JChatClient/bin/client diff --git a/JChatClient/src/client.cpp b/JChatClient/src/client.cpp index f78e5e6..0d2ddea 100644 --- a/JChatClient/src/client.cpp +++ b/JChatClient/src/client.cpp @@ -5,13 +5,7 @@ * * En este fichero se implementa un cliente para poder usar con el servidor creado, usando la clase Socket. */ -#include "Socket.h" -#include -#include "SocketException.h" -#include -#include -#include -#include +#include "client.h" using namespace std; @@ -22,24 +16,50 @@ bool connected, finished; el signal SIGPIPE. */ void exitClient(int signal/*!s)->Close(); + pthread_mutex_lock(t_arg->mutex); + if(t_arg->s != 0) + { + delete t_arg->s; + t_arg->s = 0; + } + if(t_arg != 0) + { + delete t_arg; + t_arg = 0; + } + pthread_exit(NULL); } bool connect(Socket& s) { - string host; + string host, nick; int port; - cout << "Hostname: "; + /*cout << "Hostname: "; cin >> host; cout << "Port: "; - cin >> port; + cin >> port;*/ + cout << "Nickname: "; + cin >> nick; cin.ignore(); + host = "localhost"; + port = 3001; + try { s.Connect(host,port); cout << "Connected" << endl; connected = true; + s << nick; } catch(SocketException& e) { @@ -47,32 +67,68 @@ bool connect(Socket& s) } } -void speedTest(Socket& s) +void* sendThread(void* args) { - string data = "", answer; - double size; - double start, duration; - cout << "Size in MB: "; - cin >> size; - cin.ignore(); - for(long int i = 0; i < size*1e6; i++ ) + string send; + struct thread_args *t_arg = (struct thread_args*)args; + while(true) { - data += (char)( 65 + i % 26); + cout << "> "; + getline(cin,send); + if(cin.eof()) + { + send = "/disconnect"; + } + try + { + *(t_arg->s) << send; + if(send == "/disconnect" || send == "/exit") + { + break; + } + } + catch(SocketException& e) + { + cout << e.description() << endl; + cout << "Exiting" << endl; + (t_arg->s)->Close(); + exit(-1); + } } - cout << "Data generated, commencing transfer" << endl; - struct timeval st, ed; - gettimeofday(&st, NULL); - s << data; - cout << "Data sent" << endl; - s >> answer; - gettimeofday(&ed, NULL); - start = (st.tv_sec) + (st.tv_usec) / 1e6; - duration = ((ed.tv_sec) + (ed.tv_usec) / 1e6) - start; - if(answer == "ACK") + killThread(t_arg); +} + +void* recvThread(void* args) +{ + string recv; + struct thread_args *t_arg = (struct thread_args*)args; + while(true) { - cout << "Transferred " << size << " MB in " << duration << " seconds" << endl; - cout << "Data rate: " << size/duration << " MB/s" << endl; + *(t_arg->s) >> recv; + if(recv == "DISC_OK") + { + cout << "Disconnecting" << endl; + (t_arg->s)->Close(); + connected = false; + pthread_cond_signal(t_arg->condition); + break; + } + else if(recv == "EXIT_OK") + { + cout << "Exiting" << endl; + (t_arg->s)->Close(); + connected = false; + finished = true; + pthread_cond_signal(t_arg->condition); + break; + } + else + { + *(t_arg->s) >> recv; + cout << recv << endl; + } } + killThread(t_arg); } //! Método principal del cliente @@ -80,71 +136,43 @@ void speedTest(Socket& s) int main() { signal(SIGPIPE, exitClient); - signal(SIGINT, exitClient); Socket s; connected = finished = false; - string send, recv; + pthread_mutex_t mutex; + pthread_mutex_init(&mutex,0); + pthread_cond_t condition; + pthread_cond_init(&condition,0); + thread_args *sArgs = new thread_args; + thread_args *rArgs = new thread_args; + pthread_t recv, send; s.Create(); - while (!exit) + while (!finished) { connect(s); + pthread_mutex_lock(&mutex); + sArgs->mutex = &mutex; + sArgs->condition = &condition; + sArgs->s = &s; + + rArgs->mutex = &mutex; + rArgs->condition = &condition; + rArgs->s = &s; + + pthread_create(&send,NULL,sendThread,(void *)sArgs); + pthread_create(&recv,NULL,recvThread,(void *)rArgs); + while(connected) { - cout << "> "; - getline(cin,send); - if(cin.eof()) - { - send = "/disconnect"; - } - try - { - s << send; - if(send == "/disconnect") - { - s >> recv; - if(recv == "OK") - { - cout << "Disconnecting" << endl; - s.Close(); - connected = false; - } - } - else if(send == "/exit") - { - s >> recv; - if(recv == "OK") - { - cout << "Exiting" << endl; - s.Close(); - connected = false; - finished = true; - } - } - else if(send == "/test") - { - speedTest(s); - } - else - { - s >> recv; - cout << "Received: " << recv << endl; - } - } - catch(SocketException& e) - { - cout << e.description() << endl; - cout << "Exiting" << endl; - s.Close(); - return -1; - } + pthread_cond_wait(&condition,&mutex); } + pthread_mutex_unlock(&mutex); } + pthread_cond_destroy(&condition); + pthread_mutex_destroy(&mutex); } -/* TO-DO +/* TODO * - * spawn 2 IO threads - * connect/disconnect commands * list nicks * unicast message */ \ No newline at end of file diff --git a/JChatClient/src/include/client.h b/JChatClient/src/include/client.h new file mode 100644 index 0000000..5d6182e --- /dev/null +++ b/JChatClient/src/include/client.h @@ -0,0 +1,36 @@ +#ifndef CLIENT_H_ +#define CLIENT_H_ + +#include "Socket.h" +#include +#include "SocketException.h" +#include +#include +#include +#include + +//!Argumentos de los threads +/** +\brief Este struct define los argumentos que recibe un thread abierto por la aplicación servidor al recibir una conexión entrante +\author Imanol Barba Sabariego +\date 11/06/2013 */ +struct thread_args +{ + //! Variable de control de la exclusión mútua entre threads + /** Esta variable se usa para bloquear otros threads en operaciones de exclusion mútua donde se modifican variables compartidas */ + pthread_mutex_t *mutex; + //! Variable de notificación a otros threads + /*! \brief Esta variable se usa para notificar a otros threads cuando deben realizar otras acciones. + + *Actualmente se usa para notificar al thread principal cuando el thread que lo invoca ha terminado, en caso de que el principal haya + quedado bloqueado y no admita más conexiones.* */ + pthread_cond_t *condition; + //! Puntero al socket + /*! \brief Esta variable representa el puntero al socket que proviene de la conexión entrante recibida por el servidor. Con este, el thread + puede recibir y enviar los datos. */ + Socket *s; +}; + + + +#endif /* CLIENT_H_ */ \ No newline at end of file diff --git a/JChatClient/src/include/server.h b/JChatClient/src/include/server.h deleted file mode 100644 index c73a6f0..0000000 --- a/JChatClient/src/include/server.h +++ /dev/null @@ -1,149 +0,0 @@ -/** @file -* \brief Header de la clase Server -* \author Imanol Barba Sabariego -* \date 11/06/2013 -* -* En este fichero se define la clase Server y algunos métodos globales usados por ésta para la gestión de threads y otros aspectos. -*/ - -#ifndef SERVER_H_ -#define SERVER_H_ - -#include "Socket.h" -#include "SocketException.h" -#include -#include -#include -#include -#include -#include - -//! Numero de conexiones permitidas activas (en espera o activas) -/*! Esta constante controla cuantas conexiones puede haber en espera o cuantas puede haber establecias en cualquier momento: habrá N activas -y N en espera como mucho, no N en espera o activas. */ -#define N 5 -//! Ruta al fichero de configuración -/*! Ruta relativa o absoluta al fichero de configuración, de no existir o ser inválido el programa no funcionará. */ -#define CONFFILE "socket.conf" -//! Nombre del socket del módulo de control -/*! Nombre y ruta del socket UNIX del módulo de control */ - -using namespace std; - -//! Clase de aplicación servidor -/** -Esta clase define un objeto con los métodos y atributos necesarios para lanzar una aplicación servidor y atender las conexiones. Para realizar -la comunicación con el cliente, usa un objeto de la clase Socket -*/ -class Server -{ - private: - //! Contador de threads - /*! Esta variable se encarga de mantener la cuenta de threads activos, por tanto, el número de conexiones que estan siendo antendidas - simultáneamente. */ - int nWorkers; - //! Variable de apagado - /*! Esta variable controla el apagado del servidor, al ponerla a true, la siguiente iteración del bucle que atiende las conexiones - no se producirá y el programa terminará. */ - bool shutdownServer; - //! Contador de ID de thread - /*! Esta variable contiene el ID del próximo thread que se creará, por tanto, indica el número de conexiones que han sido atendidas desde - el inicio del servidor */ - int workerID; - //! Pila de threads terminados - /*! Esta variable contiene una lista de threads que han finalizado su ejecución. A cada iteración del bucle que atiende conexiones, - se libera toda la memoria de los threads que hay almacenados aquí. */ - list stoppedThreads; - //! Pila de threads empezados - /*! \brief Esta variable contiene una lista de threads que han empezado su ejecución. Si el programa finalizara prematuramente, se liberarían los punteros - de los threads almacenados en esta pila. - - __NOTA: No se liberará la memoria asignada a los argumentos de los threads, dando lugar a memory leaks; sin embargo, esto se produciria al finalizar - el programa, por tanto no es relevante.__ */ - list startedThreads; - //! Socket de comunicación - /*! Esta variable contiene el objeto de la clase Socket que la aplicación servidor usa para poder atender las peticiones. Su función - es quedarse escuchando el el puerto e IP introducidas en el fichero de configuración y crear un objeto de la clase Socket para cada - petición de cada cliente nuevo, siendo este último objeto creado el que se usa para la comuncación. */ - Socket ss; - - public: - //! Constructor de la clase Server - /*! Incializa los argumentos inciales del servidor */ - Server() : nWorkers(0), workerID(0), shutdownServer(false) {} - //! Getter del número de threads activos - /*! Devuelve el número de threads activos en ese instante, por tanto, del número de conexiones que están siendo atendidas. */ - int getNWorkers(); - //! Setter del número de threads activos - /*! Establece el número de threads activos, para poder cambiarlo cuando alguno de los threads activos finaliza */ - void setNWorkers(int n /*!* getStartedThreads(); - //! Getter de la pila de threads terminados - /*! Devuelve un contenedor con la lista de threads que han terminado, para poder liberar la memoria que se le ha asignado */ - list* getStoppedThreads(); - //! Método de inicialización del servidor - /*! Incializa el servidor en el puerto e IP especificados para empezar a recibir conexiones entrantes */ - void startServer(string i /*! *threadList/*! activeConnections; + private boolean kill; + private Connection[] workerPool; + private int freeWorkers; + private Lock lock; + private Condition availableWorkers; + private ConcurrentHashMap activeConnections; public Server(String ip, int port, int rS) { try { ss = new MyServerSocket(); - stdout = new PrintWriter(System.out); - stdin = new BufferedReader(new InputStreamReader(System.in)); + lock = new ReentrantLock(); + activeConnections = new ConcurrentHashMap(); roomSize = rS; - rWorkerPool = new Worker[roomSize]; - wWorkerPool = new Worker[roomSize]; + freeWorkers = roomSize; + kill = false; + availableWorkers = lock.newCondition(); + workerPool = new Connection[roomSize]; + for(int i = 0; i < roomSize; i++) + { + workerPool[i] = new Connection(this); + workerPool[i].start(); + } + ss.bind(ip, port); } catch(IOException ioExc) { @@ -38,39 +44,96 @@ public class Server } } - public void startServer() + public void sendToChat(Connection origin, String message) { - + lock.lock(); + String nickname = activeConnections.get(origin); + Set connections = activeConnections.keySet(); + Iterator it = connections.iterator(); + Connection conn; + while(it.hasNext()) + { + conn = it.next(); + if(conn != origin) + { + conn.sendMessage(nickname + ": " + message); + } + } + lock.unlock(); } - public void startWorker(Worker r, Worker w, MySocket s) + public void startServer() { - if(r == null) + while(!kill) { - r = new Worker(s,null,stdout); + MySocket incoming = ss.accept(); + if(incoming != null) + { + System.out.println("Accepted connection from " + incoming.getInetAddress()); + + startWorker(incoming); + } } - r.awake(); - if(w == null) + + } + + public void startWorker(MySocket s) + { + lock.lock(); + while(freeWorkers == 0) { - w = new Worker(s,stdin,null); + try + { + lock.unlock(); + availableWorkers.await(); + lock.lock(); + } + catch(InterruptedException intExc) + { + intExc.printStackTrace(); + } } - w.awake(); + workerPool[roomSize - freeWorkers].setSock(s); + workerPool[roomSize - freeWorkers].awake(); + workerPool[roomSize - freeWorkers] = null; + freeWorkers--; + lock.unlock(); + } + + public String getNickname(Connection c) + { + return activeConnections.get(c); + } + + public void finishWorker(Connection c) + { + lock.lock(); + freeWorkers++; + workerPool[roomSize - freeWorkers] = c; + availableWorkers.signal(); + lock.unlock(); + } + + public void addToChatroom(Connection c, String nickName) + { + activeConnections.put(c, nickName); + System.out.println(nickName + " has entered the room"); } - public void finishWorkers() + public void finishConnections() { - Collection worerke= activeConnections.values(); - Iterator it = sockets.iterator(); + Set conns = activeConnections.keySet(); + Iterator it = conns.iterator(); while(it.hasNext()) { - it.next().close(); + it.next().finish(); } } public void killServer() { - run = false; + kill = true; ss.close(); - finishWorkers(); + finishConnections(); } } diff --git a/JChatServer/src/pad/prac2/Worker.java b/JChatServer/src/pad/prac2/Worker.java deleted file mode 100644 index 47daab0..0000000 --- a/JChatServer/src/pad/prac2/Worker.java +++ /dev/null @@ -1,95 +0,0 @@ -package pad.prac2; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.PrintWriter; - -public class Worker extends Thread -{ - private PrintWriter output; - private BufferedReader input; - private MySocket sock; - private boolean run; - private boolean kill; - private boolean sleep; - private Server serv; - - public Worker(Server srv, MySocket s, BufferedReader i, PrintWriter o) - { - sock = s; - output = o; - input = i; - run = false; - kill = false; - sleep = true; - serv = srv; - } - - public void finish() - { - kill = true; - run = false; - sleep = false; - sock.close(); - } - - public void run() - { - while(!kill) - { - while(sleep) - { - try - { - wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } - } - while(run) - { - if(output == null && input != null) - { - String str, length; - while(true) - { - try - { - str = input.readLine(); - length = new Integer(str.length()).toString(); - sock.write(length, length.length()); - sock.write(str, str.length()); - } - catch(IOException ioExc) - { - run = false; - sleep = true; - break; - } - } - } - else if(input == null && output != null) - { - String len, str; - while((len = sock.readLine()) != null) - { - int length = Integer.parseInt(len); - str = sock.read(length); - output.println(str); - } - run = false; - sleep = true; - } - } - } - } - - public void awake() - { - run = true; - sleep = false; - this.notify(); - } -}