Yes, that is as expected.
What you see is a pre-response. Basically, it is the service saying:
“This is not the real response, but I’ve received the request. It may take a while for me to process, but you can expect the real response within xxxxx time. So, don’t timeout before then, ok?”
It is the odd one in the RES Service protocol, the only response that is not JSON. It would be better if the NATS client supported something like this, but it doesn’t, and it had to be included in the RES protocol.
It also means that you cannot use nats.go’s func (*Conn) Request
if the responding services uses a pre-response (a.k.a. sets a Timeout). Instead you have to subscribe manually, and then unsubscribe once the actual response is handled.
Erm, I might have gone a bit overboard with this morning exercise, and made a full example.
You can just reuse the RESRequest
method in the example below. It behaves the same as nat’s func (*Conn) Request
, but handles pre-responses to allow extending the timeout.
package main
import (
"errors"
"fmt"
"reflect"
"strconv"
"time"
"github.com/jirenius/go-res"
"github.com/nats-io/go-nats"
)
// RESRequest sends a request over NATS, but can handle extending timeout
// through a Timeout pre-response.
func RESRequest(nc *nats.Conn, subject string, data []byte, timeout time.Duration) (*nats.Msg, error) {
// Manually create a response inbox
inbox := nats.NewInbox()
// Subscribe to response inbox
ch := make(chan *nats.Msg, 1)
sub, err := nc.ChanSubscribe(inbox, ch)
if err != nil {
return nil, err
}
defer sub.Unsubscribe()
// Publish request
err = nc.PublishRequest(subject, inbox, data)
if err != nil {
return nil, err
}
// Set timeout timer
timer := time.NewTimer(timeout)
for {
select {
case <-timer.C:
return nil, errors.New("request timeout")
case msg := <-ch:
// Is the first character a-z or A-Z?
// Then it is a pre-response.
if len(msg.Data) == 0 || (msg.Data[0]|32) < 'a' || (msg.Data[0]|32) > 'z' {
return msg, nil
}
// Parse pre-response using reflect.StructTag
// as it uses the same format.
tag := reflect.StructTag(msg.Data)
if v, ok := tag.Lookup("timeout"); ok {
if ms, err := strconv.Atoi(v); err == nil {
// Stop previous timeout and make a new one.
timer.Stop()
timer = time.NewTimer(time.Duration(ms) * time.Millisecond)
}
}
}
}
}
func main() {
// Send a ping request to the service after 2 seconds from start.
go func() {
time.Sleep(2 * time.Second)
nc, err := nats.Connect("nats://127.0.0.1")
if err != nil {
fmt.Println("Error: ", err)
return
}
fmt.Println("Sending ping at", time.Now())
// Send a request which by default will timeout after 1 second,
// unless a pre-response is received that extends it.
msg, err := RESRequest(nc, "call.example.model.ping", []byte(`{}`), time.Second)
if err != nil {
fmt.Println("Error: ", err)
return
}
fmt.Println("Received response: ", string(msg.Data))
}()
// Create a service that handles ping requests.
s := res.NewService("example")
s.Handle("model",
res.Call("ping", func(r res.CallRequest) {
r.Timeout(10 * time.Second)
time.Sleep(4 * time.Second)
r.OK(struct {
Timestamp time.Time `json:"timestamp"`
}{time.Now()})
}),
)
s.ListenAndServe("nats://localhost:4222")
}
That was fun.
Maybe I should work now.
Best regards,
Samuel