mirror of
https://github.com/netbirdio/gvisor.git
synced 2026-05-22 17:12:49 -07:00
3b26d2121e
Complements cl/315991648. PiperOrigin-RevId: 319327853
386 lines
12 KiB
Go
386 lines
12 KiB
Go
// Copyright 2020 The gVisor Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package host
|
|
|
|
import (
|
|
"fmt"
|
|
"syscall"
|
|
|
|
"gvisor.dev/gvisor/pkg/abi/linux"
|
|
"gvisor.dev/gvisor/pkg/context"
|
|
"gvisor.dev/gvisor/pkg/fdnotifier"
|
|
"gvisor.dev/gvisor/pkg/log"
|
|
"gvisor.dev/gvisor/pkg/refs"
|
|
"gvisor.dev/gvisor/pkg/sentry/socket/control"
|
|
"gvisor.dev/gvisor/pkg/sentry/socket/unix/transport"
|
|
"gvisor.dev/gvisor/pkg/sentry/uniqueid"
|
|
"gvisor.dev/gvisor/pkg/sync"
|
|
"gvisor.dev/gvisor/pkg/syserr"
|
|
"gvisor.dev/gvisor/pkg/syserror"
|
|
"gvisor.dev/gvisor/pkg/tcpip"
|
|
"gvisor.dev/gvisor/pkg/unet"
|
|
"gvisor.dev/gvisor/pkg/waiter"
|
|
)
|
|
|
|
// Create a new host-backed endpoint from the given fd and its corresponding
|
|
// notification queue.
|
|
func newEndpoint(ctx context.Context, hostFD int, queue *waiter.Queue) (transport.Endpoint, error) {
|
|
// Set up an external transport.Endpoint using the host fd.
|
|
addr := fmt.Sprintf("hostfd:[%d]", hostFD)
|
|
e, err := NewConnectedEndpoint(ctx, hostFD, addr, true /* saveable */)
|
|
if err != nil {
|
|
return nil, err.ToError()
|
|
}
|
|
ep := transport.NewExternal(ctx, e.stype, uniqueid.GlobalProviderFromContext(ctx), queue, e, e)
|
|
return ep, nil
|
|
}
|
|
|
|
// ConnectedEndpoint is an implementation of transport.ConnectedEndpoint and
|
|
// transport.Receiver. It is backed by a host fd that was imported at sentry
|
|
// startup. This fd is shared with a hostfs inode, which retains ownership of
|
|
// it.
|
|
//
|
|
// ConnectedEndpoint is saveable, since we expect that the host will provide
|
|
// the same fd upon restore.
|
|
//
|
|
// As of this writing, we only allow Unix sockets to be imported.
|
|
//
|
|
// +stateify savable
|
|
type ConnectedEndpoint struct {
|
|
// ref keeps track of references to a ConnectedEndpoint.
|
|
ref refs.AtomicRefCount
|
|
|
|
// mu protects fd below.
|
|
mu sync.RWMutex `state:"nosave"`
|
|
|
|
// fd is the host fd backing this endpoint.
|
|
fd int
|
|
|
|
// addr is the address at which this endpoint is bound.
|
|
addr string
|
|
|
|
// sndbuf is the size of the send buffer.
|
|
//
|
|
// N.B. When this is smaller than the host size, we present it via
|
|
// GetSockOpt and message splitting/rejection in SendMsg, but do not
|
|
// prevent lots of small messages from filling the real send buffer
|
|
// size on the host.
|
|
sndbuf int64 `state:"nosave"`
|
|
|
|
// stype is the type of Unix socket.
|
|
stype linux.SockType
|
|
}
|
|
|
|
// init performs initialization required for creating new ConnectedEndpoints and
|
|
// for restoring them.
|
|
func (c *ConnectedEndpoint) init() *syserr.Error {
|
|
family, err := syscall.GetsockoptInt(c.fd, syscall.SOL_SOCKET, syscall.SO_DOMAIN)
|
|
if err != nil {
|
|
return syserr.FromError(err)
|
|
}
|
|
|
|
if family != syscall.AF_UNIX {
|
|
// We only allow Unix sockets.
|
|
return syserr.ErrInvalidEndpointState
|
|
}
|
|
|
|
stype, err := syscall.GetsockoptInt(c.fd, syscall.SOL_SOCKET, syscall.SO_TYPE)
|
|
if err != nil {
|
|
return syserr.FromError(err)
|
|
}
|
|
|
|
if err := syscall.SetNonblock(c.fd, true); err != nil {
|
|
return syserr.FromError(err)
|
|
}
|
|
|
|
sndbuf, err := syscall.GetsockoptInt(c.fd, syscall.SOL_SOCKET, syscall.SO_SNDBUF)
|
|
if err != nil {
|
|
return syserr.FromError(err)
|
|
}
|
|
|
|
c.stype = linux.SockType(stype)
|
|
c.sndbuf = int64(sndbuf)
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewConnectedEndpoint creates a new ConnectedEndpoint backed by a host fd
|
|
// imported at sentry startup,
|
|
//
|
|
// The caller is responsible for calling Init(). Additionaly, Release needs to
|
|
// be called twice because ConnectedEndpoint is both a transport.Receiver and
|
|
// transport.ConnectedEndpoint.
|
|
func NewConnectedEndpoint(ctx context.Context, hostFD int, addr string, saveable bool) (*ConnectedEndpoint, *syserr.Error) {
|
|
e := ConnectedEndpoint{
|
|
fd: hostFD,
|
|
addr: addr,
|
|
}
|
|
|
|
if err := e.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// AtomicRefCounters start off with a single reference. We need two.
|
|
e.ref.IncRef()
|
|
e.ref.EnableLeakCheck("host.ConnectedEndpoint")
|
|
return &e, nil
|
|
}
|
|
|
|
// Send implements transport.ConnectedEndpoint.Send.
|
|
func (c *ConnectedEndpoint) Send(data [][]byte, controlMessages transport.ControlMessages, from tcpip.FullAddress) (int64, bool, *syserr.Error) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
if !controlMessages.Empty() {
|
|
return 0, false, syserr.ErrInvalidEndpointState
|
|
}
|
|
|
|
// Since stream sockets don't preserve message boundaries, we can write
|
|
// only as much of the message as fits in the send buffer.
|
|
truncate := c.stype == linux.SOCK_STREAM
|
|
|
|
n, totalLen, err := fdWriteVec(c.fd, data, c.sndbuf, truncate)
|
|
if n < totalLen && err == nil {
|
|
// The host only returns a short write if it would otherwise
|
|
// block (and only for stream sockets).
|
|
err = syserror.EAGAIN
|
|
}
|
|
if n > 0 && err != syserror.EAGAIN {
|
|
// The caller may need to block to send more data, but
|
|
// otherwise there isn't anything that can be done about an
|
|
// error with a partial write.
|
|
err = nil
|
|
}
|
|
|
|
// There is no need for the callee to call SendNotify because fdWriteVec
|
|
// uses the host's sendmsg(2) and the host kernel's queue.
|
|
return n, false, syserr.FromError(err)
|
|
}
|
|
|
|
// SendNotify implements transport.ConnectedEndpoint.SendNotify.
|
|
func (c *ConnectedEndpoint) SendNotify() {}
|
|
|
|
// CloseSend implements transport.ConnectedEndpoint.CloseSend.
|
|
func (c *ConnectedEndpoint) CloseSend() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if err := syscall.Shutdown(c.fd, syscall.SHUT_WR); err != nil {
|
|
// A well-formed UDS shutdown can't fail. See
|
|
// net/unix/af_unix.c:unix_shutdown.
|
|
panic(fmt.Sprintf("failed write shutdown on host socket %+v: %v", c, err))
|
|
}
|
|
}
|
|
|
|
// CloseNotify implements transport.ConnectedEndpoint.CloseNotify.
|
|
func (c *ConnectedEndpoint) CloseNotify() {}
|
|
|
|
// Writable implements transport.ConnectedEndpoint.Writable.
|
|
func (c *ConnectedEndpoint) Writable() bool {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.EventOut)&waiter.EventOut != 0
|
|
}
|
|
|
|
// Passcred implements transport.ConnectedEndpoint.Passcred.
|
|
func (c *ConnectedEndpoint) Passcred() bool {
|
|
// We don't support credential passing for host sockets.
|
|
return false
|
|
}
|
|
|
|
// GetLocalAddress implements transport.ConnectedEndpoint.GetLocalAddress.
|
|
func (c *ConnectedEndpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) {
|
|
return tcpip.FullAddress{Addr: tcpip.Address(c.addr)}, nil
|
|
}
|
|
|
|
// EventUpdate implements transport.ConnectedEndpoint.EventUpdate.
|
|
func (c *ConnectedEndpoint) EventUpdate() {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
if c.fd != -1 {
|
|
fdnotifier.UpdateFD(int32(c.fd))
|
|
}
|
|
}
|
|
|
|
// Recv implements transport.Receiver.Recv.
|
|
func (c *ConnectedEndpoint) Recv(data [][]byte, creds bool, numRights int, peek bool) (int64, int64, transport.ControlMessages, bool, tcpip.FullAddress, bool, *syserr.Error) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
var cm unet.ControlMessage
|
|
if numRights > 0 {
|
|
cm.EnableFDs(int(numRights))
|
|
}
|
|
|
|
// N.B. Unix sockets don't have a receive buffer, the send buffer
|
|
// serves both purposes.
|
|
rl, ml, cl, cTrunc, err := fdReadVec(c.fd, data, []byte(cm), peek, c.sndbuf)
|
|
if rl > 0 && err != nil {
|
|
// We got some data, so all we need to do on error is return
|
|
// the data that we got. Short reads are fine, no need to
|
|
// block.
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
return 0, 0, transport.ControlMessages{}, false, tcpip.FullAddress{}, false, syserr.FromError(err)
|
|
}
|
|
|
|
// There is no need for the callee to call RecvNotify because fdReadVec uses
|
|
// the host's recvmsg(2) and the host kernel's queue.
|
|
|
|
// Trim the control data if we received less than the full amount.
|
|
if cl < uint64(len(cm)) {
|
|
cm = cm[:cl]
|
|
}
|
|
|
|
// Avoid extra allocations in the case where there isn't any control data.
|
|
if len(cm) == 0 {
|
|
return rl, ml, transport.ControlMessages{}, cTrunc, tcpip.FullAddress{Addr: tcpip.Address(c.addr)}, false, nil
|
|
}
|
|
|
|
fds, err := cm.ExtractFDs()
|
|
if err != nil {
|
|
return 0, 0, transport.ControlMessages{}, false, tcpip.FullAddress{}, false, syserr.FromError(err)
|
|
}
|
|
|
|
if len(fds) == 0 {
|
|
return rl, ml, transport.ControlMessages{}, cTrunc, tcpip.FullAddress{Addr: tcpip.Address(c.addr)}, false, nil
|
|
}
|
|
return rl, ml, control.NewVFS2(nil, nil, newSCMRights(fds)), cTrunc, tcpip.FullAddress{Addr: tcpip.Address(c.addr)}, false, nil
|
|
}
|
|
|
|
// RecvNotify implements transport.Receiver.RecvNotify.
|
|
func (c *ConnectedEndpoint) RecvNotify() {}
|
|
|
|
// CloseRecv implements transport.Receiver.CloseRecv.
|
|
func (c *ConnectedEndpoint) CloseRecv() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if err := syscall.Shutdown(c.fd, syscall.SHUT_RD); err != nil {
|
|
// A well-formed UDS shutdown can't fail. See
|
|
// net/unix/af_unix.c:unix_shutdown.
|
|
panic(fmt.Sprintf("failed read shutdown on host socket %+v: %v", c, err))
|
|
}
|
|
}
|
|
|
|
// Readable implements transport.Receiver.Readable.
|
|
func (c *ConnectedEndpoint) Readable() bool {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.EventIn)&waiter.EventIn != 0
|
|
}
|
|
|
|
// SendQueuedSize implements transport.Receiver.SendQueuedSize.
|
|
func (c *ConnectedEndpoint) SendQueuedSize() int64 {
|
|
// TODO(gvisor.dev/issue/273): SendQueuedSize isn't supported for host
|
|
// sockets because we don't allow the sentry to call ioctl(2).
|
|
return -1
|
|
}
|
|
|
|
// RecvQueuedSize implements transport.Receiver.RecvQueuedSize.
|
|
func (c *ConnectedEndpoint) RecvQueuedSize() int64 {
|
|
// TODO(gvisor.dev/issue/273): RecvQueuedSize isn't supported for host
|
|
// sockets because we don't allow the sentry to call ioctl(2).
|
|
return -1
|
|
}
|
|
|
|
// SendMaxQueueSize implements transport.Receiver.SendMaxQueueSize.
|
|
func (c *ConnectedEndpoint) SendMaxQueueSize() int64 {
|
|
return int64(c.sndbuf)
|
|
}
|
|
|
|
// RecvMaxQueueSize implements transport.Receiver.RecvMaxQueueSize.
|
|
func (c *ConnectedEndpoint) RecvMaxQueueSize() int64 {
|
|
// N.B. Unix sockets don't use the receive buffer. We'll claim it is
|
|
// the same size as the send buffer.
|
|
return int64(c.sndbuf)
|
|
}
|
|
|
|
func (c *ConnectedEndpoint) destroyLocked() {
|
|
c.fd = -1
|
|
}
|
|
|
|
// Release implements transport.ConnectedEndpoint.Release and
|
|
// transport.Receiver.Release.
|
|
func (c *ConnectedEndpoint) Release() {
|
|
c.ref.DecRefWithDestructor(func() {
|
|
c.mu.Lock()
|
|
c.destroyLocked()
|
|
c.mu.Unlock()
|
|
})
|
|
}
|
|
|
|
// CloseUnread implements transport.ConnectedEndpoint.CloseUnread.
|
|
func (c *ConnectedEndpoint) CloseUnread() {}
|
|
|
|
// SCMConnectedEndpoint represents an endpoint backed by a host fd that was
|
|
// passed through a gofer Unix socket. It resembles ConnectedEndpoint, with the
|
|
// following differences:
|
|
// - SCMConnectedEndpoint is not saveable, because the host cannot guarantee
|
|
// the same descriptor number across S/R.
|
|
// - SCMConnectedEndpoint holds ownership of its fd and notification queue.
|
|
type SCMConnectedEndpoint struct {
|
|
ConnectedEndpoint
|
|
|
|
queue *waiter.Queue
|
|
}
|
|
|
|
// Init will do the initialization required without holding other locks.
|
|
func (e *SCMConnectedEndpoint) Init() error {
|
|
return fdnotifier.AddFD(int32(e.fd), e.queue)
|
|
}
|
|
|
|
// Release implements transport.ConnectedEndpoint.Release and
|
|
// transport.Receiver.Release.
|
|
func (e *SCMConnectedEndpoint) Release() {
|
|
e.ref.DecRefWithDestructor(func() {
|
|
e.mu.Lock()
|
|
if err := syscall.Close(e.fd); err != nil {
|
|
log.Warningf("Failed to close host fd %d: %v", err)
|
|
}
|
|
fdnotifier.RemoveFD(int32(e.fd))
|
|
e.destroyLocked()
|
|
e.mu.Unlock()
|
|
})
|
|
}
|
|
|
|
// NewSCMEndpoint creates a new SCMConnectedEndpoint backed by a host fd that
|
|
// was passed through a Unix socket.
|
|
//
|
|
// The caller is responsible for calling Init(). Additionaly, Release needs to
|
|
// be called twice because ConnectedEndpoint is both a transport.Receiver and
|
|
// transport.ConnectedEndpoint.
|
|
func NewSCMEndpoint(ctx context.Context, hostFD int, queue *waiter.Queue, addr string) (*SCMConnectedEndpoint, *syserr.Error) {
|
|
e := SCMConnectedEndpoint{
|
|
ConnectedEndpoint: ConnectedEndpoint{
|
|
fd: hostFD,
|
|
addr: addr,
|
|
},
|
|
queue: queue,
|
|
}
|
|
|
|
if err := e.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// AtomicRefCounters start off with a single reference. We need two.
|
|
e.ref.IncRef()
|
|
e.ref.EnableLeakCheck("host.SCMConnectedEndpoint")
|
|
return &e, nil
|
|
}
|