How do I implement Queues with the resgate

Right now i’m falling back to the default nats way of handling consumer groups using queue groups

so my user-identity-server emits an event consumer.identity.user.create and my user-identity-consumer implements the queue group in order to process the new user, this prevents 2 or 3 instances of the user-identity-consumer from racing and causing anomalies.

however Im wondering if I can eliminate the user-identity-consumer and just instruct the user-identity-server to implement the queue group, if so how would I do that?

btw Im using the go-res sdk. I expect it looks something like this

func (uh *UserHandler) SetOption(rh *res.Handler) {
	rh.Option(
        res.Group("identity-user-workers"),
		// Set call method handler, for creatong a user in vault.
		res.Call("create", uh.set),
	)
}

Or do I have the pupose of this wrong? if So how would I implement the work group?

res.Group

I am afraid that res.Group has a different purpose.

It only affects what internal Go routine is handling requests. All requests for the same group ID would be handle by the same go routine.

This was meant as a means of synchronization for resources that shared a common source. Eg.:

directory.user.42

{
    "id": 42,
    "name": "Samuel Jirénius"
}

directory.user.42.details

{
    "id": 42,
    "firstName": "Samuel",
    "lastName": "Jirénius",
    "phoneNumber": "+46 012 234 567",
    ...
}

Now, both above resources would come from the same database entry.
To avoid race conditions between get requests and updates for the two, the idea was to have them handled on the same go routine. No need for additional mutexes.

However, now I would not recommend using that approach. It is better to put this synchronization in the database layer itself. For go-res, that could be the done by the code implementing the store interface.

NATS queue groups

Currently, no. Well, I know one company that is using a go-res fork which does QueueSubscribe:

This was mainly to allow them to do horizontal scaling with many services in a large Kubernetes cluster. In their case, they are using only system.reset on resources whenever something changes, instead of add/remove/change events, to avoid race conditions.
It is a bit more costly, having to resend the data whenever something changes, but allows for easier scaling. But in a mail he told me that:

“It has been proved to work, I am using it in production and the performance is impressive!”

So that seems to work as well :slight_smile:

Due to them, I will add the use of QueueSubscribeChan based on an option that can be set on the handler.

But sorry to say that, it is not implemented yet.

Best regards,
Samuel

That’s good to hear.

Thanks for the heads up on the fork I was going to do basically the same thing…

I’m running my services in kubernetes as well with 2 replicas for almost everything along with HPA, which without queuing introduces very noticeable race conditions, there are of course ways to fix it without queues but the queues are much more reliable.

That should work fine with Resgate.

Just beware that Resgate (and the RES protocol) relies of order of delivery as a way to ensure that events are applied in the right order, instead of more complex versioning schemes. This design choice was made to simplify creation of services. But it also means it you can get race conditions between instances of the same service.

If ServiceA_1 respond to a get request, and ServiceA_2 sends a change event for the same resource, which one should be handled by Resgate first?

So, if you have more than one instance handling the same resource(s), you should use system.reset (which will not cause races) to notify Resgate on updates, instead of add/remove/change events.

/Samuel

yeah, the producer consumer pattern can get quite complex, since you advised that queues are going to be added as an option i refactored my services where it made sense, i prefer the simplicity, that’s for sure.

1 Like