Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Sources/CShim/include/linux_shim.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#if defined(__linux__)

#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/vfs.h>

#endif /* __linux__ */
Expand Down
212 changes: 102 additions & 110 deletions Sources/ContainerizationOS/Linux/Epoll.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
//===----------------------------------------------------------------------===//

#if os(Linux)
import Foundation

#if canImport(Musl)
import Musl
private let _write = Musl.write
#elseif canImport(Glibc)
import Glibc
private let _write = Glibc.write
#endif

import CShim
import Foundation
import Synchronization

// On glibc, epoll constants are EPOLL_EVENTS enum values. On musl they're
// plain UInt32. These helpers normalize them to UInt32/Int32.
Expand All @@ -35,166 +36,157 @@ private func epollMask(_ value: EPOLL_EVENTS) -> UInt32 { value.rawValue }
private func epollFlag(_ value: EPOLL_EVENTS) -> Int32 { Int32(bitPattern: value.rawValue) }
#endif

/// Register file descriptors to receive events via Linux's
/// epoll syscall surface.
/// A thin wrapper around the Linux epoll syscall surface.
public final class Epoll: Sendable {
public typealias Mask = Int32
public typealias Handler = (@Sendable (Mask) -> Void)
/// A set of epoll event flags.
public struct Mask: OptionSet, Sendable {
public let rawValue: UInt32

public init(rawValue: UInt32) {
self.rawValue = rawValue
}

public static let input = Mask(rawValue: epollMask(EPOLLIN))
public static let output = Mask(rawValue: epollMask(EPOLLOUT))

public static let maskIn: Int32 = Int32(bitPattern: epollMask(EPOLLIN))
public static let maskOut: Int32 = Int32(bitPattern: epollMask(EPOLLOUT))
public static let defaultMask: Int32 = maskIn | maskOut
public var isHangup: Bool {
!self.isDisjoint(with: Mask(rawValue: epollMask(EPOLLHUP) | epollMask(EPOLLERR)))
}

public var isRemoteHangup: Bool {
!self.isDisjoint(with: Mask(rawValue: epollMask(EPOLLRDHUP)))
}

public var readyToRead: Bool {
self.contains(.input)
}

public var readyToWrite: Bool {
self.contains(.output)
}
}

/// An event returned by `wait()`.
public struct Event: Sendable {
public let fd: Int32
public let mask: Mask
}

private let epollFD: Int32
private let handlers = SafeMap<Int32, Handler>()
private let pipe = Pipe() // to wake up a waiting epoll_wait
private let eventFD: Int32

public init() throws {
let efd = epoll_create1(Int32(EPOLL_CLOEXEC))
guard efd > 0 else {
guard efd >= 0 else {
throw POSIXError.fromErrno()
}

let evfd = eventfd(0, Int32(EFD_CLOEXEC | EFD_NONBLOCK))
guard evfd >= 0 else {
let evfdErrno = POSIXError.fromErrno()
close(efd)
throw evfdErrno
}

self.epollFD = efd
try self.add(pipe.fileHandleForReading.fileDescriptor) { _ in }
self.eventFD = evfd

// Register the eventfd with epoll for shutdown signaling.
var event = epoll_event()
event.events = epollMask(EPOLLIN)
event.data.fd = self.eventFD
let ctlResult = withUnsafeMutablePointer(to: &event) { ptr in
epoll_ctl(efd, EPOLL_CTL_ADD, self.eventFD, ptr)
}
guard ctlResult == 0 else {
let ctlErrno = POSIXError.fromErrno()
close(evfd)
close(efd)
throw ctlErrno
}
}

deinit {
close(epollFD)
close(eventFD)
}

public func add(
_ fd: Int32,
mask: Int32 = Epoll.defaultMask,
handler: @escaping Handler
) throws {
/// Register a file descriptor for edge-triggered monitoring.
public func add(_ fd: Int32, mask: Mask) throws {
guard fcntl(fd, F_SETFL, O_NONBLOCK) == 0 else {
throw POSIXError.fromErrno()
}

let events = epollMask(EPOLLET) | UInt32(bitPattern: mask)
let events = epollMask(EPOLLET) | mask.rawValue

var event = epoll_event()
event.events = events
event.data.fd = fd

try withUnsafeMutablePointer(to: &event) { ptr in
while true {
if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 {
if errno == EAGAIN || errno == EINTR {
continue
}
throw POSIXError.fromErrno()
}
break
if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 {
throw POSIXError.fromErrno()
}
}
}

self.handlers.set(fd, handler)
/// Remove a file descriptor from the monitored collection.
public func delete(_ fd: Int32) throws {
var event = epoll_event()
let result = withUnsafeMutablePointer(to: &event) { ptr in
epoll_ctl(self.epollFD, EPOLL_CTL_DEL, fd, ptr) as Int32
}
if result != 0 {
if !acceptableDeletionErrno() {
throw POSIXError.fromErrno()
}
}
}

/// Run the main epoll loop.
/// Wait for events.
///
/// max events to return in a single wait
/// timeout in ms.
/// -1 means block forever.
/// 0 means return immediately if no events.
public func run(maxEvents: Int = 128, timeout: Int32 = -1) throws {
var events: [epoll_event] = .init(
repeating: epoll_event(),
count: maxEvents
)
/// Returns ready events, an empty array on timeout, or `nil` on shutdown.
public func wait(maxEvents: Int = 128, timeout: Int32 = -1) -> [Event]? {
var events: [epoll_event] = .init(repeating: epoll_event(), count: maxEvents)

while true {
let n = epoll_wait(self.epollFD, &events, Int32(events.count), timeout)
guard n >= 0 else {
if n < 0 {
if errno == EINTR || errno == EAGAIN {
continue // go back to epoll_wait
continue
}
throw POSIXError.fromErrno()
preconditionFailure("epoll_wait failed unexpectedly: \(POSIXError.fromErrno())")
}

if n == 0 {
return // if epoll wait times out, then n will be 0
return []
}

var result: [Event] = []
result.reserveCapacity(Int(n))
for i in 0..<Int(n) {
let fd = events[i].data.fd
let mask = events[i].events

if fd == self.pipe.fileHandleForReading.fileDescriptor {
close(self.epollFD)
return // this is a shutdown message
}

guard let handler = handlers.get(fd) else {
continue
if fd == self.eventFD {
return nil
}
handler(Int32(bitPattern: mask))
result.append(Event(fd: fd, mask: Mask(rawValue: events[i].events)))
}
return result
}
}

/// Remove the provided fd from the monitored collection.
public func delete(_ fd: Int32) throws {
var event = epoll_event()
let result = withUnsafeMutablePointer(to: &event) { ptr in
epoll_ctl(self.epollFD, EPOLL_CTL_DEL, fd, ptr) as Int32
}
if result != 0 {
if !acceptableDeletionErrno() {
throw POSIXError.fromErrno()
}
}
self.handlers.del(fd)
/// Signal the epoll loop to stop waiting.
public func shutdown() {
var val: UInt64 = 1
let n = _write(eventFD, &val, MemoryLayout<UInt64>.size)
precondition(n == MemoryLayout<UInt64>.size, "eventfd write failed: \(POSIXError.fromErrno())")
}

// The errno's here are acceptable and can happen if the caller
// closed the underlying fd before calling delete().
private func acceptableDeletionErrno() -> Bool {
errno == ENOENT || errno == EBADF || errno == EPERM
}

/// Shutdown the epoll handler.
public func shutdown() throws {
// wakes up epoll_wait and triggers a shutdown
try self.pipe.fileHandleForWriting.close()
}

private final class SafeMap<Key: Hashable & Sendable, Value: Sendable>: Sendable {
let dict = Mutex<[Key: Value]>([:])

func set(_ key: Key, _ value: Value) {
dict.withLock { @Sendable in
$0[key] = value
}
}

func get(_ key: Key) -> Value? {
dict.withLock { @Sendable in
$0[key]
}
}

func del(_ key: Key) {
dict.withLock { @Sendable in
_ = $0.removeValue(forKey: key)
}
}
}
}

extension Epoll.Mask {
public var isHangup: Bool {
(self & Int32(bitPattern: epollMask(EPOLLHUP) | epollMask(EPOLLERR))) != 0
}

public var isRhangup: Bool {
(self & Int32(bitPattern: epollMask(EPOLLRDHUP))) != 0
}

public var readyToRead: Bool {
(self & Int32(bitPattern: epollMask(EPOLLIN))) != 0
}

public var readyToWrite: Bool {
(self & Int32(bitPattern: epollMask(EPOLLOUT))) != 0
}
}

#endif // os(Linux)
4 changes: 2 additions & 2 deletions vminitd/Sources/vminitd/IOPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ final class IOPair: Sendable {
// Remove the fd from our global epoll instance first.
let readFromFd = self.from.fileDescriptor
do {
try ProcessSupervisor.default.poller.delete(readFromFd)
try ProcessSupervisor.default.unregisterFd(readFromFd)
} catch {
logger?.error("failed to delete fd from epoll \(readFromFd): \(error)")
}
Expand Down Expand Up @@ -118,7 +118,7 @@ final class IOPair: Sendable {
let readFrom = OSFile(fd: readFromFd)
let writeTo = OSFile(fd: writeToFd)

try ProcessSupervisor.default.poller.add(readFromFd, mask: Epoll.maskIn) { mask in
try ProcessSupervisor.default.registerFd(readFromFd, mask: .input) { mask in
self.io.withLock { io in
if io.closed {
return
Expand Down
42 changes: 39 additions & 3 deletions vminitd/Sources/vminitd/ProcessSupervisor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import Logging
import Synchronization

final class ProcessSupervisor: Sendable {
let poller: Epoll
private let poller: Epoll
private let handlers = Mutex<[Int32: @Sendable (Epoll.Mask) -> Void]>([:])

private let queue: DispatchQueue
// `DispatchSourceSignal` is thread-safe.
Expand All @@ -47,11 +48,46 @@ final class ProcessSupervisor: Sendable {
self.poller = try! Epoll()
self.state = Mutex(State())
let t = Thread {
try! self.poller.run()
while true {
guard let events = self.poller.wait() else {
return
}
if events.isEmpty {
return
}
for event in events {
let handler = self.handlers.withLock { $0[event.fd] }
handler?(event.mask)
}
}
}
t.start()
}

/// Register a file descriptor for epoll monitoring with a handler.
///
/// The handler is stored before the fd is added to epoll, ensuring no
/// events are missed.
func registerFd(
_ fd: Int32,
mask: Epoll.Mask = [.input, .output],
handler: @escaping @Sendable (Epoll.Mask) -> Void
) throws {
self.handlers.withLock { $0[fd] = handler }
do {
try self.poller.add(fd, mask: mask)
} catch {
self.handlers.withLock { _ = $0.removeValue(forKey: fd) }
throw error
}
}

/// Remove a file descriptor from epoll monitoring and discard its handler.
func unregisterFd(_ fd: Int32) throws {
self.handlers.withLock { _ = $0.removeValue(forKey: fd) }
try self.poller.delete(fd)
}

func ready() {
self.source.setEventHandler {
self.handleSignal()
Expand Down Expand Up @@ -123,6 +159,6 @@ final class ProcessSupervisor: Sendable {

deinit {
source.cancel()
try? poller.shutdown()
poller.shutdown()
}
}
Loading
Loading