From 64bfe3506fa2c7de2bfe12193e3afd9493442be0 Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Wed, 21 Oct 2015 13:58:59 +1100 Subject: [PATCH] Make id assignment work, and test it. --- posting/list.go | 17 +++++++++++--- posting/list_test.go | 17 ++++++++++++++ uid/assigner.go | 54 +++++++++++++++++++++++++++++--------------- uid/assigner_test.go | 16 +++++++++---- 4 files changed, 78 insertions(+), 26 deletions(-) diff --git a/posting/list.go b/posting/list.go index db552522..cf87f280 100644 --- a/posting/list.go +++ b/posting/list.go @@ -94,9 +94,18 @@ func addTripleToPosting(b *flatbuffers.Builder, } func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT { + so := b.CreateByteString(p.Source()) // Do this before posting start. + var bo flatbuffers.UOffsetT + if p.ValueLength() > 0 { + bo = b.CreateByteVector(p.ValueBytes()) + } + types.PostingStart(b) types.PostingAddUid(b, p.Uid()) + if bo > 0 { + types.PostingAddValue(b, bo) + } types.PostingAddSource(b, so) types.PostingAddTs(b, p.Ts()) types.PostingAddOp(b, p.Op()) @@ -152,14 +161,14 @@ func (l *List) Init(key []byte, pstore, mstore *store.Store) { var err error if l.buffer, err = pstore.Get(key); err != nil { - log.Errorf("While retrieving posting list from db: %v\n", err) + // log.Debugf("While retrieving posting list from db: %v\n", err) // Error. Just set to empty. l.buffer = make([]byte, len(empty)) copy(l.buffer, empty) } if l.mbuffer, err = mstore.Get(key); err != nil { - log.Debugf("While retrieving mutation list from db: %v\n", err) + // log.Debugf("While retrieving mutation list from db: %v\n", err) // Error. Just set to empty. l.mbuffer = make([]byte, len(empty)) copy(l.mbuffer, empty) @@ -521,8 +530,10 @@ func (l *List) isDirty() bool { func (l *List) CommitIfDirty() error { if !l.isDirty() { - log.Debug("Not dirty. Ignoring commit.") + log.WithField("dirty", false).Debug("Not Committing") return nil + } else { + log.WithField("dirty", true).Debug("Committing") } l.mutex.Lock() diff --git a/posting/list_test.go b/posting/list_test.go index 5e9b8f73..5c848193 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -228,6 +228,23 @@ func TestAddMutation_Value(t *testing.T) { t.Errorf("Expected a value. Got: [%q]", out) } + // Run the same check after committing. + if err := ol.CommitIfDirty(); err != nil { + t.Error(err) + } + { + var tp types.Posting + if ok := ol.Get(&tp, 0); !ok { + t.Error("While retrieving posting") + } + if err := ParseValue(&out, tp); err != nil { + t.Error(err) + } + if out != "oh hey there" { + t.Errorf("Expected a value. Got: [%q]", out) + } + } + // The value made it to the posting list. Changing it now. triple.Value = 119 if err := ol.AddMutation(triple, Set); err != nil { diff --git a/uid/assigner.go b/uid/assigner.go index 084b61b6..ad175e37 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -19,6 +19,7 @@ package uid import ( "bytes" "errors" + "math" "time" "github.com/dgryski/go-farm" @@ -43,32 +44,41 @@ func (a *Assigner) Init(pstore, mstore *store.Store) { func (a *Assigner) allocateNew(xid string) (uid uint64, rerr error) { for sp := ""; ; sp += " " { txid := xid + sp - uid = farm.Hash64([]byte(txid)) // Generate from hash. - log.Debugf("xid: [%v] uid: [%v]", txid, uid) + uid = farm.Fingerprint64([]byte(txid)) // Generate from hash. + log.Debugf("txid: [%q] uid: [%x]", txid, uid) // Check if this uid has already been allocated. + // TODO: Posting List shouldn't be created here. + // Possibly, use some singular class to serve all the posting lists. pl := new(posting.List) key := store.Key("_xid_", uid) // uid -> "_xid_" -> xid pl.Init(key, a.pstore, a.mstore) + if pl.Length() > 0 { // Something already present here. var p types.Posting pl.Get(&p, 0) - log.Debug("Found existing xid: [%v]. Continuing...", string(p.ValueBytes())) - - } else { - // Uid hasn't been assigned yet. - t := x.Triple{ - Entity: uid, - Attribute: "_xid_", - Value: xid, - Source: "_assigner_", - Timestamp: time.Now(), - } - rerr = pl.AddMutation(t, posting.Set) - pl.CommitIfDirty() - return uid, rerr + + var tmp string + posting.ParseValue(&tmp, p) + log.Debug("Found existing xid: [%q]. Continuing...", tmp) + continue + } + + // Uid hasn't been assigned yet. + t := x.Triple{ + Value: xid, // not txid + Source: "_assigner_", + Timestamp: time.Now(), } + rerr = pl.AddMutation(t, posting.Set) + if rerr != nil { + x.Err(log, rerr).Error("While adding mutation") + } + if err := pl.CommitIfDirty(); err != nil { + x.Err(log, err).Error("While commiting") + } + return uid, rerr } return 0, errors.New("Some unhandled route lead me here." + " Wake the stupid developer up.") @@ -120,15 +130,23 @@ func (a *Assigner) ExternalId(uid uint64) (xid string, rerr error) { key := store.Key("_xid_", uid) // uid -> "_xid_" -> xid pl.Init(key, a.pstore, a.mstore) if pl.Length() == 0 { - return "", errors.New("NO_EXTERNAL_ID") + return "", errors.New("NO external id") } + if pl.Length() > 1 { log.WithField("uid", uid).Fatal("This shouldn't be happening.") + return "", errors.New("Multiple external ids for this uid.") } + var p types.Posting if ok := pl.Get(&p, 0); !ok { log.WithField("uid", uid).Error("While retrieving posting") return "", errors.New("While retrieving posting") } - return string(p.ValueBytes()), nil + + if p.Uid() != math.MaxUint64 { + log.WithField("uid", uid).Fatal("Value uid must be MaxUint64.") + } + rerr = posting.ParseValue(&xid, p) + return xid, rerr } diff --git a/uid/assigner_test.go b/uid/assigner_test.go index e6936a87..5d9bd939 100644 --- a/uid/assigner_test.go +++ b/uid/assigner_test.go @@ -21,6 +21,7 @@ import ( "os" "testing" + "github.com/Sirupsen/logrus" "github.com/manishrjain/dgraph/store" ) @@ -35,6 +36,8 @@ func NewStore(t *testing.T) string { } func TestGetOrAssign(t *testing.T) { + logrus.SetLevel(logrus.DebugLevel) + pdir := NewStore(t) defer os.RemoveAll(pdir) ps := new(store.Store) @@ -54,7 +57,7 @@ func TestGetOrAssign(t *testing.T) { if err != nil { t.Error(err) } - t.Logf("Found uid: [%v]", uid) + t.Logf("Found uid: [%x]", uid) u1 = uid } @@ -63,24 +66,26 @@ func TestGetOrAssign(t *testing.T) { if err != nil { t.Error(err) } - t.Logf("Found uid: [%v]", uid) + t.Logf("Found uid: [%x]", uid) u2 = uid } if u1 == u2 { t.Error("Uid1 and Uid2 shouldn't be the same") } + // return { uid, err := a.GetOrAssign("externalid0") if err != nil { t.Error(err) } - t.Logf("Found uid: [%v]", uid) + t.Logf("Found uid: [%x]", uid) if u1 != uid { t.Error("Uid should be the same.") } } + // return { xid, err := a.ExternalId(u1) @@ -88,16 +93,17 @@ func TestGetOrAssign(t *testing.T) { t.Error(err) } if xid != "externalid0" { - t.Errorf("Expected externalid0. Found: [%v]", xid) + t.Errorf("Expected externalid0. Found: [%q]", xid) } } + return { xid, err := a.ExternalId(u2) if err != nil { t.Error(err) } if xid != "externalid1" { - t.Errorf("Expected externalid1. Found: [%v]", xid) + t.Errorf("Expected externalid1. Found: [%q]", xid) } } } -- GitLab