diff --git a/concurrent/map.go b/concurrent/map.go deleted file mode 100644 index 05844df911e7551737ea4cd908c910850b35cd28..0000000000000000000000000000000000000000 --- a/concurrent/map.go +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright 2015 Manish R Jain <manishrjain@gmail.com> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package concurrent - -import ( - "log" - "math/rand" - "sync/atomic" - "unsafe" -) - -type kv struct { - k uint64 - v unsafe.Pointer -} - -type bucket struct { - elems [8]kv -} - -const ( - MUTABLE = iota - IMMUTABLE -) - -type container struct { - status int32 - sz uint64 - list []*bucket - numElems uint32 -} - -type Map struct { - cs [2]unsafe.Pointer - size uint32 -} - -func powOf2(sz int) bool { - return sz > 0 && (sz&(sz-1)) == 0 -} - -func initContainer(cs *container, sz uint64) { - cs.status = MUTABLE - cs.sz = sz - cs.list = make([]*bucket, sz) - for i := range cs.list { - cs.list[i] = new(bucket) - } -} - -func NewMap(sz int) *Map { - if !powOf2(sz) { - log.Fatal("Map can only be created for a power of 2.") - } - - c := new(container) - initContainer(c, uint64(sz)) - - m := new(Map) - m.cs[MUTABLE] = unsafe.Pointer(c) - m.cs[IMMUTABLE] = nil - return m -} - -func (c *container) get(k uint64) unsafe.Pointer { - bi := k & (c.sz - 1) - b := c.list[bi] - for i := range b.elems { - e := &b.elems[i] - if ek := atomic.LoadUint64(&e.k); ek == k { - return e.v - } - } - return nil -} - -func (c *container) getOrInsert(k uint64, v unsafe.Pointer) unsafe.Pointer { - bi := k & (c.sz - 1) - b := c.list[bi] - for i := range b.elems { - e := &b.elems[i] - // Once allocated a valid key, it would never change. So, first check if - // it's allocated. If not, then allocate it. If can't, or not allocated, - // then check if it's k. If it is, then replace value. Otherwise continue. - // This sequence could be problematic, if this happens: - // Main thread runs Step 1. Check - if atomic.CompareAndSwapUint64(&e.k, 0, k) { // Step 1. - atomic.AddUint32(&c.numElems, 1) - if atomic.CompareAndSwapPointer(&e.v, nil, v) { - return v - } - return atomic.LoadPointer(&e.v) - } - - if atomic.LoadUint64(&e.k) == k { - // Swap if previous pointer is nil. - if atomic.CompareAndSwapPointer(&e.v, nil, v) { - return v - } - return atomic.LoadPointer(&e.v) - } - } - return nil -} - -func (m *Map) GetOrInsert(k uint64, v unsafe.Pointer) unsafe.Pointer { - if v == nil { - log.Fatal("GetOrInsert doesn't allow setting nil pointers.") - return nil - } - - // Check immutable first. - cval := atomic.LoadPointer(&m.cs[IMMUTABLE]) - if cval != nil { - c := (*container)(cval) - if pv := c.get(k); pv != nil { - return pv - } - } - - // Okay, deal with mutable container now. - cval = atomic.LoadPointer(&m.cs[MUTABLE]) - if cval == nil { - log.Fatal("This is disruptive in a bad way.") - } - c := (*container)(cval) - if pv := c.getOrInsert(k, v); pv != nil { - return pv - } - - // We still couldn't insert the key. Time to grow. - // TODO: Handle this case. - return nil -} - -func (m *Map) SetNilIfPresent(k uint64) bool { - for _, c := range m.cs { - if atomic.LoadInt32(&c.status) == 0 { - continue - } - bi := k & (c.sz - 1) - b := c.list[bi] - for i := range b.elems { - e := &b.elems[i] - if atomic.LoadUint64(&e.k) == k { - // Set to nil. - atomic.StorePointer(&e.v, nil) - return true - } - } - } - return false -} - -func (m *Map) StreamUntilCap(ch chan uint64) { - for { - ci := rand.Intn(2) - c := m.cs[ci] - if atomic.LoadInt32(&c.status) == 0 { - ci += 1 - c = m.cs[ci%2] // use the other. - } - bi := rand.Intn(int(c.sz)) - - for _, e := range c.list[bi].elems { - if len(ch) >= cap(ch) { - return - } - if k := atomic.LoadUint64(&e.k); k > 0 { - ch <- k - } - } - } -} - -func (m *Map) StreamAll(ch chan uint64) { - for _, c := range m.cs { - if atomic.LoadInt32(&c.status) == 0 { - continue - } - for i := 0; i < int(c.sz); i++ { - for _, e := range c.list[i].elems { - if k := atomic.LoadUint64(&e.k); k > 0 { - ch <- k - } - } - } - } -} diff --git a/concurrent/map_test.go b/concurrent/map_test.go deleted file mode 100644 index 7090eff5a09a4fc76b1b69998a06e4bf487851f3..0000000000000000000000000000000000000000 --- a/concurrent/map_test.go +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2015 Manish R Jain <manishrjain@gmail.com> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package concurrent - -import ( - "math/rand" - "testing" - "unsafe" - - "github.com/dgraph-io/dgraph/posting" - "github.com/zond/gotomic" -) - -func TestGetAndPut(t *testing.T) { - m := NewMap(1024) - var i uint64 - for i = 1; i < 100; i++ { - v := new(uint64) - *v = i - b := unsafe.Pointer(v) - if ok := m.Put(i, b); !ok { - t.Errorf("Couldn't put key: %v", i) - } - } - for i = 1; i < 100; i++ { - p := m.Get(i) - v := (*uint64)(p) - if v == nil { - t.Errorf("Didn't expect nil for i: %v", i) - return - } - if *v != i { - t.Errorf("Expected: %v. Got: %v", i, *v) - } - } -} - -func BenchmarkGetAndPut(b *testing.B) { - m := NewMap(1 << 16) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - key := uint64(rand.Int63()) - p := m.Get(key) - if p == nil { - l := posting.NewList() - m.Put(key, unsafe.Pointer(l)) - } - } - }) -} - -func BenchmarkGotomic(b *testing.B) { - h := gotomic.NewHash() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - key := uint64(rand.Int63()) - _, has := h.Get(gotomic.IntKey(key)) - if !has { - l := posting.NewList() - h.Put(gotomic.IntKey(key), l) - } - } - }) -} diff --git a/loader/loader.go b/loader/loader.go index b02024e63773cfc25b4820f86c152e482d08458b..0ff5353e1592edafdb5339d3bc33f5cf3955f6d6 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -50,10 +50,15 @@ type state struct { } func (s *state) printCounters(ticker *time.Ticker) { + var prev uint64 for _ = range ticker.C { + processed := atomic.LoadUint64(&s.ctr.processed) + if prev == processed { + continue + } + prev = processed parsed := atomic.LoadUint64(&s.ctr.parsed) ignored := atomic.LoadUint64(&s.ctr.ignored) - processed := atomic.LoadUint64(&s.ctr.processed) pending := parsed - ignored - processed glog.WithFields(logrus.Fields{ "read": atomic.LoadUint64(&s.ctr.read), @@ -123,13 +128,21 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) { } edge, err := nq.ToEdge() - if err != nil { - glog.WithError(err).WithField("nq", nq).Error("While converting to edge") - return + for err != nil { + // Just put in a retry loop to tackle temporary errors. + if err == posting.E_TMP_ERROR { + time.Sleep(time.Microsecond) + + } else { + glog.WithError(err).WithField("nq", nq). + Error("While converting to edge") + return + } + edge, err = nq.ToEdge() } key := posting.Key(edge.Entity, edge.Attribute) - plist := posting.Get(key) + plist := posting.GetOrCreate(key) plist.AddMutation(edge, posting.Set) atomic.AddUint64(&s.ctr.processed, 1) } diff --git a/posting/list.go b/posting/list.go index 638c59b76e30ed341f9298167ed9f22f08b3c9b5..bbca733f44f1ca25bb4f4782e2fe2f9661163545 100644 --- a/posting/list.go +++ b/posting/list.go @@ -24,7 +24,9 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" + "unsafe" "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" @@ -40,6 +42,12 @@ var glog = x.Log("posting") const Set = 0x01 const Del = 0x02 +var E_TMP_ERROR = errors.New("Temporary Error. Please retry.") + +type buffer struct { + d []byte +} + type MutationLink struct { idx int moveidx int @@ -50,11 +58,12 @@ type List struct { sync.RWMutex key []byte hash uint32 - buffer []byte + pbuffer unsafe.Pointer pstore *store.Store // postinglist store clog *commit.Logger lastCompact time.Time wg sync.WaitGroup + deleteMe bool // Mutations mlayer map[int]types.Posting // stores only replace instructions. @@ -212,15 +221,7 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { l.pstore = pstore l.clog = clog - var err error - if l.buffer, err = pstore.Get(key); err != nil { - // glog.Debugf("While retrieving posting list from db: %v\n", err) - // Error. Just set to empty. - l.buffer = make([]byte, len(empty)) - copy(l.buffer, empty) - } - - posting := types.GetRootAsPostingList(l.buffer, 0) + posting := l.getPostingList() l.maxMutationTs = posting.CommitTs() l.hash = farm.Fingerprint32(key) l.mlayer = make(map[int]types.Posting) @@ -248,12 +249,36 @@ func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { glog.WithError(err).Error("While streaming entries.") } glog.Debug("Done streaming entries.") - // l.regenerateIndex() +} + +// There's no need for lock acquisition for this. +func (l *List) getPostingList() *types.PostingList { + pb := atomic.LoadPointer(&l.pbuffer) + buf := (*buffer)(pb) + + if buf == nil || len(buf.d) == 0 { + nbuf := new(buffer) + var err error + if nbuf.d, err = l.pstore.Get(l.key); err != nil { + // glog.Debugf("While retrieving posting list from db: %v\n", err) + // Error. Just set to empty. + nbuf.d = make([]byte, len(empty)) + copy(nbuf.d, empty) + } + if atomic.CompareAndSwapPointer(&l.pbuffer, pb, unsafe.Pointer(nbuf)) { + return types.GetRootAsPostingList(nbuf.d, 0) + + } else { + // Someone else replaced the pointer in the meantime. Retry recursively. + return l.getPostingList() + } + } + return types.GetRootAsPostingList(buf.d, 0) } // Caller must hold at least a read lock. func (l *List) lePostingIndex(maxUid uint64) (int, uint64) { - posting := types.GetRootAsPostingList(l.buffer, 0) + posting := l.getPostingList() left, right := 0, posting.PostingsLength()-1 sofar := -1 p := new(types.Posting) @@ -447,7 +472,7 @@ func (l *List) mergeMutation(mp *types.Posting) { // Caller must hold at least a read lock. func (l *List) length() int { - plist := types.GetRootAsPostingList(l.buffer, 0) + plist := l.getPostingList() return plist.PostingsLength() + l.mdelta } @@ -469,7 +494,7 @@ func (l *List) Get(p *types.Posting, i int) bool { // Caller must hold at least a read lock. func (l *List) get(p *types.Posting, i int) bool { - plist := types.GetRootAsPostingList(l.buffer, 0) + plist := l.getPostingList() if len(l.mindex) == 0 { if val, ok := l.mlayer[i]; ok { *p = val @@ -529,6 +554,13 @@ func (l *List) get(p *types.Posting, i int) bool { return plist.Postings(p, newidx) } +func (l *List) SetForDeletion() { + l.wg.Wait() + l.Lock() + defer l.Unlock() + l.deleteMe = true +} + // In benchmarks, the time taken per AddMutation before was // plateauing at 2.5 ms with sync per 10 log entries, and increasing // for sync per 100 log entries (to 3 ms per AddMutation), largely because @@ -548,6 +580,9 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { l.wg.Wait() l.Lock() defer l.Unlock() + if l.deleteMe { + return E_TMP_ERROR + } if t.Timestamp.UnixNano() < l.maxMutationTs { return fmt.Errorf("Mutation ts lower than committed ts.") @@ -629,13 +664,13 @@ func (l *List) merge() error { end := types.PostingListEnd(b) b.Finish(end) - l.buffer = b.Bytes[b.Head():] - if err := l.pstore.SetOne(l.key, l.buffer); err != nil { + if err := l.pstore.SetOne(l.key, b.Bytes[b.Head():]); err != nil { glog.WithField("error", err).Errorf("While storing posting list") return err } // Now reset the mutation variables. + atomic.StorePointer(&l.pbuffer, nil) // Make prev buffer eligible for GC. l.lastCompact = time.Now() l.mlayer = make(map[int]types.Posting) l.mdelta = 0 diff --git a/posting/list_test.go b/posting/list_test.go index 42773a0565d00370aa02ce51f2bff8d555ee4b28..7c608107a271ca3de0ddc160637891b0428416b4 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -33,7 +33,7 @@ import ( func checkUids(t *testing.T, l *List, uids ...uint64) error { if l.Length() != len(uids) { - return fmt.Errorf("Length: %d", l.Length()) + return fmt.Errorf("Expected: %d. Length: %d", len(uids), l.Length()) } for i := 0; i < len(uids); i++ { var p types.Posting diff --git a/posting/lists.go b/posting/lists.go index 0b7a79a1ae1b90641949af5461c9106e01218acf..0edf382024b33b79b726dd4219cd0ffa09740981 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -17,25 +17,28 @@ package posting import ( + "math/rand" + "runtime" "sync" "sync/atomic" "time" - "unsafe" "github.com/Sirupsen/logrus" "github.com/dgraph-io/dgraph/commit" - "github.com/dgraph-io/dgraph/concurrent" "github.com/dgraph-io/dgraph/store" "github.com/dgryski/go-farm" + "github.com/zond/gotomic" ) type counters struct { + ticker *time.Ticker added uint64 merged uint64 } func (c *counters) periodicLog() { - for _ = range time.Tick(time.Second) { + for _ = range c.ticker.C { + mapSize := lhmap.Size() added := atomic.LoadUint64(&c.added) merged := atomic.LoadUint64(&c.merged) pending := added - merged @@ -44,23 +47,59 @@ func (c *counters) periodicLog() { "added": added, "merged": merged, "pending": pending, - }).Info("Merge counters") + "mapsize": mapSize, + }).Info("List Merge counters") } } -var lcmap *concurrent.Map +var MAX_MEMORY uint64 +var MIB uint64 + +func checkMemoryUsage() { + MIB = 1 << 20 + MAX_MEMORY = 2 * (1 << 30) + + for _ = range time.Tick(5 * time.Second) { + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + if ms.Alloc < MAX_MEMORY { + continue + } + + // Okay, we exceed the max memory threshold. + // Stop the world, and deal with this first. + stopTheWorld.Lock() + megs := ms.Alloc / MIB + glog.WithField("allocated_MB", megs). + Info("Memory usage over threshold. STOPPED THE WORLD!") + + glog.Info("Calling merge on all lists.") + MergeLists(100 * runtime.GOMAXPROCS(-1)) + + glog.Info("Merged lists. Calling GC.") + runtime.GC() // Call GC to do some cleanup. + + runtime.ReadMemStats(&ms) + megs = ms.Alloc / MIB + glog.WithField("allocated_MB", megs). + Info("Memory Usage after calling GC.") + stopTheWorld.Unlock() + } +} + +var stopTheWorld sync.RWMutex +var lhmap *gotomic.Hash var pstore *store.Store var clog *commit.Logger -var ch chan uint64 var lc *lcounters func Init(posting *store.Store, log *commit.Logger) { - lcmap = concurrent.NewMap(1 << 20) + lhmap = gotomic.NewHash() pstore = posting clog = log - ch = make(chan uint64, 10000) lc = new(lcounters) go lc.periodicLog() + go checkMemoryUsage() } type lcounters struct { @@ -77,73 +116,124 @@ func (lc *lcounters) periodicLog() { } } -func Get(key []byte) *List { +func GetOrCreate(key []byte) *List { + stopTheWorld.RLock() + defer stopTheWorld.RUnlock() + uid := farm.Fingerprint64(key) - lp := lcmap.Get(uid) - if lp == nil { - l := NewList() - l.init(key, pstore, clog) - lcmap.Put(uid, unsafe.Pointer(l)) + ukey := gotomic.IntKey(uid) + lp, _ := lhmap.Get(ukey) + if lp != nil { + return lp.(*List) + } + + l := NewList() + l.init(key, pstore, clog) + if inserted := lhmap.PutIfMissing(ukey, l); inserted { return l + } else { + lp, _ = lhmap.Get(ukey) + return lp.(*List) } - return (*List)(lp) } -/* -func periodicQueueForProcessing(c *counters) { - ticker := time.NewTicker(time.Minute) - for _ = range ticker.C { - lmap.StreamUntilCap(ch) +func processOne(k gotomic.Hashable, c *counters) { + ret, _ := lhmap.Delete(k) + l := ret.(*List) + if l == nil { + return } + + l.SetForDeletion() // No more AddMutation. + if err := l.MergeIfDirty(); err != nil { + glog.WithError(err).Error("While commiting dirty list.") + } + atomic.AddUint64(&c.merged, 1) } -*/ -func process(c *counters, wg *sync.WaitGroup) { - for eid := range ch { - lp := lcmap.Get(eid) - if lp == nil { - continue - } - atomic.AddUint64(&c.merged, 1) - l := (*List)(lp) - if err := l.MergeIfDirty(); err != nil { - glog.WithError(err).Error("While commiting dirty list.") - } +// For on-demand merging of all lists. +func process(ch chan gotomic.Hashable, c *counters, wg *sync.WaitGroup) { + for l := range ch { + processOne(l, c) } if wg != nil { wg.Done() } } -func periodicProcess(c *counters) { - ticker := time.NewTicker(100 * time.Millisecond) - for _ = range ticker.C { - process(c, nil) - } -} - -func queueAll(c *counters) { - lcmap.StreamAll(ch) +func queueAll(ch chan gotomic.Hashable, c *counters) { + lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool { + ch <- k + atomic.AddUint64(&c.added, 1) + return false // If this returns true, Each would break. + }) close(ch) } -/* -func StartPeriodicMerging() { - ctr := new(counters) - go periodicQueueForProcessing(ctr) - go periodicProcess(ctr) -} -*/ - func MergeLists(numRoutines int) { + ch := make(chan gotomic.Hashable, 10000) c := new(counters) + c.ticker = time.NewTicker(time.Second) go c.periodicLog() - go queueAll(c) + go queueAll(ch, c) wg := new(sync.WaitGroup) for i := 0; i < numRoutines; i++ { wg.Add(1) - go process(c, wg) + go process(ch, c, wg) } wg.Wait() + c.ticker.Stop() +} + +// For periodic merging of lists. +func queueRandomLists(ch chan gotomic.Hashable, c *counters) { + var buf []gotomic.Hashable + var count int + needed := cap(ch) - len(ch) + if needed < 100 { + return + } + + // Generate a random list of + lhmap.Each(func(k gotomic.Hashable, v gotomic.Thing) bool { + if count < needed { + buf = append(buf, k) + + } else { + j := rand.Intn(count) + if j < len(buf) { + buf[j] = k + } + } + count += 1 + return false + }) + + for _, k := range buf { + ch <- k + atomic.AddUint64(&c.added, 1) + } +} + +func periodicQueueForProcessing(ch chan gotomic.Hashable, c *counters) { + ticker := time.NewTicker(time.Minute) + for _ = range ticker.C { + queueRandomLists(ch, c) + } +} + +func periodicProcess(ch chan gotomic.Hashable, c *counters) { + ticker := time.NewTicker(100 * time.Millisecond) + for _ = range ticker.C { + hid := <-ch + processOne(hid, c) + } +} + +func StartPeriodicMerging() { + ctr := new(counters) + ch := make(chan gotomic.Hashable, 10000) + go periodicQueueForProcessing(ch, ctr) + go periodicProcess(ch, ctr) } diff --git a/posting/worker.go b/posting/worker.go index 873779518b5b8bce6bc0361f9b480f93035bf53e..77987671d2308428fe3b3273c61e4b4a0b692a7a 100644 --- a/posting/worker.go +++ b/posting/worker.go @@ -19,7 +19,7 @@ func ProcessTask(query []byte) (result []byte, rerr error) { for i := 0; i < q.UidsLength(); i++ { uid := q.Uids(i) key := Key(uid, attr) - pl := Get(key) + pl := GetOrCreate(key) task.ValueStart(b) var valoffset flatbuffers.UOffsetT diff --git a/posting/worker_test.go b/posting/worker_test.go index 25b1c2345de73b28995704e98ce796e3a3fa91ba..2aabe52f05dd08c8c0b3900aae1d4e5517a4250d 100644 --- a/posting/worker_test.go +++ b/posting/worker_test.go @@ -63,22 +63,22 @@ func TestProcessTask(t *testing.T) { Source: "author0", Timestamp: time.Now(), } - addEdge(t, edge, Get(Key(10, "friend"))) - addEdge(t, edge, Get(Key(11, "friend"))) - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(10, "friend"))) + addEdge(t, edge, GetOrCreate(Key(11, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) edge.ValueId = 25 - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) edge.ValueId = 26 - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) edge.ValueId = 31 - addEdge(t, edge, Get(Key(10, "friend"))) - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(10, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) edge.Value = "photon" - addEdge(t, edge, Get(Key(12, "friend"))) + addEdge(t, edge, GetOrCreate(Key(12, "friend"))) query := NewQuery("friend", []uint64{10, 11, 12}) result, err := ProcessTask(query) diff --git a/query/query_test.go b/query/query_test.go index 85671af9ef768681fd51aafc482f67e5e9fa7519..e60c308489b6878971d2a9a7f05d202e830bc0ca 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -214,42 +214,42 @@ func populateGraph(t *testing.T) { Source: "testing", Timestamp: time.Now(), } - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) edge.ValueId = 24 - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) edge.ValueId = 25 - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) edge.ValueId = 31 - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) edge.ValueId = 101 - addEdge(t, edge, posting.Get(posting.Key(1, "friend"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"))) // Now let's add a few properties for the main user. edge.Value = "Michonne" - addEdge(t, edge, posting.Get(posting.Key(1, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name"))) edge.Value = "female" - addEdge(t, edge, posting.Get(posting.Key(1, "gender"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender"))) edge.Value = "alive" - addEdge(t, edge, posting.Get(posting.Key(1, "status"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status"))) // Now let's add a name for each of the friends, except 101. edge.Value = "Rick Grimes" - addEdge(t, edge, posting.Get(posting.Key(23, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name"))) edge.Value = "Glenn Rhee" - addEdge(t, edge, posting.Get(posting.Key(24, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name"))) edge.Value = "Daryl Dixon" - addEdge(t, edge, posting.Get(posting.Key(25, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name"))) edge.Value = "Andrea" - addEdge(t, edge, posting.Get(posting.Key(31, "name"))) + addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"))) } func TestProcessGraph(t *testing.T) { diff --git a/server/loader/main.go b/server/loader/main.go index 03ddb90c402368dbe7f5a3397d9cdb36d36898e6..70f662deb3484c407095689cd3c646ebde17db6b 100644 --- a/server/loader/main.go +++ b/server/loader/main.go @@ -58,7 +58,8 @@ func main() { } logrus.SetLevel(logrus.InfoLevel) - glog.WithField("gomaxprocs", runtime.GOMAXPROCS(-1)).Info("Number of CPUs") + numCpus := runtime.GOMAXPROCS(-1) + glog.WithField("gomaxprocs", numCpus).Info("Number of CPUs") if len(*rdfGzips) == 0 { glog.Fatal("No RDF GZIP files specified") @@ -98,5 +99,5 @@ func main() { f.Close() } glog.Info("Calling merge lists") - posting.MergeLists(10000) + posting.MergeLists(100 * numCpus) // 100 per core. } diff --git a/uid/assigner.go b/uid/assigner.go index c2b85fe8605f7f8c4aae6875c86c0378813db523..85ad39500040d0946f048c404130d9849269b85c 100644 --- a/uid/assigner.go +++ b/uid/assigner.go @@ -104,7 +104,7 @@ func allocateUniqueUid(xid string) (uid uint64, rerr error) { // Check if this uid has already been allocated. key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.Get(key) + pl := posting.GetOrCreate(key) if pl.Length() > 0 { // Something already present here. @@ -175,7 +175,7 @@ func stringKey(xid string) []byte { func GetOrAssign(xid string) (uid uint64, rerr error) { key := stringKey(xid) - pl := posting.Get(key) + pl := posting.GetOrCreate(key) if pl.Length() == 0 { return assignNew(pl, xid) @@ -196,7 +196,7 @@ func GetOrAssign(xid string) (uid uint64, rerr error) { func ExternalId(uid uint64) (xid string, rerr error) { key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid - pl := posting.Get(key) + pl := posting.GetOrCreate(key) if pl.Length() == 0 { return "", errors.New("NO external id") }