Files
Jamie Liu da3eb80271 Fix #10046
See updated comment in sentry/kernel/pipe/vfs.go.

PiperOrigin-RevId: 610516821
2024-02-26 13:56:42 -08:00

465 lines
13 KiB
Go

// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package pipe provides a pipe implementation.
package pipe
import (
"fmt"
"io"
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/atomicbitops"
"gvisor.dev/gvisor/pkg/errors/linuxerr"
"gvisor.dev/gvisor/pkg/hostarch"
"gvisor.dev/gvisor/pkg/safemem"
"gvisor.dev/gvisor/pkg/waiter"
)
const (
// MinimumPipeSize is a hard limit of the minimum size of a pipe.
// It corresponds to fs/pipe.c:pipe_min_size.
MinimumPipeSize = hostarch.PageSize
// MaximumPipeSize is a hard limit on the maximum size of a pipe.
// It corresponds to fs/pipe.c:pipe_max_size.
MaximumPipeSize = 1048576
// DefaultPipeSize is the system-wide default size of a pipe in bytes.
// It corresponds to pipe_fs_i.h:PIPE_DEF_BUFFERS.
DefaultPipeSize = 16 * hostarch.PageSize
// atomicIOBytes is the maximum number of bytes that the pipe will
// guarantee atomic reads or writes atomically.
// It corresponds to limits.h:PIPE_BUF.
atomicIOBytes = 4096
)
// waitReaders is a wrapper around Pipe.
//
// This is used for ctx.Block operations that require the synchronization of
// readers and writers, along with the careful grabbing and releasing of locks.
type waitReaders Pipe
// Readiness implements waiter.Waitable.Readiness.
func (wq *waitReaders) Readiness(mask waiter.EventMask) waiter.EventMask {
return ((*Pipe)(wq)).rwReadiness() & mask
}
// EventRegister implements waiter.Waitable.EventRegister.
func (wq *waitReaders) EventRegister(e *waiter.Entry) error {
((*Pipe)(wq)).queue.EventRegister(e)
// Notify synchronously.
if ((*Pipe)(wq)).HasReaders() {
e.NotifyEvent(waiter.EventInternal)
}
return nil
}
// EventUnregister implements waiter.Waitable.EventUnregister.
func (wq *waitReaders) EventUnregister(e *waiter.Entry) {
((*Pipe)(wq)).queue.EventUnregister(e)
}
// waitWriters is a wrapper around Pipe.
//
// This is used for ctx.Block operations that require the synchronization of
// readers and writers, along with the careful grabbing and releasing of locks.
type waitWriters Pipe
// Readiness implements waiter.Waitable.Readiness.
func (wq *waitWriters) Readiness(mask waiter.EventMask) waiter.EventMask {
return ((*Pipe)(wq)).rwReadiness() & mask
}
// EventRegister implements waiter.Waitable.EventRegister.
func (wq *waitWriters) EventRegister(e *waiter.Entry) error {
((*Pipe)(wq)).queue.EventRegister(e)
// Notify synchronously.
if ((*Pipe)(wq)).HasWriters() {
e.NotifyEvent(waiter.EventInternal)
}
return nil
}
// EventUnregister implements waiter.Waitable.EventUnregister.
func (wq *waitWriters) EventUnregister(e *waiter.Entry) {
((*Pipe)(wq)).queue.EventUnregister(e)
}
// Pipe is an encapsulation of a platform-independent pipe.
// It manages a buffered byte queue shared between a reader/writer
// pair.
//
// +stateify savable
type Pipe struct {
// queue is the waiter queue.
queue waiter.Queue
// isNamed indicates whether this is a named pipe.
//
// This value is immutable.
isNamed bool
// The number of active readers for this pipe.
readers atomicbitops.Int32
// The total number of readers for this pipe.
totalReaders atomicbitops.Int32
// The number of active writers for this pipe.
writers atomicbitops.Int32
// The total number of writers for this pipe.
totalWriters atomicbitops.Int32
// mu protects all pipe internal state below.
mu pipeMutex `state:"nosave"`
// buf holds the pipe's data. buf is a circular buffer; the first valid
// byte in buf is at offset off, and the pipe contains size valid bytes.
// bufBlocks contains two identical safemem.Blocks representing buf; this
// avoids needing to heap-allocate a new safemem.Block slice when buf is
// resized. bufBlockSeq is a safemem.BlockSeq representing bufBlocks.
//
// These fields are protected by mu.
buf []byte
bufBlocks [2]safemem.Block `state:"nosave"`
bufBlockSeq safemem.BlockSeq `state:"nosave"`
off int64
size int64
// max is the maximum size of the pipe in bytes. When this max has been
// reached, writers will get EWOULDBLOCK.
//
// This is protected by mu.
max int64
// hadWriter indicates if this pipe ever had a writer. Note that this
// does not necessarily indicate there is *currently* a writer, just
// that there has been a writer at some point since the pipe was
// created.
//
// This is protected by mu.
hadWriter bool
}
// NewPipe initializes and returns a pipe.
//
// N.B. The size will be bounded.
func NewPipe(isNamed bool, sizeBytes int64) *Pipe {
var p Pipe
initPipe(&p, isNamed, sizeBytes)
return &p
}
func initPipe(pipe *Pipe, isNamed bool, sizeBytes int64) {
if sizeBytes < MinimumPipeSize {
sizeBytes = MinimumPipeSize
}
if sizeBytes > MaximumPipeSize {
sizeBytes = MaximumPipeSize
}
pipe.isNamed = isNamed
pipe.max = sizeBytes
}
// peekLocked passes the first count bytes in the pipe, starting at offset off,
// to f and returns its result. If fewer than count bytes are available, the
// safemem.BlockSeq passed to f will be less than count bytes in length.
//
// peekLocked does not mutate the pipe; if the read consumes bytes from the
// pipe, then the caller is responsible for calling p.consumeLocked() and
// p.queue.Notify(waiter.WritableEvents). (The latter must be called with p.mu
// unlocked.)
//
// Preconditions:
// - p.mu must be locked.
// - This pipe must have readers.
// - off <= p.size.
func (p *Pipe) peekLocked(off, count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) {
// Don't block for a zero-length read even if the pipe is empty.
if count == 0 {
return 0, nil
}
// Limit the amount of data read to the amount of data in the pipe.
if rem := p.size - off; count > rem {
if rem == 0 {
if !p.HasWriters() {
return 0, io.EOF
}
return 0, linuxerr.ErrWouldBlock
}
count = rem
}
// Prepare the view of the data to be read.
pipeOff := p.off + off
if max := int64(len(p.buf)); pipeOff >= max {
pipeOff -= max
}
bs := p.bufBlockSeq.DropFirst64(uint64(pipeOff)).TakeFirst64(uint64(count))
// Perform the read.
done, err := f(bs)
return int64(done), err
}
// consumeLocked consumes the first n bytes in the pipe, such that they will no
// longer be visible to future reads.
//
// Preconditions:
// - p.mu must be locked.
// - The pipe must contain at least n bytes.
func (p *Pipe) consumeLocked(n int64) {
p.off += n
if max := int64(len(p.buf)); p.off >= max {
p.off -= max
}
p.size -= n
}
// writeLocked passes a safemem.BlockSeq representing the first count bytes of
// unused space in the pipe to f and returns the result. If fewer than count
// bytes are free, the safemem.BlockSeq passed to f will be less than count
// bytes in length. If the pipe is full or otherwise cannot accommodate a write
// of any number of bytes up to count, writeLocked returns ErrWouldBlock
// without calling f.
//
// Unlike peekLocked, writeLocked assumes that f returns the number of bytes
// written to the pipe, and increases the number of bytes stored in the pipe
// accordingly. Callers are still responsible for calling
// p.queue.Notify(waiter.ReadableEvents) with p.mu unlocked.
//
// Preconditions:
// - p.mu must be locked.
func (p *Pipe) writeLocked(count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) {
// Can't write to a pipe with no readers.
if !p.HasReaders() {
return 0, unix.EPIPE
}
avail := p.max - p.size
if avail == 0 {
return 0, linuxerr.ErrWouldBlock
}
short := false
if count > avail {
// POSIX requires that a write smaller than atomicIOBytes
// (PIPE_BUF) be atomic, but requires no atomicity for writes
// larger than this.
if count <= atomicIOBytes {
return 0, linuxerr.ErrWouldBlock
}
count = avail
short = true
}
// Ensure that the buffer is big enough.
if newLen, oldCap := p.size+count, int64(len(p.buf)); newLen > oldCap {
// Allocate a new buffer.
newCap := oldCap * 2
if oldCap == 0 {
newCap = 8 // arbitrary; sending individual integers across pipes is relatively common
}
for newLen > newCap {
newCap *= 2
}
if newCap > p.max {
newCap = p.max
}
newBuf := make([]byte, newCap)
// Copy the old buffer's contents to the beginning of the new one.
safemem.CopySeq(
safemem.BlockSeqOf(safemem.BlockFromSafeSlice(newBuf)),
p.bufBlockSeq.DropFirst64(uint64(p.off)).TakeFirst64(uint64(p.size)))
// Switch to the new buffer.
p.buf = newBuf
p.bufBlocks[0] = safemem.BlockFromSafeSlice(newBuf)
p.bufBlocks[1] = p.bufBlocks[0]
p.bufBlockSeq = safemem.BlockSeqFromSlice(p.bufBlocks[:])
p.off = 0
}
// Prepare the view of the space to be written.
woff := p.off + p.size
if woff >= int64(len(p.buf)) {
woff -= int64(len(p.buf))
}
bs := p.bufBlockSeq.DropFirst64(uint64(woff)).TakeFirst64(uint64(count))
// Perform the write.
doneU64, err := f(bs)
done := int64(doneU64)
p.size += done
if done < count || err != nil {
return done, err
}
// If we shortened the write, adjust the returned error appropriately.
if short {
return done, linuxerr.ErrWouldBlock
}
return done, nil
}
// rOpen signals a new reader of the pipe.
func (p *Pipe) rOpen() {
p.readers.Add(1)
p.totalReaders.Add(1)
// Notify for blocking openers.
p.queue.Notify(waiter.EventInternal)
}
// wOpen signals a new writer of the pipe.
func (p *Pipe) wOpen() {
p.mu.Lock()
p.hadWriter = true
p.writers.Add(1)
p.totalWriters.Add(1)
p.mu.Unlock()
// Notify for blocking openers.
p.queue.Notify(waiter.EventInternal)
}
// rClose signals that a reader has closed their end of the pipe.
func (p *Pipe) rClose() {
if newReaders := p.readers.Add(-1); newReaders < 0 {
panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders))
}
}
// wClose signals that a writer has closed their end of the pipe.
func (p *Pipe) wClose() {
if newWriters := p.writers.Add(-1); newWriters < 0 {
panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters))
}
}
// HasReaders returns whether the pipe has any active readers.
func (p *Pipe) HasReaders() bool {
return p.readers.Load() > 0
}
// HasWriters returns whether the pipe has any active writers.
func (p *Pipe) HasWriters() bool {
return p.writers.Load() > 0
}
// rReadinessLocked calculates the read readiness.
//
// Precondition: mu must be held.
func (p *Pipe) rReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasReaders() && p.size != 0 {
ready |= waiter.ReadableEvents
}
if !p.HasWriters() && p.hadWriter {
// POLLHUP must be suppressed until the pipe has had at least one writer
// at some point. Otherwise a reader thread may poll and immediately get
// a POLLHUP before the writer ever opens the pipe, which the reader may
// interpret as the writer opening then closing the pipe.
ready |= waiter.EventHUp
}
return ready
}
// rReadiness returns a mask that states whether the read end of the pipe is
// ready for reading.
func (p *Pipe) rReadiness() waiter.EventMask {
p.mu.Lock()
defer p.mu.Unlock()
return p.rReadinessLocked()
}
// wReadinessLocked calculates the write readiness.
//
// Precondition: mu must be held.
func (p *Pipe) wReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasWriters() && p.size < p.max {
ready |= waiter.WritableEvents
}
if !p.HasReaders() {
ready |= waiter.EventErr
}
return ready
}
// wReadiness returns a mask that states whether the write end of the pipe
// is ready for writing.
func (p *Pipe) wReadiness() waiter.EventMask {
p.mu.Lock()
defer p.mu.Unlock()
return p.wReadinessLocked()
}
// rwReadiness returns a mask that states whether a read-write handle to the
// pipe is ready for IO.
func (p *Pipe) rwReadiness() waiter.EventMask {
p.mu.Lock()
defer p.mu.Unlock()
return p.rReadinessLocked() | p.wReadinessLocked()
}
// EventRegister implements waiter.Waitable.EventRegister.
func (p *Pipe) EventRegister(e *waiter.Entry) error {
p.queue.EventRegister(e)
return nil
}
// EventUnregister implements waiter.Waitable.EventUnregister.
func (p *Pipe) EventUnregister(e *waiter.Entry) {
p.queue.EventUnregister(e)
}
// queued returns the amount of queued data.
func (p *Pipe) queued() int64 {
p.mu.Lock()
defer p.mu.Unlock()
return p.queuedLocked()
}
func (p *Pipe) queuedLocked() int64 {
return p.size
}
// SetFifoSize implements fs.FifoSizer.SetFifoSize.
func (p *Pipe) SetFifoSize(size int64) (int64, error) {
if size < 0 {
return 0, linuxerr.EINVAL
}
if size < MinimumPipeSize {
size = MinimumPipeSize // Per spec.
}
if size > MaximumPipeSize {
return 0, linuxerr.EPERM
}
p.mu.Lock()
defer p.mu.Unlock()
if size < p.size {
return 0, linuxerr.EBUSY
}
p.max = size
return size, nil
}