1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package sql
17
18 import (
19 "context"
20 "database/sql/driver"
21 "errors"
22 "fmt"
23 "io"
24 "reflect"
25 "runtime"
26 "sort"
27 "sync"
28 "sync/atomic"
29 "time"
30 )
31
32 var (
33 driversMu sync.RWMutex
34 drivers = make(map[string]driver.Driver)
35 )
36
37
38 var nowFunc = time.Now
39
40
41
42
43 func Register(name string, driver driver.Driver) {
44 driversMu.Lock()
45 defer driversMu.Unlock()
46 if driver == nil {
47 panic("sql: Register driver is nil")
48 }
49 if _, dup := drivers[name]; dup {
50 panic("sql: Register called twice for driver " + name)
51 }
52 drivers[name] = driver
53 }
54
55 func unregisterAllDrivers() {
56 driversMu.Lock()
57 defer driversMu.Unlock()
58
59 drivers = make(map[string]driver.Driver)
60 }
61
62
63 func Drivers() []string {
64 driversMu.RLock()
65 defer driversMu.RUnlock()
66 var list []string
67 for name := range drivers {
68 list = append(list, name)
69 }
70 sort.Strings(list)
71 return list
72 }
73
74
75
76
77
78
79
80 type NamedArg struct {
81 _Named_Fields_Required struct{}
82
83
84
85
86
87
88
89 Name string
90
91
92
93
94 Value interface{}
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109 func Named(name string, value interface{}) NamedArg {
110
111
112
113
114 return NamedArg{Name: name, Value: value}
115 }
116
117
118 type IsolationLevel int
119
120
121
122
123
124 const (
125 LevelDefault IsolationLevel = iota
126 LevelReadUncommitted
127 LevelReadCommitted
128 LevelWriteCommitted
129 LevelRepeatableRead
130 LevelSnapshot
131 LevelSerializable
132 LevelLinearizable
133 )
134
135
136 type TxOptions struct {
137
138
139 Isolation IsolationLevel
140 ReadOnly bool
141 }
142
143
144
145
146 type RawBytes []byte
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 type NullString struct {
162 String string
163 Valid bool
164 }
165
166
167 func (ns *NullString) Scan(value interface{}) error {
168 if value == nil {
169 ns.String, ns.Valid = "", false
170 return nil
171 }
172 ns.Valid = true
173 return convertAssign(&ns.String, value)
174 }
175
176
177 func (ns NullString) Value() (driver.Value, error) {
178 if !ns.Valid {
179 return nil, nil
180 }
181 return ns.String, nil
182 }
183
184
185
186
187 type NullInt64 struct {
188 Int64 int64
189 Valid bool
190 }
191
192
193 func (n *NullInt64) Scan(value interface{}) error {
194 if value == nil {
195 n.Int64, n.Valid = 0, false
196 return nil
197 }
198 n.Valid = true
199 return convertAssign(&n.Int64, value)
200 }
201
202
203 func (n NullInt64) Value() (driver.Value, error) {
204 if !n.Valid {
205 return nil, nil
206 }
207 return n.Int64, nil
208 }
209
210
211
212
213 type NullFloat64 struct {
214 Float64 float64
215 Valid bool
216 }
217
218
219 func (n *NullFloat64) Scan(value interface{}) error {
220 if value == nil {
221 n.Float64, n.Valid = 0, false
222 return nil
223 }
224 n.Valid = true
225 return convertAssign(&n.Float64, value)
226 }
227
228
229 func (n NullFloat64) Value() (driver.Value, error) {
230 if !n.Valid {
231 return nil, nil
232 }
233 return n.Float64, nil
234 }
235
236
237
238
239 type NullBool struct {
240 Bool bool
241 Valid bool
242 }
243
244
245 func (n *NullBool) Scan(value interface{}) error {
246 if value == nil {
247 n.Bool, n.Valid = false, false
248 return nil
249 }
250 n.Valid = true
251 return convertAssign(&n.Bool, value)
252 }
253
254
255 func (n NullBool) Value() (driver.Value, error) {
256 if !n.Valid {
257 return nil, nil
258 }
259 return n.Bool, nil
260 }
261
262
263 type Scanner interface {
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278 Scan(src interface{}) error
279 }
280
281
282
283
284
285
286
287
288
289 type Out struct {
290 _Named_Fields_Required struct{}
291
292
293
294 Dest interface{}
295
296
297
298
299 In bool
300 }
301
302
303
304
305 var ErrNoRows = errors.New("sql: no rows in result set")
306
307
308
309
310
311
312
313
314
315
316
317
318
319 type DB struct {
320 connector driver.Connector
321
322
323
324 numClosed uint64
325
326 mu sync.Mutex
327 freeConn []*driverConn
328 connRequests map[uint64]chan connRequest
329 nextRequest uint64
330 numOpen int
331
332
333
334
335
336 openerCh chan struct{}
337 resetterCh chan *driverConn
338 closed bool
339 dep map[finalCloser]depSet
340 lastPut map[*driverConn]string
341 maxIdle int
342 maxOpen int
343 maxLifetime time.Duration
344 cleanerCh chan struct{}
345
346 stop func()
347 }
348
349
350 type connReuseStrategy uint8
351
352 const (
353
354 alwaysNewConn connReuseStrategy = iota
355
356
357
358 cachedOrNewConn
359 )
360
361
362
363
364
365 type driverConn struct {
366 db *DB
367 createdAt time.Time
368
369 sync.Mutex
370 ci driver.Conn
371 closed bool
372 finalClosed bool
373 openStmt map[*driverStmt]bool
374 lastErr error
375
376
377 inUse bool
378 onPut []func()
379 dbmuClosed bool
380 }
381
382 func (dc *driverConn) releaseConn(err error) {
383 dc.db.putConn(dc, err, true)
384 }
385
386 func (dc *driverConn) removeOpenStmt(ds *driverStmt) {
387 dc.Lock()
388 defer dc.Unlock()
389 delete(dc.openStmt, ds)
390 }
391
392 func (dc *driverConn) expired(timeout time.Duration) bool {
393 if timeout <= 0 {
394 return false
395 }
396 return dc.createdAt.Add(timeout).Before(nowFunc())
397 }
398
399
400
401 func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) {
402 si, err := ctxDriverPrepare(ctx, dc.ci, query)
403 if err != nil {
404 return nil, err
405 }
406 ds := &driverStmt{Locker: dc, si: si}
407
408
409 if cg != nil {
410 return ds, nil
411 }
412
413
414
415
416
417 if dc.openStmt == nil {
418 dc.openStmt = make(map[*driverStmt]bool)
419 }
420 dc.openStmt[ds] = true
421 return ds, nil
422 }
423
424
425
426
427
428
429 func (dc *driverConn) resetSession(ctx context.Context) {
430 defer dc.Unlock()
431 if dc.closed {
432 return
433 }
434 dc.lastErr = dc.ci.(driver.SessionResetter).ResetSession(ctx)
435 }
436
437
438 func (dc *driverConn) closeDBLocked() func() error {
439 dc.Lock()
440 defer dc.Unlock()
441 if dc.closed {
442 return func() error { return errors.New("sql: duplicate driverConn close") }
443 }
444 dc.closed = true
445 return dc.db.removeDepLocked(dc, dc)
446 }
447
448 func (dc *driverConn) Close() error {
449 dc.Lock()
450 if dc.closed {
451 dc.Unlock()
452 return errors.New("sql: duplicate driverConn close")
453 }
454 dc.closed = true
455 dc.Unlock()
456
457
458 dc.db.mu.Lock()
459 dc.dbmuClosed = true
460 fn := dc.db.removeDepLocked(dc, dc)
461 dc.db.mu.Unlock()
462 return fn()
463 }
464
465 func (dc *driverConn) finalClose() error {
466 var err error
467
468
469
470 var openStmt []*driverStmt
471 withLock(dc, func() {
472 openStmt = make([]*driverStmt, 0, len(dc.openStmt))
473 for ds := range dc.openStmt {
474 openStmt = append(openStmt, ds)
475 }
476 dc.openStmt = nil
477 })
478 for _, ds := range openStmt {
479 ds.Close()
480 }
481 withLock(dc, func() {
482 dc.finalClosed = true
483 err = dc.ci.Close()
484 dc.ci = nil
485 })
486
487 dc.db.mu.Lock()
488 dc.db.numOpen--
489 dc.db.maybeOpenNewConnections()
490 dc.db.mu.Unlock()
491
492 atomic.AddUint64(&dc.db.numClosed, 1)
493 return err
494 }
495
496
497
498
499 type driverStmt struct {
500 sync.Locker
501 si driver.Stmt
502 closed bool
503 closeErr error
504 }
505
506
507
508 func (ds *driverStmt) Close() error {
509 ds.Lock()
510 defer ds.Unlock()
511 if ds.closed {
512 return ds.closeErr
513 }
514 ds.closed = true
515 ds.closeErr = ds.si.Close()
516 return ds.closeErr
517 }
518
519
520 type depSet map[interface{}]bool
521
522
523
524 type finalCloser interface {
525
526
527 finalClose() error
528 }
529
530
531
532 func (db *DB) addDep(x finalCloser, dep interface{}) {
533
534 db.mu.Lock()
535 defer db.mu.Unlock()
536 db.addDepLocked(x, dep)
537 }
538
539 func (db *DB) addDepLocked(x finalCloser, dep interface{}) {
540 if db.dep == nil {
541 db.dep = make(map[finalCloser]depSet)
542 }
543 xdep := db.dep[x]
544 if xdep == nil {
545 xdep = make(depSet)
546 db.dep[x] = xdep
547 }
548 xdep[dep] = true
549 }
550
551
552
553
554
555 func (db *DB) removeDep(x finalCloser, dep interface{}) error {
556 db.mu.Lock()
557 fn := db.removeDepLocked(x, dep)
558 db.mu.Unlock()
559 return fn()
560 }
561
562 func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error {
563
564
565 xdep, ok := db.dep[x]
566 if !ok {
567 panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
568 }
569
570 l0 := len(xdep)
571 delete(xdep, dep)
572
573 switch len(xdep) {
574 case l0:
575
576 panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
577 case 0:
578
579 delete(db.dep, x)
580 return x.finalClose
581 default:
582
583 return func() error { return nil }
584 }
585 }
586
587
588
589
590
591
592 var connectionRequestQueueSize = 1000000
593
594 type dsnConnector struct {
595 dsn string
596 driver driver.Driver
597 }
598
599 func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
600 return t.driver.Open(t.dsn)
601 }
602
603 func (t dsnConnector) Driver() driver.Driver {
604 return t.driver
605 }
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623 func OpenDB(c driver.Connector) *DB {
624 ctx, cancel := context.WithCancel(context.Background())
625 db := &DB{
626 connector: c,
627 openerCh: make(chan struct{}, connectionRequestQueueSize),
628 resetterCh: make(chan *driverConn, 50),
629 lastPut: make(map[*driverConn]string),
630 connRequests: make(map[uint64]chan connRequest),
631 stop: cancel,
632 }
633
634 go db.connectionOpener(ctx)
635 go db.connectionResetter(ctx)
636
637 return db
638 }
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657 func Open(driverName, dataSourceName string) (*DB, error) {
658 driversMu.RLock()
659 driveri, ok := drivers[driverName]
660 driversMu.RUnlock()
661 if !ok {
662 return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
663 }
664
665 if driverCtx, ok := driveri.(driver.DriverContext); ok {
666 connector, err := driverCtx.OpenConnector(dataSourceName)
667 if err != nil {
668 return nil, err
669 }
670 return OpenDB(connector), nil
671 }
672
673 return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
674 }
675
676 func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
677 var err error
678 if pinger, ok := dc.ci.(driver.Pinger); ok {
679 withLock(dc, func() {
680 err = pinger.Ping(ctx)
681 })
682 }
683 release(err)
684 return err
685 }
686
687
688
689 func (db *DB) PingContext(ctx context.Context) error {
690 var dc *driverConn
691 var err error
692
693 for i := 0; i < maxBadConnRetries; i++ {
694 dc, err = db.conn(ctx, cachedOrNewConn)
695 if err != driver.ErrBadConn {
696 break
697 }
698 }
699 if err == driver.ErrBadConn {
700 dc, err = db.conn(ctx, alwaysNewConn)
701 }
702 if err != nil {
703 return err
704 }
705
706 return db.pingDC(ctx, dc, dc.releaseConn)
707 }
708
709
710
711 func (db *DB) Ping() error {
712 return db.PingContext(context.Background())
713 }
714
715
716
717
718
719 func (db *DB) Close() error {
720 db.mu.Lock()
721 if db.closed {
722 db.mu.Unlock()
723 return nil
724 }
725 if db.cleanerCh != nil {
726 close(db.cleanerCh)
727 }
728 var err error
729 fns := make([]func() error, 0, len(db.freeConn))
730 for _, dc := range db.freeConn {
731 fns = append(fns, dc.closeDBLocked())
732 }
733 db.freeConn = nil
734 db.closed = true
735 for _, req := range db.connRequests {
736 close(req)
737 }
738 db.mu.Unlock()
739 for _, fn := range fns {
740 err1 := fn()
741 if err1 != nil {
742 err = err1
743 }
744 }
745 db.stop()
746 return err
747 }
748
749 const defaultMaxIdleConns = 2
750
751 func (db *DB) maxIdleConnsLocked() int {
752 n := db.maxIdle
753 switch {
754 case n == 0:
755
756 return defaultMaxIdleConns
757 case n < 0:
758 return 0
759 default:
760 return n
761 }
762 }
763
764
765
766
767
768
769
770
771 func (db *DB) SetMaxIdleConns(n int) {
772 db.mu.Lock()
773 if n > 0 {
774 db.maxIdle = n
775 } else {
776
777 db.maxIdle = -1
778 }
779
780 if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen {
781 db.maxIdle = db.maxOpen
782 }
783 var closing []*driverConn
784 idleCount := len(db.freeConn)
785 maxIdle := db.maxIdleConnsLocked()
786 if idleCount > maxIdle {
787 closing = db.freeConn[maxIdle:]
788 db.freeConn = db.freeConn[:maxIdle]
789 }
790 db.mu.Unlock()
791 for _, c := range closing {
792 c.Close()
793 }
794 }
795
796
797
798
799
800
801
802
803
804 func (db *DB) SetMaxOpenConns(n int) {
805 db.mu.Lock()
806 db.maxOpen = n
807 if n < 0 {
808 db.maxOpen = 0
809 }
810 syncMaxIdle := db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen
811 db.mu.Unlock()
812 if syncMaxIdle {
813 db.SetMaxIdleConns(n)
814 }
815 }
816
817
818
819
820
821
822 func (db *DB) SetConnMaxLifetime(d time.Duration) {
823 if d < 0 {
824 d = 0
825 }
826 db.mu.Lock()
827
828 if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
829 select {
830 case db.cleanerCh <- struct{}{}:
831 default:
832 }
833 }
834 db.maxLifetime = d
835 db.startCleanerLocked()
836 db.mu.Unlock()
837 }
838
839
840 func (db *DB) startCleanerLocked() {
841 if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
842 db.cleanerCh = make(chan struct{}, 1)
843 go db.connectionCleaner(db.maxLifetime)
844 }
845 }
846
847 func (db *DB) connectionCleaner(d time.Duration) {
848 const minInterval = time.Second
849
850 if d < minInterval {
851 d = minInterval
852 }
853 t := time.NewTimer(d)
854
855 for {
856 select {
857 case <-t.C:
858 case <-db.cleanerCh:
859 }
860
861 db.mu.Lock()
862 d = db.maxLifetime
863 if db.closed || db.numOpen == 0 || d <= 0 {
864 db.cleanerCh = nil
865 db.mu.Unlock()
866 return
867 }
868
869 expiredSince := nowFunc().Add(-d)
870 var closing []*driverConn
871 for i := 0; i < len(db.freeConn); i++ {
872 c := db.freeConn[i]
873 if c.createdAt.Before(expiredSince) {
874 closing = append(closing, c)
875 last := len(db.freeConn) - 1
876 db.freeConn[i] = db.freeConn[last]
877 db.freeConn[last] = nil
878 db.freeConn = db.freeConn[:last]
879 i--
880 }
881 }
882 db.mu.Unlock()
883
884 for _, c := range closing {
885 c.Close()
886 }
887
888 if d < minInterval {
889 d = minInterval
890 }
891 t.Reset(d)
892 }
893 }
894
895
896 type DBStats struct {
897
898 OpenConnections int
899 }
900
901
902 func (db *DB) Stats() DBStats {
903 db.mu.Lock()
904 stats := DBStats{
905 OpenConnections: db.numOpen,
906 }
907 db.mu.Unlock()
908 return stats
909 }
910
911
912
913
914 func (db *DB) maybeOpenNewConnections() {
915 numRequests := len(db.connRequests)
916 if db.maxOpen > 0 {
917 numCanOpen := db.maxOpen - db.numOpen
918 if numRequests > numCanOpen {
919 numRequests = numCanOpen
920 }
921 }
922 for numRequests > 0 {
923 db.numOpen++
924 numRequests--
925 if db.closed {
926 return
927 }
928 db.openerCh <- struct{}{}
929 }
930 }
931
932
933 func (db *DB) connectionOpener(ctx context.Context) {
934 for {
935 select {
936 case <-ctx.Done():
937 return
938 case <-db.openerCh:
939 db.openNewConnection(ctx)
940 }
941 }
942 }
943
944
945
946 func (db *DB) connectionResetter(ctx context.Context) {
947 for {
948 select {
949 case <-ctx.Done():
950 close(db.resetterCh)
951 for dc := range db.resetterCh {
952 dc.Unlock()
953 }
954 return
955 case dc := <-db.resetterCh:
956 dc.resetSession(ctx)
957 }
958 }
959 }
960
961
962 func (db *DB) openNewConnection(ctx context.Context) {
963
964
965
966 ci, err := db.connector.Connect(ctx)
967 db.mu.Lock()
968 defer db.mu.Unlock()
969 if db.closed {
970 if err == nil {
971 ci.Close()
972 }
973 db.numOpen--
974 return
975 }
976 if err != nil {
977 db.numOpen--
978 db.putConnDBLocked(nil, err)
979 db.maybeOpenNewConnections()
980 return
981 }
982 dc := &driverConn{
983 db: db,
984 createdAt: nowFunc(),
985 ci: ci,
986 }
987 if db.putConnDBLocked(dc, err) {
988 db.addDepLocked(dc, dc)
989 } else {
990 db.numOpen--
991 ci.Close()
992 }
993 }
994
995
996
997
998 type connRequest struct {
999 conn *driverConn
1000 err error
1001 }
1002
1003 var errDBClosed = errors.New("sql: database is closed")
1004
1005
1006
1007 func (db *DB) nextRequestKeyLocked() uint64 {
1008 next := db.nextRequest
1009 db.nextRequest++
1010 return next
1011 }
1012
1013
1014 func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
1015 db.mu.Lock()
1016 if db.closed {
1017 db.mu.Unlock()
1018 return nil, errDBClosed
1019 }
1020
1021 select {
1022 default:
1023 case <-ctx.Done():
1024 db.mu.Unlock()
1025 return nil, ctx.Err()
1026 }
1027 lifetime := db.maxLifetime
1028
1029
1030 numFree := len(db.freeConn)
1031 if strategy == cachedOrNewConn && numFree > 0 {
1032 conn := db.freeConn[0]
1033 copy(db.freeConn, db.freeConn[1:])
1034 db.freeConn = db.freeConn[:numFree-1]
1035 conn.inUse = true
1036 db.mu.Unlock()
1037 if conn.expired(lifetime) {
1038 conn.Close()
1039 return nil, driver.ErrBadConn
1040 }
1041
1042 conn.Lock()
1043 err := conn.lastErr
1044 conn.Unlock()
1045 if err == driver.ErrBadConn {
1046 conn.Close()
1047 return nil, driver.ErrBadConn
1048 }
1049 return conn, nil
1050 }
1051
1052
1053
1054 if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
1055
1056
1057 req := make(chan connRequest, 1)
1058 reqKey := db.nextRequestKeyLocked()
1059 db.connRequests[reqKey] = req
1060 db.mu.Unlock()
1061
1062
1063 select {
1064 case <-ctx.Done():
1065
1066
1067 db.mu.Lock()
1068 delete(db.connRequests, reqKey)
1069 db.mu.Unlock()
1070 select {
1071 default:
1072 case ret, ok := <-req:
1073 if ok {
1074 db.putConn(ret.conn, ret.err, false)
1075 }
1076 }
1077 return nil, ctx.Err()
1078 case ret, ok := <-req:
1079 if !ok {
1080 return nil, errDBClosed
1081 }
1082 if ret.err == nil && ret.conn.expired(lifetime) {
1083 ret.conn.Close()
1084 return nil, driver.ErrBadConn
1085 }
1086 if ret.conn == nil {
1087 return nil, ret.err
1088 }
1089
1090 ret.conn.Lock()
1091 err := ret.conn.lastErr
1092 ret.conn.Unlock()
1093 if err == driver.ErrBadConn {
1094 ret.conn.Close()
1095 return nil, driver.ErrBadConn
1096 }
1097 return ret.conn, ret.err
1098 }
1099 }
1100
1101 db.numOpen++
1102 db.mu.Unlock()
1103 ci, err := db.connector.Connect(ctx)
1104 if err != nil {
1105 db.mu.Lock()
1106 db.numOpen--
1107 db.maybeOpenNewConnections()
1108 db.mu.Unlock()
1109 return nil, err
1110 }
1111 db.mu.Lock()
1112 dc := &driverConn{
1113 db: db,
1114 createdAt: nowFunc(),
1115 ci: ci,
1116 inUse: true,
1117 }
1118 db.addDepLocked(dc, dc)
1119 db.mu.Unlock()
1120 return dc, nil
1121 }
1122
1123
1124 var putConnHook func(*DB, *driverConn)
1125
1126
1127
1128
1129 func (db *DB) noteUnusedDriverStatement(c *driverConn, ds *driverStmt) {
1130 db.mu.Lock()
1131 defer db.mu.Unlock()
1132 if c.inUse {
1133 c.onPut = append(c.onPut, func() {
1134 ds.Close()
1135 })
1136 } else {
1137 c.Lock()
1138 fc := c.finalClosed
1139 c.Unlock()
1140 if !fc {
1141 ds.Close()
1142 }
1143 }
1144 }
1145
1146
1147
1148 const debugGetPut = false
1149
1150
1151
1152 func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
1153 db.mu.Lock()
1154 if !dc.inUse {
1155 if debugGetPut {
1156 fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
1157 }
1158 panic("sql: connection returned that was never out")
1159 }
1160 if debugGetPut {
1161 db.lastPut[dc] = stack()
1162 }
1163 dc.inUse = false
1164
1165 for _, fn := range dc.onPut {
1166 fn()
1167 }
1168 dc.onPut = nil
1169
1170 if err == driver.ErrBadConn {
1171
1172
1173
1174
1175 db.maybeOpenNewConnections()
1176 db.mu.Unlock()
1177 dc.Close()
1178 return
1179 }
1180 if putConnHook != nil {
1181 putConnHook(db, dc)
1182 }
1183 if db.closed {
1184
1185
1186 resetSession = false
1187 }
1188 if resetSession {
1189 if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
1190
1191
1192
1193
1194 dc.Lock()
1195 }
1196 }
1197 added := db.putConnDBLocked(dc, nil)
1198 db.mu.Unlock()
1199
1200 if !added {
1201 if resetSession {
1202 dc.Unlock()
1203 }
1204 dc.Close()
1205 return
1206 }
1207 if !resetSession {
1208 return
1209 }
1210 select {
1211 default:
1212
1213
1214 dc.lastErr = driver.ErrBadConn
1215 dc.Unlock()
1216 case db.resetterCh <- dc:
1217 }
1218 }
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229 func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
1230 if db.closed {
1231 return false
1232 }
1233 if db.maxOpen > 0 && db.numOpen > db.maxOpen {
1234 return false
1235 }
1236 if c := len(db.connRequests); c > 0 {
1237 var req chan connRequest
1238 var reqKey uint64
1239 for reqKey, req = range db.connRequests {
1240 break
1241 }
1242 delete(db.connRequests, reqKey)
1243 if err == nil {
1244 dc.inUse = true
1245 }
1246 req <- connRequest{
1247 conn: dc,
1248 err: err,
1249 }
1250 return true
1251 } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
1252 db.freeConn = append(db.freeConn, dc)
1253 db.startCleanerLocked()
1254 return true
1255 }
1256 return false
1257 }
1258
1259
1260
1261
1262 const maxBadConnRetries = 2
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272 func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1273 var stmt *Stmt
1274 var err error
1275 for i := 0; i < maxBadConnRetries; i++ {
1276 stmt, err = db.prepare(ctx, query, cachedOrNewConn)
1277 if err != driver.ErrBadConn {
1278 break
1279 }
1280 }
1281 if err == driver.ErrBadConn {
1282 return db.prepare(ctx, query, alwaysNewConn)
1283 }
1284 return stmt, err
1285 }
1286
1287
1288
1289
1290
1291
1292 func (db *DB) Prepare(query string) (*Stmt, error) {
1293 return db.PrepareContext(context.Background(), query)
1294 }
1295
1296 func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
1297
1298
1299
1300
1301
1302
1303 dc, err := db.conn(ctx, strategy)
1304 if err != nil {
1305 return nil, err
1306 }
1307 return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)
1308 }
1309
1310
1311
1312
1313 func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
1314 var ds *driverStmt
1315 var err error
1316 defer func() {
1317 release(err)
1318 }()
1319 withLock(dc, func() {
1320 ds, err = dc.prepareLocked(ctx, cg, query)
1321 })
1322 if err != nil {
1323 return nil, err
1324 }
1325 stmt := &Stmt{
1326 db: db,
1327 query: query,
1328 cg: cg,
1329 cgds: ds,
1330 }
1331
1332
1333
1334
1335 if cg == nil {
1336 stmt.css = []connStmt{{dc, ds}}
1337 stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed)
1338 db.addDep(stmt, stmt)
1339 }
1340 return stmt, nil
1341 }
1342
1343
1344
1345 func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
1346 var res Result
1347 var err error
1348 for i := 0; i < maxBadConnRetries; i++ {
1349 res, err = db.exec(ctx, query, args, cachedOrNewConn)
1350 if err != driver.ErrBadConn {
1351 break
1352 }
1353 }
1354 if err == driver.ErrBadConn {
1355 return db.exec(ctx, query, args, alwaysNewConn)
1356 }
1357 return res, err
1358 }
1359
1360
1361
1362 func (db *DB) Exec(query string, args ...interface{}) (Result, error) {
1363 return db.ExecContext(context.Background(), query, args...)
1364 }
1365
1366 func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) {
1367 dc, err := db.conn(ctx, strategy)
1368 if err != nil {
1369 return nil, err
1370 }
1371 return db.execDC(ctx, dc, dc.releaseConn, query, args)
1372 }
1373
1374 func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) {
1375 defer func() {
1376 release(err)
1377 }()
1378 execerCtx, ok := dc.ci.(driver.ExecerContext)
1379 var execer driver.Execer
1380 if !ok {
1381 execer, ok = dc.ci.(driver.Execer)
1382 }
1383 if ok {
1384 var nvdargs []driver.NamedValue
1385 var resi driver.Result
1386 withLock(dc, func() {
1387 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1388 if err != nil {
1389 return
1390 }
1391 resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
1392 })
1393 if err != driver.ErrSkip {
1394 if err != nil {
1395 return nil, err
1396 }
1397 return driverResult{dc, resi}, nil
1398 }
1399 }
1400
1401 var si driver.Stmt
1402 withLock(dc, func() {
1403 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1404 })
1405 if err != nil {
1406 return nil, err
1407 }
1408 ds := &driverStmt{Locker: dc, si: si}
1409 defer ds.Close()
1410 return resultFromStatement(ctx, dc.ci, ds, args...)
1411 }
1412
1413
1414
1415 func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
1416 var rows *Rows
1417 var err error
1418 for i := 0; i < maxBadConnRetries; i++ {
1419 rows, err = db.query(ctx, query, args, cachedOrNewConn)
1420 if err != driver.ErrBadConn {
1421 break
1422 }
1423 }
1424 if err == driver.ErrBadConn {
1425 return db.query(ctx, query, args, alwaysNewConn)
1426 }
1427 return rows, err
1428 }
1429
1430
1431
1432 func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
1433 return db.QueryContext(context.Background(), query, args...)
1434 }
1435
1436 func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
1437 dc, err := db.conn(ctx, strategy)
1438 if err != nil {
1439 return nil, err
1440 }
1441
1442 return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
1443 }
1444
1445
1446
1447
1448
1449 func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
1450 queryerCtx, ok := dc.ci.(driver.QueryerContext)
1451 var queryer driver.Queryer
1452 if !ok {
1453 queryer, ok = dc.ci.(driver.Queryer)
1454 }
1455 if ok {
1456 var nvdargs []driver.NamedValue
1457 var rowsi driver.Rows
1458 var err error
1459 withLock(dc, func() {
1460 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1461 if err != nil {
1462 return
1463 }
1464 rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
1465 })
1466 if err != driver.ErrSkip {
1467 if err != nil {
1468 releaseConn(err)
1469 return nil, err
1470 }
1471
1472
1473 rows := &Rows{
1474 dc: dc,
1475 releaseConn: releaseConn,
1476 rowsi: rowsi,
1477 }
1478 rows.initContextClose(ctx, txctx)
1479 return rows, nil
1480 }
1481 }
1482
1483 var si driver.Stmt
1484 var err error
1485 withLock(dc, func() {
1486 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1487 })
1488 if err != nil {
1489 releaseConn(err)
1490 return nil, err
1491 }
1492
1493 ds := &driverStmt{Locker: dc, si: si}
1494 rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
1495 if err != nil {
1496 ds.Close()
1497 releaseConn(err)
1498 return nil, err
1499 }
1500
1501
1502
1503 rows := &Rows{
1504 dc: dc,
1505 releaseConn: releaseConn,
1506 rowsi: rowsi,
1507 closeStmt: ds,
1508 }
1509 rows.initContextClose(ctx, txctx)
1510 return rows, nil
1511 }
1512
1513
1514
1515
1516
1517
1518
1519 func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
1520 rows, err := db.QueryContext(ctx, query, args...)
1521 return &Row{rows: rows, err: err}
1522 }
1523
1524
1525
1526
1527
1528
1529
1530 func (db *DB) QueryRow(query string, args ...interface{}) *Row {
1531 return db.QueryRowContext(context.Background(), query, args...)
1532 }
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544 func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
1545 var tx *Tx
1546 var err error
1547 for i := 0; i < maxBadConnRetries; i++ {
1548 tx, err = db.begin(ctx, opts, cachedOrNewConn)
1549 if err != driver.ErrBadConn {
1550 break
1551 }
1552 }
1553 if err == driver.ErrBadConn {
1554 return db.begin(ctx, opts, alwaysNewConn)
1555 }
1556 return tx, err
1557 }
1558
1559
1560
1561 func (db *DB) Begin() (*Tx, error) {
1562 return db.BeginTx(context.Background(), nil)
1563 }
1564
1565 func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
1566 dc, err := db.conn(ctx, strategy)
1567 if err != nil {
1568 return nil, err
1569 }
1570 return db.beginDC(ctx, dc, dc.releaseConn, opts)
1571 }
1572
1573
1574 func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
1575 var txi driver.Tx
1576 withLock(dc, func() {
1577 txi, err = ctxDriverBegin(ctx, opts, dc.ci)
1578 })
1579 if err != nil {
1580 release(err)
1581 return nil, err
1582 }
1583
1584
1585
1586 ctx, cancel := context.WithCancel(ctx)
1587 tx = &Tx{
1588 db: db,
1589 dc: dc,
1590 releaseConn: release,
1591 txi: txi,
1592 cancel: cancel,
1593 ctx: ctx,
1594 }
1595 go tx.awaitDone()
1596 return tx, nil
1597 }
1598
1599
1600 func (db *DB) Driver() driver.Driver {
1601 return db.connector.Driver()
1602 }
1603
1604
1605
1606 var ErrConnDone = errors.New("database/sql: connection is already closed")
1607
1608
1609
1610
1611
1612
1613
1614
1615 func (db *DB) Conn(ctx context.Context) (*Conn, error) {
1616 var dc *driverConn
1617 var err error
1618 for i := 0; i < maxBadConnRetries; i++ {
1619 dc, err = db.conn(ctx, cachedOrNewConn)
1620 if err != driver.ErrBadConn {
1621 break
1622 }
1623 }
1624 if err == driver.ErrBadConn {
1625 dc, err = db.conn(ctx, cachedOrNewConn)
1626 }
1627 if err != nil {
1628 return nil, err
1629 }
1630
1631 conn := &Conn{
1632 db: db,
1633 dc: dc,
1634 }
1635 return conn, nil
1636 }
1637
1638 type releaseConn func(error)
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649 type Conn struct {
1650 db *DB
1651
1652
1653
1654
1655 closemu sync.RWMutex
1656
1657
1658
1659 dc *driverConn
1660
1661
1662
1663
1664 done int32
1665 }
1666
1667 func (c *Conn) grabConn(context.Context) (*driverConn, releaseConn, error) {
1668 if atomic.LoadInt32(&c.done) != 0 {
1669 return nil, nil, ErrConnDone
1670 }
1671 c.closemu.RLock()
1672 return c.dc, c.closemuRUnlockCondReleaseConn, nil
1673 }
1674
1675
1676 func (c *Conn) PingContext(ctx context.Context) error {
1677 dc, release, err := c.grabConn(ctx)
1678 if err != nil {
1679 return err
1680 }
1681 return c.db.pingDC(ctx, dc, release)
1682 }
1683
1684
1685
1686 func (c *Conn) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
1687 dc, release, err := c.grabConn(ctx)
1688 if err != nil {
1689 return nil, err
1690 }
1691 return c.db.execDC(ctx, dc, release, query, args)
1692 }
1693
1694
1695
1696 func (c *Conn) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
1697 dc, release, err := c.grabConn(ctx)
1698 if err != nil {
1699 return nil, err
1700 }
1701 return c.db.queryDC(ctx, nil, dc, release, query, args)
1702 }
1703
1704
1705
1706
1707
1708
1709
1710 func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
1711 rows, err := c.QueryContext(ctx, query, args...)
1712 return &Row{rows: rows, err: err}
1713 }
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723 func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1724 dc, release, err := c.grabConn(ctx)
1725 if err != nil {
1726 return nil, err
1727 }
1728 return c.db.prepareDC(ctx, dc, release, c, query)
1729 }
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741 func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
1742 dc, release, err := c.grabConn(ctx)
1743 if err != nil {
1744 return nil, err
1745 }
1746 return c.db.beginDC(ctx, dc, release, opts)
1747 }
1748
1749
1750
1751 func (c *Conn) closemuRUnlockCondReleaseConn(err error) {
1752 c.closemu.RUnlock()
1753 if err == driver.ErrBadConn {
1754 c.close(err)
1755 }
1756 }
1757
1758 func (c *Conn) txCtx() context.Context {
1759 return nil
1760 }
1761
1762 func (c *Conn) close(err error) error {
1763 if !atomic.CompareAndSwapInt32(&c.done, 0, 1) {
1764 return ErrConnDone
1765 }
1766
1767
1768
1769 c.closemu.Lock()
1770 defer c.closemu.Unlock()
1771
1772 c.dc.releaseConn(err)
1773 c.dc = nil
1774 c.db = nil
1775 return err
1776 }
1777
1778
1779
1780
1781
1782
1783 func (c *Conn) Close() error {
1784 return c.close(nil)
1785 }
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797 type Tx struct {
1798 db *DB
1799
1800
1801
1802
1803 closemu sync.RWMutex
1804
1805
1806
1807 dc *driverConn
1808 txi driver.Tx
1809
1810
1811
1812 releaseConn func(error)
1813
1814
1815
1816
1817
1818 done int32
1819
1820
1821
1822 stmts struct {
1823 sync.Mutex
1824 v []*Stmt
1825 }
1826
1827
1828 cancel func()
1829
1830
1831 ctx context.Context
1832 }
1833
1834
1835
1836 func (tx *Tx) awaitDone() {
1837
1838
1839 <-tx.ctx.Done()
1840
1841
1842
1843
1844
1845 tx.rollback(true)
1846 }
1847
1848 func (tx *Tx) isDone() bool {
1849 return atomic.LoadInt32(&tx.done) != 0
1850 }
1851
1852
1853
1854 var ErrTxDone = errors.New("sql: Transaction has already been committed or rolled back")
1855
1856
1857
1858 func (tx *Tx) close(err error) {
1859 tx.cancel()
1860
1861 tx.closemu.Lock()
1862 defer tx.closemu.Unlock()
1863
1864 tx.releaseConn(err)
1865 tx.dc = nil
1866 tx.txi = nil
1867 }
1868
1869
1870
1871 var hookTxGrabConn func()
1872
1873 func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
1874 select {
1875 default:
1876 case <-ctx.Done():
1877 return nil, nil, ctx.Err()
1878 }
1879
1880
1881
1882 tx.closemu.RLock()
1883 if tx.isDone() {
1884 tx.closemu.RUnlock()
1885 return nil, nil, ErrTxDone
1886 }
1887 if hookTxGrabConn != nil {
1888 hookTxGrabConn()
1889 }
1890 return tx.dc, tx.closemuRUnlockRelease, nil
1891 }
1892
1893 func (tx *Tx) txCtx() context.Context {
1894 return tx.ctx
1895 }
1896
1897
1898
1899
1900
1901 func (tx *Tx) closemuRUnlockRelease(error) {
1902 tx.closemu.RUnlock()
1903 }
1904
1905
1906 func (tx *Tx) closePrepared() {
1907 tx.stmts.Lock()
1908 defer tx.stmts.Unlock()
1909 for _, stmt := range tx.stmts.v {
1910 stmt.Close()
1911 }
1912 }
1913
1914
1915 func (tx *Tx) Commit() error {
1916
1917
1918
1919 select {
1920 default:
1921 case <-tx.ctx.Done():
1922 if atomic.LoadInt32(&tx.done) == 1 {
1923 return ErrTxDone
1924 }
1925 return tx.ctx.Err()
1926 }
1927 if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
1928 return ErrTxDone
1929 }
1930 var err error
1931 withLock(tx.dc, func() {
1932 err = tx.txi.Commit()
1933 })
1934 if err != driver.ErrBadConn {
1935 tx.closePrepared()
1936 }
1937 tx.close(err)
1938 return err
1939 }
1940
1941
1942
1943 func (tx *Tx) rollback(discardConn bool) error {
1944 if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
1945 return ErrTxDone
1946 }
1947 var err error
1948 withLock(tx.dc, func() {
1949 err = tx.txi.Rollback()
1950 })
1951 if err != driver.ErrBadConn {
1952 tx.closePrepared()
1953 }
1954 if discardConn {
1955 err = driver.ErrBadConn
1956 }
1957 tx.close(err)
1958 return err
1959 }
1960
1961
1962 func (tx *Tx) Rollback() error {
1963 return tx.rollback(false)
1964 }
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976 func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1977 dc, release, err := tx.grabConn(ctx)
1978 if err != nil {
1979 return nil, err
1980 }
1981
1982 stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query)
1983 if err != nil {
1984 return nil, err
1985 }
1986 tx.stmts.Lock()
1987 tx.stmts.v = append(tx.stmts.v, stmt)
1988 tx.stmts.Unlock()
1989 return stmt, nil
1990 }
1991
1992
1993
1994
1995
1996
1997
1998 func (tx *Tx) Prepare(query string) (*Stmt, error) {
1999 return tx.PrepareContext(context.Background(), query)
2000 }
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017 func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
2018 dc, release, err := tx.grabConn(ctx)
2019 if err != nil {
2020 return &Stmt{stickyErr: err}
2021 }
2022 defer release(nil)
2023
2024 if tx.db != stmt.db {
2025 return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
2026 }
2027 var si driver.Stmt
2028 var parentStmt *Stmt
2029 stmt.mu.Lock()
2030 if stmt.closed || stmt.cg != nil {
2031
2032
2033
2034
2035
2036
2037 stmt.mu.Unlock()
2038 withLock(dc, func() {
2039 si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query)
2040 })
2041 if err != nil {
2042 return &Stmt{stickyErr: err}
2043 }
2044 } else {
2045 stmt.removeClosedStmtLocked()
2046
2047
2048 for _, v := range stmt.css {
2049 if v.dc == dc {
2050 si = v.ds.si
2051 break
2052 }
2053 }
2054
2055 stmt.mu.Unlock()
2056
2057 if si == nil {
2058 var ds *driverStmt
2059 withLock(dc, func() {
2060 ds, err = stmt.prepareOnConnLocked(ctx, dc)
2061 })
2062 if err != nil {
2063 return &Stmt{stickyErr: err}
2064 }
2065 si = ds.si
2066 }
2067 parentStmt = stmt
2068 }
2069
2070 txs := &Stmt{
2071 db: tx.db,
2072 cg: tx,
2073 cgds: &driverStmt{
2074 Locker: dc,
2075 si: si,
2076 },
2077 parentStmt: parentStmt,
2078 query: stmt.query,
2079 }
2080 if parentStmt != nil {
2081 tx.db.addDep(parentStmt, txs)
2082 }
2083 tx.stmts.Lock()
2084 tx.stmts.v = append(tx.stmts.v, txs)
2085 tx.stmts.Unlock()
2086 return txs
2087 }
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101 func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
2102 return tx.StmtContext(context.Background(), stmt)
2103 }
2104
2105
2106
2107 func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
2108 dc, release, err := tx.grabConn(ctx)
2109 if err != nil {
2110 return nil, err
2111 }
2112 return tx.db.execDC(ctx, dc, release, query, args)
2113 }
2114
2115
2116
2117 func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) {
2118 return tx.ExecContext(context.Background(), query, args...)
2119 }
2120
2121
2122 func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
2123 dc, release, err := tx.grabConn(ctx)
2124 if err != nil {
2125 return nil, err
2126 }
2127
2128 return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args)
2129 }
2130
2131
2132 func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) {
2133 return tx.QueryContext(context.Background(), query, args...)
2134 }
2135
2136
2137
2138
2139
2140
2141
2142 func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
2143 rows, err := tx.QueryContext(ctx, query, args...)
2144 return &Row{rows: rows, err: err}
2145 }
2146
2147
2148
2149
2150
2151
2152
2153 func (tx *Tx) QueryRow(query string, args ...interface{}) *Row {
2154 return tx.QueryRowContext(context.Background(), query, args...)
2155 }
2156
2157
2158 type connStmt struct {
2159 dc *driverConn
2160 ds *driverStmt
2161 }
2162
2163
2164
2165 type stmtConnGrabber interface {
2166
2167
2168 grabConn(context.Context) (*driverConn, releaseConn, error)
2169
2170
2171
2172
2173 txCtx() context.Context
2174 }
2175
2176 var (
2177 _ stmtConnGrabber = &Tx{}
2178 _ stmtConnGrabber = &Conn{}
2179 )
2180
2181
2182
2183 type Stmt struct {
2184
2185 db *DB
2186 query string
2187 stickyErr error
2188
2189 closemu sync.RWMutex
2190
2191
2192
2193
2194
2195
2196 cg stmtConnGrabber
2197 cgds *driverStmt
2198
2199
2200
2201
2202
2203
2204
2205 parentStmt *Stmt
2206
2207 mu sync.Mutex
2208 closed bool
2209
2210
2211
2212
2213
2214 css []connStmt
2215
2216
2217
2218 lastNumClosed uint64
2219 }
2220
2221
2222
2223 func (s *Stmt) ExecContext(ctx context.Context, args ...interface{}) (Result, error) {
2224 s.closemu.RLock()
2225 defer s.closemu.RUnlock()
2226
2227 var res Result
2228 strategy := cachedOrNewConn
2229 for i := 0; i < maxBadConnRetries+1; i++ {
2230 if i == maxBadConnRetries {
2231 strategy = alwaysNewConn
2232 }
2233 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2234 if err != nil {
2235 if err == driver.ErrBadConn {
2236 continue
2237 }
2238 return nil, err
2239 }
2240
2241 res, err = resultFromStatement(ctx, dc.ci, ds, args...)
2242 releaseConn(err)
2243 if err != driver.ErrBadConn {
2244 return res, err
2245 }
2246 }
2247 return nil, driver.ErrBadConn
2248 }
2249
2250
2251
2252 func (s *Stmt) Exec(args ...interface{}) (Result, error) {
2253 return s.ExecContext(context.Background(), args...)
2254 }
2255
2256 func resultFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...interface{}) (Result, error) {
2257 ds.Lock()
2258 defer ds.Unlock()
2259
2260 dargs, err := driverArgsConnLocked(ci, ds, args)
2261 if err != nil {
2262 return nil, err
2263 }
2264
2265
2266
2267
2268 if want := ds.si.NumInput(); want >= 0 && want != len(dargs) {
2269 return nil, fmt.Errorf("sql: statement expects %d inputs; got %d", want, len(dargs))
2270 }
2271
2272 resi, err := ctxDriverStmtExec(ctx, ds.si, dargs)
2273 if err != nil {
2274 return nil, err
2275 }
2276 return driverResult{ds.Locker, resi}, nil
2277 }
2278
2279
2280
2281
2282
2283 func (s *Stmt) removeClosedStmtLocked() {
2284 t := len(s.css)/2 + 1
2285 if t > 10 {
2286 t = 10
2287 }
2288 dbClosed := atomic.LoadUint64(&s.db.numClosed)
2289 if dbClosed-s.lastNumClosed < uint64(t) {
2290 return
2291 }
2292
2293 s.db.mu.Lock()
2294 for i := 0; i < len(s.css); i++ {
2295 if s.css[i].dc.dbmuClosed {
2296 s.css[i] = s.css[len(s.css)-1]
2297 s.css = s.css[:len(s.css)-1]
2298 i--
2299 }
2300 }
2301 s.db.mu.Unlock()
2302 s.lastNumClosed = dbClosed
2303 }
2304
2305
2306
2307
2308 func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
2309 if err = s.stickyErr; err != nil {
2310 return
2311 }
2312 s.mu.Lock()
2313 if s.closed {
2314 s.mu.Unlock()
2315 err = errors.New("sql: statement is closed")
2316 return
2317 }
2318
2319
2320
2321 if s.cg != nil {
2322 s.mu.Unlock()
2323 dc, releaseConn, err = s.cg.grabConn(ctx)
2324 if err != nil {
2325 return
2326 }
2327 return dc, releaseConn, s.cgds, nil
2328 }
2329
2330 s.removeClosedStmtLocked()
2331 s.mu.Unlock()
2332
2333 dc, err = s.db.conn(ctx, strategy)
2334 if err != nil {
2335 return nil, nil, nil, err
2336 }
2337
2338 s.mu.Lock()
2339 for _, v := range s.css {
2340 if v.dc == dc {
2341 s.mu.Unlock()
2342 return dc, dc.releaseConn, v.ds, nil
2343 }
2344 }
2345 s.mu.Unlock()
2346
2347
2348 withLock(dc, func() {
2349 ds, err = s.prepareOnConnLocked(ctx, dc)
2350 })
2351 if err != nil {
2352 dc.releaseConn(err)
2353 return nil, nil, nil, err
2354 }
2355
2356 return dc, dc.releaseConn, ds, nil
2357 }
2358
2359
2360
2361 func (s *Stmt) prepareOnConnLocked(ctx context.Context, dc *driverConn) (*driverStmt, error) {
2362 si, err := dc.prepareLocked(ctx, s.cg, s.query)
2363 if err != nil {
2364 return nil, err
2365 }
2366 cs := connStmt{dc, si}
2367 s.mu.Lock()
2368 s.css = append(s.css, cs)
2369 s.mu.Unlock()
2370 return cs.ds, nil
2371 }
2372
2373
2374
2375 func (s *Stmt) QueryContext(ctx context.Context, args ...interface{}) (*Rows, error) {
2376 s.closemu.RLock()
2377 defer s.closemu.RUnlock()
2378
2379 var rowsi driver.Rows
2380 strategy := cachedOrNewConn
2381 for i := 0; i < maxBadConnRetries+1; i++ {
2382 if i == maxBadConnRetries {
2383 strategy = alwaysNewConn
2384 }
2385 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2386 if err != nil {
2387 if err == driver.ErrBadConn {
2388 continue
2389 }
2390 return nil, err
2391 }
2392
2393 rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...)
2394 if err == nil {
2395
2396
2397 rows := &Rows{
2398 dc: dc,
2399 rowsi: rowsi,
2400
2401 }
2402
2403
2404 s.db.addDep(s, rows)
2405
2406
2407
2408 rows.releaseConn = func(err error) {
2409 releaseConn(err)
2410 s.db.removeDep(s, rows)
2411 }
2412 var txctx context.Context
2413 if s.cg != nil {
2414 txctx = s.cg.txCtx()
2415 }
2416 rows.initContextClose(ctx, txctx)
2417 return rows, nil
2418 }
2419
2420 releaseConn(err)
2421 if err != driver.ErrBadConn {
2422 return nil, err
2423 }
2424 }
2425 return nil, driver.ErrBadConn
2426 }
2427
2428
2429
2430 func (s *Stmt) Query(args ...interface{}) (*Rows, error) {
2431 return s.QueryContext(context.Background(), args...)
2432 }
2433
2434 func rowsiFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...interface{}) (driver.Rows, error) {
2435 ds.Lock()
2436 defer ds.Unlock()
2437
2438 dargs, err := driverArgsConnLocked(ci, ds, args)
2439 if err != nil {
2440 return nil, err
2441 }
2442
2443
2444
2445
2446 if want := ds.si.NumInput(); want >= 0 && want != len(dargs) {
2447 return nil, fmt.Errorf("sql: statement expects %d inputs; got %d", want, len(dargs))
2448 }
2449
2450 rowsi, err := ctxDriverStmtQuery(ctx, ds.si, dargs)
2451 if err != nil {
2452 return nil, err
2453 }
2454 return rowsi, nil
2455 }
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468 func (s *Stmt) QueryRowContext(ctx context.Context, args ...interface{}) *Row {
2469 rows, err := s.QueryContext(ctx, args...)
2470 if err != nil {
2471 return &Row{err: err}
2472 }
2473 return &Row{rows: rows}
2474 }
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487 func (s *Stmt) QueryRow(args ...interface{}) *Row {
2488 return s.QueryRowContext(context.Background(), args...)
2489 }
2490
2491
2492 func (s *Stmt) Close() error {
2493 s.closemu.Lock()
2494 defer s.closemu.Unlock()
2495
2496 if s.stickyErr != nil {
2497 return s.stickyErr
2498 }
2499 s.mu.Lock()
2500 if s.closed {
2501 s.mu.Unlock()
2502 return nil
2503 }
2504 s.closed = true
2505 txds := s.cgds
2506 s.cgds = nil
2507
2508 s.mu.Unlock()
2509
2510 if s.cg == nil {
2511 return s.db.removeDep(s, s)
2512 }
2513
2514 if s.parentStmt != nil {
2515
2516
2517 return s.db.removeDep(s.parentStmt, s)
2518 }
2519 return txds.Close()
2520 }
2521
2522 func (s *Stmt) finalClose() error {
2523 s.mu.Lock()
2524 defer s.mu.Unlock()
2525 if s.css != nil {
2526 for _, v := range s.css {
2527 s.db.noteUnusedDriverStatement(v.dc, v.ds)
2528 v.dc.removeOpenStmt(v.ds)
2529 }
2530 s.css = nil
2531 }
2532 return nil
2533 }
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549 type Rows struct {
2550 dc *driverConn
2551 releaseConn func(error)
2552 rowsi driver.Rows
2553 cancel func()
2554 closeStmt *driverStmt
2555
2556
2557
2558
2559
2560
2561 closemu sync.RWMutex
2562 closed bool
2563 lasterr error
2564
2565
2566
2567 lastcols []driver.Value
2568 }
2569
2570 func (rs *Rows) initContextClose(ctx, txctx context.Context) {
2571 ctx, rs.cancel = context.WithCancel(ctx)
2572 go rs.awaitDone(ctx, txctx)
2573 }
2574
2575
2576
2577
2578
2579 func (rs *Rows) awaitDone(ctx, txctx context.Context) {
2580 var txctxDone <-chan struct{}
2581 if txctx != nil {
2582 txctxDone = txctx.Done()
2583 }
2584 select {
2585 case <-ctx.Done():
2586 case <-txctxDone:
2587 }
2588 rs.close(ctx.Err())
2589 }
2590
2591
2592
2593
2594
2595
2596
2597 func (rs *Rows) Next() bool {
2598 var doClose, ok bool
2599 withLock(rs.closemu.RLocker(), func() {
2600 doClose, ok = rs.nextLocked()
2601 })
2602 if doClose {
2603 rs.Close()
2604 }
2605 return ok
2606 }
2607
2608 func (rs *Rows) nextLocked() (doClose, ok bool) {
2609 if rs.closed {
2610 return false, false
2611 }
2612
2613
2614
2615 rs.dc.Lock()
2616 defer rs.dc.Unlock()
2617
2618 if rs.lastcols == nil {
2619 rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))
2620 }
2621
2622 rs.lasterr = rs.rowsi.Next(rs.lastcols)
2623 if rs.lasterr != nil {
2624
2625 if rs.lasterr != io.EOF {
2626 return true, false
2627 }
2628 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
2629 if !ok {
2630 return true, false
2631 }
2632
2633
2634
2635 if !nextResultSet.HasNextResultSet() {
2636 doClose = true
2637 }
2638 return doClose, false
2639 }
2640 return false, true
2641 }
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651 func (rs *Rows) NextResultSet() bool {
2652 var doClose bool
2653 defer func() {
2654 if doClose {
2655 rs.Close()
2656 }
2657 }()
2658 rs.closemu.RLock()
2659 defer rs.closemu.RUnlock()
2660
2661 if rs.closed {
2662 return false
2663 }
2664
2665 rs.lastcols = nil
2666 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
2667 if !ok {
2668 doClose = true
2669 return false
2670 }
2671
2672
2673
2674 rs.dc.Lock()
2675 defer rs.dc.Unlock()
2676
2677 rs.lasterr = nextResultSet.NextResultSet()
2678 if rs.lasterr != nil {
2679 doClose = true
2680 return false
2681 }
2682 return true
2683 }
2684
2685
2686
2687 func (rs *Rows) Err() error {
2688 rs.closemu.RLock()
2689 defer rs.closemu.RUnlock()
2690 if rs.lasterr == io.EOF {
2691 return nil
2692 }
2693 return rs.lasterr
2694 }
2695
2696
2697
2698
2699 func (rs *Rows) Columns() ([]string, error) {
2700 rs.closemu.RLock()
2701 defer rs.closemu.RUnlock()
2702 if rs.closed {
2703 return nil, errors.New("sql: Rows are closed")
2704 }
2705 if rs.rowsi == nil {
2706 return nil, errors.New("sql: no Rows available")
2707 }
2708 rs.dc.Lock()
2709 defer rs.dc.Unlock()
2710
2711 return rs.rowsi.Columns(), nil
2712 }
2713
2714
2715
2716 func (rs *Rows) ColumnTypes() ([]*ColumnType, error) {
2717 rs.closemu.RLock()
2718 defer rs.closemu.RUnlock()
2719 if rs.closed {
2720 return nil, errors.New("sql: Rows are closed")
2721 }
2722 if rs.rowsi == nil {
2723 return nil, errors.New("sql: no Rows available")
2724 }
2725 rs.dc.Lock()
2726 defer rs.dc.Unlock()
2727
2728 return rowsColumnInfoSetupConnLocked(rs.rowsi), nil
2729 }
2730
2731
2732 type ColumnType struct {
2733 name string
2734
2735 hasNullable bool
2736 hasLength bool
2737 hasPrecisionScale bool
2738
2739 nullable bool
2740 length int64
2741 databaseType string
2742 precision int64
2743 scale int64
2744 scanType reflect.Type
2745 }
2746
2747
2748 func (ci *ColumnType) Name() string {
2749 return ci.name
2750 }
2751
2752
2753
2754
2755
2756
2757 func (ci *ColumnType) Length() (length int64, ok bool) {
2758 return ci.length, ci.hasLength
2759 }
2760
2761
2762
2763 func (ci *ColumnType) DecimalSize() (precision, scale int64, ok bool) {
2764 return ci.precision, ci.scale, ci.hasPrecisionScale
2765 }
2766
2767
2768
2769
2770 func (ci *ColumnType) ScanType() reflect.Type {
2771 return ci.scanType
2772 }
2773
2774
2775
2776 func (ci *ColumnType) Nullable() (nullable, ok bool) {
2777 return ci.nullable, ci.hasNullable
2778 }
2779
2780
2781
2782
2783
2784
2785 func (ci *ColumnType) DatabaseTypeName() string {
2786 return ci.databaseType
2787 }
2788
2789 func rowsColumnInfoSetupConnLocked(rowsi driver.Rows) []*ColumnType {
2790 names := rowsi.Columns()
2791
2792 list := make([]*ColumnType, len(names))
2793 for i := range list {
2794 ci := &ColumnType{
2795 name: names[i],
2796 }
2797 list[i] = ci
2798
2799 if prop, ok := rowsi.(driver.RowsColumnTypeScanType); ok {
2800 ci.scanType = prop.ColumnTypeScanType(i)
2801 } else {
2802 ci.scanType = reflect.TypeOf(new(interface{})).Elem()
2803 }
2804 if prop, ok := rowsi.(driver.RowsColumnTypeDatabaseTypeName); ok {
2805 ci.databaseType = prop.ColumnTypeDatabaseTypeName(i)
2806 }
2807 if prop, ok := rowsi.(driver.RowsColumnTypeLength); ok {
2808 ci.length, ci.hasLength = prop.ColumnTypeLength(i)
2809 }
2810 if prop, ok := rowsi.(driver.RowsColumnTypeNullable); ok {
2811 ci.nullable, ci.hasNullable = prop.ColumnTypeNullable(i)
2812 }
2813 if prop, ok := rowsi.(driver.RowsColumnTypePrecisionScale); ok {
2814 ci.precision, ci.scale, ci.hasPrecisionScale = prop.ColumnTypePrecisionScale(i)
2815 }
2816 }
2817 return list
2818 }
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871 func (rs *Rows) Scan(dest ...interface{}) error {
2872 rs.closemu.RLock()
2873 if rs.closed {
2874 rs.closemu.RUnlock()
2875 return errors.New("sql: Rows are closed")
2876 }
2877 rs.closemu.RUnlock()
2878
2879 if rs.lastcols == nil {
2880 return errors.New("sql: Scan called without calling Next")
2881 }
2882 if len(dest) != len(rs.lastcols) {
2883 return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest))
2884 }
2885 for i, sv := range rs.lastcols {
2886 err := convertAssign(dest[i], sv)
2887 if err != nil {
2888 return fmt.Errorf("sql: Scan error on column index %d: %v", i, err)
2889 }
2890 }
2891 return nil
2892 }
2893
2894
2895
2896 var rowsCloseHook = func() func(*Rows, *error) { return nil }
2897
2898
2899
2900
2901
2902 func (rs *Rows) Close() error {
2903 return rs.close(nil)
2904 }
2905
2906 func (rs *Rows) close(err error) error {
2907 rs.closemu.Lock()
2908 defer rs.closemu.Unlock()
2909
2910 if rs.closed {
2911 return nil
2912 }
2913 rs.closed = true
2914
2915 if rs.lasterr == nil {
2916 rs.lasterr = err
2917 }
2918
2919 withLock(rs.dc, func() {
2920 err = rs.rowsi.Close()
2921 })
2922 if fn := rowsCloseHook(); fn != nil {
2923 fn(rs, &err)
2924 }
2925 if rs.cancel != nil {
2926 rs.cancel()
2927 }
2928
2929 if rs.closeStmt != nil {
2930 rs.closeStmt.Close()
2931 }
2932 rs.releaseConn(err)
2933 return err
2934 }
2935
2936
2937 type Row struct {
2938
2939 err error
2940 rows *Rows
2941 }
2942
2943
2944
2945
2946
2947
2948 func (r *Row) Scan(dest ...interface{}) error {
2949 if r.err != nil {
2950 return r.err
2951 }
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966 defer r.rows.Close()
2967 for _, dp := range dest {
2968 if _, ok := dp.(*RawBytes); ok {
2969 return errors.New("sql: RawBytes isn't allowed on Row.Scan")
2970 }
2971 }
2972
2973 if !r.rows.Next() {
2974 if err := r.rows.Err(); err != nil {
2975 return err
2976 }
2977 return ErrNoRows
2978 }
2979 err := r.rows.Scan(dest...)
2980 if err != nil {
2981 return err
2982 }
2983
2984 if err := r.rows.Close(); err != nil {
2985 return err
2986 }
2987
2988 return nil
2989 }
2990
2991
2992 type Result interface {
2993
2994
2995
2996
2997
2998 LastInsertId() (int64, error)
2999
3000
3001
3002
3003 RowsAffected() (int64, error)
3004 }
3005
3006 type driverResult struct {
3007 sync.Locker
3008 resi driver.Result
3009 }
3010
3011 func (dr driverResult) LastInsertId() (int64, error) {
3012 dr.Lock()
3013 defer dr.Unlock()
3014 return dr.resi.LastInsertId()
3015 }
3016
3017 func (dr driverResult) RowsAffected() (int64, error) {
3018 dr.Lock()
3019 defer dr.Unlock()
3020 return dr.resi.RowsAffected()
3021 }
3022
3023 func stack() string {
3024 var buf [2 << 10]byte
3025 return string(buf[:runtime.Stack(buf[:], false)])
3026 }
3027
3028
3029 func withLock(lk sync.Locker, fn func()) {
3030 lk.Lock()
3031 defer lk.Unlock()
3032 fn()
3033 }
3034
View as plain text