A distributed, transactional,
fault-tolerant object store

A queue

GoshawkDB supports retry which is a very flexible form of triggers. Within a transaction, if you retry, what you're saying is: "Look at all the objects I've read from in this transaction. Put me to sleep, until any of those objects are modified. When they are modified, tell me about them and restart this transaction." In this howto, we use retry to consume from a queue. The use of transactions means that we can have many processes appending to the queue and many processes consuming from the queue at the same time and we can guarantee that no item on the queue will be lost or duplicated.

The code in this Howto, along with the other Howtos are available in the examples repository.

Our queue is going to have a head and a tail and is going to be structured like a (singly) linked list. New items are appended to the tail, so this will involve 1) creating a new object which is the cell: containing a reference to the value and a reference (initially empty) to the next cell; 2) setting the current tail's next-cell pointer to our new cell; and 3) setting the tail of our queue to point to the new cell. Diagrammatically, we can think of the queue like this:

package queue

import (
	"errors"
	"goshawkdb.io/client"
)

type Queue struct {
	conn   *client.Connection
	objRef client.ObjectRef
}

const (
	queueHead = 0
	queueTail = 1

	cellValue = 0
	cellNext  = 1
)

func NewQueue(conn *client.Connection) (*Queue, error) {
	result, _, err := conn.RunTransaction(func(txn *client.Txn) (interface{}, error) {
		roots, err := txn.GetRootObjects()
		if err != nil {
			return nil, err
		}
		rootObj, found := roots["myRoot1"]
		if !found {
			return nil, errors.New("No root 'myRoot1' found")
		}
		err = rootObj.Set([]byte{})
		if err != nil {
			return nil, err
		}
		return rootObj, nil
	})
	if err != nil {
		return nil, err
	}
	return &Queue{
		conn:   conn,
		objRef: result.(client.ObjectRef),
	}, nil
}

func (q *Queue) Append(item client.ObjectRef) error {
	_, _, err := q.conn.RunTransaction(func(txn *client.Txn) (interface{}, error) {
		queue, err := txn.GetObject(q.objRef)
		if err != nil {
			return nil, err
		}
		// create a new cell that only has a ref to the value being appended.
		cell, err := txn.CreateObject([]byte{}, item)
		if err != nil {
			return nil, err
		}
		queueReferences, err := queue.References()
		if err != nil {
			return nil, err
		}
		if len(queueReferences) == 0 {
			// queue is completely empty: set both head and tail at once.
			return nil, queue.Set([]byte{}, cell, cell)

		} else {
			tailCell := queueReferences[queueTail]
			tailCellReferences, err := tailCell.References()
			if err != nil {
				return nil, err
			}
			// append our new cell to the refs of the current tail
			err = tailCell.Set([]byte{}, append(tailCellReferences, cell)...)
			if err != nil {
				return nil, err
			}
			// update the queue tail to point at the new tail.
			queueReferences[queueTail] = cell
			return nil, queue.Set([]byte{}, queueReferences...)
		}
	})
	return err
}

So we have an overall queue object that contains no value but a reference to the queue head and tail cells, and then we have the cells each of which has a reference to their item in the queue, and a reference to the next cell. Because of the use of transactions and the fact GoshawkDB implements strong-serializability, if multiple clients are running this same code at the same time, appends will be safely serialized.

To consume from the queue is almost as simple. The only complexity is that if the queue is empty, we return client.Retry.

func (q *Queue) Dequeue() (*client.ObjectRef, error) {
	result, _, err := q.conn.RunTransaction(func(txn *client.Txn) (interface{}, error) {
		queue, err := txn.GetObject(q.objRef)
		if err != nil {
			return nil, err
		}
		queueReferences, err := queue.References()
		if err != nil {
			return nil, err
		}
		if len(queueReferences) == 0 {
			// queue is completely empty; Let's wait until it's not!
			return client.Retry, nil

		} else {
			headCell := queueReferences[queueHead]
			headCellReferences, err := headCell.References()
			if err != nil {
				return nil, err
			}
			item := headCellReferences[cellValue]
			if len(headCellReferences) == 1 {
				// there's only one item in the queue and we've just
				// consumed it, so remove all references from the queue
				return item, queue.Set([]byte{})

			} else {
				// the queue head should point to the next cell. The queue
				// tail doesn't change.
				queueReferences[queueHead] = headCellReferences[cellNext]
				return item, queue.Set([]byte{}, queueReferences...)
			}
		}
	})
	if err != nil {
		return nil, err
	}
	itemRef := result.(client.ObjectRef)
	return &itemRef, nil
}

And that is all there is to it: short and simple. Let's try it out! First we need to add a Clone method to Queue so that we can use the same queue from multiple concurrent connections.

func (q *Queue) Clone(conn *client.Connection) *Queue {
	return &Queue{
		conn:   conn,
		objRef: q.objRef,
	}
}

Now we can write our main. We're going to have flexible number of producers and consumers.

package main

import (
	"fmt"
	"goshawkdb.io/client"
	"goshawkdb.io/examples/howto/queue"
	"log"
)

const (
	clusterCertPEM      = `...`
	clientCertAndKeyPEM = `...`
)

func main() {
	consumerCount := 2
	producerCount := 3

	productionLimit := 10

	connections := make([]*client.Connection, consumerCount+producerCount)
	consumers := connections[:consumerCount]
	producers := connections[consumerCount:]

	for i := range connections {
		conn, err := client.NewConnection("localhost", []byte(clientCertAndKeyPEM), []byte(clusterCertPEM))
		if err != nil {
			log.Fatal(err)
		}
		connections[i] = conn
		defer conn.Shutdown()
	}

	q, err := queue.NewQueue(connections[0])
	if err != nil {
		log.Fatal(err)
	}

	for i, conn := range consumers {
		consumer := i
		connection := conn
		go consume(consumer, connection, q)
	}

	for i, conn := range producers {
		producer := i
		connection := conn
		go produce(producer, connection, q, productionLimit)
	}

	// wait forever
	c := make(chan struct{})
	<-c
}

func consume(consumerId int, conn *client.Connection, q *queue.Queue) {
	q = q.Clone(conn)
	for {
		result, _, err := conn.RunTransaction(func(txn *client.Txn) (interface{}, error) {
			item, err := q.Dequeue()
			if err != nil {
				return nil, err
			}
			itemValue, err := item.Value()
			if err != nil {
				return nil, err
			}
			return itemValue, nil
		})
		if err != nil {
			log.Fatal(err)
		}
		fmt.Printf("Consumer %v dequeued '%s'\n", consumerId, result)
	}
}

func produce(producerId int, conn *client.Connection, q *queue.Queue, limit int) {
	q = q.Clone(conn)
	for i := 0; i < limit; i++ {
		_, _, err := conn.RunTransaction(func(txn *client.Txn) (interface{}, error) {
			itemValue := []byte(fmt.Sprintf("producer %v item %v", producerId, i))
			item, err := txn.CreateObject(itemValue)
			if err != nil {
				return nil, err
			}
			return nil, q.Append(item)
		})
		if err != nil {
			log.Fatal(err)
		}
	}
}

Once again we make use of nested transactions to show how APIs can be composed safely through nested transactions. However, in this case it's not strictly necessary: there's no intrinsic reason why the item value should be created in the same transaction as it's appended to the queue, and equally no reason why the item value should be read in the same transaction as it's dequeued. Running this:

> ./main
2015/12/20 20:06:06 Connection established to localhost:10001 (RM:1e2492f8)
2015/12/20 20:06:06 Connection established to localhost:10001 (RM:1e2492f8)
2015/12/20 20:06:06 Connection established to localhost:10001 (RM:1e2492f8)
2015/12/20 20:06:06 Connection established to localhost:10001 (RM:1e2492f8)
2015/12/20 20:06:07 Connection established to localhost:10001 (RM:1e2492f8)
Consumer 0 dequeued 'producer 1 item 0'
Consumer 0 dequeued 'producer 1 item 1'
Consumer 0 dequeued 'producer 1 item 2'
Consumer 0 dequeued 'producer 1 item 3'
Consumer 1 dequeued 'producer 2 item 0'
Consumer 1 dequeued 'producer 2 item 1'
Consumer 1 dequeued 'producer 2 item 2'
Consumer 1 dequeued 'producer 2 item 3'
Consumer 1 dequeued 'producer 2 item 4'
Consumer 1 dequeued 'producer 2 item 5'
Consumer 0 dequeued 'producer 2 item 6'
Consumer 1 dequeued 'producer 2 item 7'
Consumer 1 dequeued 'producer 2 item 8'
Consumer 1 dequeued 'producer 2 item 9'
Consumer 1 dequeued 'producer 1 item 4'
Consumer 1 dequeued 'producer 0 item 0'
Consumer 1 dequeued 'producer 0 item 1'
Consumer 1 dequeued 'producer 0 item 2'
Consumer 0 dequeued 'producer 0 item 3'
Consumer 1 dequeued 'producer 0 item 4'
Consumer 1 dequeued 'producer 0 item 5'
Consumer 1 dequeued 'producer 0 item 6'
Consumer 1 dequeued 'producer 0 item 7'
Consumer 1 dequeued 'producer 0 item 8'
Consumer 1 dequeued 'producer 0 item 9'
Consumer 1 dequeued 'producer 1 item 5'
Consumer 1 dequeued 'producer 1 item 6'
Consumer 1 dequeued 'producer 1 item 7'
Consumer 1 dequeued 'producer 1 item 8'
Consumer 1 dequeued 'producer 1 item 9'

Running this, we see that 30 items were dequeued and they're all different items which is good: we don't have duplication or items going missing. We see that the two different consumers each dequeued some items. Every item was dequeued in an order which matches our expectations: whilst the items from different producers can be interleaved in any order, items from the same producer are dequeued in the same order they were appended.

If you ramp up the number of producers and consumers you'll probably see that performance tails off. This is expected and shows the effect of contention. In the case of producers they're all fighting to commit a transaction that writes to the tail of the queue and if multiple transactions that do that arrive at the same time then only one can commit: the others will have to be restarted. Similarly, multiple consumers are all attempting to modify the head of the queue.

Needless to say, GoshawkDB was not designed as a queuing middleware product: indeed no database is going to match a dedicated message queuing product. However, the retry functionality is very useful and powerful, and means you can safely signal between different clients and avoid having to poll.