00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef MPNLBASEH
00019 #define MPNLBASEH
00020
00024 #include "MThread.h"
00025 #include <exception>
00026 #include <string>
00027 #include <list>
00028 #include <queue>
00029 #include <sstream>
00030 #include "MUtils.h"
00031
00032 #ifdef WIN32
00033 #include<winsock2.h>
00034
00035 #define MYSOCKET SOCKET
00036 #define MYINVALID_SOCKET INVALID_SOCKET
00037 #define MYSOCKET_ERROR SOCKET_ERROR
00038 #define MYCLOSE(s) closesocket(s)
00039 #define SLEEP(x) Sleep(x)
00040 #else
00041 #include <netinet/in.h>
00042 #include <arpa/inet.h>
00043 #include <sys/time.h>
00044 #include <netdb.h>
00045
00046 #define MYSOCKET int
00047 #define MYINVALID_SOCKET -1
00048 #define MYSOCKET_ERROR -1
00049 #define MYCLOSE(s) close(s)
00050 #define SLEEP(x) usleep((x)*1000)
00051 #endif
00052
00053 namespace MPNL {
00054
00055
00056
00057
00058
00059 std::string GetLocalIP();
00060 std::string GetHostName(int socket);
00061
00062
00063
00064
00065 enum MMode {MODE_LENPREFIXED, MODE_LINE, MODE_PACKET};
00066
00067 class MPNLBase;
00068 class MSocket;
00069 struct MClient {
00070 MClient () : Socket(NULL), ToBeDeleted(false) {}
00071 MSocket* Socket;
00072
00073 bool ToBeDeleted;
00074 unsigned long ID;
00075 };
00076 typedef std::list<MClient> MClientList;
00077
00078 class MTransferManager;
00079
00080
00081
00082
00083
00088 class MSocket : public MThread {
00089
00090 public:
00091 void* Data;
00092 MMode Mode;
00093 int PrefixLen;
00094 struct sockaddr_in ServAddr;
00095 std::string PeerIP, LocalIP;
00096 int PeerPort;
00097 std::iostream* Stream;
00098
00099 MSocket(int type, MPNLBase* owner);
00100 virtual ~MSocket();
00101
00102 void operator()();
00103 bool IsInvalid();
00104 bool Read(std::string& msg);
00105 bool Read(std::string& ip, std::string& msg);
00106 virtual void Stop();
00107 int Send(const std::string& msg);
00108 unsigned long SendStream();
00109 void SetStream(std::iostream* s) {Stream = s;}
00110 int SendTo(const std::string& ip, int port, const std::string& msg);
00111 MYSOCKET GetS() const {return S;}
00112 void SetS(MYSOCKET s) {S=s;}
00113 bool Listen(int port);
00114
00115 bool HasMsg();
00116 std::string GetLocalIP();
00117
00118 protected:
00119 typedef std::queue<std::string> MMsgQueue;
00120
00121
00122 MYSOCKET S;
00123 int Type;
00124 MPNLBase* Owner;
00125
00126 MMsgQueue Messages;
00127 MMsgQueue IP;
00128 TIME LastMsgTime;
00129 boost::mutex MsgMutex;
00130
00132
00133
00134
00135 void PushMsg(const std::string& msg);
00136 void PushMsg(const std::string& ip, const std::string& msg);
00137 int Send(const char* str, int len, int flags);
00138
00139 };
00140
00141
00142
00143
00144 class MPNLBase {
00145
00146 public:
00147 int Port;
00148
00149 MPNLBase();
00150 virtual ~MPNLBase();
00151
00152 virtual void SetMode(MMode m)=0;
00153 virtual void SetPrefixLen(int len)=0;
00154 virtual void SetStream(std::iostream* s)=0;
00155 virtual bool OnConnection(MSocket* s)=0;
00156 virtual void OnDisconnection(MSocket* s)=0;
00157 virtual void PushMsg(MSocket*) {};
00158 void SetLastError(const std::string& a) {
00159 boost::mutex::scoped_lock lock(Mutex);
00160 LastError = a;
00161 }
00162 std::string GetLastError() {
00163 boost::mutex::scoped_lock lock(Mutex);
00164 return LastError;
00165 }
00166
00167 protected:
00168 typedef std::deque<MSocket*> MMsgQueue;
00169 MMsgQueue Messages;
00170 MMode Mode;
00171 int PrefixLen;
00172
00173 private:
00174 std::string LastError;
00175 boost::mutex Mutex;
00176
00177 };
00178
00179
00180
00181
00182 class MTCPServer : public MPNLBase, public MThread {
00183
00184 public:
00185 struct MStats {
00186 MStats() {Total = CurCon = MaxCon = 0;}
00187 unsigned long Total, CurCon, MaxCon;
00188 };
00189
00190 MTCPServer();
00191 virtual ~MTCPServer();
00192
00193 void operator()();
00194 bool Listen();
00195 bool Read(MSocket*& s);
00196 void Disconnect(MSocket* s);
00197 virtual bool OnConnection(MSocket* s);
00198 virtual void OnDisconnection(MSocket* s);
00199 virtual void SetMode(MMode m);
00200 virtual void SetPrefixLen(int len);
00201 virtual void SetStream(std::iostream* s);
00202 virtual void Stop();
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213 void GetStats(MStats& s);
00214 void BroadcastMsg(const std::string& msg);
00215 int Send(const std::string& ip, const std::string& msg);
00216 bool IsConnected(const std::string& ip);
00217 int NbClients() {
00218 boost::mutex::scoped_lock lock(ClientMutex);
00219 return Clients.size();
00220 }
00222 MClientList* LockClients() {
00223 if(ClientLock->locked()) return NULL;
00224 ClientLock->lock();
00225 return &Clients;
00226 }
00227 void UnlockClients() {
00228 ClientLock->unlock();
00229 }
00230
00231
00232 protected:
00233 boost::mutex ClientMutex;
00234 MStats Stats;
00235 private:
00236 MClientList Clients;
00237 boost::mutex::scoped_lock* ClientLock;
00238 boost::mutex MsgMutex;
00239 MSocket* Socket;
00240
00241
00242
00243 std::iostream* Stream;
00244
00245 virtual void PushMsg(MSocket* s);
00246 bool SocketHasMsg(MSocket* s);
00247
00248 };
00249
00250
00251
00252
00253 class MTCPClient : public MPNLBase {
00254
00255 public:
00256 std::string Host;
00257
00258 MTCPClient();
00259 virtual ~MTCPClient();
00260
00261 bool Connect();
00262 void Disconnect();
00263
00264
00268 int Send(const std::string& msg);
00269 unsigned long SendStream();
00270
00271 bool Read(std::string& msg);
00272 bool OnConnection(MSocket* s);
00273 void OnDisconnection(MSocket* s);
00274 bool IsConnected() {return Socket->IsActive();}
00275 void SetMode(MMode m) {Mode = m;Socket->Mode = m;}
00276 void SetPrefixLen(int len);
00277 virtual void SetStream(std::iostream* s);
00278 std::string GetLocalIP() {return Socket->GetLocalIP();}
00279
00280 private:
00281
00282 MSocket* Socket;
00283
00284 struct sockaddr_in LocalAddr;
00285 struct sockaddr_in ServAddr;
00286
00287
00288 };
00289
00290
00291
00292
00293 class MUDPServer : public MPNLBase, public MThread {
00294
00295 public:
00296 MUDPServer();
00297 virtual ~MUDPServer();
00298
00299 void operator()();
00300 virtual bool Listen(int port);
00301 virtual bool Read(MSocket*& s);
00302
00303
00304
00305 virtual void SetMode(MMode m);
00306
00307
00308
00309
00310
00311
00312 private:
00313
00314 boost::mutex MsgMutex;
00315 MSocket* Socket;
00316
00317 bool Active;
00318
00319
00320 void PushMsg(MSocket* s);
00321 bool SocketHasMsg();
00322
00323 };
00324
00325
00326
00327
00328 class MUDPClient : public MPNLBase {
00329
00330 public:
00331 std::string Host;
00332
00333 MUDPClient();
00334 virtual ~MUDPClient();
00335
00343 bool PrepareToSend();
00344
00348 virtual int Send(const std::string& msg);
00349
00350 virtual void SetMode(MMode m) {Mode = m;Socket->Mode = m;}
00351
00352 private:
00353 MSocket* Socket;
00354
00355 };
00356
00357
00358
00359
00360
00361 enum MTransferDirection {tdDOWNLOAD=0, tdUPLOAD=1};
00362 enum MTransferMode {tmCLIENT, tmSERVER, tmBOTH};
00363 class MTransfer;
00364 typedef std::list<MTransfer*> MTransferList;
00365
00366
00367 class MTransferTCPServer : public MTCPServer {
00368
00369 public:
00370 MTransferDirection Direction;
00371 MTransfer* Transfer;
00372
00373 MTransferTCPServer(MTransfer* t) : MTCPServer() {Transfer=t;}
00374 ~MTransferTCPServer() {}
00375
00376
00377 bool OnConnection(MSocket* s) {
00378 if(Direction==tdDOWNLOAD) return true;
00379 boost::mutex::scoped_lock lock(ClientMutex);
00380 Stats.Total++;
00381 lock.unlock();
00382 s->SendStream();
00383 SetActive(false);
00384 return false;
00385 }
00386 void OnDisconnection(MSocket* s);
00387 };
00388
00389
00390 class MTransferTCPClient : public MTCPClient {
00391
00392 public:
00393 MTransfer* Transfer;
00394
00395 MTransferTCPClient(MTransfer* t) : MTCPClient() {Transfer=t;}
00396 ~MTransferTCPClient() {}
00397
00398 void OnDisconnection(MSocket* s);
00399
00400 };
00401
00402
00411 class MTransfer : public MThread {
00412 friend class MTransferManager;
00413 public:
00414 class MUpdate {
00415 public:
00416 MUpdate() : Stamp(0) {}
00417 MUpdate(MTransfer* t) : Stamp(0), PeerIP(t->PeerIP), ID(t->ID) {}
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436 enum MType {utADD, utPROGRESS, utMSG, utREMOVE} Type;
00437 enum MResult {urOK, urFAILED} Result;
00438 std::string Msg, PeerIP;
00439 int ID;
00440 MTransferDirection Direction;
00441 unsigned char GetStamp() {
00442 boost::mutex::scoped_lock lock(Mutex);
00443 return Stamp;
00444 }
00445 void SetStamp(unsigned char s) {
00446 boost::mutex::scoped_lock lock(Mutex);
00447 Stamp |= s;
00448 }
00449
00450
00451 private:
00452 boost::mutex Mutex;
00453 unsigned char Stamp;
00454 };
00455 typedef std::deque<MUpdate*> MUpdateList;
00456
00457 MTransferMode Mode;
00458 std::string PeerIP;
00459 std::string LocalPath;
00460 std::string MD5;
00461 std::string URL;
00462 int Port;
00464 int ID;
00465 bool Dead;
00466
00467 MTransfer();
00468 ~MTransfer();
00469
00470 void Stop(void);
00471 MTransferDirection GetDirection() {return Direction;}
00472 void SetDirection(MTransferDirection dir) {
00473 Direction = dir;
00474 Server->Direction = dir;
00475 }
00476
00477
00478
00479
00480 bool IsPending() {
00481 boost::mutex::scoped_lock lock(AccessMutex);
00482 return Pending;
00483 }
00484 void SetPending(bool b) {
00485 boost::mutex::scoped_lock lock(AccessMutex);
00486 Pending = b;
00487 }
00488
00489
00490
00491
00492 void AddUpdate(MUpdate* u) {
00493 boost::mutex::scoped_lock lock(UpdateMutex);
00494 Updates.push_back(u);
00495 }
00496
00497 MUpdateList* LockUpdates() {
00498 UpdateLock->lock();
00499 return &Updates;
00500 }
00501 void UnlockUpdates() {
00502 UpdateLock->unlock();
00503 }
00504
00505 int NbUpdates() {
00506 boost::mutex::scoped_lock lock(UpdateMutex);
00507 return Updates.size();
00508 }
00509
00510 std::string Mode2String() {
00511 switch(Mode) {
00512 case tmCLIENT: return "Client";
00513 case tmSERVER: return "Server";
00514 case tmBOTH: return "Both";
00515 default: return "Unknown";
00516 }
00517 }
00518 std::string Dir2String() {
00519 switch(Direction) {
00520 case tdUPLOAD: return "Upload";
00521 case tdDOWNLOAD: return "Download";
00522 default: return "Unknown";
00523 }
00524 }
00525 void Disconnected() {SetActive(false);}
00526 void EraseUpdate(MUpdateList::iterator& ite);
00527
00528 void operator()();
00529
00530 private:
00531
00532 bool Pending;
00533 MTransferTCPServer* Server;
00534 MTransferTCPClient* Client;
00535
00536 MTransferDirection Direction;
00537
00538 MUpdateList Updates;
00539 boost::mutex::scoped_lock* UpdateLock;
00540 boost::mutex UpdateMutex;
00541 boost::mutex AccessMutex;
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554 };
00555
00556 inline void MTransferTCPServer::OnDisconnection(MSocket* s) {
00557 Transfer->Disconnected();
00558 }
00559 inline void MTransferTCPClient::OnDisconnection(MSocket* s) {
00560 Transfer->Disconnected();
00561 }
00562
00563
00564
00565
00566
00577 class MTransferManager : public MThread {
00578
00579 public:
00580
00581 struct MStats {
00582 MStats() : Total(0),Cur(0),Max(0) {}
00583 unsigned int Total, Cur, Max, DL, UP;
00584 } Stats;
00585
00586 unsigned char ClearStamp;
00587
00588 MTransferManager();
00589 ~MTransferManager();
00590
00594 int StartTransfer(MTransfer* t);
00595
00596 void Cancel(int id, MTransfer::MUpdate& u);
00597
00598 bool SameTransfer(MTransfer* t1, MTransfer* t2);
00599
00600 MTransferList* LockTransfers() {
00601 TransferLock->lock();
00602 return &Transfers;
00603 }
00604 void UnlockTransfers() {
00605 TransferLock->unlock();
00606 }
00607
00608 void operator()();
00609
00610 private:
00611 MTransferList Transfers;
00612 boost::mutex::scoped_lock* TransferLock;
00613 boost::mutex TransferMutex;
00614 boost::mutex StatMutex;
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624 };
00625
00626
00627
00628
00629 }
00630
00631 #endif