mirror of
https://github.com/encounter/engine.git
synced 2026-03-30 11:09:55 -07:00
Add trace event slices for the duration in which the engine produces a pipeline item. (#2985)
* Add trace event slices for the duration in which the engine produces a pipeline item. * Trace the entire lifetime of an item in the pipeline.
This commit is contained in:
+30
-13
@@ -34,28 +34,32 @@ class Pipeline : public ftl::RefCountedThreadSafe<Pipeline<R>> {
|
||||
/// preparing a completed pipeline resource.
|
||||
class ProducerContinuation {
|
||||
public:
|
||||
ProducerContinuation() = default;
|
||||
ProducerContinuation() : trace_id_(0) {}
|
||||
|
||||
ProducerContinuation(ProducerContinuation&& other)
|
||||
: continuation_(other.continuation_) {
|
||||
: continuation_(other.continuation_), trace_id_(other.trace_id_) {
|
||||
other.continuation_ = nullptr;
|
||||
other.trace_id_ = 0;
|
||||
}
|
||||
|
||||
ProducerContinuation& operator=(ProducerContinuation&& other) {
|
||||
std::swap(continuation_, other.continuation_);
|
||||
std::swap(trace_id_, other.trace_id_);
|
||||
return *this;
|
||||
}
|
||||
|
||||
~ProducerContinuation() {
|
||||
if (continuation_) {
|
||||
continuation_(nullptr);
|
||||
continuation_(nullptr, trace_id_);
|
||||
TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_);
|
||||
}
|
||||
}
|
||||
|
||||
void Complete(ResourcePtr resource) {
|
||||
if (continuation_) {
|
||||
continuation_(std::move(resource));
|
||||
continuation_(std::move(resource), trace_id_);
|
||||
continuation_ = nullptr;
|
||||
TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,16 +67,22 @@ class Pipeline : public ftl::RefCountedThreadSafe<Pipeline<R>> {
|
||||
|
||||
private:
|
||||
friend class Pipeline;
|
||||
using Continuation = std::function<void(ResourcePtr, size_t)>;
|
||||
|
||||
std::function<void(ResourcePtr)> continuation_;
|
||||
Continuation continuation_;
|
||||
size_t trace_id_;
|
||||
|
||||
ProducerContinuation(std::function<void(ResourcePtr)> continuation)
|
||||
: continuation_(continuation) {}
|
||||
ProducerContinuation(Continuation continuation, size_t trace_id)
|
||||
: continuation_(continuation), trace_id_(trace_id) {
|
||||
TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineItem", trace_id_);
|
||||
TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineProduce", trace_id_);
|
||||
}
|
||||
|
||||
FTL_DISALLOW_COPY_AND_ASSIGN(ProducerContinuation);
|
||||
};
|
||||
|
||||
explicit Pipeline(uint32_t depth) : empty_(depth), available_(0) {}
|
||||
explicit Pipeline(uint32_t depth)
|
||||
: empty_(depth), available_(0), last_trace_id_(0) {}
|
||||
|
||||
~Pipeline() = default;
|
||||
|
||||
@@ -83,7 +93,10 @@ class Pipeline : public ftl::RefCountedThreadSafe<Pipeline<R>> {
|
||||
return {};
|
||||
}
|
||||
|
||||
return {std::bind(&Pipeline::ProducerCommit, this, std::placeholders::_1)};
|
||||
return ProducerContinuation{
|
||||
std::bind(&Pipeline::ProducerCommit, this, std::placeholders::_1,
|
||||
std::placeholders::_2), // continuation
|
||||
++last_trace_id_}; // trace id
|
||||
}
|
||||
|
||||
using Consumer = std::function<void(ResourcePtr)>;
|
||||
@@ -99,11 +112,12 @@ class Pipeline : public ftl::RefCountedThreadSafe<Pipeline<R>> {
|
||||
}
|
||||
|
||||
ResourcePtr resource;
|
||||
size_t trace_id = 0;
|
||||
size_t items_count = 0;
|
||||
|
||||
{
|
||||
ftl::MutexLocker lock(&queue_mutex_);
|
||||
resource = std::move(queue_.front());
|
||||
std::tie(resource, trace_id) = std::move(queue_.front());
|
||||
queue_.pop();
|
||||
items_count = queue_.size();
|
||||
}
|
||||
@@ -115,6 +129,8 @@ class Pipeline : public ftl::RefCountedThreadSafe<Pipeline<R>> {
|
||||
|
||||
empty_.Signal();
|
||||
|
||||
TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id);
|
||||
|
||||
return items_count > 0 ? PipelineConsumeResult::MoreAvailable
|
||||
: PipelineConsumeResult::Done;
|
||||
}
|
||||
@@ -123,12 +139,13 @@ class Pipeline : public ftl::RefCountedThreadSafe<Pipeline<R>> {
|
||||
Semaphore empty_;
|
||||
Semaphore available_;
|
||||
ftl::Mutex queue_mutex_;
|
||||
std::queue<ResourcePtr> queue_;
|
||||
std::queue<std::pair<ResourcePtr, size_t>> queue_;
|
||||
std::atomic_size_t last_trace_id_;
|
||||
|
||||
void ProducerCommit(ResourcePtr resource) {
|
||||
void ProducerCommit(ResourcePtr resource, size_t trace_id) {
|
||||
{
|
||||
ftl::MutexLocker lock(&queue_mutex_);
|
||||
queue_.emplace(std::move(resource));
|
||||
queue_.emplace(std::move(resource), trace_id);
|
||||
}
|
||||
|
||||
// Ensure the queue mutex is not held as that would be a pessimization.
|
||||
|
||||
Reference in New Issue
Block a user