Listening for events

in the go-res sdk is there a recommended way to listen for events from another service,

e.g.

I have a user service that emits event.user.created with a payload of the user created

r.Event("created", payload)

is there a way to subscribe to that event in another service, other than importing nats.go and subscribing to it with the nats.go sdk?

it seems like I should either

  1. have access to the nats connection in the go-res sdk after doing the s := res.NewService("user") maybe s.NConn

  2. have a res.Event handler that lets me listen to custom events and do stuff

  3. have a method on either Resource or the CallRequest that allows me to call another method directly r.Call("call.group.create", payload)

go-res currently doesn’t have have anything to help with inter-service communication, such as calling other services, or listen to events. It is currently in the Roadmap to do after summer.

You can listen to internal events (emitted by the same service), using func (*Mux) AddListener (and Service has the same promoted method from the embedded Mux). But for interservice, you would do it manually using the nats.Conn instance.

  1. I agree fully with you. I am still not sure why it isn’t there yet! Currently, you can create a nats.Conn instance yourself, and pass it to func (*Service) Serve (instead of ListenAndServe).

  2. Yes, something like that.

  3. Yes, but it should be on the Service instance instead, so you would go r.Service().Call("call.group.create", payload)

Thanks for great feedback! :slight_smile:

/Samuel

ah yes, i didn’t see that you can pass the connection in… that’ll work for now.

i tried calling a call event directly, seems like it requires a more complex payload when calling from a service and not the browser requiring params, cid and such…

There are other fields, yes, but nothing magic. It is still a simple NATS request. You can ignore most of them:

  • cid - The client connection ID. Most likely you can ignore this. If the request is done on behalf of a client, and if the receiving service needs it, then you might want to set it. But in your case, no.
  • token - Not needed, unless your receiving service uses information in the token. You would know if you need it, as you are the one both setting the token (with connection token event), and handling the call.
  • query - Unlike when calling from the client where the query is part of the resourceID (eg. user.list?limit=10), in the RES Service protocol, the query part of the resource is put here instead (eg. { "query": "limit=10" }). But this is only needed if it is a query resource. And query resources with call methods is quite uncommon :slight_smile: . So, you should skip this too.
  • params - This is your call parameters. May be null/omitted if your call takes no parameters. You decide.

So, all in all, you can make an interservice call request without any payload at all.

that’s great information, thank you, seems like it complained when i tried it, I’ll try again… currently I’m chaining events in the browser I’d rather have some of that stuff in the events as much as possible… ty again

You are right. I just tried it myself, and go-res requires the payload to be a valid json object. So, you need to send at least an empty object: {}

And I agree. Unless there is a good reason to have the client do chained requests, better to let the services do them :slight_smile:

ok, i have done some more testing, for the most part I have things working fairly well.

I am however finding that if I set r.Timeout(12 * time.Second) in a service that is being called directly rather than from the browser client the r.OK(“somecooldatahere”) response ends up being timeout:"12000" rather than somecooldatahere, if I dont change the timeout the response is the expected somecooldatahere

Yes, that is as expected.
What you see is a pre-response. Basically, it is the service saying:

“This is not the real response, but I’ve received the request. It may take a while for me to process, but you can expect the real response within xxxxx time. So, don’t timeout before then, ok?”

It is the odd one in the RES Service protocol, the only response that is not JSON. It would be better if the NATS client supported something like this, but it doesn’t, and it had to be included in the RES protocol.

It also means that you cannot use nats.go’s func (*Conn) Request if the responding services uses a pre-response (a.k.a. sets a Timeout). Instead you have to subscribe manually, and then unsubscribe once the actual response is handled.

Erm, I might have gone a bit overboard with this morning exercise, and made a full example.
You can just reuse the RESRequest method in the example below. It behaves the same as nat’s func (*Conn) Request, but handles pre-responses to allow extending the timeout.

package main

import (
	"errors"
	"fmt"
	"reflect"
	"strconv"
	"time"

	"github.com/jirenius/go-res"
	"github.com/nats-io/go-nats"
)

// RESRequest sends a request over NATS, but can handle extending timeout
// through a Timeout pre-response.
func RESRequest(nc *nats.Conn, subject string, data []byte, timeout time.Duration) (*nats.Msg, error) {
	// Manually create a response inbox
	inbox := nats.NewInbox()

	// Subscribe to response inbox
	ch := make(chan *nats.Msg, 1)
	sub, err := nc.ChanSubscribe(inbox, ch)
	if err != nil {
		return nil, err
	}
	defer sub.Unsubscribe()

	// Publish request
	err = nc.PublishRequest(subject, inbox, data)
	if err != nil {
		return nil, err
	}

	// Set timeout timer
	timer := time.NewTimer(timeout)

	for {
		select {
		case <-timer.C:
			return nil, errors.New("request timeout")
		case msg := <-ch:
			// Is the first character a-z or A-Z?
			// Then it is a pre-response.
			if len(msg.Data) == 0 || (msg.Data[0]|32) < 'a' || (msg.Data[0]|32) > 'z' {
				return msg, nil
			}

			// Parse pre-response using reflect.StructTag
			// as it uses the same format.
			tag := reflect.StructTag(msg.Data)

			if v, ok := tag.Lookup("timeout"); ok {
				if ms, err := strconv.Atoi(v); err == nil {
					// Stop previous timeout and make a new one.
					timer.Stop()
					timer = time.NewTimer(time.Duration(ms) * time.Millisecond)
				}
			}
		}
	}
}

func main() {
	// Send a ping request to the service after 2 seconds from start.
	go func() {
		time.Sleep(2 * time.Second)
		nc, err := nats.Connect("nats://127.0.0.1")
		if err != nil {
			fmt.Println("Error: ", err)
			return
		}
		fmt.Println("Sending ping at", time.Now())
		// Send a request which by default will timeout after 1 second,
		// unless a pre-response is received that extends it.
		msg, err := RESRequest(nc, "call.example.model.ping", []byte(`{}`), time.Second)
		if err != nil {
			fmt.Println("Error: ", err)
			return
		}

		fmt.Println("Received response: ", string(msg.Data))
	}()

	// Create a service that handles ping requests.
	s := res.NewService("example")
	s.Handle("model",
		res.Call("ping", func(r res.CallRequest) {
			r.Timeout(10 * time.Second)
			time.Sleep(4 * time.Second)
			r.OK(struct {
				Timestamp time.Time `json:"timestamp"`
			}{time.Now()})
		}),
	)
	s.ListenAndServe("nats://localhost:4222")
}

That was fun. :grin:
Maybe I should work now.

Best regards,
Samuel