network.cpp

00001 // network.cpp - written and placed in the public domain by Wei Dai
00002 
00003 #include "pch.h"
00004 #include "network.h"
00005 #include "wait.h"
00006 
00007 #define CRYPTOPP_TRACE_NETWORK 0
00008 
00009 NAMESPACE_BEGIN(CryptoPP)
00010 
00011 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
00012 {
00013         if (!m_maxBytesPerSecond)
00014                 return ULONG_MAX;
00015 
00016         double curTime = GetCurTimeAndCleanUp();
00017         lword total = 0;
00018         for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
00019                 total += m_ops[i].second;
00020         return SaturatingSubtract(m_maxBytesPerSecond, total);
00021 }
00022 
00023 double LimitedBandwidth::TimeToNextTransceive()
00024 {
00025         if (!m_maxBytesPerSecond)
00026                 return 0;
00027 
00028         if (!m_nextTransceiveTime)
00029                 ComputeNextTransceiveTime();
00030 
00031         return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
00032 }
00033 
00034 void LimitedBandwidth::NoteTransceive(lword size)
00035 {
00036         if (m_maxBytesPerSecond)
00037         {
00038                 double curTime = GetCurTimeAndCleanUp();
00039                 m_ops.push_back(std::make_pair(curTime, size));
00040                 m_nextTransceiveTime = 0;
00041         }
00042 }
00043 
00044 void LimitedBandwidth::ComputeNextTransceiveTime()
00045 {
00046         double curTime = GetCurTimeAndCleanUp();
00047         lword total = 0;
00048         for (unsigned int i=0; i!=m_ops.size(); ++i)
00049                 total += m_ops[i].second;
00050         m_nextTransceiveTime =
00051                 (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
00052 }
00053 
00054 double LimitedBandwidth::GetCurTimeAndCleanUp()
00055 {
00056         if (!m_maxBytesPerSecond)
00057                 return 0;
00058 
00059         double curTime = m_timer.ElapsedTimeAsDouble();
00060         while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
00061                 m_ops.pop_front();
00062         return curTime;
00063 }
00064 
00065 void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
00066 {
00067         double nextTransceiveTime = TimeToNextTransceive();
00068         if (nextTransceiveTime)
00069                 container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
00070 }
00071 
00072 // *************************************************************
00073 
00074 size_t NonblockingSource::GeneralPump2(
00075         lword& byteCount, bool blockingOutput,
00076         unsigned long maxTime, bool checkDelimiter, byte delimiter)
00077 {
00078         m_blockedBySpeedLimit = false;
00079 
00080         if (!GetMaxBytesPerSecond())
00081         {
00082                 size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
00083                 m_doPumpBlocked = (ret != 0);
00084                 return ret;
00085         }
00086 
00087         bool forever = (maxTime == INFINITE_TIME);
00088         unsigned long timeToGo = maxTime;
00089         Timer timer(Timer::MILLISECONDS, forever);
00090         lword maxSize = byteCount;
00091         byteCount = 0;
00092 
00093         timer.StartTimer();
00094 
00095         while (true)
00096         {
00097                 lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
00098 
00099                 if (curMaxSize || m_doPumpBlocked)
00100                 {
00101                         if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00102                         size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
00103                         m_doPumpBlocked = (ret != 0);
00104                         if (curMaxSize)
00105                         {
00106                                 NoteTransceive(curMaxSize);
00107                                 byteCount += curMaxSize;
00108                         }
00109                         if (ret)
00110                                 return ret;
00111                 }
00112 
00113                 if (maxSize != ULONG_MAX && byteCount >= maxSize)
00114                         break;
00115 
00116                 if (!forever)
00117                 {
00118                         timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00119                         if (!timeToGo)
00120                                 break;
00121                 }
00122 
00123                 double waitTime = TimeToNextTransceive();
00124                 if (!forever && waitTime > timeToGo)
00125                 {
00126                         m_blockedBySpeedLimit = true;
00127                         break;
00128                 }
00129 
00130                 WaitObjectContainer container;
00131                 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
00132                 container.Wait((unsigned long)waitTime);
00133         }
00134 
00135         return 0;
00136 }
00137 
00138 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
00139 {
00140         if (messageCount == 0)
00141                 return 0;
00142 
00143         messageCount = 0;
00144 
00145         lword byteCount;
00146         do {
00147                 byteCount = LWORD_MAX;
00148                 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00149         } while(byteCount == LWORD_MAX);
00150 
00151         if (!m_messageEndSent && SourceExhausted())
00152         {
00153                 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
00154                 m_messageEndSent = true;
00155                 messageCount = 1;
00156         }
00157         return 0;
00158 }
00159 
00160 lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
00161 {
00162         m_blockedBySpeedLimit = false;
00163 
00164         size_t curBufSize = GetCurrentBufferSize();
00165         if (curBufSize <= targetSize && (targetSize || !EofPending()))
00166                 return 0;
00167 
00168         if (!GetMaxBytesPerSecond())
00169                 return DoFlush(maxTime, targetSize);
00170 
00171         bool forever = (maxTime == INFINITE_TIME);
00172         unsigned long timeToGo = maxTime;
00173         Timer timer(Timer::MILLISECONDS, forever);
00174         lword totalFlushed = 0;
00175 
00176         timer.StartTimer();
00177 
00178         while (true)
00179         {       
00180                 size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
00181                 if (flushSize || EofPending())
00182                 {
00183                         if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00184                         size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
00185                         if (ret)
00186                         {
00187                                 NoteTransceive(ret);
00188                                 curBufSize -= ret;
00189                                 totalFlushed += ret;
00190                         }
00191                 }
00192 
00193                 if (curBufSize <= targetSize && (targetSize || !EofPending()))
00194                         break;
00195 
00196                 if (!forever)
00197                 {
00198                         timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00199                         if (!timeToGo)
00200                                 break;
00201                 }
00202 
00203                 double waitTime = TimeToNextTransceive();
00204                 if (!forever && waitTime > timeToGo)
00205                 {
00206                         m_blockedBySpeedLimit = true;
00207                         break;
00208                 }
00209 
00210                 WaitObjectContainer container;
00211                 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
00212                 container.Wait((unsigned long)waitTime);
00213         }
00214 
00215         return totalFlushed;
00216 }
00217 
00218 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
00219 {
00220         TimedFlush(blocking ? INFINITE_TIME : 0);
00221         return hardFlush && (!!GetCurrentBufferSize() || EofPending());
00222 }
00223 
00224 // *************************************************************
00225 
00226 #ifdef HIGHRES_TIMER_AVAILABLE
00227 
00228 NetworkSource::NetworkSource(BufferedTransformation *attachment)
00229         : NonblockingSource(attachment), m_buf(1024*16)
00230         , m_waitingForResult(false), m_outputBlocked(false)
00231         , m_dataBegin(0), m_dataEnd(0)
00232 {
00233 }
00234 
00235 unsigned int NetworkSource::GetMaxWaitObjectCount() const
00236 {
00237         return LimitedBandwidth::GetMaxWaitObjectCount()
00238                 + GetReceiver().GetMaxWaitObjectCount()
00239                 + AttachedTransformation()->GetMaxWaitObjectCount();
00240 }
00241 
00242 void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
00243 {
00244         if (BlockedBySpeedLimit())
00245                 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
00246         else if (!m_outputBlocked)
00247         {
00248                 if (m_dataBegin == m_dataEnd)
00249                         AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack)); 
00250                 else
00251                         container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
00252         }
00253 
00254         AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
00255 }
00256 
00257 size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
00258 {
00259         NetworkReceiver &receiver = AccessReceiver();
00260 
00261         lword maxSize = byteCount;
00262         byteCount = 0;
00263         bool forever = maxTime == INFINITE_TIME;
00264         Timer timer(Timer::MILLISECONDS, forever);
00265         BufferedTransformation *t = AttachedTransformation();
00266 
00267         if (m_outputBlocked)
00268                 goto DoOutput;
00269 
00270         while (true)
00271         {
00272                 if (m_dataBegin == m_dataEnd)
00273                 {
00274                         if (receiver.EofReceived())
00275                                 break;
00276 
00277                         if (m_waitingForResult)
00278                         {
00279                                 if (receiver.MustWaitForResult() &&
00280                                         !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00281                                                 CallStack("NetworkSource::DoPump() - wait receive result", 0)))
00282                                         break;
00283 
00284                                 unsigned int recvResult = receiver.GetReceiveResult();
00285 #if CRYPTOPP_TRACE_NETWORK
00286                                 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00287 #endif
00288                                 m_dataEnd += recvResult;
00289                                 m_waitingForResult = false;
00290 
00291                                 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
00292                                         goto ReceiveNoWait;
00293                         }
00294                         else
00295                         {
00296                                 m_dataEnd = m_dataBegin = 0;
00297 
00298                                 if (receiver.MustWaitToReceive())
00299                                 {
00300                                         if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00301                                                         CallStack("NetworkSource::DoPump() - wait receive", 0)))
00302                                                 break;
00303 
00304                                         receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
00305                                         m_waitingForResult = true;
00306                                 }
00307                                 else
00308                                 {
00309 ReceiveNoWait:
00310                                         m_waitingForResult = true;
00311                                         // call Receive repeatedly as long as data is immediately available,
00312                                         // because some receivers tend to return data in small pieces
00313 #if CRYPTOPP_TRACE_NETWORK
00314                                         OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
00315 #endif
00316                                         while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
00317                                         {
00318                                                 unsigned int recvResult = receiver.GetReceiveResult();
00319 #if CRYPTOPP_TRACE_NETWORK
00320                                                 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00321 #endif
00322                                                 m_dataEnd += recvResult;
00323                                                 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
00324                                                 {
00325                                                         m_waitingForResult = false;
00326                                                         break;
00327                                                 }
00328                                         }
00329                                 }
00330                         }
00331                 }
00332                 else
00333                 {
00334                         m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
00335 
00336                         if (checkDelimiter)
00337                                 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
00338 
00339 DoOutput:
00340                         size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
00341                         if (result)
00342                         {
00343                                 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00344                                                 CallStack("NetworkSource::DoPump() - wait attachment", 0)))
00345                                         goto DoOutput;
00346                                 else
00347                                 {
00348                                         m_outputBlocked = true;
00349                                         return result;
00350                                 }
00351                         }
00352                         m_outputBlocked = false;
00353 
00354                         byteCount += m_putSize;
00355                         m_dataBegin += m_putSize;
00356                         if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
00357                                 break;
00358                         if (maxSize != ULONG_MAX && byteCount == maxSize)
00359                                 break;
00360                         // once time limit is reached, return even if there is more data waiting
00361                         // but make 0 a special case so caller can request a large amount of data to be
00362                         // pumped as long as it is immediately available
00363                         if (maxTime > 0 && timer.ElapsedTime() > maxTime)
00364                                 break;
00365                 }
00366         }
00367 
00368         return 0;
00369 }
00370 
00371 // *************************************************************
00372 
00373 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
00374         : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
00375         , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
00376         , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) 
00377         , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
00378         , m_currentSpeed(0), m_maxObservedSpeed(0)
00379 {
00380 }
00381 
00382 float NetworkSink::ComputeCurrentSpeed()
00383 {
00384         if (m_speedTimer.ElapsedTime() > 1000)
00385         {
00386                 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
00387                 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
00388                 m_byteCountSinceLastTimerReset = 0;
00389                 m_speedTimer.StartTimer();
00390 //              OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
00391         }
00392         return m_currentSpeed;
00393 }
00394 
00395 float NetworkSink::GetMaxObservedSpeed() const
00396 {
00397         lword m = GetMaxBytesPerSecond();
00398         return m ? STDMIN(m_maxObservedSpeed, float(m)) : m_maxObservedSpeed;
00399 }
00400 
00401 unsigned int NetworkSink::GetMaxWaitObjectCount() const
00402 {
00403         return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
00404 }
00405 
00406 void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
00407 {
00408         if (BlockedBySpeedLimit())
00409                 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
00410         else if (m_wasBlocked)
00411                 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
00412         else if (!m_buffer.IsEmpty())
00413                 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
00414         else if (EofPending())
00415                 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
00416 }
00417 
00418 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
00419 {
00420         if (m_eofState == EOF_DONE)
00421         {
00422                 if (length || messageEnd)
00423                         throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
00424 
00425                 return 0;
00426         }
00427 
00428         if (m_eofState > EOF_NONE)
00429                 goto EofSite;
00430 
00431         {
00432                 if (m_skipBytes)
00433                 {
00434                         assert(length >= m_skipBytes);
00435                         inString += m_skipBytes;
00436                         length -= m_skipBytes;
00437                 }
00438 
00439                 m_buffer.Put(inString, length);
00440 
00441                 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
00442                         TimedFlush(0, 0);
00443 
00444                 size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
00445                 if (blocking)
00446                         TimedFlush(INFINITE_TIME, targetSize);
00447 
00448                 if (m_buffer.CurrentSize() > targetSize)
00449                 {
00450                         assert(!blocking);
00451                         m_wasBlocked = true;
00452                         m_skipBytes += length;
00453                         size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
00454                         return STDMAX<size_t>(blockedBytes, 1);
00455                 }
00456 
00457                 m_wasBlocked = false;
00458                 m_skipBytes = 0;
00459         }
00460 
00461         if (messageEnd)
00462         {
00463                 m_eofState = EOF_PENDING_SEND;
00464 
00465         EofSite:
00466                 TimedFlush(blocking ? INFINITE_TIME : 0, 0);
00467                 if (m_eofState != EOF_DONE)
00468                         return 1;
00469         }
00470 
00471         return 0;
00472 }
00473 
00474 lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
00475 {
00476         NetworkSender &sender = AccessSender();
00477 
00478         bool forever = maxTime == INFINITE_TIME;
00479         Timer timer(Timer::MILLISECONDS, forever);
00480         unsigned int totalFlushSize = 0;
00481 
00482         while (true)
00483         {
00484                 if (m_buffer.CurrentSize() <= targetSize)
00485                         break;
00486                 
00487                 if (m_needSendResult)
00488                 {
00489                         if (sender.MustWaitForResult() &&
00490                                 !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00491                                         CallStack("NetworkSink::DoFlush() - wait send result", 0)))
00492                                 break;
00493 
00494                         unsigned int sendResult = sender.GetSendResult();
00495 #if CRYPTOPP_TRACE_NETWORK
00496                         OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
00497 #endif
00498                         m_buffer.Skip(sendResult);
00499                         totalFlushSize += sendResult;
00500                         m_needSendResult = false;
00501 
00502                         if (!m_buffer.AnyRetrievable())
00503                                 break;
00504                 }
00505 
00506                 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00507                 if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
00508                         break;
00509 
00510                 size_t contiguousSize = 0;
00511                 const byte *block = m_buffer.Spy(contiguousSize);
00512 
00513 #if CRYPTOPP_TRACE_NETWORK
00514                 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
00515 #endif
00516                 sender.Send(block, contiguousSize);
00517                 m_needSendResult = true;
00518 
00519                 if (maxTime > 0 && timeOut == 0)
00520                         break;  // once time limit is reached, return even if there is more data waiting
00521         }
00522 
00523         m_byteCountSinceLastTimerReset += totalFlushSize;
00524         ComputeCurrentSpeed();
00525         
00526         if (m_buffer.IsEmpty() && !m_needSendResult)
00527         {
00528                 if (m_eofState == EOF_PENDING_SEND)
00529                 {
00530                         sender.SendEof();
00531                         m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
00532                 }
00533 
00534                 while (m_eofState == EOF_PENDING_DELIVERY)
00535                 {
00536                         unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00537                         if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
00538                                 break;
00539 
00540                         if (sender.EofSent())
00541                                 m_eofState = EOF_DONE;
00542                 }
00543         }
00544 
00545         return totalFlushSize;
00546 }
00547 
00548 #endif  // #ifdef HIGHRES_TIMER_AVAILABLE
00549 
00550 NAMESPACE_END

Generated on Thu Nov 23 15:57:42 2006 for Crypto++ by  doxygen 1.5.1-p1