Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "io"
21 "log"
22 "net"
23 "net/http/httptrace"
24 "net/url"
25 "os"
26 "strings"
27 "sync"
28 "sync/atomic"
29 "time"
30
31 "golang_org/x/net/lex/httplex"
32 "golang_org/x/net/proxy"
33 )
34
35
36
37
38
39
40 var DefaultTransport RoundTripper = &Transport{
41 Proxy: ProxyFromEnvironment,
42 DialContext: (&net.Dialer{
43 Timeout: 30 * time.Second,
44 KeepAlive: 30 * time.Second,
45 DualStack: true,
46 }).DialContext,
47 MaxIdleConns: 100,
48 IdleConnTimeout: 90 * time.Second,
49 TLSHandshakeTimeout: 10 * time.Second,
50 ExpectContinueTimeout: 1 * time.Second,
51 }
52
53
54
55 const DefaultMaxIdleConnsPerHost = 2
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 type Transport struct {
86 idleMu sync.Mutex
87 wantIdle bool
88 idleConn map[connectMethodKey][]*persistConn
89 idleConnCh map[connectMethodKey]chan *persistConn
90 idleLRU connLRU
91
92 reqMu sync.Mutex
93 reqCanceler map[*Request]func(error)
94
95 altMu sync.Mutex
96 altProto atomic.Value
97
98
99
100
101
102
103
104
105
106
107 Proxy func(*Request) (*url.URL, error)
108
109
110
111
112 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
113
114
115
116
117
118
119 Dial func(network, addr string) (net.Conn, error)
120
121
122
123
124
125
126
127
128
129
130 DialTLS func(network, addr string) (net.Conn, error)
131
132
133
134
135
136 TLSClientConfig *tls.Config
137
138
139
140 TLSHandshakeTimeout time.Duration
141
142
143
144 DisableKeepAlives bool
145
146
147
148
149
150
151
152
153
154 DisableCompression bool
155
156
157
158 MaxIdleConns int
159
160
161
162
163 MaxIdleConnsPerHost int
164
165
166
167
168
169 IdleConnTimeout time.Duration
170
171
172
173
174
175 ResponseHeaderTimeout time.Duration
176
177
178
179
180
181
182
183
184 ExpectContinueTimeout time.Duration
185
186
187
188
189
190
191
192
193
194
195
196 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
197
198
199
200 ProxyConnectHeader Header
201
202
203
204
205
206
207 MaxResponseHeaderBytes int64
208
209
210
211 nextProtoOnce sync.Once
212 h2transport *http2Transport
213
214
215 }
216
217
218
219 func (t *Transport) onceSetNextProtoDefaults() {
220 if strings.Contains(os.Getenv("GODEBUG"), "http2client=0") {
221 return
222 }
223 if t.TLSNextProto != nil {
224
225
226 return
227 }
228 if t.TLSClientConfig != nil || t.Dial != nil || t.DialTLS != nil {
229
230
231
232
233
234 return
235 }
236 t2, err := http2configureTransport(t)
237 if err != nil {
238 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
239 return
240 }
241 t.h2transport = t2
242
243
244
245
246
247
248
249 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
250 const h2max = 1<<32 - 1
251 if limit1 >= h2max {
252 t2.MaxHeaderListSize = h2max
253 } else {
254 t2.MaxHeaderListSize = uint32(limit1)
255 }
256 }
257 }
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
276 var proxy string
277 if req.URL.Scheme == "https" {
278 proxy = httpsProxyEnv.Get()
279 }
280 if proxy == "" {
281 proxy = httpProxyEnv.Get()
282 if proxy != "" && os.Getenv("REQUEST_METHOD") != "" {
283 return nil, errors.New("net/http: refusing to use HTTP_PROXY value in CGI environment; see golang.org/s/cgihttpproxy")
284 }
285 }
286 if proxy == "" {
287 return nil, nil
288 }
289 if !useProxy(canonicalAddr(req.URL)) {
290 return nil, nil
291 }
292 proxyURL, err := url.Parse(proxy)
293 if err != nil ||
294 (proxyURL.Scheme != "http" &&
295 proxyURL.Scheme != "https" &&
296 proxyURL.Scheme != "socks5") {
297
298
299
300 if proxyURL, err := url.Parse("http://" + proxy); err == nil {
301 return proxyURL, nil
302 }
303
304 }
305 if err != nil {
306 return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
307 }
308 return proxyURL, nil
309 }
310
311
312
313 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
314 return func(*Request) (*url.URL, error) {
315 return fixedURL, nil
316 }
317 }
318
319
320
321
322 type transportRequest struct {
323 *Request
324 extra Header
325 trace *httptrace.ClientTrace
326
327 mu sync.Mutex
328 err error
329 }
330
331 func (tr *transportRequest) extraHeaders() Header {
332 if tr.extra == nil {
333 tr.extra = make(Header)
334 }
335 return tr.extra
336 }
337
338 func (tr *transportRequest) setError(err error) {
339 tr.mu.Lock()
340 if tr.err == nil {
341 tr.err = err
342 }
343 tr.mu.Unlock()
344 }
345
346
347
348
349
350 func (t *Transport) RoundTrip(req *Request) (*Response, error) {
351 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
352 ctx := req.Context()
353 trace := httptrace.ContextClientTrace(ctx)
354
355 if req.URL == nil {
356 req.closeBody()
357 return nil, errors.New("http: nil Request.URL")
358 }
359 if req.Header == nil {
360 req.closeBody()
361 return nil, errors.New("http: nil Request.Header")
362 }
363 scheme := req.URL.Scheme
364 isHTTP := scheme == "http" || scheme == "https"
365 if isHTTP {
366 for k, vv := range req.Header {
367 if !httplex.ValidHeaderFieldName(k) {
368 return nil, fmt.Errorf("net/http: invalid header field name %q", k)
369 }
370 for _, v := range vv {
371 if !httplex.ValidHeaderFieldValue(v) {
372 return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
373 }
374 }
375 }
376 }
377
378 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
379 if altRT := altProto[scheme]; altRT != nil {
380 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
381 return resp, err
382 }
383 }
384 if !isHTTP {
385 req.closeBody()
386 return nil, &badStringError{"unsupported protocol scheme", scheme}
387 }
388 if req.Method != "" && !validMethod(req.Method) {
389 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
390 }
391 if req.URL.Host == "" {
392 req.closeBody()
393 return nil, errors.New("http: no Host in request URL")
394 }
395
396 for {
397
398 treq := &transportRequest{Request: req, trace: trace}
399 cm, err := t.connectMethodForRequest(treq)
400 if err != nil {
401 req.closeBody()
402 return nil, err
403 }
404
405
406
407
408
409 pconn, err := t.getConn(treq, cm)
410 if err != nil {
411 t.setReqCanceler(req, nil)
412 req.closeBody()
413 return nil, err
414 }
415
416 var resp *Response
417 if pconn.alt != nil {
418
419 t.setReqCanceler(req, nil)
420 resp, err = pconn.alt.RoundTrip(req)
421 } else {
422 resp, err = pconn.roundTrip(treq)
423 }
424 if err == nil {
425 return resp, nil
426 }
427 if !pconn.shouldRetryRequest(req, err) {
428
429
430 if e, ok := err.(transportReadFromServerError); ok {
431 err = e.err
432 }
433 return nil, err
434 }
435 testHookRoundTripRetried()
436
437
438
439 if req.GetBody != nil && pconn.alt == nil {
440 newReq := *req
441 var err error
442 newReq.Body, err = req.GetBody()
443 if err != nil {
444 return nil, err
445 }
446 req = &newReq
447 }
448 }
449 }
450
451
452
453
454 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
455 if http2isNoCachedConnError(err) {
456
457
458
459
460
461
462 return true
463 }
464 if err == errMissingHost {
465
466 return false
467 }
468 if !pc.isReused() {
469
470
471
472
473
474
475
476 return false
477 }
478 if _, ok := err.(nothingWrittenError); ok {
479
480
481 return req.outgoingLength() == 0 || req.GetBody != nil
482 }
483 if !req.isReplayable() {
484
485 return false
486 }
487 if _, ok := err.(transportReadFromServerError); ok {
488
489
490 return true
491 }
492 if err == errServerClosedIdle {
493
494
495
496 return true
497 }
498 return false
499 }
500
501
502 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
503
504
505
506
507
508
509
510
511
512
513
514 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
515 t.altMu.Lock()
516 defer t.altMu.Unlock()
517 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
518 if _, exists := oldMap[scheme]; exists {
519 panic("protocol " + scheme + " already registered")
520 }
521 newMap := make(map[string]RoundTripper)
522 for k, v := range oldMap {
523 newMap[k] = v
524 }
525 newMap[scheme] = rt
526 t.altProto.Store(newMap)
527 }
528
529
530
531
532
533 func (t *Transport) CloseIdleConnections() {
534 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
535 t.idleMu.Lock()
536 m := t.idleConn
537 t.idleConn = nil
538 t.idleConnCh = nil
539 t.wantIdle = true
540 t.idleLRU = connLRU{}
541 t.idleMu.Unlock()
542 for _, conns := range m {
543 for _, pconn := range conns {
544 pconn.close(errCloseIdleConns)
545 }
546 }
547 if t2 := t.h2transport; t2 != nil {
548 t2.CloseIdleConnections()
549 }
550 }
551
552
553
554
555
556
557
558 func (t *Transport) CancelRequest(req *Request) {
559 t.cancelRequest(req, errRequestCanceled)
560 }
561
562
563 func (t *Transport) cancelRequest(req *Request, err error) {
564 t.reqMu.Lock()
565 cancel := t.reqCanceler[req]
566 delete(t.reqCanceler, req)
567 t.reqMu.Unlock()
568 if cancel != nil {
569 cancel(err)
570 }
571 }
572
573
574
575
576
577 var (
578 httpProxyEnv = &envOnce{
579 names: []string{"HTTP_PROXY", "http_proxy"},
580 }
581 httpsProxyEnv = &envOnce{
582 names: []string{"HTTPS_PROXY", "https_proxy"},
583 }
584 noProxyEnv = &envOnce{
585 names: []string{"NO_PROXY", "no_proxy"},
586 }
587 )
588
589
590
591
592 type envOnce struct {
593 names []string
594 once sync.Once
595 val string
596 }
597
598 func (e *envOnce) Get() string {
599 e.once.Do(e.init)
600 return e.val
601 }
602
603 func (e *envOnce) init() {
604 for _, n := range e.names {
605 e.val = os.Getenv(n)
606 if e.val != "" {
607 return
608 }
609 }
610 }
611
612
613 func (e *envOnce) reset() {
614 e.once = sync.Once{}
615 e.val = ""
616 }
617
618 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
619 if port := treq.URL.Port(); !validPort(port) {
620 return cm, fmt.Errorf("invalid URL port %q", port)
621 }
622 cm.targetScheme = treq.URL.Scheme
623 cm.targetAddr = canonicalAddr(treq.URL)
624 if t.Proxy != nil {
625 cm.proxyURL, err = t.Proxy(treq.Request)
626 if err == nil && cm.proxyURL != nil {
627 if port := cm.proxyURL.Port(); !validPort(port) {
628 return cm, fmt.Errorf("invalid proxy URL port %q", port)
629 }
630 }
631 }
632 return cm, err
633 }
634
635
636
637 func (cm *connectMethod) proxyAuth() string {
638 if cm.proxyURL == nil {
639 return ""
640 }
641 if u := cm.proxyURL.User; u != nil {
642 username := u.Username()
643 password, _ := u.Password()
644 return "Basic " + basicAuth(username, password)
645 }
646 return ""
647 }
648
649
650 var (
651 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
652 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
653 errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
654 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
655 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
656 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
657 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
658 errIdleConnTimeout = errors.New("http: idle connection timeout")
659 errNotCachingH2Conn = errors.New("http: not caching alternate protocol's connections")
660
661
662
663
664
665 errServerClosedIdle = errors.New("http: server closed idle connection")
666 )
667
668
669
670
671
672
673
674
675
676 type transportReadFromServerError struct {
677 err error
678 }
679
680 func (e transportReadFromServerError) Error() string {
681 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
682 }
683
684 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
685 if err := t.tryPutIdleConn(pconn); err != nil {
686 pconn.close(err)
687 }
688 }
689
690 func (t *Transport) maxIdleConnsPerHost() int {
691 if v := t.MaxIdleConnsPerHost; v != 0 {
692 return v
693 }
694 return DefaultMaxIdleConnsPerHost
695 }
696
697
698
699
700
701
702 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
703 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
704 return errKeepAlivesDisabled
705 }
706 if pconn.isBroken() {
707 return errConnBroken
708 }
709 if pconn.alt != nil {
710 return errNotCachingH2Conn
711 }
712 pconn.markReused()
713 key := pconn.cacheKey
714
715 t.idleMu.Lock()
716 defer t.idleMu.Unlock()
717
718 waitingDialer := t.idleConnCh[key]
719 select {
720 case waitingDialer <- pconn:
721
722
723
724
725
726 return nil
727 default:
728 if waitingDialer != nil {
729
730
731 delete(t.idleConnCh, key)
732 }
733 }
734 if t.wantIdle {
735 return errWantIdle
736 }
737 if t.idleConn == nil {
738 t.idleConn = make(map[connectMethodKey][]*persistConn)
739 }
740 idles := t.idleConn[key]
741 if len(idles) >= t.maxIdleConnsPerHost() {
742 return errTooManyIdleHost
743 }
744 for _, exist := range idles {
745 if exist == pconn {
746 log.Fatalf("dup idle pconn %p in freelist", pconn)
747 }
748 }
749 t.idleConn[key] = append(idles, pconn)
750 t.idleLRU.add(pconn)
751 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
752 oldest := t.idleLRU.removeOldest()
753 oldest.close(errTooManyIdle)
754 t.removeIdleConnLocked(oldest)
755 }
756 if t.IdleConnTimeout > 0 {
757 if pconn.idleTimer != nil {
758 pconn.idleTimer.Reset(t.IdleConnTimeout)
759 } else {
760 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
761 }
762 }
763 pconn.idleAt = time.Now()
764 return nil
765 }
766
767
768
769
770 func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn {
771 if t.DisableKeepAlives {
772 return nil
773 }
774 key := cm.key()
775 t.idleMu.Lock()
776 defer t.idleMu.Unlock()
777 t.wantIdle = false
778 if t.idleConnCh == nil {
779 t.idleConnCh = make(map[connectMethodKey]chan *persistConn)
780 }
781 ch, ok := t.idleConnCh[key]
782 if !ok {
783 ch = make(chan *persistConn)
784 t.idleConnCh[key] = ch
785 }
786 return ch
787 }
788
789 func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince time.Time) {
790 key := cm.key()
791 t.idleMu.Lock()
792 defer t.idleMu.Unlock()
793 for {
794 pconns, ok := t.idleConn[key]
795 if !ok {
796 return nil, time.Time{}
797 }
798 if len(pconns) == 1 {
799 pconn = pconns[0]
800 delete(t.idleConn, key)
801 } else {
802
803
804 pconn = pconns[len(pconns)-1]
805 t.idleConn[key] = pconns[:len(pconns)-1]
806 }
807 t.idleLRU.remove(pconn)
808 if pconn.isBroken() {
809
810
811
812
813
814 continue
815 }
816 if pconn.idleTimer != nil && !pconn.idleTimer.Stop() {
817
818
819
820 continue
821 }
822 return pconn, pconn.idleAt
823 }
824 }
825
826
827 func (t *Transport) removeIdleConn(pconn *persistConn) {
828 t.idleMu.Lock()
829 defer t.idleMu.Unlock()
830 t.removeIdleConnLocked(pconn)
831 }
832
833
834 func (t *Transport) removeIdleConnLocked(pconn *persistConn) {
835 if pconn.idleTimer != nil {
836 pconn.idleTimer.Stop()
837 }
838 t.idleLRU.remove(pconn)
839 key := pconn.cacheKey
840 pconns := t.idleConn[key]
841 switch len(pconns) {
842 case 0:
843
844 case 1:
845 if pconns[0] == pconn {
846 delete(t.idleConn, key)
847 }
848 default:
849 for i, v := range pconns {
850 if v != pconn {
851 continue
852 }
853
854
855 copy(pconns[i:], pconns[i+1:])
856 t.idleConn[key] = pconns[:len(pconns)-1]
857 break
858 }
859 }
860 }
861
862 func (t *Transport) setReqCanceler(r *Request, fn func(error)) {
863 t.reqMu.Lock()
864 defer t.reqMu.Unlock()
865 if t.reqCanceler == nil {
866 t.reqCanceler = make(map[*Request]func(error))
867 }
868 if fn != nil {
869 t.reqCanceler[r] = fn
870 } else {
871 delete(t.reqCanceler, r)
872 }
873 }
874
875
876
877
878
879 func (t *Transport) replaceReqCanceler(r *Request, fn func(error)) bool {
880 t.reqMu.Lock()
881 defer t.reqMu.Unlock()
882 _, ok := t.reqCanceler[r]
883 if !ok {
884 return false
885 }
886 if fn != nil {
887 t.reqCanceler[r] = fn
888 } else {
889 delete(t.reqCanceler, r)
890 }
891 return true
892 }
893
894 var zeroDialer net.Dialer
895
896 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
897 if t.DialContext != nil {
898 return t.DialContext(ctx, network, addr)
899 }
900 if t.Dial != nil {
901 c, err := t.Dial(network, addr)
902 if c == nil && err == nil {
903 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
904 }
905 return c, err
906 }
907 return zeroDialer.DialContext(ctx, network, addr)
908 }
909
910
911
912
913
914 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
915 req := treq.Request
916 trace := treq.trace
917 ctx := req.Context()
918 if trace != nil && trace.GetConn != nil {
919 trace.GetConn(cm.addr())
920 }
921 if pc, idleSince := t.getIdleConn(cm); pc != nil {
922 if trace != nil && trace.GotConn != nil {
923 trace.GotConn(pc.gotIdleConnTrace(idleSince))
924 }
925
926
927
928 t.setReqCanceler(req, func(error) {})
929 return pc, nil
930 }
931
932 type dialRes struct {
933 pc *persistConn
934 err error
935 }
936 dialc := make(chan dialRes)
937
938
939
940 testHookPrePendingDial := testHookPrePendingDial
941 testHookPostPendingDial := testHookPostPendingDial
942
943 handlePendingDial := func() {
944 testHookPrePendingDial()
945 go func() {
946 if v := <-dialc; v.err == nil {
947 t.putOrCloseIdleConn(v.pc)
948 }
949 testHookPostPendingDial()
950 }()
951 }
952
953 cancelc := make(chan error, 1)
954 t.setReqCanceler(req, func(err error) { cancelc <- err })
955
956 go func() {
957 pc, err := t.dialConn(ctx, cm)
958 dialc <- dialRes{pc, err}
959 }()
960
961 idleConnCh := t.getIdleConnCh(cm)
962 select {
963 case v := <-dialc:
964
965 if v.pc != nil {
966 if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
967 trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
968 }
969 return v.pc, nil
970 }
971
972
973 select {
974 case <-req.Cancel:
975
976
977 return nil, errRequestCanceledConn
978 case <-req.Context().Done():
979 return nil, req.Context().Err()
980 case err := <-cancelc:
981 if err == errRequestCanceled {
982 err = errRequestCanceledConn
983 }
984 return nil, err
985 default:
986
987
988 return nil, v.err
989 }
990 case pc := <-idleConnCh:
991
992
993
994
995
996 handlePendingDial()
997 if trace != nil && trace.GotConn != nil {
998 trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
999 }
1000 return pc, nil
1001 case <-req.Cancel:
1002 handlePendingDial()
1003 return nil, errRequestCanceledConn
1004 case <-req.Context().Done():
1005 handlePendingDial()
1006 return nil, req.Context().Err()
1007 case err := <-cancelc:
1008 handlePendingDial()
1009 if err == errRequestCanceled {
1010 err = errRequestCanceledConn
1011 }
1012 return nil, err
1013 }
1014 }
1015
1016 type oneConnDialer <-chan net.Conn
1017
1018 func newOneConnDialer(c net.Conn) proxy.Dialer {
1019 ch := make(chan net.Conn, 1)
1020 ch <- c
1021 return oneConnDialer(ch)
1022 }
1023
1024 func (d oneConnDialer) Dial(network, addr string) (net.Conn, error) {
1025 select {
1026 case c := <-d:
1027 return c, nil
1028 default:
1029 return nil, io.EOF
1030 }
1031 }
1032
1033
1034
1035 func chooseTLSHost(cm connectMethod, t *Transport) string {
1036 tlsHost := ""
1037 if t.TLSClientConfig != nil {
1038 tlsHost = t.TLSClientConfig.ServerName
1039 }
1040 if tlsHost == "" {
1041 tlsHost = cm.tlsHost()
1042 }
1043 return tlsHost
1044 }
1045
1046
1047
1048
1049 func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) error {
1050
1051 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1052 if cfg.ServerName == "" {
1053 cfg.ServerName = name
1054 }
1055 plainConn := pconn.conn
1056 tlsConn := tls.Client(plainConn, cfg)
1057 errc := make(chan error, 2)
1058 var timer *time.Timer
1059 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1060 timer = time.AfterFunc(d, func() {
1061 errc <- tlsHandshakeTimeoutError{}
1062 })
1063 }
1064 go func() {
1065 if trace != nil && trace.TLSHandshakeStart != nil {
1066 trace.TLSHandshakeStart()
1067 }
1068 err := tlsConn.Handshake()
1069 if timer != nil {
1070 timer.Stop()
1071 }
1072 errc <- err
1073 }()
1074 if err := <-errc; err != nil {
1075 plainConn.Close()
1076 if trace != nil && trace.TLSHandshakeDone != nil {
1077 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1078 }
1079 return err
1080 }
1081 if !cfg.InsecureSkipVerify {
1082 if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
1083 plainConn.Close()
1084 return err
1085 }
1086 }
1087 cs := tlsConn.ConnectionState()
1088 if trace != nil && trace.TLSHandshakeDone != nil {
1089 trace.TLSHandshakeDone(cs, nil)
1090 }
1091 pconn.tlsState = &cs
1092 pconn.conn = tlsConn
1093 return nil
1094 }
1095
1096 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
1097 pconn := &persistConn{
1098 t: t,
1099 cacheKey: cm.key(),
1100 reqch: make(chan requestAndChan, 1),
1101 writech: make(chan writeRequest, 1),
1102 closech: make(chan struct{}),
1103 writeErrCh: make(chan error, 1),
1104 writeLoopDone: make(chan struct{}),
1105 }
1106 trace := httptrace.ContextClientTrace(ctx)
1107 wrapErr := func(err error) error {
1108 if cm.proxyURL != nil {
1109
1110 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1111 }
1112 return err
1113 }
1114 if cm.scheme() == "https" && t.DialTLS != nil {
1115 var err error
1116 pconn.conn, err = t.DialTLS("tcp", cm.addr())
1117 if err != nil {
1118 return nil, wrapErr(err)
1119 }
1120 if pconn.conn == nil {
1121 return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)"))
1122 }
1123 if tc, ok := pconn.conn.(*tls.Conn); ok {
1124
1125
1126 if trace != nil && trace.TLSHandshakeStart != nil {
1127 trace.TLSHandshakeStart()
1128 }
1129 if err := tc.Handshake(); err != nil {
1130 go pconn.conn.Close()
1131 if trace != nil && trace.TLSHandshakeDone != nil {
1132 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1133 }
1134 return nil, err
1135 }
1136 cs := tc.ConnectionState()
1137 if trace != nil && trace.TLSHandshakeDone != nil {
1138 trace.TLSHandshakeDone(cs, nil)
1139 }
1140 pconn.tlsState = &cs
1141 }
1142 } else {
1143 conn, err := t.dial(ctx, "tcp", cm.addr())
1144 if err != nil {
1145 return nil, wrapErr(err)
1146 }
1147 pconn.conn = conn
1148 if cm.scheme() == "https" {
1149 var firstTLSHost string
1150 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1151 return nil, wrapErr(err)
1152 }
1153 if err = pconn.addTLS(firstTLSHost, trace); err != nil {
1154 return nil, wrapErr(err)
1155 }
1156 }
1157 }
1158
1159
1160 switch {
1161 case cm.proxyURL == nil:
1162
1163 case cm.proxyURL.Scheme == "socks5":
1164 conn := pconn.conn
1165 var auth *proxy.Auth
1166 if u := cm.proxyURL.User; u != nil {
1167 auth = &proxy.Auth{}
1168 auth.User = u.Username()
1169 auth.Password, _ = u.Password()
1170 }
1171 p, err := proxy.SOCKS5("", cm.addr(), auth, newOneConnDialer(conn))
1172 if err != nil {
1173 conn.Close()
1174 return nil, err
1175 }
1176 if _, err := p.Dial("tcp", cm.targetAddr); err != nil {
1177 conn.Close()
1178 return nil, err
1179 }
1180 case cm.targetScheme == "http":
1181 pconn.isProxy = true
1182 if pa := cm.proxyAuth(); pa != "" {
1183 pconn.mutateHeaderFunc = func(h Header) {
1184 h.Set("Proxy-Authorization", pa)
1185 }
1186 }
1187 case cm.targetScheme == "https":
1188 conn := pconn.conn
1189 hdr := t.ProxyConnectHeader
1190 if hdr == nil {
1191 hdr = make(Header)
1192 }
1193 connectReq := &Request{
1194 Method: "CONNECT",
1195 URL: &url.URL{Opaque: cm.targetAddr},
1196 Host: cm.targetAddr,
1197 Header: hdr,
1198 }
1199 if pa := cm.proxyAuth(); pa != "" {
1200 connectReq.Header.Set("Proxy-Authorization", pa)
1201 }
1202 connectReq.Write(conn)
1203
1204
1205
1206
1207 br := bufio.NewReader(conn)
1208 resp, err := ReadResponse(br, connectReq)
1209 if err != nil {
1210 conn.Close()
1211 return nil, err
1212 }
1213 if resp.StatusCode != 200 {
1214 f := strings.SplitN(resp.Status, " ", 2)
1215 conn.Close()
1216 if len(f) < 2 {
1217 return nil, errors.New("unknown status code")
1218 }
1219 return nil, errors.New(f[1])
1220 }
1221 }
1222
1223 if cm.proxyURL != nil && cm.targetScheme == "https" {
1224 if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
1225 return nil, err
1226 }
1227 }
1228
1229 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1230 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1231 return &persistConn{alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
1232 }
1233 }
1234
1235 pconn.br = bufio.NewReader(pconn)
1236 pconn.bw = bufio.NewWriter(persistConnWriter{pconn})
1237 go pconn.readLoop()
1238 go pconn.writeLoop()
1239 return pconn, nil
1240 }
1241
1242
1243
1244
1245
1246
1247
1248 type persistConnWriter struct {
1249 pc *persistConn
1250 }
1251
1252 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1253 n, err = w.pc.conn.Write(p)
1254 w.pc.nwrite += int64(n)
1255 return
1256 }
1257
1258
1259
1260
1261 func useProxy(addr string) bool {
1262 if len(addr) == 0 {
1263 return true
1264 }
1265 host, _, err := net.SplitHostPort(addr)
1266 if err != nil {
1267 return false
1268 }
1269 if host == "localhost" {
1270 return false
1271 }
1272 if ip := net.ParseIP(host); ip != nil {
1273 if ip.IsLoopback() {
1274 return false
1275 }
1276 }
1277
1278 noProxy := noProxyEnv.Get()
1279 if noProxy == "*" {
1280 return false
1281 }
1282
1283 addr = strings.ToLower(strings.TrimSpace(addr))
1284 if hasPort(addr) {
1285 addr = addr[:strings.LastIndex(addr, ":")]
1286 }
1287
1288 for _, p := range strings.Split(noProxy, ",") {
1289 p = strings.ToLower(strings.TrimSpace(p))
1290 if len(p) == 0 {
1291 continue
1292 }
1293 if hasPort(p) {
1294 p = p[:strings.LastIndex(p, ":")]
1295 }
1296 if addr == p {
1297 return false
1298 }
1299 if len(p) == 0 {
1300
1301 continue
1302 }
1303 if p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:]) {
1304
1305 return false
1306 }
1307 if p[0] != '.' && strings.HasSuffix(addr, p) && addr[len(addr)-len(p)-1] == '.' {
1308
1309 return false
1310 }
1311 }
1312 return true
1313 }
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331 type connectMethod struct {
1332 proxyURL *url.URL
1333 targetScheme string
1334
1335
1336
1337 targetAddr string
1338 }
1339
1340 func (cm *connectMethod) key() connectMethodKey {
1341 proxyStr := ""
1342 targetAddr := cm.targetAddr
1343 if cm.proxyURL != nil {
1344 proxyStr = cm.proxyURL.String()
1345 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
1346 targetAddr = ""
1347 }
1348 }
1349 return connectMethodKey{
1350 proxy: proxyStr,
1351 scheme: cm.targetScheme,
1352 addr: targetAddr,
1353 }
1354 }
1355
1356
1357 func (cm *connectMethod) scheme() string {
1358 if cm.proxyURL != nil {
1359 return cm.proxyURL.Scheme
1360 }
1361 return cm.targetScheme
1362 }
1363
1364
1365 func (cm *connectMethod) addr() string {
1366 if cm.proxyURL != nil {
1367 return canonicalAddr(cm.proxyURL)
1368 }
1369 return cm.targetAddr
1370 }
1371
1372
1373
1374 func (cm *connectMethod) tlsHost() string {
1375 h := cm.targetAddr
1376 if hasPort(h) {
1377 h = h[:strings.LastIndex(h, ":")]
1378 }
1379 return h
1380 }
1381
1382
1383
1384
1385 type connectMethodKey struct {
1386 proxy, scheme, addr string
1387 }
1388
1389 func (k connectMethodKey) String() string {
1390
1391 return fmt.Sprintf("%s|%s|%s", k.proxy, k.scheme, k.addr)
1392 }
1393
1394
1395
1396 type persistConn struct {
1397
1398
1399
1400 alt RoundTripper
1401
1402 t *Transport
1403 cacheKey connectMethodKey
1404 conn net.Conn
1405 tlsState *tls.ConnectionState
1406 br *bufio.Reader
1407 bw *bufio.Writer
1408 nwrite int64
1409 reqch chan requestAndChan
1410 writech chan writeRequest
1411 closech chan struct{}
1412 isProxy bool
1413 sawEOF bool
1414 readLimit int64
1415
1416
1417
1418
1419 writeErrCh chan error
1420
1421 writeLoopDone chan struct{}
1422
1423
1424 idleAt time.Time
1425 idleTimer *time.Timer
1426
1427 mu sync.Mutex
1428 numExpectedResponses int
1429 closed error
1430 canceledErr error
1431 broken bool
1432 reused bool
1433
1434
1435
1436 mutateHeaderFunc func(Header)
1437 }
1438
1439 func (pc *persistConn) maxHeaderResponseSize() int64 {
1440 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
1441 return v
1442 }
1443 return 10 << 20
1444 }
1445
1446 func (pc *persistConn) Read(p []byte) (n int, err error) {
1447 if pc.readLimit <= 0 {
1448 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
1449 }
1450 if int64(len(p)) > pc.readLimit {
1451 p = p[:pc.readLimit]
1452 }
1453 n, err = pc.conn.Read(p)
1454 if err == io.EOF {
1455 pc.sawEOF = true
1456 }
1457 pc.readLimit -= int64(n)
1458 return
1459 }
1460
1461
1462 func (pc *persistConn) isBroken() bool {
1463 pc.mu.Lock()
1464 b := pc.closed != nil
1465 pc.mu.Unlock()
1466 return b
1467 }
1468
1469
1470
1471 func (pc *persistConn) canceled() error {
1472 pc.mu.Lock()
1473 defer pc.mu.Unlock()
1474 return pc.canceledErr
1475 }
1476
1477
1478 func (pc *persistConn) isReused() bool {
1479 pc.mu.Lock()
1480 r := pc.reused
1481 pc.mu.Unlock()
1482 return r
1483 }
1484
1485 func (pc *persistConn) gotIdleConnTrace(idleAt time.Time) (t httptrace.GotConnInfo) {
1486 pc.mu.Lock()
1487 defer pc.mu.Unlock()
1488 t.Reused = pc.reused
1489 t.Conn = pc.conn
1490 t.WasIdle = true
1491 if !idleAt.IsZero() {
1492 t.IdleTime = time.Since(idleAt)
1493 }
1494 return
1495 }
1496
1497 func (pc *persistConn) cancelRequest(err error) {
1498 pc.mu.Lock()
1499 defer pc.mu.Unlock()
1500 pc.canceledErr = err
1501 pc.closeLocked(errRequestCanceled)
1502 }
1503
1504
1505
1506
1507 func (pc *persistConn) closeConnIfStillIdle() {
1508 t := pc.t
1509 t.idleMu.Lock()
1510 defer t.idleMu.Unlock()
1511 if _, ok := t.idleLRU.m[pc]; !ok {
1512
1513 return
1514 }
1515 t.removeIdleConnLocked(pc)
1516 pc.close(errIdleConnTimeout)
1517 }
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
1528 if err == nil {
1529 return nil
1530 }
1531
1532
1533
1534
1535 if cerr := pc.canceled(); cerr != nil {
1536 return cerr
1537 }
1538
1539
1540 req.mu.Lock()
1541 reqErr := req.err
1542 req.mu.Unlock()
1543 if reqErr != nil {
1544 return reqErr
1545 }
1546
1547 if err == errServerClosedIdle {
1548
1549 return err
1550 }
1551
1552 if _, ok := err.(transportReadFromServerError); ok {
1553
1554 return err
1555 }
1556 if pc.isBroken() {
1557 <-pc.writeLoopDone
1558 if pc.nwrite == startBytesWritten {
1559 return nothingWrittenError{err}
1560 }
1561 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %v", err)
1562 }
1563 return err
1564 }
1565
1566 func (pc *persistConn) readLoop() {
1567 closeErr := errReadLoopExiting
1568 defer func() {
1569 pc.close(closeErr)
1570 pc.t.removeIdleConn(pc)
1571 }()
1572
1573 tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
1574 if err := pc.t.tryPutIdleConn(pc); err != nil {
1575 closeErr = err
1576 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
1577 trace.PutIdleConn(err)
1578 }
1579 return false
1580 }
1581 if trace != nil && trace.PutIdleConn != nil {
1582 trace.PutIdleConn(nil)
1583 }
1584 return true
1585 }
1586
1587
1588
1589
1590 eofc := make(chan struct{})
1591 defer close(eofc)
1592
1593
1594 testHookMu.Lock()
1595 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
1596 testHookMu.Unlock()
1597
1598 alive := true
1599 for alive {
1600 pc.readLimit = pc.maxHeaderResponseSize()
1601 _, err := pc.br.Peek(1)
1602
1603 pc.mu.Lock()
1604 if pc.numExpectedResponses == 0 {
1605 pc.readLoopPeekFailLocked(err)
1606 pc.mu.Unlock()
1607 return
1608 }
1609 pc.mu.Unlock()
1610
1611 rc := <-pc.reqch
1612 trace := httptrace.ContextClientTrace(rc.req.Context())
1613
1614 var resp *Response
1615 if err == nil {
1616 resp, err = pc.readResponse(rc, trace)
1617 } else {
1618 err = transportReadFromServerError{err}
1619 closeErr = err
1620 }
1621
1622 if err != nil {
1623 if pc.readLimit <= 0 {
1624 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
1625 }
1626
1627 select {
1628 case rc.ch <- responseAndError{err: err}:
1629 case <-rc.callerGone:
1630 return
1631 }
1632 return
1633 }
1634 pc.readLimit = maxInt64
1635
1636 pc.mu.Lock()
1637 pc.numExpectedResponses--
1638 pc.mu.Unlock()
1639
1640 hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
1641
1642 if resp.Close || rc.req.Close || resp.StatusCode <= 199 {
1643
1644
1645
1646 alive = false
1647 }
1648
1649 if !hasBody {
1650 pc.t.setReqCanceler(rc.req, nil)
1651
1652
1653
1654
1655
1656
1657
1658 alive = alive &&
1659 !pc.sawEOF &&
1660 pc.wroteRequest() &&
1661 tryPutIdleConn(trace)
1662
1663 select {
1664 case rc.ch <- responseAndError{res: resp}:
1665 case <-rc.callerGone:
1666 return
1667 }
1668
1669
1670
1671
1672 testHookReadLoopBeforeNextRead()
1673 continue
1674 }
1675
1676 waitForBodyRead := make(chan bool, 2)
1677 body := &bodyEOFSignal{
1678 body: resp.Body,
1679 earlyCloseFn: func() error {
1680 waitForBodyRead <- false
1681 <-eofc
1682 return nil
1683
1684 },
1685 fn: func(err error) error {
1686 isEOF := err == io.EOF
1687 waitForBodyRead <- isEOF
1688 if isEOF {
1689 <-eofc
1690 } else if err != nil {
1691 if cerr := pc.canceled(); cerr != nil {
1692 return cerr
1693 }
1694 }
1695 return err
1696 },
1697 }
1698
1699 resp.Body = body
1700 if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
1701 resp.Body = &gzipReader{body: body}
1702 resp.Header.Del("Content-Encoding")
1703 resp.Header.Del("Content-Length")
1704 resp.ContentLength = -1
1705 resp.Uncompressed = true
1706 }
1707
1708 select {
1709 case rc.ch <- responseAndError{res: resp}:
1710 case <-rc.callerGone:
1711 return
1712 }
1713
1714
1715
1716
1717 select {
1718 case bodyEOF := <-waitForBodyRead:
1719 pc.t.setReqCanceler(rc.req, nil)
1720 alive = alive &&
1721 bodyEOF &&
1722 !pc.sawEOF &&
1723 pc.wroteRequest() &&
1724 tryPutIdleConn(trace)
1725 if bodyEOF {
1726 eofc <- struct{}{}
1727 }
1728 case <-rc.req.Cancel:
1729 alive = false
1730 pc.t.CancelRequest(rc.req)
1731 case <-rc.req.Context().Done():
1732 alive = false
1733 pc.t.cancelRequest(rc.req, rc.req.Context().Err())
1734 case <-pc.closech:
1735 alive = false
1736 }
1737
1738 testHookReadLoopBeforeNextRead()
1739 }
1740 }
1741
1742 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
1743 if pc.closed != nil {
1744 return
1745 }
1746 if n := pc.br.Buffered(); n > 0 {
1747 buf, _ := pc.br.Peek(n)
1748 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
1749 }
1750 if peekErr == io.EOF {
1751
1752 pc.closeLocked(errServerClosedIdle)
1753 } else {
1754 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr))
1755 }
1756 }
1757
1758
1759
1760
1761 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
1762 if trace != nil && trace.GotFirstResponseByte != nil {
1763 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
1764 trace.GotFirstResponseByte()
1765 }
1766 }
1767 resp, err = ReadResponse(pc.br, rc.req)
1768 if err != nil {
1769 return
1770 }
1771 if rc.continueCh != nil {
1772 if resp.StatusCode == 100 {
1773 if trace != nil && trace.Got100Continue != nil {
1774 trace.Got100Continue()
1775 }
1776 rc.continueCh <- struct{}{}
1777 } else {
1778 close(rc.continueCh)
1779 }
1780 }
1781 if resp.StatusCode == 100 {
1782 pc.readLimit = pc.maxHeaderResponseSize()
1783 resp, err = ReadResponse(pc.br, rc.req)
1784 if err != nil {
1785 return
1786 }
1787 }
1788 resp.TLS = pc.tlsState
1789 return
1790 }
1791
1792
1793
1794
1795 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
1796 if continueCh == nil {
1797 return nil
1798 }
1799 return func() bool {
1800 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
1801 defer timer.Stop()
1802
1803 select {
1804 case _, ok := <-continueCh:
1805 return ok
1806 case <-timer.C:
1807 return true
1808 case <-pc.closech:
1809 return false
1810 }
1811 }
1812 }
1813
1814
1815 type nothingWrittenError struct {
1816 error
1817 }
1818
1819 func (pc *persistConn) writeLoop() {
1820 defer close(pc.writeLoopDone)
1821 for {
1822 select {
1823 case wr := <-pc.writech:
1824 startBytesWritten := pc.nwrite
1825 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
1826 if bre, ok := err.(requestBodyReadError); ok {
1827 err = bre.error
1828
1829
1830
1831
1832
1833
1834
1835 wr.req.setError(err)
1836 }
1837 if err == nil {
1838 err = pc.bw.Flush()
1839 }
1840 if err != nil {
1841 wr.req.Request.closeBody()
1842 if pc.nwrite == startBytesWritten {
1843 err = nothingWrittenError{err}
1844 }
1845 }
1846 pc.writeErrCh <- err
1847 wr.ch <- err
1848 if err != nil {
1849 pc.close(err)
1850 return
1851 }
1852 case <-pc.closech:
1853 return
1854 }
1855 }
1856 }
1857
1858
1859
1860 func (pc *persistConn) wroteRequest() bool {
1861 select {
1862 case err := <-pc.writeErrCh:
1863
1864
1865 return err == nil
1866 default:
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877 select {
1878 case err := <-pc.writeErrCh:
1879 return err == nil
1880 case <-time.After(50 * time.Millisecond):
1881 return false
1882 }
1883 }
1884 }
1885
1886
1887
1888 type responseAndError struct {
1889 res *Response
1890 err error
1891 }
1892
1893 type requestAndChan struct {
1894 req *Request
1895 ch chan responseAndError
1896
1897
1898
1899
1900 addedGzip bool
1901
1902
1903
1904
1905
1906 continueCh chan<- struct{}
1907
1908 callerGone <-chan struct{}
1909 }
1910
1911
1912
1913
1914
1915 type writeRequest struct {
1916 req *transportRequest
1917 ch chan<- error
1918
1919
1920
1921
1922 continueCh <-chan struct{}
1923 }
1924
1925 type httpError struct {
1926 err string
1927 timeout bool
1928 }
1929
1930 func (e *httpError) Error() string { return e.err }
1931 func (e *httpError) Timeout() bool { return e.timeout }
1932 func (e *httpError) Temporary() bool { return true }
1933
1934 var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
1935 var errRequestCanceled = errors.New("net/http: request canceled")
1936 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
1937
1938 func nop() {}
1939
1940
1941 var (
1942 testHookEnterRoundTrip = nop
1943 testHookWaitResLoop = nop
1944 testHookRoundTripRetried = nop
1945 testHookPrePendingDial = nop
1946 testHookPostPendingDial = nop
1947
1948 testHookMu sync.Locker = fakeLocker{}
1949 testHookReadLoopBeforeNextRead = nop
1950 )
1951
1952 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
1953 testHookEnterRoundTrip()
1954 if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
1955 pc.t.putOrCloseIdleConn(pc)
1956 return nil, errRequestCanceled
1957 }
1958 pc.mu.Lock()
1959 pc.numExpectedResponses++
1960 headerFn := pc.mutateHeaderFunc
1961 pc.mu.Unlock()
1962
1963 if headerFn != nil {
1964 headerFn(req.extraHeaders())
1965 }
1966
1967
1968
1969
1970
1971 requestedGzip := false
1972 if !pc.t.DisableCompression &&
1973 req.Header.Get("Accept-Encoding") == "" &&
1974 req.Header.Get("Range") == "" &&
1975 req.Method != "HEAD" {
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988 requestedGzip = true
1989 req.extraHeaders().Set("Accept-Encoding", "gzip")
1990 }
1991
1992 var continueCh chan struct{}
1993 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
1994 continueCh = make(chan struct{}, 1)
1995 }
1996
1997 if pc.t.DisableKeepAlives {
1998 req.extraHeaders().Set("Connection", "close")
1999 }
2000
2001 gone := make(chan struct{})
2002 defer close(gone)
2003
2004 defer func() {
2005 if err != nil {
2006 pc.t.setReqCanceler(req.Request, nil)
2007 }
2008 }()
2009
2010 const debugRoundTrip = false
2011
2012
2013
2014
2015 startBytesWritten := pc.nwrite
2016 writeErrCh := make(chan error, 1)
2017 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2018
2019 resc := make(chan responseAndError)
2020 pc.reqch <- requestAndChan{
2021 req: req.Request,
2022 ch: resc,
2023 addedGzip: requestedGzip,
2024 continueCh: continueCh,
2025 callerGone: gone,
2026 }
2027
2028 var respHeaderTimer <-chan time.Time
2029 cancelChan := req.Request.Cancel
2030 ctxDoneChan := req.Context().Done()
2031 for {
2032 testHookWaitResLoop()
2033 select {
2034 case err := <-writeErrCh:
2035 if debugRoundTrip {
2036 req.logf("writeErrCh resv: %T/%#v", err, err)
2037 }
2038 if err != nil {
2039 pc.close(fmt.Errorf("write error: %v", err))
2040 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2041 }
2042 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2043 if debugRoundTrip {
2044 req.logf("starting timer for %v", d)
2045 }
2046 timer := time.NewTimer(d)
2047 defer timer.Stop()
2048 respHeaderTimer = timer.C
2049 }
2050 case <-pc.closech:
2051 if debugRoundTrip {
2052 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2053 }
2054 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2055 case <-respHeaderTimer:
2056 if debugRoundTrip {
2057 req.logf("timeout waiting for response headers.")
2058 }
2059 pc.close(errTimeout)
2060 return nil, errTimeout
2061 case re := <-resc:
2062 if (re.res == nil) == (re.err == nil) {
2063 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2064 }
2065 if debugRoundTrip {
2066 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2067 }
2068 if re.err != nil {
2069 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2070 }
2071 return re.res, nil
2072 case <-cancelChan:
2073 pc.t.CancelRequest(req.Request)
2074 cancelChan = nil
2075 case <-ctxDoneChan:
2076 pc.t.cancelRequest(req.Request, req.Context().Err())
2077 cancelChan = nil
2078 ctxDoneChan = nil
2079 }
2080 }
2081 }
2082
2083
2084
2085 type tLogKey struct{}
2086
2087 func (tr *transportRequest) logf(format string, args ...interface{}) {
2088 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...interface{})); ok {
2089 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2090 }
2091 }
2092
2093
2094
2095 func (pc *persistConn) markReused() {
2096 pc.mu.Lock()
2097 pc.reused = true
2098 pc.mu.Unlock()
2099 }
2100
2101
2102
2103
2104
2105
2106 func (pc *persistConn) close(err error) {
2107 pc.mu.Lock()
2108 defer pc.mu.Unlock()
2109 pc.closeLocked(err)
2110 }
2111
2112 func (pc *persistConn) closeLocked(err error) {
2113 if err == nil {
2114 panic("nil error")
2115 }
2116 pc.broken = true
2117 if pc.closed == nil {
2118 pc.closed = err
2119 if pc.alt != nil {
2120
2121
2122
2123
2124
2125
2126
2127 } else {
2128 pc.conn.Close()
2129 close(pc.closech)
2130 }
2131 }
2132 pc.mutateHeaderFunc = nil
2133 }
2134
2135 var portMap = map[string]string{
2136 "http": "80",
2137 "https": "443",
2138 "socks5": "1080",
2139 }
2140
2141
2142 func canonicalAddr(url *url.URL) string {
2143 addr := url.Hostname()
2144 if v, err := idnaASCII(addr); err == nil {
2145 addr = v
2146 }
2147 port := url.Port()
2148 if port == "" {
2149 port = portMap[url.Scheme]
2150 }
2151 return net.JoinHostPort(addr, port)
2152 }
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165 type bodyEOFSignal struct {
2166 body io.ReadCloser
2167 mu sync.Mutex
2168 closed bool
2169 rerr error
2170 fn func(error) error
2171 earlyCloseFn func() error
2172 }
2173
2174 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2175
2176 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2177 es.mu.Lock()
2178 closed, rerr := es.closed, es.rerr
2179 es.mu.Unlock()
2180 if closed {
2181 return 0, errReadOnClosedResBody
2182 }
2183 if rerr != nil {
2184 return 0, rerr
2185 }
2186
2187 n, err = es.body.Read(p)
2188 if err != nil {
2189 es.mu.Lock()
2190 defer es.mu.Unlock()
2191 if es.rerr == nil {
2192 es.rerr = err
2193 }
2194 err = es.condfn(err)
2195 }
2196 return
2197 }
2198
2199 func (es *bodyEOFSignal) Close() error {
2200 es.mu.Lock()
2201 defer es.mu.Unlock()
2202 if es.closed {
2203 return nil
2204 }
2205 es.closed = true
2206 if es.earlyCloseFn != nil && es.rerr != io.EOF {
2207 return es.earlyCloseFn()
2208 }
2209 err := es.body.Close()
2210 return es.condfn(err)
2211 }
2212
2213
2214 func (es *bodyEOFSignal) condfn(err error) error {
2215 if es.fn == nil {
2216 return err
2217 }
2218 err = es.fn(err)
2219 es.fn = nil
2220 return err
2221 }
2222
2223
2224
2225 type gzipReader struct {
2226 body *bodyEOFSignal
2227 zr *gzip.Reader
2228 zerr error
2229 }
2230
2231 func (gz *gzipReader) Read(p []byte) (n int, err error) {
2232 if gz.zr == nil {
2233 if gz.zerr == nil {
2234 gz.zr, gz.zerr = gzip.NewReader(gz.body)
2235 }
2236 if gz.zerr != nil {
2237 return 0, gz.zerr
2238 }
2239 }
2240
2241 gz.body.mu.Lock()
2242 if gz.body.closed {
2243 err = errReadOnClosedResBody
2244 }
2245 gz.body.mu.Unlock()
2246
2247 if err != nil {
2248 return 0, err
2249 }
2250 return gz.zr.Read(p)
2251 }
2252
2253 func (gz *gzipReader) Close() error {
2254 return gz.body.Close()
2255 }
2256
2257 type readerAndCloser struct {
2258 io.Reader
2259 io.Closer
2260 }
2261
2262 type tlsHandshakeTimeoutError struct{}
2263
2264 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
2265 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
2266 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
2267
2268
2269
2270
2271 type fakeLocker struct{}
2272
2273 func (fakeLocker) Lock() {}
2274 func (fakeLocker) Unlock() {}
2275
2276
2277
2278
2279 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
2280 if cfg == nil {
2281 return &tls.Config{}
2282 }
2283 return cfg.Clone()
2284 }
2285
2286 type connLRU struct {
2287 ll *list.List
2288 m map[*persistConn]*list.Element
2289 }
2290
2291
2292 func (cl *connLRU) add(pc *persistConn) {
2293 if cl.ll == nil {
2294 cl.ll = list.New()
2295 cl.m = make(map[*persistConn]*list.Element)
2296 }
2297 ele := cl.ll.PushFront(pc)
2298 if _, ok := cl.m[pc]; ok {
2299 panic("persistConn was already in LRU")
2300 }
2301 cl.m[pc] = ele
2302 }
2303
2304 func (cl *connLRU) removeOldest() *persistConn {
2305 ele := cl.ll.Back()
2306 pc := ele.Value.(*persistConn)
2307 cl.ll.Remove(ele)
2308 delete(cl.m, pc)
2309 return pc
2310 }
2311
2312
2313 func (cl *connLRU) remove(pc *persistConn) {
2314 if ele, ok := cl.m[pc]; ok {
2315 cl.ll.Remove(ele)
2316 delete(cl.m, pc)
2317 }
2318 }
2319
2320
2321 func (cl *connLRU) len() int {
2322 return len(cl.m)
2323 }
2324
2325
2326
2327
2328 func validPort(p string) bool {
2329 for _, r := range []byte(p) {
2330 if r < '0' || r > '9' {
2331 return false
2332 }
2333 }
2334 return true
2335 }
2336
View as plain text