From 879a33290d5ddfdef201d6db4213f5fc9a463d17 Mon Sep 17 00:00:00 2001 From: Manish R Jain <manishrjain@gmail.com> Date: Thu, 19 Nov 2015 18:49:13 +1100 Subject: [PATCH] Loads of fixes and tweaks to make data loading work. Largely, the system is really fast in inserting data, but memory becomes a problem. So, running continuous memory checks, and if the db exceeds set memory, then stopping in world in batch load mode to deal with it first. In server mode, same code runs but without stopping the world. --- concurrent/map.go | 203 ----------------------------------------- concurrent/map_test.go | 78 ---------------- loader/loader.go | 23 ++++- posting/list.go | 67 ++++++++++---- posting/list_test.go | 2 +- posting/lists.go | 192 +++++++++++++++++++++++++++----------- posting/worker.go | 2 +- posting/worker_test.go | 16 ++-- query/query_test.go | 24 ++--- server/loader/main.go | 5 +- uid/assigner.go | 6 +- 11 files changed, 238 insertions(+), 380 deletions(-) delete mode 100644 concurrent/map.go delete mode 100644 concurrent/map_test.go diff --git a/concurrent/map.go b/concurrent/map.go deleted file mode 100644 index 05844df9..00000000 --- a/concurrent/map.go +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright 2015 Manish R Jain <manishrjain@gmail.com> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package concurrent - -import ( - "log" - "math/rand" - "sync/atomic" - "unsafe" -) - -type kv struct { - k uint64 - v unsafe.Pointer -} - -type bucket struct { - elems [8]kv -} - -const ( - MUTABLE = iota - IMMUTABLE -) - -type container struct { - status int32 - sz uint64 - list []*bucket - numElems uint32 -} - -type Map struct { - cs [2]unsafe.Pointer - size uint32 -} - -func powOf2(sz int) bool { - return sz > 0 && (sz&(sz-1)) == 0 -} - -func initContainer(cs *container, sz uint64) { - cs.status = MUTABLE - cs.sz = sz - cs.list = make([]*bucket, sz) - for i := range cs.list { - cs.list[i] = new(bucket) - } -} - -func NewMap(sz int) *Map { - if !powOf2(sz) { - log.Fatal("Map can only be created for a power of 2.") - } - - c := new(container) - initContainer(c, uint64(sz)) - - m := new(Map) - m.cs[MUTABLE] = unsafe.Pointer(c) - m.cs[IMMUTABLE] = nil - return m -} - -func (c *container) get(k uint64) unsafe.Pointer { - bi := k & (c.sz - 1) - b := c.list[bi] - for i := range b.elems { - e := &b.elems[i] - if ek := atomic.LoadUint64(&e.k); ek == k { - return e.v - } - } - return nil -} - -func (c *container) getOrInsert(k uint64, v unsafe.Pointer) unsafe.Pointer { - bi := k & (c.sz - 1) - b := c.list[bi] - for i := range b.elems { - e := &b.elems[i] - // Once allocated a valid key, it would never change. So, first check if - // it's allocated. If not, then allocate it. If can't, or not allocated, - // then check if it's k. If it is, then replace value. Otherwise continue. - // This sequence could be problematic, if this happens: - // Main thread runs Step 1. Check - if atomic.CompareAndSwapUint64(&e.k, 0, k) { // Step 1. - atomic.AddUint32(&c.numElems, 1) - if atomic.CompareAndSwapPointer(&e.v, nil, v) { - return v - } - return atomic.LoadPointer(&e.v) - } - - if atomic.LoadUint64(&e.k) == k { - // Swap if previous pointer is nil. - if atomic.CompareAndSwapPointer(&e.v, nil, v) { - return v - } - return atomic.LoadPointer(&e.v) - } - } - return nil -} - -func (m *Map) GetOrInsert(k uint64, v unsafe.Pointer) unsafe.Pointer { - if v == nil { - log.Fatal("GetOrInsert doesn't allow setting nil pointers.") - return nil - } - - // Check immutable first. - cval := atomic.LoadPointer(&m.cs[IMMUTABLE]) - if cval != nil { - c := (*container)(cval) - if pv := c.get(k); pv != nil { - return pv - } - } - - // Okay, deal with mutable container now. - cval = atomic.LoadPointer(&m.cs[MUTABLE]) - if cval == nil { - log.Fatal("This is disruptive in a bad way.") - } - c := (*container)(cval) - if pv := c.getOrInsert(k, v); pv != nil { - return pv - } - - // We still couldn't insert the key. Time to grow. - // TODO: Handle this case. - return nil -} - -func (m *Map) SetNilIfPresent(k uint64) bool { - for _, c := range m.cs { - if atomic.LoadInt32(&c.status) == 0 { - continue - } - bi := k & (c.sz - 1) - b := c.list[bi] - for i := range b.elems { - e := &b.elems[i] - if atomic.LoadUint64(&e.k) == k { - // Set to nil. - atomic.StorePointer(&e.v, nil) - return true - } - } - } - return false -} - -func (m *Map) StreamUntilCap(ch chan uint64) { - for { - ci := rand.Intn(2) - c := m.cs[ci] - if atomic.LoadInt32(&c.status) == 0 { - ci += 1 - c = m.cs[ci%2] // use the other. - } - bi := rand.Intn(int(c.sz)) - - for _, e := range c.list[bi].elems { - if len(ch) >= cap(ch) { - return - } - if k := atomic.LoadUint64(&e.k); k > 0 { - ch <- k - } - } - } -} - -func (m *Map) StreamAll(ch chan uint64) { - for _, c := range m.cs { - if atomic.LoadInt32(&c.status) == 0 { - continue - } - for i := 0; i < int(c.sz); i++ { - for _, e := range c.list[i].elems { - if k := atomic.LoadUint64(&e.k); k > 0 { - ch <- k - } - } - } - } -} diff --git a/concurrent/map_test.go b/concurrent/map_test.go deleted file mode 100644 index 7090eff5..00000000 --- a/concurrent/map_test.go +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2015 Manish R Jain <manishrjain@gmail.com> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package concurrent - -import ( - "math/rand" - "testing" - "unsafe" - - "github.com/dgraph-io/dgraph/posting" - "github.com/zond/gotomic" -) - -func TestGetAndPut(t *testing.T) { - m := NewMap(1024) - var i uint64 - for i = 1; i < 100; i++ { - v := new(uint64) - *v = i - b := unsafe.Pointer(v) - if ok := m.Put(i, b); !ok { - t.Errorf("Couldn't put key: %v", i) - } - } - for i = 1; i < 100; i++ { - p := m.Get(i) - v := (*uint64)(p) - if v == nil { - t.Errorf("Didn't expect nil for i: %v", i) - return - } - if *v != i { - t.Errorf("Expected: %v. Got: %v", i, *v) - } - } -} - -func BenchmarkGetAndPut(b *testing.B) { - m := NewMap(1 << 16) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - key := uint64(rand.Int63()) - p := m.Get(key) - if p == nil { - l := posting.NewList() - m.Put(key, unsafe.Pointer(l)) - } - } - }) -} - -func BenchmarkGotomic(b *testing.B) { - h := gotomic.NewHash() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - key := uint64(rand.Int63()) - _, has := h.Get(gotomic.IntKey(key)) - if !has { - l := posting.NewList() - h.Put(gotomic.IntKey(key), l) - } - } - }) -} diff --git a/loader/loader.go b/loader/loader.go index b02024e6..0ff5353e 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -50,10 +50,15 @@ type state struct { } func (s *state) printCounters(ticker *time.Ticker) { + var prev uint64 for _ = range ticker.C { + processed := atomic.LoadUint64(&s.ctr.processed) + if prev == processed { + continue + } + prev = processed parsed := atomic.LoadUint64(&s.ctr.parsed) ignored := atomic.LoadUint64(&s.ctr.ignored) - processed := atomic.LoadUint64(&s.ctr.processed) pending := parsed - ignored - processed glog.WithFields(logrus.Fields{ "read": atomic.LoadUint64(&s.ctr.read), @@ -123,13 +128,21 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { } edge, err := nq.ToEdge() - if err != nil { - glog.WithError(err).WithField("nq", nq).Error("While converting to edge") - return + for err != nil { + // Just put in a retry loop to tackle temporary errors. + if err == posting.E_TMP_ERROR { + time.Sleep(time.Microsecond) + + } else { + glog.WithError(err).WithField("nq", nq). + Error("While converting to edge") + return + } + edge, err = nq.ToEdge() } key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.Get(key) + plist := posting.GetOrCreate(key) plist.AddMutation(edge, posting.Set) atomic.AddUint64(&s.ctr.processed, 1) } diff --git a/posting/list.go b/posting/list.go index 638c59b7..bbca733f 100644 --- a/posting/list.go +++ b/posting/list.go @@ -24,7 +24,9 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" + "unsafe" "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" @@ -40,6 +42,12 @@ var glog = x.Log("posting") const Set = 0x01 const Del = 0x02 +var E_TMP_ERROR = errors.New("Temporary Error. Please retry.") + +type buffer struct { + d []byte +} + type MutationLink struct { idx int moveidx int @@ -50,11 +58,12 @@ type List struct { sync.RWMutex key []byte hash uint32 - buffer []byte + pbuffer unsafe.Pointer pstore *store.Store // postinglist store clog *commit.Logger lastCompact time.Time wg sync.WaitGroup + deleteMe bool // Mutations mlayer map[int]types.Posting // stores only replace instructions. @@ -212,15 +221,7 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { l.pstore = pstore l.clog = clog - var err error - if l.buffer, err = pstore.Get(key); err != nil { - // glog.Debugf("While retrieving posting list from db: %v\n", err) - // Error. Just set to empty. - l.buffer = make([]byte, len(empty)) - copy(l.buffer, empty) - } - - posting := types.GetRootAsPostingList(l.buffer, 0) + posting := l.getPostingList() l.maxMutationTs = posting.CommitTs() l.hash = farm.Fingerprint32(key) l.mlayer = make(map[int]types.Posting) @@ -248,12 +249,36 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { glog.WithError(err).Error("While streaming entries.") } glog.Debug("Done streaming entries.") - // l.regenerateIndex() +} + +// There's no need for lock acquisition for this. +func (l *List) getPostingList() *types.PostingList { + pb := atomic.LoadPointer(&l.pbuffer) + buf := (*buffer)(pb) + + if buf == nil || len(buf.d) == 0 { + nbuf := new(buffer) + var err error + if nbuf.d, err = l.pstore.Get(l.key); err != nil { + // glog.Debugf("While retrieving posting list from db: %v\n", err) + // Error. Just set to empty. + nbuf.d = make([]byte, len(empty)) + copy(nbuf.d, empty) + } + if atomic.CompareAndSwapPointer(&l.pbuffer, pb, unsafe.Pointer(nbuf)) { + return types.GetRootAsPostingList(nbuf.d, 0) + + } else { + // Someone else replaced the pointer in the meantime. Retry recursively. + return l.getPostingList() + } + } + return types.GetRootAsPostingList(buf.d, 0) } // Caller must hold at least a read lock. func (l *List) lePostingIndex(maxUid uint64) (int, uint64) { - posting := types.GetRootAsPostingList(l.buffer, 0) + posting := l.getPostingList() left, right := 0, posting.PostingsLength()-1 sofar := -1 p := new(types.Posting) @@ -447,7 +472,7 @@ func (l *List) mergeMutation(mp *types.Posting) { // Caller must hold at least a read lock. func (l *List) length() int { - plist := types.GetRootAsPostingList(l.buffer, 0) + plist := l.getPostingList() return plist.PostingsLength() + l.mdelta } @@ -469,7 +494,7 @@ func (l *List) Get(p *types.Posting, i int) bool { // Caller must hold at least a read lock. func (l *List) get(p *types.Posting, i int) bool { - plist := types.GetRootAsPostingList(l.buffer, 0) + plist := l.getPostingList() if len(l.mindex) == 0 { if val, ok := l.mlayer[i]; ok { *p = val @@ -529,6 +554,13 @@ func (l *List) get(p *types.Posting, i int) bool { return plist.Postings(p, newidx) } +func (l *List) SetForDeletion() { + l.wg.Wait() + l.Lock() + defer l.Unlock() + l.deleteMe = true +} + // In benchmarks, the time taken per AddMutation before was // plateauing at 2.5 ms with sync per 10 log entries, and increasing // for sync per 100 log entries (to 3 ms per AddMutation), largely because @@ -548,6 +580,9 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { l.wg.Wait() l.Lock() defer l.Unlock() + if l.deleteMe { + return E_TMP_ERROR + } if t.Timestamp.UnixNano() < l.maxMutationTs { return fmt.Errorf("Mutation ts lower than committed ts.") @@ -629,13 +664,13 @@ func (l *List) merge() error { end := types.PostingListEnd(b) b.Finish(end) - l.buffer = b.Bytes[b.Head():] - if err := l.pstore.SetOne(l.key, l.buffer); err != nil { + if err := l.pstore.SetOne(l.key, b.Bytes[b.Head():]); err != nil { glog.WithField("error", err).Errorf("While storing posting list") return err } // Now reset the mutation variables. + atomic.StorePointer(&l.pbuffer, nil) // Make prev buffer eligible for GC. l.lastCompact = time.Now() l.mlayer = make(map[int]types.Posting) l.mdelta = 0 diff --git a/posting/list_test.go b/posting/list_test.go index 42773a05..7c608107 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -33,7 +33,7 @@ import ( func checkUids(t *testing.T, l *List, uids ...uint64) error { if l.Length() != len(uids) { - return fmt.Errorf("Length: %d", l.Length()) + return fmt.Errorf("Expected: %d. Length: %d", len(uids), l.Length()) } for i := 0; i < len(uids); i++ { var p types.Posting diff --git a/posting/lists.go b/posting/lists.go index 0b7a79a1..0edf3820 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -17,25 +17,28 @@ package posting import ( + "math/rand" + "runtime" "sync" "sync/atomic" "time" - "unsafe" "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" - "github.com/dgraph-io/dgraph/concurrent" "github.com/dgraph-io/dgraph/store" "github.com/dgryski/go-farm" + "github.com/zond/gotomic" ) type counters struct { + ticker *time.Ticker added uint64 merged uint64 } func (c *counters) periodicLog() { - for _ = range time.Tick(time.Second) { + for _ = range c.ticker.C { + mapSize := lhmap.Size() added := atomic.LoadUint64(&c.added) merged := atomic.LoadUint64(&c.merged) pending := added - merged @@ -44,23 +47,59 @@ func (c *counters) periodicLog() { "added": added, "merged": merged, "pending": pending, - }).Info("Merge counters") + "mapsize": mapSize, + }).Info("List Merge counters") } } -var lcmap *concurrent.Map +var MAX_MEMORY uint64 +var MIB uint64 + +func checkMemoryUsage() { + MIB = 1 << 20 + MAX_MEMORY = 2 * (1 << 30) + + for _ = range time.Tick(5 * time.Second) { + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + if ms.Alloc < MAX_MEMORY { + continue + } + + // Okay, we exceed the max memory threshold. + // Stop the world, and deal with this first. + stopTheWorld.Lock() + megs := ms.Alloc / MIB + glog.WithField("allocated_MB", megs). + Info("Memory usage over threshold. STOPPED THE WORLD!") + + glog.Info("Calling merge on all lists.") + MergeLists(100 * runtime.GOMAXPROCS(-1)) + + glog.Info("Merged lists. Calling GC.") + runtime.GC() // Call GC to do some cleanup. + + runtime.ReadMemStats(&ms) + megs = ms.Alloc / MIB + glog.WithField("allocated_MB", megs). + Info("Memory Usage after calling GC.") + stopTheWorld.Unlock() + } +} + +var stopTheWorld sync.RWMutex +var lhmap *gotomic.Hash var pstore *store.Store var clog *commit.Logger -var ch chan uint64 var lc *lcounters func Init(posting *store.Store, log *commit.Logger) { - lcmap = concurrent.NewMap(1 << 20) + lhmap = gotomic.NewHash() pstore = posting clog = log - ch = make(chan uint64, 10000) lc = new(lcounters) go lc.periodicLog() + go checkMemoryUsage() } type lcounters struct { @@ -77,73 +116,124 @@ func (lc *lcounters) periodicLog() { } } -func Get(key []byte) *List { +func GetOrCreate(key []byte) *List { + stopTheWorld.RLock() + defer stopTheWorld.RUnlock() + uid := farm.Fingerprint64(key) - lp := lcmap.Get(uid) - if lp == nil { - l := NewList() - l.init(key, pstore, clog) - lcmap.Put(uid, unsafe.Pointer(l)) + ukey := gotomic.IntKey(uid) + lp, _ := lhmap.Get(ukey) + if lp != nil { + return lp.(*List) + } + + l := NewList() + l.init(key, pstore, clog) + if inserted := lhmap.PutIfMissing(ukey, l); inserted { return l + } else { + lp, _ = lhmap.Get(ukey) + return lp.(*List) } - return (*List)(lp) } -/* -func periodicQueueForProcessing(c *counters) { - ticker := time.NewTicker(time.Minute) - for _ = range ticker.C { - lmap.StreamUntilCap(ch) +func processOne(k gotomic.Hashable, c *counters) { + ret, _ := lhmap.Delete(k) + l := ret.(*List) + if l == nil { + return } + + l.SetForDeletion() // No more AddMutation. + if err := l.MergeIfDirty(); err != nil { + glog.WithError(err).Error("While commiting dirty list.") + } + atomic.AddUint64(&c.merged, 1) } -*/ -func process(c *counters, wg *sync.WaitGroup) { - for eid := range ch { - lp := lcmap.Get(eid) - if lp == nil { - continue - } - atomic.AddUint64(&c.merged, 1) - l := (*List)(lp) - if err := l.MergeIfDirty(); err != nil { - glog.WithError(err).Error("While commiting dirty list.") - } +// For on-demand merging of all lists. +func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) { + for l := range ch { + processOne(l, c) } if wg != nil { wg.Done() } } -func periodicProcess(c *counters) { - ticker := time.NewTicker(100 * time.Millisecond) - for _ = range ticker.C { - process(c, nil) - } -} - -func queueAll(c *counters) { - lcmap.StreamAll(ch) +func queueAll(ch chan gotomic.Hashable, c *counters) { + lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool { + ch <- k + atomic.AddUint64(&c.added, 1) + return false // If this returns true, Each would break. + }) close(ch) } -/* -func StartPeriodicMerging() { - ctr := new(counters) - go periodicQueueForProcessing(ctr) - go periodicProcess(ctr) -} -*/ - func MergeLists(numRoutines int) { + ch := make(chan gotomic.Hashable, 10000) c := new(counters) + c.ticker = time.NewTicker(time.Second) go c.periodicLog() - go queueAll(c) + go queueAll(ch, c) wg := new(sync.WaitGroup) for i := 0; i < numRoutines; i++ { wg.Add(1) - go process(c, wg) + go process(ch, c, wg) } wg.Wait() + c.ticker.Stop() +} + +// For periodic merging of lists. +func queueRandomLists(ch chan gotomic.Hashable, c *counters) { + var buf []gotomic.Hashable + var count int + needed := cap(ch) - len(ch) + if needed < 100 { + return + } + + // Generate a random list of + lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool { + if count < needed { + buf = append(buf, k) + + } else { + j := rand.Intn(count) + if j < len(buf) { + buf[j] = k + } + } + count += 1 + return false + }) + + for _, k := range buf { + ch <- k + atomic.AddUint64(&c.added, 1) + } +} + +func periodicQueueForProcessing(ch chan gotomic.Hashable, c *counters) { + ticker := time.NewTicker(time.Minute) + for _ = range ticker.C { + queueRandomLists(ch, c) + } +} + +func periodicProcess(ch chan gotomic.Hashable, c *counters) { + ticker := time.NewTicker(100 * time.Millisecond) + for _ = range ticker.C { + hid := <-ch + processOne(hid, c) + } +} + +func StartPeriodicMerging() { + ctr := new(counters) + ch := make(chan gotomic.Hashable, 10000) + go periodicQueueForProcessing(ch, ctr) + go periodicProcess(ch, ctr) } diff --git a/posting/worker.go b/posting/worker.go index 87377951..77987671 100644 --- a/posting/worker.go +++ b/posting/worker.go @@ -19,7 +19,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) { for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) key := Key(uid, attr) - pl := Get(key) + pl := GetOrCreate(key) task.ValueStart(b) var valoffset flatbuffers.UOffsetT diff --git a/posting/worker_test.go b/posting/worker_test.go index 25b1c234..2aabe52f 100644 --- a/posting/worker_test.go +++ b/posting/worker_test.go @@ -63,22 +63,22 @@ func TestProcessTask(t *testing.T) { Source: "author0", Timestamp: time.Now(), } - addEdge(t, edge, Get(Key(10, "friend"))) - addEdge(t, edge, Get(Key(11, "friend"))) - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(10, "friend"))) + addEdge(t, edge, GetOrCreate(Key(11, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) edge.ValueId = 25 - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) edge.ValueId = 26 - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) edge.ValueId = 31 - addEdge(t, edge, Get(Key(10, "friend"))) - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(10, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) edge.Value = "photon" - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) query := NewQuery("friend", []uint64{10, 11, 12}) result, err := ProcessTask(query) diff --git a/query/query_test.go b/query/query_test.go index 85671af9..e60c3084 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -214,42 +214,42 @@ func populateGraph(t *testing.T) { Source: "testing", Timestamp: time.Now(), } - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) edge.ValueId = 24 - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) edge.ValueId = 25 - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) edge.ValueId = 31 - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) edge.ValueId = 101 - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) // Now let's add a few properties for the main user. edge.Value = "Michonne" - addEdge(t, edge, posting.Get(posting.Key(1, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name"))) edge.Value = "female" - addEdge(t, edge, posting.Get(posting.Key(1, "gender"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender"))) edge.Value = "alive" - addEdge(t, edge, posting.Get(posting.Key(1, "status"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status"))) // Now let's add a name for each of the friends, except 101. edge.Value = "Rick Grimes" - addEdge(t, edge, posting.Get(posting.Key(23, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name"))) edge.Value = "Glenn Rhee" - addEdge(t, edge, posting.Get(posting.Key(24, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name"))) edge.Value = "Daryl Dixon" - addEdge(t, edge, posting.Get(posting.Key(25, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name"))) edge.Value = "Andrea" - addEdge(t, edge, posting.Get(posting.Key(31, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"))) } func TestProcessGraph(t *testing.T) { diff --git a/server/loader/main.go b/server/loader/main.go index 03ddb90c..70f662de 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -58,7 +58,8 @@ func main() { } logrus.SetLevel(logrus.InfoLevel) - glog.WithField("gomaxprocs", runtime.GOMAXPROCS(-1)).Info("Number of CPUs") + numCpus := runtime.GOMAXPROCS(-1) + glog.WithField("gomaxprocs", numCpus).Info("Number of CPUs") if len(*rdfGzips) == 0 { glog.Fatal("No RDF GZIP files specified") @@ -98,5 +99,5 @@ func main() { f.Close() } glog.Info("Calling merge lists") - posting.MergeLists(10000) + posting.MergeLists(100 * numCpus) // 100 per core. } diff --git a/uid/assigner.go b/uid/assigner.go index c2b85fe8..85ad3950 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -104,7 +104,7 @@ func allocateUniqueUid(xid string) (uid uint64, rerr error) { // Check if this uid has already been allocated. key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.Get(key) + pl := posting.GetOrCreate(key) if pl.Length() > 0 { // Something already present here. @@ -175,7 +175,7 @@ func stringKey(xid string) []byte { func GetOrAssign(xid string) (uid uint64, rerr error) { key := stringKey(xid) - pl := posting.Get(key) + pl := posting.GetOrCreate(key) if pl.Length() == 0 { return assignNew(pl, xid) @@ -196,7 +196,7 @@ func GetOrAssign(xid string) (uid uint64, rerr error) { func ExternalId(uid uint64) (xid string, rerr error) { key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.Get(key) + pl := posting.GetOrCreate(key) if pl.Length() == 0 { return "", errors.New("NO external id") } -- GitLab