Source file
src/runtime/netpoll.go
Documentation: runtime
1
2
3
4
5
6
7 package runtime
8
9 import (
10 "runtime/internal/atomic"
11 "unsafe"
12 )
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 const (
35 pdReady uintptr = 1
36 pdWait uintptr = 2
37 )
38
39 const pollBlockSize = 4 * 1024
40
41
42
43
44
45
46 type pollDesc struct {
47 link *pollDesc
48
49
50
51
52
53
54
55
56 lock mutex
57 fd uintptr
58 closing bool
59 seq uintptr
60 rg uintptr
61 rt timer
62 rd int64
63 wg uintptr
64 wt timer
65 wd int64
66 user uint32
67 }
68
69 type pollCache struct {
70 lock mutex
71 first *pollDesc
72
73
74
75
76
77 }
78
79 var (
80 netpollInited uint32
81 pollcache pollCache
82 netpollWaiters uint32
83 )
84
85
86 func poll_runtime_pollServerInit() {
87 netpollinit()
88 atomic.Store(&netpollInited, 1)
89 }
90
91 func netpollinited() bool {
92 return atomic.Load(&netpollInited) != 0
93 }
94
95
96
97
98
99 func poll_runtime_pollServerDescriptor() uintptr {
100 return netpolldescriptor()
101 }
102
103
104 func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
105 pd := pollcache.alloc()
106 lock(&pd.lock)
107 if pd.wg != 0 && pd.wg != pdReady {
108 throw("runtime: blocked write on free polldesc")
109 }
110 if pd.rg != 0 && pd.rg != pdReady {
111 throw("runtime: blocked read on free polldesc")
112 }
113 pd.fd = fd
114 pd.closing = false
115 pd.seq++
116 pd.rg = 0
117 pd.rd = 0
118 pd.wg = 0
119 pd.wd = 0
120 unlock(&pd.lock)
121
122 var errno int32
123 errno = netpollopen(fd, pd)
124 return pd, int(errno)
125 }
126
127
128 func poll_runtime_pollClose(pd *pollDesc) {
129 if !pd.closing {
130 throw("runtime: close polldesc w/o unblock")
131 }
132 if pd.wg != 0 && pd.wg != pdReady {
133 throw("runtime: blocked write on closing polldesc")
134 }
135 if pd.rg != 0 && pd.rg != pdReady {
136 throw("runtime: blocked read on closing polldesc")
137 }
138 netpollclose(pd.fd)
139 pollcache.free(pd)
140 }
141
142 func (c *pollCache) free(pd *pollDesc) {
143 lock(&c.lock)
144 pd.link = c.first
145 c.first = pd
146 unlock(&c.lock)
147 }
148
149
150 func poll_runtime_pollReset(pd *pollDesc, mode int) int {
151 err := netpollcheckerr(pd, int32(mode))
152 if err != 0 {
153 return err
154 }
155 if mode == 'r' {
156 pd.rg = 0
157 } else if mode == 'w' {
158 pd.wg = 0
159 }
160 return 0
161 }
162
163
164 func poll_runtime_pollWait(pd *pollDesc, mode int) int {
165 err := netpollcheckerr(pd, int32(mode))
166 if err != 0 {
167 return err
168 }
169
170 if GOOS == "solaris" {
171 netpollarm(pd, mode)
172 }
173 for !netpollblock(pd, int32(mode), false) {
174 err = netpollcheckerr(pd, int32(mode))
175 if err != 0 {
176 return err
177 }
178
179
180
181 }
182 return 0
183 }
184
185
186 func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
187
188
189 for !netpollblock(pd, int32(mode), true) {
190 }
191 }
192
193
194 func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
195 lock(&pd.lock)
196 if pd.closing {
197 unlock(&pd.lock)
198 return
199 }
200 pd.seq++
201
202 if pd.rt.f != nil {
203 deltimer(&pd.rt)
204 pd.rt.f = nil
205 }
206 if pd.wt.f != nil {
207 deltimer(&pd.wt)
208 pd.wt.f = nil
209 }
210
211 if d != 0 && d <= nanotime() {
212 d = -1
213 }
214 if mode == 'r' || mode == 'r'+'w' {
215 pd.rd = d
216 }
217 if mode == 'w' || mode == 'r'+'w' {
218 pd.wd = d
219 }
220 if pd.rd > 0 && pd.rd == pd.wd {
221 pd.rt.f = netpollDeadline
222 pd.rt.when = pd.rd
223
224
225
226 pd.rt.arg = pd
227 pd.rt.seq = pd.seq
228 addtimer(&pd.rt)
229 } else {
230 if pd.rd > 0 {
231 pd.rt.f = netpollReadDeadline
232 pd.rt.when = pd.rd
233 pd.rt.arg = pd
234 pd.rt.seq = pd.seq
235 addtimer(&pd.rt)
236 }
237 if pd.wd > 0 {
238 pd.wt.f = netpollWriteDeadline
239 pd.wt.when = pd.wd
240 pd.wt.arg = pd
241 pd.wt.seq = pd.seq
242 addtimer(&pd.wt)
243 }
244 }
245
246 var rg, wg *g
247 atomicstorep(unsafe.Pointer(&wg), nil)
248 if pd.rd < 0 {
249 rg = netpollunblock(pd, 'r', false)
250 }
251 if pd.wd < 0 {
252 wg = netpollunblock(pd, 'w', false)
253 }
254 unlock(&pd.lock)
255 if rg != nil {
256 netpollgoready(rg, 3)
257 }
258 if wg != nil {
259 netpollgoready(wg, 3)
260 }
261 }
262
263
264 func poll_runtime_pollUnblock(pd *pollDesc) {
265 lock(&pd.lock)
266 if pd.closing {
267 throw("runtime: unblock on closing polldesc")
268 }
269 pd.closing = true
270 pd.seq++
271 var rg, wg *g
272 atomicstorep(unsafe.Pointer(&rg), nil)
273 rg = netpollunblock(pd, 'r', false)
274 wg = netpollunblock(pd, 'w', false)
275 if pd.rt.f != nil {
276 deltimer(&pd.rt)
277 pd.rt.f = nil
278 }
279 if pd.wt.f != nil {
280 deltimer(&pd.wt)
281 pd.wt.f = nil
282 }
283 unlock(&pd.lock)
284 if rg != nil {
285 netpollgoready(rg, 3)
286 }
287 if wg != nil {
288 netpollgoready(wg, 3)
289 }
290 }
291
292
293
294
295 func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
296 var rg, wg guintptr
297 if mode == 'r' || mode == 'r'+'w' {
298 rg.set(netpollunblock(pd, 'r', true))
299 }
300 if mode == 'w' || mode == 'r'+'w' {
301 wg.set(netpollunblock(pd, 'w', true))
302 }
303 if rg != 0 {
304 rg.ptr().schedlink = *gpp
305 *gpp = rg
306 }
307 if wg != 0 {
308 wg.ptr().schedlink = *gpp
309 *gpp = wg
310 }
311 }
312
313 func netpollcheckerr(pd *pollDesc, mode int32) int {
314 if pd.closing {
315 return 1
316 }
317 if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
318 return 2
319 }
320 return 0
321 }
322
323 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
324 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
325 if r {
326
327
328
329 atomic.Xadd(&netpollWaiters, 1)
330 }
331 return r
332 }
333
334 func netpollgoready(gp *g, traceskip int) {
335 atomic.Xadd(&netpollWaiters, -1)
336 goready(gp, traceskip+1)
337 }
338
339
340
341 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
342 gpp := &pd.rg
343 if mode == 'w' {
344 gpp = &pd.wg
345 }
346
347
348 for {
349 old := *gpp
350 if old == pdReady {
351 *gpp = 0
352 return true
353 }
354 if old != 0 {
355 throw("runtime: double wait")
356 }
357 if atomic.Casuintptr(gpp, 0, pdWait) {
358 break
359 }
360 }
361
362
363
364
365 if waitio || netpollcheckerr(pd, mode) == 0 {
366 gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
367 }
368
369 old := atomic.Xchguintptr(gpp, 0)
370 if old > pdWait {
371 throw("runtime: corrupted polldesc")
372 }
373 return old == pdReady
374 }
375
376 func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
377 gpp := &pd.rg
378 if mode == 'w' {
379 gpp = &pd.wg
380 }
381
382 for {
383 old := *gpp
384 if old == pdReady {
385 return nil
386 }
387 if old == 0 && !ioready {
388
389
390 return nil
391 }
392 var new uintptr
393 if ioready {
394 new = pdReady
395 }
396 if atomic.Casuintptr(gpp, old, new) {
397 if old == pdReady || old == pdWait {
398 old = 0
399 }
400 return (*g)(unsafe.Pointer(old))
401 }
402 }
403 }
404
405 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
406 lock(&pd.lock)
407
408
409 if seq != pd.seq {
410
411 unlock(&pd.lock)
412 return
413 }
414 var rg *g
415 if read {
416 if pd.rd <= 0 || pd.rt.f == nil {
417 throw("runtime: inconsistent read deadline")
418 }
419 pd.rd = -1
420 atomicstorep(unsafe.Pointer(&pd.rt.f), nil)
421 rg = netpollunblock(pd, 'r', false)
422 }
423 var wg *g
424 if write {
425 if pd.wd <= 0 || pd.wt.f == nil && !read {
426 throw("runtime: inconsistent write deadline")
427 }
428 pd.wd = -1
429 atomicstorep(unsafe.Pointer(&pd.wt.f), nil)
430 wg = netpollunblock(pd, 'w', false)
431 }
432 unlock(&pd.lock)
433 if rg != nil {
434 netpollgoready(rg, 0)
435 }
436 if wg != nil {
437 netpollgoready(wg, 0)
438 }
439 }
440
441 func netpollDeadline(arg interface{}, seq uintptr) {
442 netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
443 }
444
445 func netpollReadDeadline(arg interface{}, seq uintptr) {
446 netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
447 }
448
449 func netpollWriteDeadline(arg interface{}, seq uintptr) {
450 netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
451 }
452
453 func (c *pollCache) alloc() *pollDesc {
454 lock(&c.lock)
455 if c.first == nil {
456 const pdSize = unsafe.Sizeof(pollDesc{})
457 n := pollBlockSize / pdSize
458 if n == 0 {
459 n = 1
460 }
461
462
463 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
464 for i := uintptr(0); i < n; i++ {
465 pd := (*pollDesc)(add(mem, i*pdSize))
466 pd.link = c.first
467 c.first = pd
468 }
469 }
470 pd := c.first
471 c.first = pd.link
472 unlock(&c.lock)
473 return pd
474 }
475
View as plain text