Source file
src/runtime/chan.go
Documentation: runtime
1
2
3
4
5 package runtime
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import (
21 "runtime/internal/atomic"
22 "unsafe"
23 )
24
25 const (
26 maxAlign = 8
27 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
28 debugChan = false
29 )
30
31 type hchan struct {
32 qcount uint
33 dataqsiz uint
34 buf unsafe.Pointer
35 elemsize uint16
36 closed uint32
37 elemtype *_type
38 sendx uint
39 recvx uint
40 recvq waitq
41 sendq waitq
42
43
44
45
46
47
48
49 lock mutex
50 }
51
52 type waitq struct {
53 first *sudog
54 last *sudog
55 }
56
57
58 func reflect_makechan(t *chantype, size int) *hchan {
59 return makechan(t, size)
60 }
61
62 func makechan64(t *chantype, size int64) *hchan {
63 if int64(int(size)) != size {
64 panic(plainError("makechan: size out of range"))
65 }
66
67 return makechan(t, int(size))
68 }
69
70 func makechan(t *chantype, size int) *hchan {
71 elem := t.elem
72
73
74 if elem.size >= 1<<16 {
75 throw("makechan: invalid channel element type")
76 }
77 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
78 throw("makechan: bad alignment")
79 }
80
81 if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > _MaxMem-hchanSize {
82 panic(plainError("makechan: size out of range"))
83 }
84
85
86
87
88
89 var c *hchan
90 switch {
91 case size == 0 || elem.size == 0:
92
93 c = (*hchan)(mallocgc(hchanSize, nil, true))
94
95 c.buf = unsafe.Pointer(c)
96 case elem.kind&kindNoPointers != 0:
97
98
99 c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
100 c.buf = add(unsafe.Pointer(c), hchanSize)
101 default:
102
103 c = new(hchan)
104 c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
105 }
106
107 c.elemsize = uint16(elem.size)
108 c.elemtype = elem
109 c.dataqsiz = uint(size)
110
111 if debugChan {
112 print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
113 }
114 return c
115 }
116
117
118 func chanbuf(c *hchan, i uint) unsafe.Pointer {
119 return add(c.buf, uintptr(i)*uintptr(c.elemsize))
120 }
121
122
123
124 func chansend1(c *hchan, elem unsafe.Pointer) {
125 chansend(c, elem, true, getcallerpc())
126 }
127
128
140 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
141 if c == nil {
142 if !block {
143 return false
144 }
145 gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
146 throw("unreachable")
147 }
148
149 if debugChan {
150 print("chansend: chan=", c, "\n")
151 }
152
153 if raceenabled {
154 racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
172 (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
173 return false
174 }
175
176 var t0 int64
177 if blockprofilerate > 0 {
178 t0 = cputicks()
179 }
180
181 lock(&c.lock)
182
183 if c.closed != 0 {
184 unlock(&c.lock)
185 panic(plainError("send on closed channel"))
186 }
187
188 if sg := c.recvq.dequeue(); sg != nil {
189
190
191 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
192 return true
193 }
194
195 if c.qcount < c.dataqsiz {
196
197 qp := chanbuf(c, c.sendx)
198 if raceenabled {
199 raceacquire(qp)
200 racerelease(qp)
201 }
202 typedmemmove(c.elemtype, qp, ep)
203 c.sendx++
204 if c.sendx == c.dataqsiz {
205 c.sendx = 0
206 }
207 c.qcount++
208 unlock(&c.lock)
209 return true
210 }
211
212 if !block {
213 unlock(&c.lock)
214 return false
215 }
216
217
218 gp := getg()
219 mysg := acquireSudog()
220 mysg.releasetime = 0
221 if t0 != 0 {
222 mysg.releasetime = -1
223 }
224
225
226 mysg.elem = ep
227 mysg.waitlink = nil
228 mysg.g = gp
229 mysg.isSelect = false
230 mysg.c = c
231 gp.waiting = mysg
232 gp.param = nil
233 c.sendq.enqueue(mysg)
234 goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
235
236
237 if mysg != gp.waiting {
238 throw("G waiting list is corrupted")
239 }
240 gp.waiting = nil
241 if gp.param == nil {
242 if c.closed == 0 {
243 throw("chansend: spurious wakeup")
244 }
245 panic(plainError("send on closed channel"))
246 }
247 gp.param = nil
248 if mysg.releasetime > 0 {
249 blockevent(mysg.releasetime-t0, 2)
250 }
251 mysg.c = nil
252 releaseSudog(mysg)
253 return true
254 }
255
256
257
258
259
260
261
262 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
263 if raceenabled {
264 if c.dataqsiz == 0 {
265 racesync(c, sg)
266 } else {
267
268
269
270 qp := chanbuf(c, c.recvx)
271 raceacquire(qp)
272 racerelease(qp)
273 raceacquireg(sg.g, qp)
274 racereleaseg(sg.g, qp)
275 c.recvx++
276 if c.recvx == c.dataqsiz {
277 c.recvx = 0
278 }
279 c.sendx = c.recvx
280 }
281 }
282 if sg.elem != nil {
283 sendDirect(c.elemtype, sg, ep)
284 sg.elem = nil
285 }
286 gp := sg.g
287 unlockf()
288 gp.param = unsafe.Pointer(sg)
289 if sg.releasetime != 0 {
290 sg.releasetime = cputicks()
291 }
292 goready(gp, skip+1)
293 }
294
295
296
297
298
299
300
301
302
303
304
305 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
306
307
308
309
310
311 dst := sg.elem
312 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
313 memmove(dst, src, t.size)
314 }
315
316 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
317
318
319
320 src := sg.elem
321 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
322 memmove(dst, src, t.size)
323 }
324
325 func closechan(c *hchan) {
326 if c == nil {
327 panic(plainError("close of nil channel"))
328 }
329
330 lock(&c.lock)
331 if c.closed != 0 {
332 unlock(&c.lock)
333 panic(plainError("close of closed channel"))
334 }
335
336 if raceenabled {
337 callerpc := getcallerpc()
338 racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
339 racerelease(unsafe.Pointer(c))
340 }
341
342 c.closed = 1
343
344 var glist *g
345
346
347 for {
348 sg := c.recvq.dequeue()
349 if sg == nil {
350 break
351 }
352 if sg.elem != nil {
353 typedmemclr(c.elemtype, sg.elem)
354 sg.elem = nil
355 }
356 if sg.releasetime != 0 {
357 sg.releasetime = cputicks()
358 }
359 gp := sg.g
360 gp.param = nil
361 if raceenabled {
362 raceacquireg(gp, unsafe.Pointer(c))
363 }
364 gp.schedlink.set(glist)
365 glist = gp
366 }
367
368
369 for {
370 sg := c.sendq.dequeue()
371 if sg == nil {
372 break
373 }
374 sg.elem = nil
375 if sg.releasetime != 0 {
376 sg.releasetime = cputicks()
377 }
378 gp := sg.g
379 gp.param = nil
380 if raceenabled {
381 raceacquireg(gp, unsafe.Pointer(c))
382 }
383 gp.schedlink.set(glist)
384 glist = gp
385 }
386 unlock(&c.lock)
387
388
389 for glist != nil {
390 gp := glist
391 glist = glist.schedlink.ptr()
392 gp.schedlink = 0
393 goready(gp, 3)
394 }
395 }
396
397
398
399 func chanrecv1(c *hchan, elem unsafe.Pointer) {
400 chanrecv(c, elem, true)
401 }
402
403
404 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
405 _, received = chanrecv(c, elem, true)
406 return
407 }
408
409
410
411
412
413
414
415 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
416
417
418
419 if debugChan {
420 print("chanrecv: chan=", c, "\n")
421 }
422
423 if c == nil {
424 if !block {
425 return
426 }
427 gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
428 throw("unreachable")
429 }
430
431
432
433
434
435
436
437
438
439
440
441
442
443 if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
444 c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
445 atomic.Load(&c.closed) == 0 {
446 return
447 }
448
449 var t0 int64
450 if blockprofilerate > 0 {
451 t0 = cputicks()
452 }
453
454 lock(&c.lock)
455
456 if c.closed != 0 && c.qcount == 0 {
457 if raceenabled {
458 raceacquire(unsafe.Pointer(c))
459 }
460 unlock(&c.lock)
461 if ep != nil {
462 typedmemclr(c.elemtype, ep)
463 }
464 return true, false
465 }
466
467 if sg := c.sendq.dequeue(); sg != nil {
468
469
470
471
472 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
473 return true, true
474 }
475
476 if c.qcount > 0 {
477
478 qp := chanbuf(c, c.recvx)
479 if raceenabled {
480 raceacquire(qp)
481 racerelease(qp)
482 }
483 if ep != nil {
484 typedmemmove(c.elemtype, ep, qp)
485 }
486 typedmemclr(c.elemtype, qp)
487 c.recvx++
488 if c.recvx == c.dataqsiz {
489 c.recvx = 0
490 }
491 c.qcount--
492 unlock(&c.lock)
493 return true, true
494 }
495
496 if !block {
497 unlock(&c.lock)
498 return false, false
499 }
500
501
502 gp := getg()
503 mysg := acquireSudog()
504 mysg.releasetime = 0
505 if t0 != 0 {
506 mysg.releasetime = -1
507 }
508
509
510 mysg.elem = ep
511 mysg.waitlink = nil
512 gp.waiting = mysg
513 mysg.g = gp
514 mysg.isSelect = false
515 mysg.c = c
516 gp.param = nil
517 c.recvq.enqueue(mysg)
518 goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
519
520
521 if mysg != gp.waiting {
522 throw("G waiting list is corrupted")
523 }
524 gp.waiting = nil
525 if mysg.releasetime > 0 {
526 blockevent(mysg.releasetime-t0, 2)
527 }
528 closed := gp.param == nil
529 gp.param = nil
530 mysg.c = nil
531 releaseSudog(mysg)
532 return true, !closed
533 }
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
549 if c.dataqsiz == 0 {
550 if raceenabled {
551 racesync(c, sg)
552 }
553 if ep != nil {
554
555 recvDirect(c.elemtype, sg, ep)
556 }
557 } else {
558
559
560
561
562 qp := chanbuf(c, c.recvx)
563 if raceenabled {
564 raceacquire(qp)
565 racerelease(qp)
566 raceacquireg(sg.g, qp)
567 racereleaseg(sg.g, qp)
568 }
569
570 if ep != nil {
571 typedmemmove(c.elemtype, ep, qp)
572 }
573
574 typedmemmove(c.elemtype, qp, sg.elem)
575 c.recvx++
576 if c.recvx == c.dataqsiz {
577 c.recvx = 0
578 }
579 c.sendx = c.recvx
580 }
581 sg.elem = nil
582 gp := sg.g
583 unlockf()
584 gp.param = unsafe.Pointer(sg)
585 if sg.releasetime != 0 {
586 sg.releasetime = cputicks()
587 }
588 goready(gp, skip+1)
589 }
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
609 return chansend(c, elem, false, getcallerpc())
610 }
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
630 selected, _ = chanrecv(c, elem, false)
631 return
632 }
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651 func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
652
653 selected, *received = chanrecv(c, elem, false)
654 return
655 }
656
657
658 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
659 return chansend(c, elem, !nb, getcallerpc())
660 }
661
662
663 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
664 return chanrecv(c, elem, !nb)
665 }
666
667
668 func reflect_chanlen(c *hchan) int {
669 if c == nil {
670 return 0
671 }
672 return int(c.qcount)
673 }
674
675
676 func reflect_chancap(c *hchan) int {
677 if c == nil {
678 return 0
679 }
680 return int(c.dataqsiz)
681 }
682
683
684 func reflect_chanclose(c *hchan) {
685 closechan(c)
686 }
687
688 func (q *waitq) enqueue(sgp *sudog) {
689 sgp.next = nil
690 x := q.last
691 if x == nil {
692 sgp.prev = nil
693 q.first = sgp
694 q.last = sgp
695 return
696 }
697 sgp.prev = x
698 x.next = sgp
699 q.last = sgp
700 }
701
702 func (q *waitq) dequeue() *sudog {
703 for {
704 sgp := q.first
705 if sgp == nil {
706 return nil
707 }
708 y := sgp.next
709 if y == nil {
710 q.first = nil
711 q.last = nil
712 } else {
713 y.prev = nil
714 q.first = y
715 sgp.next = nil
716 }
717
718
719
720
721
722
723
724
725
726 if sgp.isSelect {
727 if !atomic.Cas(&sgp.g.selectDone, 0, 1) {
728 continue
729 }
730 }
731
732 return sgp
733 }
734 }
735
736 func racesync(c *hchan, sg *sudog) {
737 racerelease(chanbuf(c, 0))
738 raceacquireg(sg.g, chanbuf(c, 0))
739 racereleaseg(sg.g, chanbuf(c, 0))
740 raceacquire(chanbuf(c, 0))
741 }
742
View as plain text