Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

MPNLBase.h

Go to the documentation of this file.
00001 /***************************************************************************
00002                           mpnlbase.h  -  description
00003                              -------------------
00004     begin                : Sat Feb 7 2004
00005     copyright            : (C) 2004 by Mickael Faivre-Macon
00006     email                : mickael@easyplay.com.tw
00007  ***************************************************************************/
00008 
00009 /***************************************************************************
00010  *                                                                         *
00011  *   This program is free software; you can redistribute it and/or modify  *
00012  *   it under the terms of the GNU General Public License as published by  *
00013  *   the Free Software Foundation; either version 2 of the License, or     *
00014  *   (at your option) any later version.                                   *
00015  *                                                                         *
00016  ***************************************************************************/
00017 
00018 #ifndef MPNLBASEH
00019 #define MPNLBASEH
00020 
00024 #include "MThread.h"
00025 #include <exception>
00026 #include <string>
00027 #include <list>
00028 #include <queue>
00029 #include <sstream>
00030 #include "MUtils.h"
00031 
00032 #ifdef WIN32
00033    #include<winsock2.h>
00034 
00035    #define MYSOCKET SOCKET
00036    #define MYINVALID_SOCKET INVALID_SOCKET
00037    #define MYSOCKET_ERROR SOCKET_ERROR
00038    #define MYCLOSE(s) closesocket(s)
00039    #define SLEEP(x) Sleep(x)
00040 #else
00041    #include <netinet/in.h>
00042    #include <arpa/inet.h> // inet_ntoa
00043    #include <sys/time.h>
00044    #include <netdb.h>
00045 
00046    #define MYSOCKET int
00047    #define MYINVALID_SOCKET -1
00048    #define MYSOCKET_ERROR -1
00049    #define MYCLOSE(s) close(s)
00050    #define SLEEP(x) usleep((x)*1000)
00051 #endif
00052 
00053 namespace MPNL {
00054 
00055 //---------------------------------------------------------------------------
00056 //---------------------------------------------------------------------------
00057 // Global Fonctions
00058 
00059 std::string GetLocalIP();
00060 std::string GetHostName(int socket);
00061 
00062 //---------------------------------------------------------------------------
00063 //---------------------------------------------------------------------------
00064 // Types
00065 enum MMode {MODE_LENPREFIXED, MODE_LINE, MODE_PACKET};
00066 //class error : public std::exception {};
00067 class MPNLBase;
00068 class MSocket;
00069 struct MClient {
00070    MClient () : Socket(NULL), /*Thread(NULL),*/ ToBeDeleted(false) {}
00071    MSocket*       Socket;
00072    //boost::thread* Thread;
00073    bool ToBeDeleted;
00074    unsigned long ID;
00075    };
00076 typedef std::list<MClient> MClientList;
00077 
00078 class MTransferManager;
00079 
00080 //---------------------------------------------------------------------------
00081 //---------------------------------------------------------------------------
00082 //---------------------------------------------------------------------------
00083 
00088 class MSocket : public MThread {
00089 
00090 public:
00091    void*          Data;
00092    MMode          Mode;
00093    int            PrefixLen;
00094    struct sockaddr_in ServAddr; 
00095    std::string    PeerIP, LocalIP; 
00096    int            PeerPort;
00097    std::iostream*  Stream;
00098 
00099    MSocket(int type, MPNLBase* owner);
00100    virtual ~MSocket();
00101 
00102    void           operator()(); // thread function
00103    bool           IsInvalid();
00104    bool           Read(std::string& msg);
00105    bool           Read(std::string& ip, std::string& msg); // for UDP
00106    virtual void   Stop();
00107    int            Send(const std::string& msg);
00108    unsigned long  SendStream();
00109    void           SetStream(std::iostream* s) {Stream = s;}
00110    int            SendTo(const std::string& ip, int port, const std::string& msg);
00111    MYSOCKET       GetS() const {return S;}
00112    void           SetS(MYSOCKET s) {S=s;}
00113    bool           Listen(int port);
00114    //bool           IsActive();
00115    bool           HasMsg();
00116    std::string    GetLocalIP();
00117 
00118 protected:
00119    typedef std::queue<std::string> MMsgQueue;
00120 
00121    //boost::mutex   Mutex;
00122    MYSOCKET       S;
00123    int            Type;
00124    MPNLBase*      Owner;
00125    //bool           Active;
00126    MMsgQueue      Messages;
00127    MMsgQueue      IP; // for UDP
00128    TIME           LastMsgTime;
00129    boost::mutex   MsgMutex;
00130 
00132    //   boost::mutex::scoped_lock lock(Mutex);
00133    //   Active = a;
00134    //   }
00135    void PushMsg(const std::string& msg);
00136    void PushMsg(const std::string& ip, const std::string& msg); // for UDP
00137    int Send(const char* str, int len, int flags);
00138 
00139    };
00140 
00141 //---------------------------------------------------------------------------
00142 //---------------------------------------------------------------------------
00143 //---------------------------------------------------------------------------
00144 class MPNLBase {
00145 
00146 public:
00147    int         Port;
00148 
00149    MPNLBase();
00150    virtual ~MPNLBase();
00151 
00152    virtual void SetMode(MMode m)=0;
00153    virtual void SetPrefixLen(int len)=0;
00154    virtual void SetStream(std::iostream* s)=0;
00155    virtual bool OnConnection(MSocket* s)=0; 
00156    virtual void OnDisconnection(MSocket* s)=0;
00157    virtual void PushMsg(MSocket*) {};   // public because MSocket call this. Maybe change this.
00158    void SetLastError(const std::string& a) {
00159       boost::mutex::scoped_lock lock(Mutex);
00160       LastError = a;
00161       }
00162    std::string GetLastError() {
00163       boost::mutex::scoped_lock lock(Mutex);
00164       return LastError;
00165       }
00166 
00167 protected:
00168    typedef std::deque<MSocket*> MMsgQueue;
00169    MMsgQueue      Messages;
00170    MMode          Mode;
00171    int            PrefixLen;
00172 
00173 private:
00174    std::string    LastError;
00175    boost::mutex   Mutex;
00176 
00177    };
00178 
00179 //---------------------------------------------------------------------------
00180 //---------------------------------------------------------------------------
00181 //---------------------------------------------------------------------------
00182 class MTCPServer : public MPNLBase, public MThread /*accepting connections*/ {
00183 
00184 public:
00185    struct MStats {
00186       MStats() {Total = CurCon = MaxCon = 0;}
00187       unsigned long Total, CurCon, MaxCon;
00188       };
00189 
00190    MTCPServer();
00191    virtual ~MTCPServer();
00192 
00193    void     operator()(); // thread function accepting connections
00194    bool     Listen();
00195    bool     Read(MSocket*& s);
00196    void     Disconnect(MSocket* s);
00197    virtual bool OnConnection(MSocket* s); // return false to reject the connection
00198    virtual void OnDisconnection(MSocket* s);
00199    virtual void SetMode(MMode m);
00200    virtual void SetPrefixLen(int len);
00201    virtual void SetStream(std::iostream* s);
00202    virtual void Stop();
00203 /*
00204    void     SetActive(bool a) {
00205       boost::mutex::scoped_lock lock(Mutex);
00206       Active = a;
00207       }
00208    bool     IsActive() {
00209       boost::mutex::scoped_lock lock(Mutex);
00210       return Active;
00211       }
00212 */
00213    void     GetStats(MStats& s);
00214    void     BroadcastMsg(const std::string& msg);
00215    int      Send(const std::string& ip, const std::string& msg);
00216    bool     IsConnected(const std::string& ip);
00217    int      NbClients() { // or use getstats
00218       boost::mutex::scoped_lock lock(ClientMutex);
00219       return Clients.size();
00220       }
00222    MClientList* LockClients() {
00223       if(ClientLock->locked()) return NULL;
00224       ClientLock->lock();
00225       return &Clients;
00226       }
00227    void UnlockClients() {
00228       ClientLock->unlock();
00229       }
00230 //*/
00231 
00232 protected:
00233    boost::mutex         ClientMutex;
00234    MStats               Stats;
00235 private:
00236    MClientList          Clients;
00237    boost::mutex::scoped_lock* ClientLock;
00238    boost::mutex         MsgMutex;
00239    MSocket*             Socket;
00240    //boost::thread*       Thread;
00241    //boost::thread_group  ClientThreads; // reading threads
00242    //bool                 Active;
00243    std::iostream*       Stream;
00244 
00245    virtual void         PushMsg(MSocket* s);
00246    bool                 SocketHasMsg(MSocket* s);
00247 
00248    };
00249 
00250 //---------------------------------------------------------------------------
00251 //---------------------------------------------------------------------------
00252 //---------------------------------------------------------------------------
00253 class MTCPClient : public MPNLBase {
00254 
00255 public:
00256    std::string Host;
00257 
00258    MTCPClient();
00259    virtual ~MTCPClient();
00260 
00261    bool   Connect();
00262    void   Disconnect();
00263 
00264 
00268    int    Send(const std::string& msg);
00269    unsigned long  SendStream();
00270 
00271    bool   Read(std::string& msg);
00272    bool   OnConnection(MSocket* s);
00273    void   OnDisconnection(MSocket* s);
00274    bool   IsConnected() {return Socket->IsActive();}
00275    void   SetMode(MMode m) {Mode = m;Socket->Mode = m;}
00276    void   SetPrefixLen(int len);
00277    virtual void SetStream(std::iostream* s);
00278    std::string GetLocalIP() {return Socket->GetLocalIP();}
00279 
00280 private:
00281    //boost::mutex         Mutex;
00282    MSocket*             Socket;
00283    //boost::thread*       Thread; // reading socket thread
00284    struct sockaddr_in LocalAddr;
00285    struct sockaddr_in ServAddr;
00286 
00287 
00288    };
00289 
00290 //---------------------------------------------------------------------------
00291 //---------------------------------------------------------------------------
00292 //---------------------------------------------------------------------------
00293 class MUDPServer : public MPNLBase, public MThread {
00294 
00295 public:
00296    MUDPServer();
00297    virtual ~MUDPServer();
00298 
00299    void           operator()(); // thread function
00300    virtual bool   Listen(int port);
00301    virtual bool   Read(MSocket*& s);
00302    //virtual void   Stop();
00303    //virtual bool   OnConnection(MSocket* s); // return false to reject the connection
00304    //virtual void   OnDisconnection(MSocket* s);
00305    virtual void   SetMode(MMode m);
00306    //void           SetActive(bool a) {
00307    //   boost::mutex::scoped_lock lock(Mutex);
00308    //   Active = a;
00309    //   }
00310    //void           GetStats(MStats& s);
00311 
00312 private:
00313    //boost::mutex         Mutex;
00314    boost::mutex         MsgMutex;
00315    MSocket*             Socket;
00316    //boost::thread*       Thread; // Socket thread
00317    bool                 Active;
00318    //MStats               Stats;
00319 
00320    void                 PushMsg(MSocket* s);
00321    bool                 SocketHasMsg();
00322 
00323    };
00324 
00325 //---------------------------------------------------------------------------
00326 //---------------------------------------------------------------------------
00327 //---------------------------------------------------------------------------
00328 class MUDPClient : public MPNLBase {
00329 
00330 public:
00331    std::string Host;
00332 
00333    MUDPClient();
00334    virtual ~MUDPClient();
00335 
00343    bool PrepareToSend();
00344    
00348    virtual int    Send(const std::string& msg);
00349 
00350    virtual void   SetMode(MMode m) {Mode = m;Socket->Mode = m;}
00351 
00352 private:
00353    MSocket*             Socket;
00354 
00355    };
00356 
00357 
00358 //---------------------------------------------------------------------------
00359 //---------------------------------------------------------------------------
00360 //---------------------------------------------------------------------------
00361 enum MTransferDirection {tdDOWNLOAD=0, tdUPLOAD=1};
00362 enum MTransferMode {tmCLIENT, tmSERVER, tmBOTH};
00363 class MTransfer;
00364 typedef std::list<MTransfer*> MTransferList;
00365 
00366 //---------------------------------------------------------------------------
00367 class MTransferTCPServer : public MTCPServer {
00368 
00369 public:
00370    MTransferDirection Direction;
00371    MTransfer* Transfer;
00372 
00373    MTransferTCPServer(MTransfer* t) : MTCPServer() {Transfer=t;}
00374    ~MTransferTCPServer() {}
00375 
00376 
00377    bool OnConnection(MSocket* s) {
00378       if(Direction==tdDOWNLOAD) return true;
00379       boost::mutex::scoped_lock lock(ClientMutex);
00380       Stats.Total++; // we depend on it
00381       lock.unlock();
00382       s->SendStream();  // TODO: should test the return of send stream with the len of the file
00383       SetActive(false);
00384       return false;
00385       }
00386    void OnDisconnection(MSocket* s);
00387    };
00388 
00389 //---------------------------------------------------------------------------
00390 class MTransferTCPClient : public MTCPClient {
00391 
00392 public:
00393    MTransfer* Transfer;
00394 
00395    MTransferTCPClient(MTransfer* t) : MTCPClient() {Transfer=t;}
00396    ~MTransferTCPClient() {}
00397 
00398    void OnDisconnection(MSocket* s);
00399 
00400    };
00401 
00402 //---------------------------------------------------------------------------
00411 class MTransfer : public MThread {
00412    friend class MTransferManager;
00413 public:
00414    class MUpdate {
00415       public:
00416          MUpdate() : Stamp(0) {}
00417          MUpdate(MTransfer* t) : Stamp(0), PeerIP(t->PeerIP), ID(t->ID) {}
00418 /*
00419          MUpdate(const MUpdate& u) {
00420             boost::mutex::scoped_lock lock(u.Mutex);
00421             Type  = u.Type;
00422             Msg   = u.Msg;
00423             Stamp = u.Stamp;
00424             }
00425          MUpdate& operator=(const MUpdate& u) {
00426             if (this == &u) return *this;
00427             boost::mutex::scoped_lock lock1(&Mutex < &u.Mutex ? Mutex : u.Mutex);
00428             boost::mutex::scoped_lock lock2(&Mutex > &u.Mutex ? Mutex : u.Mutex);
00429             Type  = u.Type;
00430             Msg   = u.Msg;
00431             Stamp = u.Stamp;
00432             // Progress
00433             return *this;
00434             }
00435 */
00436          enum MType {utADD, utPROGRESS, utMSG, utREMOVE} Type;
00437          enum MResult {urOK, urFAILED} Result;
00438          std::string Msg, PeerIP;
00439          int ID;
00440          MTransferDirection Direction;
00441          unsigned char GetStamp() {
00442             boost::mutex::scoped_lock lock(Mutex);
00443             return Stamp;
00444             }
00445          void SetStamp(unsigned char s) {
00446             boost::mutex::scoped_lock lock(Mutex);
00447             Stamp |= s;
00448             }
00449          //unsigned long Progress;
00450 
00451       private:
00452          /*mutable*/ boost::mutex   Mutex;
00453          unsigned char Stamp; // when == Manager::ClearStamp, it is erased by the manager
00454       };
00455    typedef std::deque<MUpdate*> MUpdateList;
00456 
00457    MTransferMode  Mode;
00458    std::string    PeerIP;
00459    std::string    LocalPath;
00460    std::string    MD5;
00461    std::string    URL;
00462    int            Port;
00464    int            ID;
00465    bool           Dead;
00466 
00467    MTransfer();
00468    ~MTransfer();
00469 
00470    void Stop(void);  
00471    MTransferDirection GetDirection() {return Direction;}
00472    void SetDirection(MTransferDirection dir) {
00473       Direction = dir;
00474       Server->Direction = dir;
00475       }
00476    //bool IsActive() {
00477    //   boost::mutex::scoped_lock lock(Mutex);
00478    //   return Active;
00479    //   }
00480    bool IsPending() {
00481       boost::mutex::scoped_lock lock(AccessMutex);
00482       return Pending;
00483       }
00484    void SetPending(bool b) {
00485       boost::mutex::scoped_lock lock(AccessMutex);
00486       Pending = b;
00487       }
00488    /*const std::string& GetError() {
00489       boost::mutex::scoped_lock lock(Mutex);
00490       return Error;
00491       }*/
00492    void AddUpdate(MUpdate* u) {
00493       boost::mutex::scoped_lock lock(UpdateMutex);
00494       Updates.push_back(u);
00495       }
00496 
00497    MUpdateList* LockUpdates() {
00498       UpdateLock->lock();
00499       return &Updates;
00500       }
00501    void UnlockUpdates() {
00502       UpdateLock->unlock();
00503       }
00504       
00505    int NbUpdates() {
00506       boost::mutex::scoped_lock lock(UpdateMutex);
00507       return Updates.size();
00508       }
00509 
00510    std::string Mode2String() {
00511       switch(Mode) {
00512          case tmCLIENT: return "Client";
00513          case tmSERVER: return "Server";
00514          case tmBOTH: return "Both";
00515          default: return "Unknown";
00516          }
00517       }
00518    std::string Dir2String() {
00519       switch(Direction) {
00520          case tdUPLOAD: return "Upload";
00521          case tdDOWNLOAD: return "Download";
00522          default: return "Unknown";
00523          }
00524       }
00525    void Disconnected() {SetActive(false);}
00526    void EraseUpdate(MUpdateList::iterator& ite);
00527 
00528    void operator()(); 
00529 
00530 private:
00531    //bool                 Active;
00532    bool                 Pending; 
00533    MTransferTCPServer*  Server;
00534    MTransferTCPClient*  Client;
00535    //boost::thread*       Thread;
00536    MTransferDirection   Direction;
00537    //std::string          Error;
00538    MUpdateList          Updates;
00539    boost::mutex::scoped_lock* UpdateLock;
00540    boost::mutex         UpdateMutex;
00541    boost::mutex         AccessMutex;
00542 
00543    //void Start(void); ///< Start negociations (thread). Moved to private recently.
00544 
00545    //void SetActive(bool b) {
00546    //   boost::mutex::scoped_lock lock(Mutex);
00547    //   Active = b;
00548    //   }
00549    /*void SetError(const std::string& s) {
00550       boost::mutex::scoped_lock lock(Mutex);
00551       Error = s;
00552       }*/
00553 
00554    };
00555 
00556 inline  void MTransferTCPServer::OnDisconnection(MSocket* s) {
00557    Transfer->Disconnected();
00558    }
00559 inline  void MTransferTCPClient::OnDisconnection(MSocket* s) {
00560    Transfer->Disconnected();
00561    }
00562 
00563 //---------------------------------------------------------------------------
00564 //---------------------------------------------------------------------------
00565 //---------------------------------------------------------------------------
00566 
00577 class MTransferManager : public MThread {
00578 
00579 public:
00580 
00581    struct MStats {
00582       MStats() : Total(0),Cur(0),Max(0) {}
00583       unsigned int Total, Cur, Max, DL, UP;
00584       } Stats;
00585 
00586    unsigned char ClearStamp;
00587 
00588    MTransferManager();
00589    ~MTransferManager();
00590 
00594    int StartTransfer(MTransfer* t);
00595 
00596    void Cancel(int id, MTransfer::MUpdate& u);
00597 
00598    bool SameTransfer(MTransfer* t1, MTransfer* t2);
00599 
00600    MTransferList* LockTransfers() {
00601       TransferLock->lock();
00602       return &Transfers;
00603       }
00604    void UnlockTransfers() {
00605       TransferLock->unlock();
00606       }
00607 
00608    void operator()();
00609 
00610 private:
00611    MTransferList  Transfers;
00612    boost::mutex::scoped_lock* TransferLock;
00613    boost::mutex         TransferMutex;
00614    boost::mutex         StatMutex;
00615 
00616 
00617 /*
00618    void           SetActive(bool a) {
00619       boost::mutex::scoped_lock lock(Mutex);
00620       Active = a;
00621       }
00622 */
00623 
00624    };
00625 
00626 //---------------------------------------------------------------------------
00627 //---------------------------------------------------------------------------
00628 //---------------------------------------------------------------------------
00629 } // namespace
00630 
00631 #endif



MDFS SourceForge project page and download

SourceForge.net Logo

Generated on Sat Jan 3 03:14:20 2004 for MDFS by doxygen1.3.5