实现HTTP部分功能

This commit is contained in:
GoEdgeLab
2020-09-26 08:07:07 +08:00
parent d44fd0073c
commit 320ed5a1b6
39 changed files with 1960 additions and 230 deletions

View File

@@ -0,0 +1,90 @@
package utils
import (
"time"
)
// pool for get byte slice
type BytePool struct {
c chan []byte
length int
ticker *Ticker
lastSize int
}
// 创建新对象
func NewBytePool(maxSize, length int) *BytePool {
if maxSize <= 0 {
maxSize = 1024
}
if length <= 0 {
length = 128
}
pool := &BytePool{
c: make(chan []byte, maxSize),
length: length,
}
pool.start()
return pool
}
func (this *BytePool) start() {
// 清除Timer
this.ticker = NewTicker(1 * time.Minute)
go func() {
for this.ticker.Next() {
currentSize := len(this.c)
if currentSize <= 32 || this.lastSize == 0 || this.lastSize != currentSize {
this.lastSize = currentSize
continue
}
i := 0
For:
for {
select {
case _ = <-this.c:
i++
if i >= currentSize/2 {
break For
}
default:
break For
}
}
}
}()
}
// 获取一个新的byte slice
func (this *BytePool) Get() (b []byte) {
select {
case b = <-this.c:
default:
b = make([]byte, this.length)
}
return
}
// 放回一个使用过的byte slice
func (this *BytePool) Put(b []byte) {
if cap(b) != this.length {
return
}
select {
case this.c <- b:
default:
// 已达最大容量,则抛弃
}
}
// 当前的数量
func (this *BytePool) Size() int {
return len(this.c)
}
// 销毁
func (this *BytePool) Destroy() {
this.ticker.Stop()
}

View File

@@ -0,0 +1,41 @@
package utils
import (
"github.com/iwind/TeaGo/assert"
"runtime"
"testing"
)
func TestNewBytePool(t *testing.T) {
a := assert.NewAssertion(t)
pool := NewBytePool(5, 8)
buf := pool.Get()
a.IsTrue(len(buf) == 8)
a.IsTrue(len(pool.c) == 0)
pool.Put(buf)
a.IsTrue(len(pool.c) == 1)
pool.Get()
a.IsTrue(len(pool.c) == 0)
for i := 0; i < 10; i++ {
pool.Put(buf)
}
t.Log(len(pool.c))
a.IsTrue(len(pool.c) == 5)
}
func BenchmarkBytePool_Get(b *testing.B) {
runtime.GOMAXPROCS(1)
pool := NewBytePool(1024, 1)
for i := 0; i < b.N; i++ {
buf := pool.Get()
_ = buf
pool.Put(buf)
}
b.Log(pool.Size())
}

53
internal/utils/http.go Normal file
View File

@@ -0,0 +1,53 @@
package utils
import (
"crypto/tls"
"io/ioutil"
"net/http"
"net/http/httputil"
"sync"
"time"
)
// HTTP请求客户端管理
var timeoutClientMap = map[time.Duration]*http.Client{} // timeout => Client
var timeoutClientLocker = sync.Mutex{}
// 导出响应
func DumpResponse(resp *http.Response) (header []byte, body []byte, err error) {
header, err = httputil.DumpResponse(resp, false)
body, err = ioutil.ReadAll(resp.Body)
return
}
// 获取一个新的Client
func NewHTTPClient(timeout time.Duration) *http.Client {
return &http.Client{
Timeout: timeout,
Transport: &http.Transport{
MaxIdleConns: 4096,
MaxIdleConnsPerHost: 32,
MaxConnsPerHost: 32,
IdleConnTimeout: 2 * time.Minute,
ExpectContinueTimeout: 1 * time.Second,
TLSHandshakeTimeout: 0,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
}
// 获取一个公用的Client
func SharedHttpClient(timeout time.Duration) *http.Client {
timeoutClientLocker.Lock()
defer timeoutClientLocker.Unlock()
client, ok := timeoutClientMap[timeout]
if ok {
return client
}
client = NewHTTPClient(timeout)
timeoutClientMap[timeout] = client
return client
}

View File

@@ -0,0 +1,32 @@
package utils
import (
"github.com/iwind/TeaGo/assert"
"testing"
"time"
)
func TestNewHTTPClient(t *testing.T) {
a := assert.NewAssertion(t)
client := NewHTTPClient(1 * time.Second)
a.IsTrue(client.Timeout == 1*time.Second)
client2 := NewHTTPClient(1 * time.Second)
a.IsTrue(client != client2)
}
func TestSharedHTTPClient(t *testing.T) {
a := assert.NewAssertion(t)
_ = SharedHttpClient(2 * time.Second)
_ = SharedHttpClient(3 * time.Second)
client := SharedHttpClient(1 * time.Second)
a.IsTrue(client.Timeout == 1*time.Second)
client2 := SharedHttpClient(1 * time.Second)
a.IsTrue(client == client2)
t.Log(timeoutClientMap)
}

47
internal/utils/ticker.go Normal file
View File

@@ -0,0 +1,47 @@
package utils
import (
"time"
)
// 类似于time.Ticker但能够真正地停止
type Ticker struct {
raw *time.Ticker
S chan bool
C <-chan time.Time
isStopped bool
}
// 创建新Ticker
func NewTicker(duration time.Duration) *Ticker {
raw := time.NewTicker(duration)
return &Ticker{
raw: raw,
C: raw.C,
S: make(chan bool, 1),
}
}
// 查找下一个Tick
func (this *Ticker) Next() bool {
select {
case <-this.raw.C:
return true
case <-this.S:
return false
}
}
// 停止
func (this *Ticker) Stop() {
if this.isStopped {
return
}
this.isStopped = true
this.raw.Stop()
this.S <- true
}

View File

@@ -0,0 +1,52 @@
package utils
import (
"github.com/iwind/TeaGo/logs"
"sync"
"testing"
"time"
)
func TestTicker(t *testing.T) {
ticker := NewTicker(3 * time.Second)
go func() {
time.Sleep(10 * time.Second)
ticker.Stop()
}()
for ticker.Next() {
logs.Println("tick")
}
t.Log("finished")
}
func TestTicker2(t *testing.T) {
ticker := NewTicker(1 * time.Second)
go func() {
time.Sleep(5 * time.Second)
ticker.Stop()
}()
for {
logs.Println("loop")
select {
case <-ticker.C:
logs.Println("tick")
case <-ticker.S:
return
}
}
}
func TestTickerEvery(t *testing.T) {
i := 0
wg := &sync.WaitGroup{}
wg.Add(1)
Every(2*time.Second, func(ticker *Ticker) {
i++
logs.Println("TestTickerEvery i:", i)
if i >= 4 {
ticker.Stop()
wg.Done()
}
})
wg.Wait()
}

View File

@@ -0,0 +1,15 @@
package utils
import "time"
// 定时运行某个函数
func Every(duration time.Duration, f func(ticker *Ticker)) *Ticker {
ticker := NewTicker(duration)
go func() {
for ticker.Next() {
f(ticker)
}
}()
return ticker
}