Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/libwebrtc/net/dcsctp/rx/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 9 kB image not shown  

Quelle  reassembly_queue.cc   Sprache: C

 
/*
 *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#include "net/dcsctp/rx/reassembly_queue.h"

#include <stddef.h>

#include <algorithm>
#include <cstdint>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "absl/strings/string_view.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
#include "net/dcsctp/public/dcsctp_handover_state.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
#include "net/dcsctp/rx/reassembly_streams.h"
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/str_join.h"

namespace dcsctp {
namespace {
std::unique_ptr<ReassemblyStreams> CreateStreams(
    absl::string_view log_prefix,
    ReassemblyStreams::OnAssembledMessage on_assembled_message,
    bool use_message_interleaving) {
  if (use_message_interleaving) {
    return std::make_unique<InterleavedReassemblyStreams>(
        log_prefix, std::move(on_assembled_message));
  }
  return std::make_unique<TraditionalReassemblyStreams>(
      log_prefix, std::move(on_assembled_message));
}
}  // namespace

ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
                                 size_t max_size_bytes,
                                 bool use_message_interleaving)
    : log_prefix_(log_prefix),
      max_size_bytes_(max_size_bytes),
      watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
      streams_(CreateStreams(
          log_prefix_,
          [this](rtc::ArrayView<const UnwrappedTSN> tsns,
                 DcSctpMessage message) {
            AddReassembledMessage(tsns, std::move(message));
          },
          use_message_interleaving)) {}

void ReassemblyQueue::Add(TSN tsn, Data data) {
  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
                       << ", stream=" << *data.stream_id << ":" << *data.mid
                       << ":" << *data.fsn << ", type="
                       << (data.is_beginning && data.is_end ? "complete"
                           : data.is_beginning              ? "first"
                           : data.is_end                    ? "last"
                                                            : "middle");

  UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);

  // If a stream reset has been received with a "sender's last assigned tsn" in
  // the future, the socket is in "deferred reset processing" mode and must
  // buffer chunks until it's exited.
  if (deferred_reset_streams_.has_value() &&
      unwrapped_tsn > deferred_reset_streams_->sender_last_assigned_tsn &&
      deferred_reset_streams_->streams.contains(data.stream_id)) {
    RTC_DLOG(LS_VERBOSE)
        << log_prefix_ << "Deferring chunk with tsn=" << *tsn
        << ", sid=" << *data.stream_id << " until tsn="
        << *deferred_reset_streams_->sender_last_assigned_tsn.Wrap();
    // https://tools.ietf.org/html/rfc6525#section-5.2.2
    // "In this mode, any data arriving with a TSN larger than the
    // Sender's Last Assigned TSN for the affected stream(s) MUST be queued
    // locally and held until the cumulative acknowledgment point reaches the
    // Sender's Last Assigned TSN."
    queued_bytes_ += data.size();
    deferred_reset_streams_->deferred_actions.push_back(
        [this, tsn, data = std::move(data)]() mutable {
          queued_bytes_ -= data.size();
          Add(tsn, std::move(data));
        });
  } else {
    queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
  }

  // https://tools.ietf.org/html/rfc4960#section-6.9
  // "Note: If the data receiver runs out of buffer space while still
  // waiting for more fragments to complete the reassembly of the message, it
  // should dispatch part of its inbound message through a partial delivery
  // API (see Section 10), freeing some of its receive buffer space so that
  // the rest of the message may be received."

  // TODO(boivie): Support EOR flag and partial delivery?
  RTC_DCHECK(IsConsistent());
}

void ReassemblyQueue::ResetStreamsAndLeaveDeferredReset(
    rtc::ArrayView<const StreamID> stream_ids) {
  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Resetting streams: ["
                       << webrtc::StrJoin(stream_ids, ",",
                                          [](rtc::StringBuilder& sb,
                                             StreamID sid) { sb << *sid; })
                       << "]";

  // https://tools.ietf.org/html/rfc6525#section-5.2.2
  // "... streams MUST be reset to 0 as the next expected SSN."
  streams_->ResetStreams(stream_ids);

  if (deferred_reset_streams_.has_value()) {
    RTC_DLOG(LS_VERBOSE) << log_prefix_
                         << "Leaving deferred reset processing, feeding back "
                         << deferred_reset_streams_->deferred_actions.size()
                         << " actions";
    // https://tools.ietf.org/html/rfc6525#section-5.2.2
    // "Any queued TSNs (queued at step E2) MUST now be released and processed
    // normally."
    auto deferred_actions =
        std::move(deferred_reset_streams_->deferred_actions);
    deferred_reset_streams_ = std::nullopt;

    for (auto& action : deferred_actions) {
      action();
    }
  }

  RTC_DCHECK(IsConsistent());
}

void ReassemblyQueue::EnterDeferredReset(
    TSN sender_last_assigned_tsn,
    rtc::ArrayView<const StreamID> streams) {
  if (!deferred_reset_streams_.has_value()) {
    RTC_DLOG(LS_VERBOSE) << log_prefix_
                         << "Entering deferred reset; sender_last_assigned_tsn="
                         << *sender_last_assigned_tsn;
    deferred_reset_streams_ = std::make_optional<DeferredResetStreams>(
        tsn_unwrapper_.Unwrap(sender_last_assigned_tsn),
        webrtc::flat_set<StreamID>(streams.begin(), streams.end()));
  }
  RTC_DCHECK(IsConsistent());
}

std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
  std::vector<DcSctpMessage> ret;
  reassembled_messages_.swap(ret);
  return ret;
}

void ReassemblyQueue::AddReassembledMessage(
    rtc::ArrayView<const UnwrappedTSN> tsns,
    DcSctpMessage message) {
  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
                       << webrtc::StrJoin(
                              tsns, ",",
                              [](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
                                sb << *tsn.Wrap();
                              })
                       << "], message; stream_id=" << *message.stream_id()
                       << ", ppid=" << *message.ppid()
                       << ", payload=" << message.payload().size() << " bytes";

  reassembled_messages_.emplace_back(std::move(message));
}

void ReassemblyQueue::HandleForwardTsn(
    TSN new_cumulative_tsn,
    rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
  UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(new_cumulative_tsn);

  if (deferred_reset_streams_.has_value() &&
      tsn > deferred_reset_streams_->sender_last_assigned_tsn) {
    RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
                         << "- deferring.";
    deferred_reset_streams_->deferred_actions.emplace_back(
        [this, new_cumulative_tsn,
         streams = std::vector<AnyForwardTsnChunk::SkippedStream>(
             skipped_streams.begin(), skipped_streams.end())] {
          HandleForwardTsn(new_cumulative_tsn, streams);
        });
    RTC_DCHECK(IsConsistent());
    return;
  }

  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
                       << " - performing.";
  queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams);
  RTC_DCHECK(IsConsistent());
}

bool ReassemblyQueue::IsConsistent() const {
  // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
  // enforced in this class. But in case it wraps around (becomes negative, but
  // as it's unsigned, that would wrap to very big), this would trigger.
  return (queued_bytes_ <= 2 * max_size_bytes_);
}

HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
  HandoverReadinessStatus status = streams_->GetHandoverReadiness();
  if (deferred_reset_streams_.has_value()) {
    status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
  }
  return status;
}

void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
  streams_->AddHandoverState(state);
}

void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
  streams_->RestoreFromState(state);
}
}  // namespace dcsctp

Messung V0.5
C=94 H=96 G=94

¤ Dauer der Verarbeitung: 0.4 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.