Skip to content
Snippets Groups Projects
Commit d47d5c69 authored by Manish R Jain's avatar Manish R Jain
Browse files

Posting List can store posting lists and mutations in/from different stores.

parent 84697103
No related branches found
No related tags found
No related merge requests found
......@@ -21,6 +21,7 @@ import (
"github.com/google/flatbuffers/go"
"github.com/manishrjain/dgraph/posting/types"
"github.com/manishrjain/dgraph/store"
"github.com/manishrjain/dgraph/x"
linked "container/list"
......@@ -32,9 +33,12 @@ const Set = 0x01
const Del = 0x02
type List struct {
key []byte
mutex sync.RWMutex
buffer []byte
mutations []byte
pstore *store.Store // postinglist store
mstore *store.Store // mutation store
}
func addTripleToPosting(b *flatbuffers.Builder,
......@@ -61,21 +65,38 @@ func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
var empty []byte
func (l *List) Init() {
// package level init
func init() {
b := flatbuffers.NewBuilder(0)
types.PostingListStart(b)
of := types.PostingListEnd(b)
b.Finish(of)
empty = b.Bytes[b.Head():]
}
func (l *List) Init(key []byte, pstore, mstore *store.Store) {
l.mutex.Lock()
defer l.mutex.Unlock()
if len(empty) == 0 {
b := flatbuffers.NewBuilder(0)
types.PostingListStart(b)
of := types.PostingListEnd(b)
b.Finish(of)
empty = b.Bytes[b.Head():]
log.Fatal("empty should have some bytes.")
}
l.key = key
l.pstore = pstore
l.mstore = mstore
var err error
if l.buffer, err = pstore.Get(key); err != nil {
log.Errorf("While retrieving posting list from db: %v\n", err)
l.buffer = make([]byte, len(empty))
copy(l.buffer, empty)
}
if l.mutations, err = mstore.Get(key); err != nil {
log.Debugf("While retrieving mutation list from db: %v\n", err)
l.mutations = make([]byte, len(empty))
copy(l.mutations, empty)
}
l.buffer = make([]byte, len(empty))
l.mutations = make([]byte, len(empty))
copy(l.buffer, empty)
copy(l.mutations, empty)
}
func (l *List) Root() *types.PostingList {
......@@ -85,7 +106,7 @@ func (l *List) Root() *types.PostingList {
return types.GetRootAsPostingList(l.buffer, 0)
}
func (l *List) AddMutation(t x.Triple, op byte) {
func (l *List) AddMutation(t x.Triple, op byte) error {
l.mutex.Lock()
defer l.mutex.Unlock()
......@@ -114,6 +135,7 @@ func (l *List) AddMutation(t x.Triple, op byte) {
b.Finish(end)
l.mutations = b.Bytes[b.Head():]
return l.mstore.SetOne(l.key, l.mutations)
}
func addOrSet(ll *linked.List, p *types.Posting) {
......@@ -180,7 +202,7 @@ func (l *List) generateLinkedList() *linked.List {
return ll
}
func (l *List) Commit() {
func (l *List) Commit() error {
l.mutex.Lock()
defer l.mutex.Unlock()
......@@ -206,6 +228,16 @@ func (l *List) Commit() {
b.Finish(end)
l.buffer = b.Bytes[b.Head():]
if err := l.pstore.SetOne(l.key, l.buffer); err != nil {
log.WithField("error", err).Errorf("While storing posting list")
return err
}
if err := l.mstore.Delete(l.key); err != nil {
log.WithField("error", err).Errorf("While deleting mutation list")
return err
}
l.mutations = make([]byte, len(empty))
copy(l.mutations, empty)
return nil
}
......@@ -17,10 +17,13 @@
package posting
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/manishrjain/dgraph/posting/types"
"github.com/manishrjain/dgraph/store"
"github.com/manishrjain/dgraph/x"
)
......@@ -40,17 +43,42 @@ func checkUids(t *testing.T, l List, uids ...uint64) {
}
}
func NewStore(t *testing.T) string {
path, err := ioutil.TempDir("", "storetest_")
if err != nil {
t.Error(err)
t.Fail()
return ""
}
return path
}
func TestAddTriple(t *testing.T) {
var l List
l.Init()
key := store.Key("name", 1)
pdir := NewStore(t)
defer os.RemoveAll(pdir)
ps := new(store.Store)
ps.Init(pdir)
mdir := NewStore(t)
defer os.RemoveAll(mdir)
ms := new(store.Store)
ms.Init(mdir)
l.Init(key, ps, ms)
triple := x.Triple{
ValueId: 9,
Source: "testing",
Timestamp: time.Now(),
}
l.AddMutation(triple, Set)
l.Commit()
if err := l.AddMutation(triple, Set); err != nil {
t.Error(err)
}
if err := l.Commit(); err != nil {
t.Error(err)
}
if l.Root().PostingsLength() != 1 {
t.Error("Unable to find added elements in posting list")
......@@ -93,21 +121,33 @@ func TestAddTriple(t *testing.T) {
9, 49, 81,
}
triple.ValueId = 49
l.AddMutation(triple, Set)
l.Commit()
if err := l.AddMutation(triple, Set); err != nil {
t.Error(err)
}
if err := l.Commit(); err != nil {
t.Error(err)
}
checkUids(t, l, uids...)
// Delete a triple, add a triple, replace a triple
triple.ValueId = 49
l.AddMutation(triple, Del)
if err := l.AddMutation(triple, Del); err != nil {
t.Error(err)
}
triple.ValueId = 69
l.AddMutation(triple, Set)
if err := l.AddMutation(triple, Set); err != nil {
t.Error(err)
}
triple.ValueId = 9
triple.Source = "anti-testing"
l.AddMutation(triple, Set)
l.Commit()
if err := l.AddMutation(triple, Set); err != nil {
t.Error(err)
}
if err := l.Commit(); err != nil {
t.Error(err)
}
uids = []uint64{9, 69, 81}
checkUids(t, l, uids...)
......@@ -116,4 +156,9 @@ func TestAddTriple(t *testing.T) {
if string(p.Source()) != "anti-testing" {
t.Errorf("Expected: anti-testing. Got: %v", p.Source())
}
// Try reading the same data in another PostingList.
var dl List
dl.Init(key, ps, ms)
checkUids(t, dl, uids...)
}
......@@ -47,33 +47,29 @@ func (s *Store) IsNew(id uint64) bool {
}
// key = (attribute, entity id)
func key(attr string, eid uint64) (ret []byte, rerr error) {
func Key(attr string, eid uint64) []byte {
buf := new(bytes.Buffer)
buf.WriteString(attr)
if err := binary.Write(buf, binary.LittleEndian, eid); err != nil {
return ret, err
log.Fatalf("Error while creating key with attr: %v eid: %v\n", attr, eid)
}
return buf.Bytes(), nil
return buf.Bytes()
}
func (s *Store) Get(attr string, eid uint64) (val []byte, rerr error) {
k, err := key(attr, eid)
if err != nil {
return val, err
}
func (s *Store) Get(k []byte) (val []byte, rerr error) {
return s.db.Get(k, nil)
}
func (s *Store) SetOne(attr string, eid uint64, val []byte) error {
k, err := key(attr, eid)
if err != nil {
return err
}
func (s *Store) SetOne(k []byte, val []byte) error {
wb := new(leveldb.Batch)
wb.Put(k, val)
return s.db.Write(wb, nil)
}
func (s *Store) Delete(k []byte) error {
return s.db.Delete(k, nil)
}
func (s *Store) Close() error {
return s.db.Close()
}
......@@ -33,24 +33,25 @@ func TestGet(t *testing.T) {
var s Store
s.Init(path)
if err := s.SetOne("name", 1, []byte("neo")); err != nil {
k := Key("name", 1)
if err := s.SetOne(k, []byte("neo")); err != nil {
t.Error(err)
t.Fail()
}
if val, err := s.Get("name", 1); err != nil {
if val, err := s.Get(k); err != nil {
t.Error(err)
t.Fail()
} else if string(val) != "neo" {
t.Errorf("Expected 'neo'. Found: %s", string(val))
}
if err := s.SetOne("name", 1, []byte("the one")); err != nil {
if err := s.SetOne(k, []byte("the one")); err != nil {
t.Error(err)
t.Fail()
}
if val, err := s.Get("name", 1); err != nil {
if val, err := s.Get(k); err != nil {
t.Error(err)
t.Fail()
} else if string(val) != "the one" {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment