diff --git a/loader/loader.go b/loader/loader.go index 00821ba63ecc304b9fba7a5a7a69b7bc5aac1daf..b02024e63773cfc25b4820f86c152e482d08458b 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -156,7 +156,7 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) { } wg := new(sync.WaitGroup) - for i := 0; i < 100; i++ { + for i := 0; i < 3000; i++ { wg.Add(1) go s.handleNQuads(wg) // NQuads --> Posting list [slow]. } diff --git a/posting/list.go b/posting/list.go index f6c307cd0c5150a6c886c3c4ce5be6e111fdc656..638c59b76e30ed341f9298167ed9f22f08b3c9b5 100644 --- a/posting/list.go +++ b/posting/list.go @@ -54,6 +54,7 @@ type List struct { pstore *store.Store // postinglist store clog *commit.Logger lastCompact time.Time + wg sync.WaitGroup // Mutations mlayer map[int]types.Posting // stores only replace instructions. @@ -62,6 +63,13 @@ type List struct { mindex []*MutationLink } +func NewList() *List { + l := new(List) + l.wg.Add(1) + l.mlayer = make(map[int]types.Posting) + return l +} + type ByUid []*types.Posting func (pa ByUid) Len() int { return len(pa) } @@ -195,6 +203,7 @@ func ParseValue(i *interface{}, value []byte) error { func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) { l.Lock() defer l.Unlock() + defer l.wg.Done() if len(empty) == 0 { glog.Fatal("empty should have some bytes.") @@ -443,12 +452,16 @@ func (l *List) length() int { } func (l *List) Length() int { + l.wg.Wait() + l.RLock() defer l.RUnlock() return l.length() } func (l *List) Get(p *types.Posting, i int) bool { + l.wg.Wait() + l.RLock() defer l.RUnlock() return l.get(p, i) @@ -532,6 +545,7 @@ func (l *List) get(p *types.Posting, i int) bool { // BenchmarkAddMutations_SyncEvery1000LogEntry-6 30000 63544 ns/op // ok github.com/dgraph-io/dgraph/posting 10.291s func (l *List) AddMutation(t x.DirectedEdge, op byte) error { + l.wg.Wait() l.Lock() defer l.Unlock() @@ -572,25 +586,12 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error { } func (l *List) IsDirty() bool { + // We can avoid checking for init here. l.RLock() defer l.RUnlock() return len(l.mindex)+len(l.mlayer) > 0 } -func (l *List) DirtyRatio() float64 { - l.RLock() - defer l.RUnlock() - - d := len(l.mindex) + len(l.mlayer) - plist := types.GetRootAsPostingList(l.buffer, 0) - ln := plist.PostingsLength() - if ln == 0 { - return math.MaxFloat64 - } - - return float64(d) / float64(ln) -} - func (l *List) MergeIfDirty() error { if !l.IsDirty() { glog.WithField("dirty", false).Debug("Not Committing") @@ -602,6 +603,7 @@ func (l *List) MergeIfDirty() error { } func (l *List) merge() error { + l.wg.Wait() l.Lock() defer l.Unlock() @@ -648,6 +650,7 @@ func (l *List) LastCompactionTs() time.Time { } func (l *List) GetUids() []uint64 { + l.wg.Wait() l.RLock() defer l.RUnlock() @@ -664,6 +667,7 @@ func (l *List) GetUids() []uint64 { } func (l *List) Value() (result []byte, rerr error) { + l.wg.Wait() l.RLock() defer l.RUnlock() diff --git a/posting/list_test.go b/posting/list_test.go index a3d46bbfe1e47cfbef604c0bcc83e71fad5c7e84..42773a0565d00370aa02ce51f2bff8d555ee4b28 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -31,7 +31,7 @@ import ( "github.com/dgraph-io/dgraph/x" ) -func checkUids(t *testing.T, l List, uids ...uint64) error { +func checkUids(t *testing.T, l *List, uids ...uint64) error { if l.Length() != len(uids) { return fmt.Errorf("Length: %d", l.Length()) } @@ -49,7 +49,7 @@ func checkUids(t *testing.T, l List, uids ...uint64) error { func TestAddMutation(t *testing.T) { // logrus.SetLevel(logrus.DebugLevel) - var l List + l := NewList() key := Key(1, "name") dir, err := ioutil.TempDir("", "storetest_") if err != nil { @@ -175,7 +175,7 @@ func TestAddMutation(t *testing.T) { } */ // Try reading the same data in another PostingList. - var dl List + dl := NewList() dl.init(key, ps, clog) if err := checkUids(t, dl, uids...); err != nil { t.Error(err) @@ -192,7 +192,7 @@ func TestAddMutation(t *testing.T) { func TestAddMutation_Value(t *testing.T) { // logrus.SetLevel(logrus.DebugLevel) glog.Debug("Running init...") - var ol List + ol := NewList() key := Key(10, "value") dir, err := ioutil.TempDir("", "storetest_") if err != nil { @@ -273,7 +273,7 @@ func TestAddMutation_Value(t *testing.T) { func benchmarkAddMutations(n int, b *testing.B) { // logrus.SetLevel(logrus.DebugLevel) - var l List + l := NewList() key := Key(1, "name") dir, err := ioutil.TempDir("", "storetest_") if err != nil { diff --git a/posting/lists.go b/posting/lists.go index be5c27e0146ecf00bf246bb8cec02a6b10bd4009..40d8410c173c02cf87a42c9ec18a66c6d999d1aa 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -27,10 +27,6 @@ import ( "github.com/dgryski/go-farm" ) -type entry struct { - l *List -} - type counters struct { added uint64 merged uint64 @@ -50,18 +46,14 @@ func (c *counters) periodicLog() { } } -var lmutex sync.RWMutex -var lcache map[uint64]*entry +var lmap *Map var pstore *store.Store var clog *commit.Logger var ch chan uint64 var lc *lcounters func Init(posting *store.Store, log *commit.Logger) { - lmutex.Lock() - defer lmutex.Unlock() - - lcache = make(map[uint64]*entry) + lmap = NewMap(true) pstore = posting clog = log ch = make(chan uint64, 10000) @@ -69,84 +61,44 @@ func Init(posting *store.Store, log *commit.Logger) { go lc.periodicLog() } -func get(k uint64) *List { - lmutex.RLock() - defer lmutex.RUnlock() - if e, ok := lcache[k]; ok { - return e.l - } - return nil -} - type lcounters struct { - hit uint64 - miss uint64 - misshit uint64 + hit uint64 + miss uint64 } func (lc *lcounters) periodicLog() { for _ = range time.Tick(10 * time.Second) { glog.WithFields(logrus.Fields{ - "hit": atomic.LoadUint64(&lc.hit), - "miss": atomic.LoadUint64(&lc.miss), - "misshit": atomic.LoadUint64(&lc.misshit), + "hit": atomic.LoadUint64(&lc.hit), + "miss": atomic.LoadUint64(&lc.miss), }).Info("Lists counters") } } func Get(key []byte) *List { - // Acquire read lock and check if list is available. - lmutex.RLock() uid := farm.Fingerprint64(key) - if e, ok := lcache[uid]; ok { - lmutex.RUnlock() - atomic.AddUint64(&lc.hit, 1) - return e.l - } - lmutex.RUnlock() - - // Couldn't find it. Acquire write lock. - lmutex.Lock() - defer lmutex.Unlock() - // Check again after acquiring write lock. - if e, ok := lcache[uid]; ok { - atomic.AddUint64(&lc.misshit, 1) - return e.l - } - - atomic.AddUint64(&lc.miss, 1) - e := new(entry) - e.l = new(List) - e.l.init(key, pstore, clog) - lcache[uid] = e - return e.l -} + l, added := lmap.Get(uid) + if added { + atomic.AddUint64(&lc.miss, 1) + l.init(key, pstore, clog) -func queueForProcessing(c *counters) { - lmutex.RLock() - for eid, e := range lcache { - if len(ch) >= cap(ch) { - break - } - if e.l.IsDirty() { - ch <- eid - atomic.AddUint64(&c.added, 1) - } + } else { + atomic.AddUint64(&lc.hit, 1) } - lmutex.RUnlock() + return l } func periodicQueueForProcessing(c *counters) { ticker := time.NewTicker(time.Minute) for _ = range ticker.C { - queueForProcessing(c) + lmap.StreamUntilCap(ch) } } func process(c *counters, wg *sync.WaitGroup) { for eid := range ch { - l := get(eid) - if l == nil { + l, added := lmap.Get(eid) + if l == nil || added { continue } atomic.AddUint64(&c.merged, 1) @@ -167,13 +119,8 @@ func periodicProcess(c *counters) { } func queueAll(c *counters) { - lmutex.RLock() - for hid, _ := range lcache { - ch <- hid - atomic.AddUint64(&c.added, 1) - } + lmap.StreamAllKeys(ch) close(ch) - lmutex.RUnlock() } func StartPeriodicMerging() { diff --git a/posting/lmap.go b/posting/lmap.go new file mode 100644 index 0000000000000000000000000000000000000000..c9471ba81ff617aa2e16b74133badf4abeed463d --- /dev/null +++ b/posting/lmap.go @@ -0,0 +1,150 @@ +/* + * Copyright 2015 Manish R Jain <manishrjain@gmail.com> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package posting + +import ( + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/Sirupsen/logrus" +) + +const NBUCKETS = 32 + +type latency struct { + n1 uint64 + u1 uint64 + u10 uint64 + u100 uint64 + m1 uint64 + s1 uint64 +} + +type bucket struct { + sync.RWMutex + m map[uint64]*List + lat *latency +} + +func (l *latency) update(s time.Time) { + e := time.Now().Sub(s) + micros := e.Nanoseconds() / 1000 + if micros > 1000000 { + atomic.AddUint64(&l.s1, 1) + } else if micros > 1000 { + atomic.AddUint64(&l.m1, 1) + } else if micros > 100 { + atomic.AddUint64(&l.u100, 1) + } else if micros > 10 { + atomic.AddUint64(&l.u10, 1) + } else if micros > 1 { + atomic.AddUint64(&l.u1, 1) + } else { + atomic.AddUint64(&l.n1, 1) + } +} + +func (l *latency) log() { + for _ = range time.Tick(5 * time.Second) { + glog.WithFields(logrus.Fields{ + "n1": atomic.LoadUint64(&l.n1), + "u1": atomic.LoadUint64(&l.u1), + "u10": atomic.LoadUint64(&l.u10), + "u100": atomic.LoadUint64(&l.u100), + "m1": atomic.LoadUint64(&l.m1), + "s1": atomic.LoadUint64(&l.s1), + }).Info("Lmap latency") + } +} + +func (b *bucket) get(key uint64) (*List, bool) { + if b.lat != nil { + n := time.Now() + defer b.lat.update(n) + } + + b.RLock() + if l, ok := b.m[key]; ok { + b.RUnlock() + return l, false + } + b.RUnlock() + + b.Lock() + defer b.Unlock() + if l, ok := b.m[key]; ok { + return l, false + } + + l := NewList() + b.m[key] = l + return l, true +} + +type Map struct { + buckets []*bucket +} + +func NewMap(withLog bool) *Map { + var lat *latency + if withLog { + lat = new(latency) + go lat.log() + } else { + lat = nil + } + + m := new(Map) + m.buckets = make([]*bucket, NBUCKETS) + for i := 0; i < NBUCKETS; i++ { + m.buckets[i] = new(bucket) + m.buckets[i].lat = lat + m.buckets[i].m = make(map[uint64]*List) + } + return m +} + +func (m *Map) Get(key uint64) (*List, bool) { + bi := key % NBUCKETS + return m.buckets[bi].get(key) +} + +func (m *Map) StreamUntilCap(ch chan uint64) { + bi := rand.Intn(NBUCKETS) + b := m.buckets[bi] + b.RLock() + defer b.RUnlock() + for u := range b.m { + if len(ch) >= cap(ch) { + break + } + ch <- u + } +} + +func (m *Map) StreamAllKeys(ch chan uint64) { + for i := 0; i < len(m.buckets); i++ { + b := m.buckets[i] + b.RLock() + for u := range b.m { + ch <- u + } + b.RUnlock() + } +} diff --git a/posting/lmap_test.go b/posting/lmap_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d15caadd15353960f9e32b5309155ccf8d7d718e --- /dev/null +++ b/posting/lmap_test.go @@ -0,0 +1,55 @@ +/* + * Copyright 2015 Manish R Jain <manishrjain@gmail.com> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package posting + +import ( + "math/rand" + "testing" +) + +func BenchmarkGet(b *testing.B) { + // lmap := NewMap(false) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // i := uint64(rand.Int63()) + _ = uint64(rand.Int63()) + NewList() + // lmap.Get(i) + } + }) +} + +func BenchmarkGetLinear(b *testing.B) { + m := make(map[uint64]*List) + for i := 0; i < b.N; i++ { + k := uint64(i) + if l, ok := m[k]; !ok { + l = NewList() + m[k] = l + } + } +} + +func BenchmarkGetLinearBool(b *testing.B) { + m := make(map[uint64]bool) + for i := 0; i < b.N; i++ { + k := uint64(i) + if _, ok := m[k]; !ok { + m[k] = true + } + } +} diff --git a/posting/notes.txt b/posting/notes.txt new file mode 100644 index 0000000000000000000000000000000000000000..c3456c88b3e9b568721b0f6da1616a3c0d772ca1 --- /dev/null +++ b/posting/notes.txt @@ -0,0 +1,15 @@ +Using a map made up of 32 buckets, with each bucket holding it's own lock. + +INFO[0166] Lmap latency +s1=24100 +m1=37251 +u100=2017 +u10=2864 +u1=109856 +n1=55635 + +Based on benchmarks, a Get should take close to 1 microsecond. +In this case, u1 has 110K Gets, while u10 has only ~3000. So, that's good. +But we see a sudden jump to 37k Gets (16%), which took over 1 millisecond! +And 24k requests (~10%) took over a second to return. That's very erratic behaviour. +Overall, only 72% of Gets took under 10 microseconds.