WebSockets通过TCP连接提供客户端与服务器之间的双向即时通信。这意味着,我们可以维护一个TCP连接,然后发送和监听该连接上的消息,而不是不断地通过新建TCP连接去轮询web服务器的更新。在Go的生态中,WebSocket协议有几个不同的实现。有些库是协议的纯实现。另外一些则选择在WebSocket协议的基础上构建,为他们特定的用例创建更好的抽象。
下面是一个不全的Go WebSocket协议实现列表:x/net/websocket
(from Go sub-repository packages)sacOO7/gowebsocket
本文我们将使用gorilla/websocket库来实现websocket协议。你会发现websocket单测与HTTP服务的并没有很大不同。尽管如此,我们在测试时还是要考虑WebSockets的一些特殊方面。
WebSocket在线物品拍卖示例
在线拍卖是以实时通信为核心的行业之一。在一场拍卖中,几秒钟的时间就决定了你是赢了还是失去了一件你一直想要的收藏品。让我们以gorilla/websocket库实现的简单拍卖应用程序作为本文的示例。
首先,我们将定义两个非常简单的结构体Bid和Auction,我们将在WebSocket处理程序中使用它们。Auction
有一个Bid方法,我们将使用该方法接收客户端发送来的竞价请求。结构体定义
type Bid struct {
UserID int `json:"user_id"`
Amount float64 `json:"amount"`
}
type Auction struct {
ItemID int `json:"item_id"`
EndTime int64 `json:"end_time"`
Bids []*Bid
}
func NewAuction(d time.Duration, itemID int, b []*Bid) Auction {
return Auction{
ItemID: itemID,
EndTime: time.Now().Add(d).Unix(),
Bids: b,
}
}
这两种类型都相当简单,包含的字段非常少。NewAuction构造函数创建了一个带有duration拍卖持续时间、itemID和*Bids的Aution实例。竞拍
func (a *Auction) Bid(amount float64, userID int) (*Bid, error) {
if len(a.Bids) > 0 {
largestBid := a.Bids[len(a.Bids)-1]
if largestBid.Amount >= amount {
return nil, fmt.Errorf("竞拍价必须大于 %.2f", largestBid.Amount)
}
}
if a.EndTime < time.Now().Unix() {
return nil, fmt.Errorf("拍卖已结束")
}
bid := Bid{
Amount: amount,
UserID: userID,
}
// Mutex lock
a.Bids = append(a.Bids, &bid)
// Mutex unlock
return &bid, nil
}
Auction的Bid方法就是物品竞拍发生的地方。它接收一个amount
和userID
作为参数,并向Auction
对象中添加Bid实例。而且它会检查竞拍是否结束以及客户端发送的竞拍价格是否大于已有的最大竞价。如果这些条件中的任何一个不满足,它将向客户端返回适当的错误。有了结构体定义和Bid方法,让我们深入到WebSockets机制。WebSocket连接处理
想象一下,一个可以在拍卖中实时出价的网站。它通过WebSockets发送的每一条JSON消息都会包含用户的标识符(UserID
)和出价的金额(amount
)。一旦服务器接收了消息,它将参与竞价并向客户端返回一个竞拍结果。在服务器端,此通信将由net/http
处理程序完成。它将处理所有WebSocket的业务逻辑,有几个值得注意的步骤:
1、将接收到的HTTP连接升级为WebSocket连接。
2、接收来自客户端的消息。
3、从消息中解码出bid对象。
4、参与竞价。
5、 向客户端发送竞拍结果。
下面我们来实现这个处理程序。首先定义inbound
和outbound
消息类型,用于接收和发送客户端消息。type inbound struct {
UserID int `json:"user_id"`
Amount float64 `json:"amount"`
}
type outbound struct {
Body string `json:"body"`
}
它们都分别表示入站/出站消息,这就是在客户端和服务器之间的交互数据。inbound
入站消息将表示一个竞标内容,而outbound
类型表示一个简单的返回消息,其Body中包含一些文本。接下来定义bidsHandler
,包含ServeHTTP方法实现HTTP连接的升级:var upgrader = websocket.Upgrader{}
type bidsHandler struct {
auction *Auction
}
func (bh bidsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("upgrade:", err)
return
}
defer ws.Close()
// 剩余代码在后面
}
首先定义websocket.Upgrader
,接收处理程序的http.ResponseWriter
和*http.Resquest
并升级连接。因为这只是一个应用程序示例upgrader.CheckOrigin
方法将只返回true,而不检查传入请求的来源。一旦upgrader
完成连接的升级,将返回*websocket.Conn
对象保存在ws
变量中。*websocket.Conn
将接收所有客户端发送来的消息,也是处理程序读取请求内容的地方。同样,处理程序将会向*websocket.Conn
写入消息,它将向客户端发送响应消息。func (bh bidsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 前面的代码...
for {
_, m, err := ws.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
return
}
var in inbound
err = json.Unmarshal(m, &in)
if err != nil {
handleError(ws, err)
continue
}
bid, err := bh.auction.Bid(in.Amount, in.UserID)
if err != nil {
handleError(ws, err)
continue
}
out, err := json.Marshal(outbound{Body: fmt.Sprintf("Bid placed: %.2f", bid.Amount)})
if err != nil {
handleError(ws, err)
continue
}
err = ws.WriteMessage(websocket.BinaryMessage, out)
if err != nil {
handleError(ws, err)
continue
}
}
}
for
循环做了几件事:首先,使用ws.ReadMessage()
读取websocket消息,该函数返回消息类型(二进制或文本)和消息内容(m
)以及可能发生的错误(err
)。然后,检查客户端是否意外地关闭了连接。错误处理完成并读取到消息,我们将使用json.Unmarshal
对其进行解码。接着调Bid方法参与竞拍。然后使用json.Marshal
对返回内容进行序列化,使用ws.WriteMessage
方法发送给客户端。测试WebSockets处理函数
尽管编写WebSocket处理程序比普通HTTP处理程序要复杂得多,但测试它们很简单。事实上,测试WebSockets处理程序就像测试HTTP处理程序一样简单。这是因为WebSockets是在HTTP上构建的,所以测试WebSockets使用的工具与测试HTTP服务器相同。func TestBidsHandler(t *testing.T) {
tcs := []struct {
name string
bids []*Bid
duration time.Duration
message inbound
reply outbound
}{
{
name: "with good bid",
bids: []*Bid{},
duration: time.Hour * 1,
message: inbound{UserID: 1, Amount: 10},
reply: outbound{Body: "Bid placed: 10.00"},
},
{
name: "with bad bid",
bids: []*Bid{
&Bid{
UserID: 1,
Amount: 20,
},
},
duration: time.Hour * 1,
message: inbound{UserID: 1, Amount: 10},
reply: outbound{Body: "amount must be larger than 20.00"},
},
{
name: "good bid on expired auction",
bids: []*Bid{
&Bid{
UserID: 1,
Amount: 20,
},
},
duration: time.Hour * -1,
message: inbound{UserID: 1, Amount: 30},
reply: outbound{Body: "auction already closed"},
},
}
for _, tt := range tcs {
t.Run(tt.name, func(t *testing.T) {
a := NewAuction(tt.duration, 1, tt.bids)
h := bidsHandler{&a}
// 剩余代码在后面
})
}
}
首先,我们从定义测试用例开始。每个用例有一个name
,这是测试用例的可读名称。此外,每个测试用例都有一个bids
切片和一个duration持续时间,用于创建一个测试拍卖对象Auction
。测试用例还有一个入站消息inbound
和一个出站回复outbound
—这是测试用例将发送给处理程序并期望从处理程序返回的消息。在TestBidsHandler中我们添加三种不同的测试用例——一个是客户端发起了错误的报价,低于目前最大报价,另一个测试用例,客户端添加了一个正常的报价,第三个客户端参与的拍卖已结束。
下面完成测试函数:func TestBidsHandler(t *testing.T) {
// 测试用例和其他内容在前面...
for _, tt := range tcs {
t.Run(tt.name, func(t *testing.T) {
a := NewAuction(tt.duration, 1, tt.bids)
h := bidsHandler{&a}
s, ws := newWSServer(t, h)
defer s.Close()
defer ws.Close()
sendMessage(t, ws, tt.message)
reply := receiveWSMessage(t, ws)
if reply != tt.reply {
t.Fatalf("Expected '%+v', got '%+v'", tt.reply, reply)
}
})
}
}
我们在subtest函数体中添加了一些新函数。newWSServer
将创建一个测试服务器并将其升级为WebSocket连接,同时返回服务器和WebSocket连接。然后,sendMessage
函数通过WebSocket连接将消息从测试用例发送到测试服务器。之后,通过receiveWSMessage
,我们将从服务器读取响应,并通过将其与测试用例的进行比较来断言其正确性。那么,这些新的函数的作用是什么呢?让我们逐一分析。func newWSServer(t *testing.T, h http.Handler) (*httptest.Server, *websocket.Conn) {
t.Helper()
s := httptest.NewServer(h)
wsURL := httpToWs(t, s.URL)
ws, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatal(err)
}
return s, ws
}
newWSServer
函数使用httptest.NewServer
函数将处理程序挂载到测试HTTP服务器上。通过httpToWS
,实现了将服务器的URL
转为websocket URL (它只是将URL中的http
协议替换为ws
,或将https
替换为wss
协议)。为了建立WebSocket连接,我们使用WebSocket.DefaultDialer
,它是一个所有字段都设置为默认值的dialer。调用Dial
方法通过WebSocket服务器URL (wsURL)返回WebSocket连接。func sendMessage(t *testing.T, ws *websocket.Conn, msg inbound) {
t.Helper()
m, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
if err := ws.WriteMessage(websocket.BinaryMessage, m); err != nil {
t.Fatalf("%v", err)
}
}
sendMessage
函数接收一个WebSocket连接和inbound
消息作为参数。将消息序列化成json以二进制格式在websocket连接中发送。func receiveWSMessage(t *testing.T, ws *websocket.Conn) outbound {
t.Helper()
_, m, err := ws.ReadMessage()
if err != nil {
t.Fatalf("%v", err)
}
var reply outbound
err = json.Unmarshal(m, &reply)
if err != nil {
t.Fatal(err)
}
return reply
}
receiveWSMessage
函数以ws
WebSocket连接为参数,通过ws.ReadMessage()
读取请求消息,然后反序列化成outbound
类型返回。
如果我们运行测试,我们将看到它们通过:$ go test ./... -v
=== RUN TestBidsHandler
=== RUN TestBidsHandler/with_good_bid
=== RUN TestBidsHandler/with_bad_bid
=== RUN TestBidsHandler/good_bid_on_expired_auction
--- PASS: TestBidsHandler (0.00s)
--- PASS: TestBidsHandler/with_good_bid (0.00s)
--- PASS: TestBidsHandler/with_bad_bid (0.00s)
--- PASS: TestBidsHandler/good_bid_on_expired_auction (0.00s)
PASS
ok github.com/fteem/go-playground/testing-in-go-web-sockets 0.013s