summaryrefslogtreecommitdiff
path: root/doc/fiber.rdoc
blob: 8a107f5c3a9897bf541130059982eaf53d1332c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
= Fiber

Fiber is a flow-control primitive which enable cooperative scheduling. This is
in contrast to threads which can be preemptively scheduled at any time. While
having a similar memory profiles, the cost of context switching fibers can be
significantly less than threads as it does not involve a system call.

== Design

=== Scheduler

The per-thread fiber scheduler interface is used to intercept blocking
operations. A typical implementation would be a wrapper for a gem like
EventMachine or Async. This design provides separation of concerns between the
event loop implementation and application code. It also allows for layered
schedulers which can perform instrumentation.

  class Scheduler
    # Wait for the given file descriptor to become readable.
    def wait_readable(io)
    end

    # Wait for the given file descriptor to become writable.
    def wait_writable(io)
    end

    # Wait for the given file descriptor to match the specified events within
    # the specified timeout.
    # @param event [Integer] a bit mask of +IO::WAIT_READABLE+,
    #   `IO::WAIT_WRITABLE` and `IO::WAIT_PRIORITY`.
    # @param timeout [#to_f] the amount of time to wait for the event.
    def wait_any(io, events, timeout)
    end

    # Sleep the current task for the specified duration, or forever if not
    # specified.
    # @param duration [#to_f] the amount of time to sleep.
    def wait_sleep(duration = nil)
    end

    # The Ruby virtual machine is going to enter a system level blocking
    # operation.
    def enter_blocking_region
    end

    # The Ruby virtual machine has completed the system level blocking
    # operation.
    def exit_blocking_region
    end

    # Intercept the creation of a non-blocking fiber.
    def fiber(&block)
      Fiber.new(blocking: false, &block)
    end

    # Invoked when the thread exits.
    def run
      # Implement event loop here.
    end
  end

On CRuby, the following extra methods need to be implemented to handle the
public C interface:

  class Scheduler
    # Wrapper for rb_wait_readable(int) C function.
    def wait_readable_fd(fd)
      wait_readable(::IO.for_fd(fd, autoclose: false))
    end

    # Wrapper for rb_wait_readable(int) C function.
    def wait_writable_fd(fd)
      wait_writable(::IO.for_fd(fd, autoclose: false))
    end

    # Wrapper for rb_wait_for_single_fd(int) C function.
    def wait_for_single_fd(fd, events, duration)
      wait_any(::IO.for_fd(fd, autoclose: false), events, duration)
    end
  end

=== Non-blocking Fibers

By default fibers are blocking. Non-blocking fibers may invoke specific
scheduler hooks when a blocking operation occurs, and these hooks may introduce
context switching points.

  Fiber.new(blocking: false) do
    puts Fiber.current.blocking? # false

    # May invoke `Thread.current.scheduler&.wait_readable`.
    io.read(...)

    # May invoke `Thread.current.scheduler&.wait_writable`.
    io.write(...)

    # Will invoke `Thread.current.scheduler&.wait_sleep`.
    sleep(n)
  end.resume

We also introduce a new method which simplifies the creation of these
non-blocking fibers:

  Fiber do
    puts Fiber.current.blocking? # false
  end

The purpose of this method is to allow the scheduler to internally decide the
policy for when to start the fiber, and whether to use symmetric or asymmetric
fibers.

=== Mutex

Locking a mutex causes the +Thread#scheduler+ to not be used while the mutex
is held by that thread. On +Mutex#lock+, fiber switching via the scheduler
is disabled and operations become blocking for all fibers of the same +Thread+.
On +Mutex#unlock+, the scheduler is enabled again.

  mutex = Mutex.new

  puts Thread.current.blocking? # 1 (true)

  Fiber.new(blocking: false) do
    puts Thread.current.blocking? # false
    mutex.synchronize do
      puts Thread.current.blocking? # (1) true
    end

    puts Thread.current.blocking? # false
  end.resume

=== Non-blocking I/O

By default, I/O is non-blocking. Not all operating systems support non-blocking
I/O. Windows is a notable example where socket I/O can be non-blocking but pipe
I/O is blocking. Provided that there *is* a scheduler and the current thread *is
non-blocking*, the operation will invoke the scheduler.