Refactored file assemble and send code #4601

This commit is contained in:
Jerry (Xinyu Hou) 2015-05-22 11:27:57 -07:00
parent 1df566d241
commit a81b88c730
12 changed files with 140 additions and 209 deletions

View File

@ -780,25 +780,6 @@ Client::writeToDropDirThread(void*)
m_receivedFileData); m_receivedFileData);
} }
void
Client::clearReceivedFileData()
{
m_receivedFileData.clear();
}
void
Client::setExpectedFileSize(String data)
{
std::istringstream iss(data);
iss >> m_expectedFileSize;
}
void
Client::fileChunkReceived(String data)
{
m_receivedFileData += data;
}
void void
Client::dragInfoReceived(UInt32 fileNum, String data) Client::dragInfoReceived(UInt32 fileNum, String data)
{ {

View File

@ -85,15 +85,6 @@ public:
*/ */
virtual void handshakeComplete(); virtual void handshakeComplete();
//! Clears the file buffer
void clearReceivedFileData();
//! Set the expected size of receiving file
void setExpectedFileSize(String data);
//! Received a chunk of file data
void fileChunkReceived(String data);
//! Received drag information //! Received drag information
void dragInfoReceived(UInt32 fileNum, String data); void dragInfoReceived(UInt32 fileNum, String data);
@ -131,7 +122,10 @@ public:
bool isReceivedFileSizeValid(); bool isReceivedFileSizeValid();
//! Return expected file size //! Return expected file size
size_t getExpectedFileSize() { return m_expectedFileSize; } size_t& getExpectedFileSize() { return m_expectedFileSize; }
//! Return received file data
String& getReceivedFileData() { return m_receivedFileData; }
//@} //@}

View File

@ -19,6 +19,7 @@
#include "client/ServerProxy.h" #include "client/ServerProxy.h"
#include "client/Client.h" #include "client/Client.h"
#include "synergy/FileChunk.h"
#include "synergy/ClipboardChunk.h" #include "synergy/ClipboardChunk.h"
#include "synergy/StreamChunker.h" #include "synergy/StreamChunker.h"
#include "synergy/Clipboard.h" #include "synergy/Clipboard.h"
@ -37,8 +38,6 @@
// ServerProxy // ServerProxy
// //
const UInt16 ServerProxy::m_intervalThreshold = 1;
ServerProxy::ServerProxy(Client* client, synergy::IStream* stream, IEventQueue* events) : ServerProxy::ServerProxy(Client* client, synergy::IStream* stream, IEventQueue* events) :
m_client(client), m_client(client),
m_stream(stream), m_stream(stream),
@ -53,10 +52,7 @@ ServerProxy::ServerProxy(Client* client, synergy::IStream* stream, IEventQueue*
m_keepAliveAlarm(0.0), m_keepAliveAlarm(0.0),
m_keepAliveAlarmTimer(NULL), m_keepAliveAlarmTimer(NULL),
m_parser(&ServerProxy::parseHandshakeMessage), m_parser(&ServerProxy::parseHandshakeMessage),
m_events(events), m_events(events)
m_stopwatch(true),
m_elapsedTime(0),
m_receivedDataSize(0)
{ {
assert(m_client != NULL); assert(m_client != NULL);
assert(m_stream != NULL); assert(m_stream != NULL);
@ -860,50 +856,13 @@ ServerProxy::infoAcknowledgment()
void void
ServerProxy::fileChunkReceived() ServerProxy::fileChunkReceived()
{ {
// parse int result = FileChunk::assemble(
UInt8 mark = 0; m_stream,
String content; m_client->getReceivedFileData(),
ProtocolUtil::readf(m_stream, kMsgDFileTransfer + 4, &mark, &content); m_client->getExpectedFileSize());
switch (mark) { if (result == kFinish) {
case kDataStart:
m_client->clearReceivedFileData();
m_client->setExpectedFileSize(content);
if (CLOG->getFilter() >= kDEBUG2) {
LOG((CLOG_DEBUG2 "recv file data from server: size=%s", content.c_str()));
m_stopwatch.start();
}
break;
case kDataChunk:
m_client->fileChunkReceived(content);
if (CLOG->getFilter() >= kDEBUG2) {
LOG((CLOG_DEBUG2 "recv file data from server: size=%i", content.size()));
double interval = m_stopwatch.getTime();
LOG((CLOG_DEBUG2 "recv file data from server: interval=%f s", interval));
m_receivedDataSize += content.size();
if (interval >= m_intervalThreshold) {
double averageSpeed = m_receivedDataSize / interval / 1000;
LOG((CLOG_DEBUG2 "recv file data from server: average speed=%f kb/s", averageSpeed));
m_receivedDataSize = 0;
m_elapsedTime += interval;
m_stopwatch.reset();
}
}
break;
case kDataEnd:
m_events->addEvent(Event(m_events->forIScreen().fileRecieveCompleted(), m_client)); m_events->addEvent(Event(m_events->forIScreen().fileRecieveCompleted(), m_client));
if (CLOG->getFilter() >= kDEBUG2) {
LOG((CLOG_DEBUG2 "file data transfer finished"));
m_elapsedTime += m_stopwatch.getTime();
double averageSpeed = m_client->getExpectedFileSize() / m_elapsedTime / 1000;
LOG((CLOG_DEBUG2 "file data transfer finished: total time consumed=%f s", m_elapsedTime));
LOG((CLOG_DEBUG2 "file data transfer finished: total data received=%i kb", m_client->getExpectedFileSize() / 1000));
LOG((CLOG_DEBUG2 "file data transfer finished: total average speed=%f kb/s", averageSpeed));
}
break;
} }
} }
@ -927,23 +886,7 @@ ServerProxy::handleClipboardSendingEvent(const Event& event, void*)
void void
ServerProxy::fileChunkSending(UInt8 mark, char* data, size_t dataSize) ServerProxy::fileChunkSending(UInt8 mark, char* data, size_t dataSize)
{ {
String chunk(data, dataSize); FileChunk::send(m_stream, mark, data, dataSize);
switch (mark) {
case kDataStart:
LOG((CLOG_DEBUG2 "file sending start: size=%s", data));
break;
case kDataChunk:
LOG((CLOG_DEBUG2 "file chunk sending: size=%i", chunk.size()));
break;
case kDataEnd:
LOG((CLOG_DEBUG2 "file sending finished"));
break;
}
ProtocolUtil::writef(m_stream, kMsgDFileTransfer, mark, &chunk);
} }
void void

View File

@ -130,9 +130,4 @@ private:
MessageParser m_parser; MessageParser m_parser;
IEventQueue* m_events; IEventQueue* m_events;
Stopwatch m_stopwatch;
double m_elapsedTime;
size_t m_receivedDataSize;
static const UInt16 m_intervalThreshold;
}; };

View File

@ -18,6 +18,7 @@
#include "server/ClientProxy1_5.h" #include "server/ClientProxy1_5.h"
#include "server/Server.h" #include "server/Server.h"
#include "synergy/FileChunk.h"
#include "synergy/StreamChunker.h" #include "synergy/StreamChunker.h"
#include "synergy/ProtocolUtil.h" #include "synergy/ProtocolUtil.h"
#include "io/IStream.h" #include "io/IStream.h"
@ -29,14 +30,9 @@
// ClientProxy1_5 // ClientProxy1_5
// //
const UInt16 ClientProxy1_5::m_intervalThreshold = 1;
ClientProxy1_5::ClientProxy1_5(const String& name, synergy::IStream* stream, Server* server, IEventQueue* events) : ClientProxy1_5::ClientProxy1_5(const String& name, synergy::IStream* stream, Server* server, IEventQueue* events) :
ClientProxy1_4(name, stream, server, events), ClientProxy1_4(name, stream, server, events),
m_events(events), m_events(events)
m_stopwatch(true),
m_elapsedTime(0),
m_receivedDataSize(0)
{ {
} }
@ -55,23 +51,7 @@ ClientProxy1_5::sendDragInfo(UInt32 fileCount, const char* info, size_t size)
void void
ClientProxy1_5::fileChunkSending(UInt8 mark, char* data, size_t dataSize) ClientProxy1_5::fileChunkSending(UInt8 mark, char* data, size_t dataSize)
{ {
String chunk(data, dataSize); FileChunk::send(getStream(), mark, data, dataSize);
switch (mark) {
case kDataStart:
LOG((CLOG_DEBUG2 "file sending start: size=%s", data));
break;
case kDataChunk:
LOG((CLOG_DEBUG2 "file chunk sending: size=%i", chunk.size()));
break;
case kDataEnd:
LOG((CLOG_DEBUG2 "file sending finished"));
break;
}
ProtocolUtil::writef(getStream(), kMsgDFileTransfer, mark, &chunk);
} }
bool bool
@ -93,51 +73,15 @@ ClientProxy1_5::parseMessage(const UInt8* code)
void void
ClientProxy1_5::fileChunkReceived() ClientProxy1_5::fileChunkReceived()
{ {
// parse
UInt8 mark = 0;
String content;
ProtocolUtil::readf(getStream(), kMsgDFileTransfer + 4, &mark, &content);
Server* server = getServer(); Server* server = getServer();
switch (mark) { int result = FileChunk::assemble(
case kDataStart: getStream(),
server->clearReceivedFileData(); server->getReceivedFileData(),
server->setExpectedFileSize(content); server->getExpectedFileSize());
if (CLOG->getFilter() >= kDEBUG2) {
LOG((CLOG_DEBUG2 "recv file data from client: file size=%s", content.c_str()));
m_stopwatch.start();
}
break;
case kDataChunk:
server->fileChunkReceived(content);
if (CLOG->getFilter() >= kDEBUG2) {
LOG((CLOG_DEBUG2 "recv file data from client: chunck size=%i", content.size()));
double interval = m_stopwatch.getTime();
m_receivedDataSize += content.size();
LOG((CLOG_DEBUG2 "recv file data from client: interval=%f s", interval));
if (interval >= m_intervalThreshold) {
double averageSpeed = m_receivedDataSize / interval / 1000;
LOG((CLOG_DEBUG2 "recv file data from client: average speed=%f kb/s", averageSpeed));
m_receivedDataSize = 0; if (result == kFinish) {
m_elapsedTime += interval;
m_stopwatch.reset();
}
}
break;
case kDataEnd:
m_events->addEvent(Event(m_events->forIScreen().fileRecieveCompleted(), server)); m_events->addEvent(Event(m_events->forIScreen().fileRecieveCompleted(), server));
if (CLOG->getFilter() >= kDEBUG2) {
LOG((CLOG_DEBUG2 "file data transfer finished"));
m_elapsedTime += m_stopwatch.getTime();
double averageSpeed = getServer()->getExpectedFileSize() / m_elapsedTime / 1000;
LOG((CLOG_DEBUG2 "file data transfer finished: total time consumed=%f s", m_elapsedTime));
LOG((CLOG_DEBUG2 "file data transfer finished: total data received=%i kb", getServer()->getExpectedFileSize() / 1000));
LOG((CLOG_DEBUG2 "file data transfer finished: total average speed=%f kb/s", averageSpeed));
}
break;
} }
} }

View File

@ -38,9 +38,4 @@ public:
private: private:
IEventQueue* m_events; IEventQueue* m_events;
Stopwatch m_stopwatch;
double m_elapsedTime;
size_t m_receivedDataSize;
static const UInt16 m_intervalThreshold;
}; };

View File

@ -2338,25 +2338,6 @@ Server::KeyboardBroadcastInfo::alloc(State state, const String& screens)
return info; return info;
} }
void
Server::clearReceivedFileData()
{
m_receivedFileData.clear();
}
void
Server::setExpectedFileSize(String data)
{
std::istringstream iss(data);
iss >> m_expectedFileSize;
}
void
Server::fileChunkReceived(String data)
{
m_receivedFileData += data;
}
bool bool
Server::isReceivedFileSizeValid() Server::isReceivedFileSizeValid()
{ {

View File

@ -141,15 +141,6 @@ public:
*/ */
void disconnect(); void disconnect();
//! Clears the file buffer
void clearReceivedFileData();
//! Set the expected size of receiving file
void setExpectedFileSize(String data);
//! Received a chunk of file data
void fileChunkReceived(String data);
//! Create a new thread and use it to send file to client //! Create a new thread and use it to send file to client
void sendFileToClient(const char* filename); void sendFileToClient(const char* filename);
@ -178,8 +169,11 @@ public:
//! Return true if recieved file size is valid //! Return true if recieved file size is valid
bool isReceivedFileSizeValid(); bool isReceivedFileSizeValid();
//! Return expected file size //! Return expected file data size
size_t getExpectedFileSize() { return m_expectedFileSize; } size_t& getExpectedFileSize() { return m_expectedFileSize; }
//! Return received file data
String& getReceivedFileData() { return m_receivedFileData; }
//@} //@}

View File

@ -24,12 +24,6 @@
#define CLIPBOARD_CHUNK_META_SIZE 7 #define CLIPBOARD_CHUNK_META_SIZE 7
enum EAssembleResult {
kNotFinish,
kFinish,
kError
};
namespace synergy { namespace synergy {
class IStream; class IStream;
}; };

View File

@ -17,7 +17,13 @@
#include "synergy/FileChunk.h" #include "synergy/FileChunk.h"
#include "synergy/ProtocolUtil.h"
#include "synergy/protocol_types.h" #include "synergy/protocol_types.h"
#include "io/IStream.h"
#include "base/Stopwatch.h"
#include "base/Log.h"
static const UInt16 kIntervalThreshold = 1;
FileChunk::FileChunk(size_t size) : FileChunk::FileChunk(size_t size) :
Chunk(size) Chunk(size)
@ -60,3 +66,87 @@ FileChunk::end()
return end; return end;
} }
int
FileChunk::assemble(synergy::IStream* stream, String& dataReceived, size_t& expectedSize)
{
// parse
UInt8 mark = 0;
String content;
static size_t receivedDataSize;
static double elapsedTime;
static Stopwatch stopwatch;
if (!ProtocolUtil::readf(stream, kMsgDFileTransfer + 4, &mark, &content)) {
return kError;
}
switch (mark) {
case kDataStart:
dataReceived.clear();
expectedSize = synergy::string::stringToSizeType(content);
receivedDataSize = 0;
elapsedTime = 0;
stopwatch.reset();
if (CLOG->getFilter() >= kDEBUG2) {
LOG((CLOG_DEBUG2 "recv file data from client: file size=%s", content.c_str()));
stopwatch.start();
}
return kNotFinish;
case kDataChunk:
dataReceived.append(content);
if (CLOG->getFilter() >= kDEBUG2) {
LOG((CLOG_DEBUG2 "recv file data from client: chunck size=%i", content.size()));
double interval = stopwatch.getTime();
receivedDataSize += content.size();
LOG((CLOG_DEBUG2 "recv file data from client: interval=%f s", interval));
if (interval >= kIntervalThreshold) {
double averageSpeed = receivedDataSize / interval / 1000;
LOG((CLOG_DEBUG2 "recv file data from client: average speed=%f kb/s", averageSpeed));
receivedDataSize = 0;
elapsedTime += interval;
stopwatch.reset();
}
}
return kNotFinish;
case kDataEnd:
//m_events->addEvent(Event(m_events->forIScreen().fileRecieveCompleted(), server));
if (CLOG->getFilter() >= kDEBUG2) {
LOG((CLOG_DEBUG2 "file data transfer finished"));
elapsedTime += stopwatch.getTime();
double averageSpeed = expectedSize / elapsedTime / 1000;
LOG((CLOG_DEBUG2 "file data transfer finished: total time consumed=%f s", elapsedTime));
LOG((CLOG_DEBUG2 "file data transfer finished: total data received=%i kb", expectedSize / 1000));
LOG((CLOG_DEBUG2 "file data transfer finished: total average speed=%f kb/s", averageSpeed));
}
return kFinish;
}
return kError;
}
void
FileChunk::send(synergy::IStream* stream, UInt8 mark, char* data, size_t dataSize)
{
String chunk(data, dataSize);
switch (mark) {
case kDataStart:
LOG((CLOG_DEBUG2 "sending file chunk start: size=%s", data));
break;
case kDataChunk:
LOG((CLOG_DEBUG2 "sending file chunk: size=%i", chunk.size()));
break;
case kDataEnd:
LOG((CLOG_DEBUG2 "sending file finished"));
break;
}
ProtocolUtil::writef(stream, kMsgDFileTransfer, mark, &chunk);
}

View File

@ -23,11 +23,24 @@
#define FILE_CHUNK_META_SIZE 2 #define FILE_CHUNK_META_SIZE 2
namespace synergy {
class IStream;
};
class FileChunk : public Chunk { class FileChunk : public Chunk {
public: public:
FileChunk(size_t size); FileChunk(size_t size);
static FileChunk* start(const String& size); static FileChunk* start(const String& size);
static FileChunk* data(UInt8* data, size_t dataSize); static FileChunk* data(UInt8* data, size_t dataSize);
static FileChunk* end(); static FileChunk* end();
static int assemble(
synergy::IStream* stream,
String& dataCached,
size_t& expectedSize);
static void send(
synergy::IStream* stream,
UInt8 mark,
char* data,
size_t dataSize);
}; };

View File

@ -70,13 +70,20 @@ enum EDirectionMask {
kBottomMask = 1 << kBottom kBottomMask = 1 << kBottom
}; };
// file transfer constants // Data transfer constants
enum EDataTransfer { enum EDataTransfer {
kDataStart = 1, kDataStart = 1,
kDataChunk = 2, kDataChunk = 2,
kDataEnd = 3 kDataEnd = 3
}; };
// Data received constants
enum EDataReceived {
kNotFinish,
kFinish,
kError
};
// //
// message codes (trailing NUL is not part of code). in comments, $n // message codes (trailing NUL is not part of code). in comments, $n
// refers to the n'th argument (counting from one). message codes are // refers to the n'th argument (counting from one). message codes are