Skip to content
Snippets Groups Projects
Commit 3c60a5e6 authored by Manish R Jain's avatar Manish R Jain
Browse files

Moved lcache to lmap, a bucket based concurrent hash map. But, the behaviour...

Moved lcache to lmap, a bucket based concurrent hash map. But, the behaviour is still erratic, with 28% of Gets (with inserts) taking over 10us.
parent 192be4c2
No related branches found
No related tags found
No related merge requests found
......@@ -156,7 +156,7 @@ func HandleRdfReader(reader io.Reader, mod uint64) (uint64, error) {
}
wg := new(sync.WaitGroup)
for i := 0; i < 100; i++ {
for i := 0; i < 3000; i++ {
wg.Add(1)
go s.handleNQuads(wg) // NQuads --> Posting list [slow].
}
......
......@@ -54,6 +54,7 @@ type List struct {
pstore *store.Store // postinglist store
clog *commit.Logger
lastCompact time.Time
wg sync.WaitGroup
// Mutations
mlayer map[int]types.Posting // stores only replace instructions.
......@@ -62,6 +63,13 @@ type List struct {
mindex []*MutationLink
}
func NewList() *List {
l := new(List)
l.wg.Add(1)
l.mlayer = make(map[int]types.Posting)
return l
}
type ByUid []*types.Posting
func (pa ByUid) Len() int { return len(pa) }
......@@ -195,6 +203,7 @@ func ParseValue(i *interface{}, value []byte) error {
func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
l.Lock()
defer l.Unlock()
defer l.wg.Done()
if len(empty) == 0 {
glog.Fatal("empty should have some bytes.")
......@@ -443,12 +452,16 @@ func (l *List) length() int {
}
func (l *List) Length() int {
l.wg.Wait()
l.RLock()
defer l.RUnlock()
return l.length()
}
func (l *List) Get(p *types.Posting, i int) bool {
l.wg.Wait()
l.RLock()
defer l.RUnlock()
return l.get(p, i)
......@@ -532,6 +545,7 @@ func (l *List) get(p *types.Posting, i int) bool {
// BenchmarkAddMutations_SyncEvery1000LogEntry-6 30000 63544 ns/op
// ok github.com/dgraph-io/dgraph/posting 10.291s
func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
l.wg.Wait()
l.Lock()
defer l.Unlock()
......@@ -572,25 +586,12 @@ func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
}
func (l *List) IsDirty() bool {
// We can avoid checking for init here.
l.RLock()
defer l.RUnlock()
return len(l.mindex)+len(l.mlayer) > 0
}
func (l *List) DirtyRatio() float64 {
l.RLock()
defer l.RUnlock()
d := len(l.mindex) + len(l.mlayer)
plist := types.GetRootAsPostingList(l.buffer, 0)
ln := plist.PostingsLength()
if ln == 0 {
return math.MaxFloat64
}
return float64(d) / float64(ln)
}
func (l *List) MergeIfDirty() error {
if !l.IsDirty() {
glog.WithField("dirty", false).Debug("Not Committing")
......@@ -602,6 +603,7 @@ func (l *List) MergeIfDirty() error {
}
func (l *List) merge() error {
l.wg.Wait()
l.Lock()
defer l.Unlock()
......@@ -648,6 +650,7 @@ func (l *List) LastCompactionTs() time.Time {
}
func (l *List) GetUids() []uint64 {
l.wg.Wait()
l.RLock()
defer l.RUnlock()
......@@ -664,6 +667,7 @@ func (l *List) GetUids() []uint64 {
}
func (l *List) Value() (result []byte, rerr error) {
l.wg.Wait()
l.RLock()
defer l.RUnlock()
......
......@@ -31,7 +31,7 @@ import (
"github.com/dgraph-io/dgraph/x"
)
func checkUids(t *testing.T, l List, uids ...uint64) error {
func checkUids(t *testing.T, l *List, uids ...uint64) error {
if l.Length() != len(uids) {
return fmt.Errorf("Length: %d", l.Length())
}
......@@ -49,7 +49,7 @@ func checkUids(t *testing.T, l List, uids ...uint64) error {
func TestAddMutation(t *testing.T) {
// logrus.SetLevel(logrus.DebugLevel)
var l List
l := NewList()
key := Key(1, "name")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
......@@ -175,7 +175,7 @@ func TestAddMutation(t *testing.T) {
}
*/
// Try reading the same data in another PostingList.
var dl List
dl := NewList()
dl.init(key, ps, clog)
if err := checkUids(t, dl, uids...); err != nil {
t.Error(err)
......@@ -192,7 +192,7 @@ func TestAddMutation(t *testing.T) {
func TestAddMutation_Value(t *testing.T) {
// logrus.SetLevel(logrus.DebugLevel)
glog.Debug("Running init...")
var ol List
ol := NewList()
key := Key(10, "value")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
......@@ -273,7 +273,7 @@ func TestAddMutation_Value(t *testing.T) {
func benchmarkAddMutations(n int, b *testing.B) {
// logrus.SetLevel(logrus.DebugLevel)
var l List
l := NewList()
key := Key(1, "name")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
......
......@@ -27,10 +27,6 @@ import (
"github.com/dgryski/go-farm"
)
type entry struct {
l *List
}
type counters struct {
added uint64
merged uint64
......@@ -50,18 +46,14 @@ func (c *counters) periodicLog() {
}
}
var lmutex sync.RWMutex
var lcache map[uint64]*entry
var lmap *Map
var pstore *store.Store
var clog *commit.Logger
var ch chan uint64
var lc *lcounters
func Init(posting *store.Store, log *commit.Logger) {
lmutex.Lock()
defer lmutex.Unlock()
lcache = make(map[uint64]*entry)
lmap = NewMap(true)
pstore = posting
clog = log
ch = make(chan uint64, 10000)
......@@ -69,84 +61,44 @@ func Init(posting *store.Store, log *commit.Logger) {
go lc.periodicLog()
}
func get(k uint64) *List {
lmutex.RLock()
defer lmutex.RUnlock()
if e, ok := lcache[k]; ok {
return e.l
}
return nil
}
type lcounters struct {
hit uint64
miss uint64
misshit uint64
hit uint64
miss uint64
}
func (lc *lcounters) periodicLog() {
for _ = range time.Tick(10 * time.Second) {
glog.WithFields(logrus.Fields{
"hit": atomic.LoadUint64(&lc.hit),
"miss": atomic.LoadUint64(&lc.miss),
"misshit": atomic.LoadUint64(&lc.misshit),
"hit": atomic.LoadUint64(&lc.hit),
"miss": atomic.LoadUint64(&lc.miss),
}).Info("Lists counters")
}
}
func Get(key []byte) *List {
// Acquire read lock and check if list is available.
lmutex.RLock()
uid := farm.Fingerprint64(key)
if e, ok := lcache[uid]; ok {
lmutex.RUnlock()
atomic.AddUint64(&lc.hit, 1)
return e.l
}
lmutex.RUnlock()
// Couldn't find it. Acquire write lock.
lmutex.Lock()
defer lmutex.Unlock()
// Check again after acquiring write lock.
if e, ok := lcache[uid]; ok {
atomic.AddUint64(&lc.misshit, 1)
return e.l
}
atomic.AddUint64(&lc.miss, 1)
e := new(entry)
e.l = new(List)
e.l.init(key, pstore, clog)
lcache[uid] = e
return e.l
}
l, added := lmap.Get(uid)
if added {
atomic.AddUint64(&lc.miss, 1)
l.init(key, pstore, clog)
func queueForProcessing(c *counters) {
lmutex.RLock()
for eid, e := range lcache {
if len(ch) >= cap(ch) {
break
}
if e.l.IsDirty() {
ch <- eid
atomic.AddUint64(&c.added, 1)
}
} else {
atomic.AddUint64(&lc.hit, 1)
}
lmutex.RUnlock()
return l
}
func periodicQueueForProcessing(c *counters) {
ticker := time.NewTicker(time.Minute)
for _ = range ticker.C {
queueForProcessing(c)
lmap.StreamUntilCap(ch)
}
}
func process(c *counters, wg *sync.WaitGroup) {
for eid := range ch {
l := get(eid)
if l == nil {
l, added := lmap.Get(eid)
if l == nil || added {
continue
}
atomic.AddUint64(&c.merged, 1)
......@@ -167,13 +119,8 @@ func periodicProcess(c *counters) {
}
func queueAll(c *counters) {
lmutex.RLock()
for hid, _ := range lcache {
ch <- hid
atomic.AddUint64(&c.added, 1)
}
lmap.StreamAllKeys(ch)
close(ch)
lmutex.RUnlock()
}
func StartPeriodicMerging() {
......
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package posting
import (
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/Sirupsen/logrus"
)
const NBUCKETS = 32
type latency struct {
n1 uint64
u1 uint64
u10 uint64
u100 uint64
m1 uint64
s1 uint64
}
type bucket struct {
sync.RWMutex
m map[uint64]*List
lat *latency
}
func (l *latency) update(s time.Time) {
e := time.Now().Sub(s)
micros := e.Nanoseconds() / 1000
if micros > 1000000 {
atomic.AddUint64(&l.s1, 1)
} else if micros > 1000 {
atomic.AddUint64(&l.m1, 1)
} else if micros > 100 {
atomic.AddUint64(&l.u100, 1)
} else if micros > 10 {
atomic.AddUint64(&l.u10, 1)
} else if micros > 1 {
atomic.AddUint64(&l.u1, 1)
} else {
atomic.AddUint64(&l.n1, 1)
}
}
func (l *latency) log() {
for _ = range time.Tick(5 * time.Second) {
glog.WithFields(logrus.Fields{
"n1": atomic.LoadUint64(&l.n1),
"u1": atomic.LoadUint64(&l.u1),
"u10": atomic.LoadUint64(&l.u10),
"u100": atomic.LoadUint64(&l.u100),
"m1": atomic.LoadUint64(&l.m1),
"s1": atomic.LoadUint64(&l.s1),
}).Info("Lmap latency")
}
}
func (b *bucket) get(key uint64) (*List, bool) {
if b.lat != nil {
n := time.Now()
defer b.lat.update(n)
}
b.RLock()
if l, ok := b.m[key]; ok {
b.RUnlock()
return l, false
}
b.RUnlock()
b.Lock()
defer b.Unlock()
if l, ok := b.m[key]; ok {
return l, false
}
l := NewList()
b.m[key] = l
return l, true
}
type Map struct {
buckets []*bucket
}
func NewMap(withLog bool) *Map {
var lat *latency
if withLog {
lat = new(latency)
go lat.log()
} else {
lat = nil
}
m := new(Map)
m.buckets = make([]*bucket, NBUCKETS)
for i := 0; i < NBUCKETS; i++ {
m.buckets[i] = new(bucket)
m.buckets[i].lat = lat
m.buckets[i].m = make(map[uint64]*List)
}
return m
}
func (m *Map) Get(key uint64) (*List, bool) {
bi := key % NBUCKETS
return m.buckets[bi].get(key)
}
func (m *Map) StreamUntilCap(ch chan uint64) {
bi := rand.Intn(NBUCKETS)
b := m.buckets[bi]
b.RLock()
defer b.RUnlock()
for u := range b.m {
if len(ch) >= cap(ch) {
break
}
ch <- u
}
}
func (m *Map) StreamAllKeys(ch chan uint64) {
for i := 0; i < len(m.buckets); i++ {
b := m.buckets[i]
b.RLock()
for u := range b.m {
ch <- u
}
b.RUnlock()
}
}
/*
* Copyright 2015 Manish R Jain <manishrjain@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package posting
import (
"math/rand"
"testing"
)
func BenchmarkGet(b *testing.B) {
// lmap := NewMap(false)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// i := uint64(rand.Int63())
_ = uint64(rand.Int63())
NewList()
// lmap.Get(i)
}
})
}
func BenchmarkGetLinear(b *testing.B) {
m := make(map[uint64]*List)
for i := 0; i < b.N; i++ {
k := uint64(i)
if l, ok := m[k]; !ok {
l = NewList()
m[k] = l
}
}
}
func BenchmarkGetLinearBool(b *testing.B) {
m := make(map[uint64]bool)
for i := 0; i < b.N; i++ {
k := uint64(i)
if _, ok := m[k]; !ok {
m[k] = true
}
}
}
Using a map made up of 32 buckets, with each bucket holding it's own lock.
INFO[0166] Lmap latency
s1=24100
m1=37251
u100=2017
u10=2864
u1=109856
n1=55635
Based on benchmarks, a Get should take close to 1 microsecond.
In this case, u1 has 110K Gets, while u10 has only ~3000. So, that's good.
But we see a sudden jump to 37k Gets (16%), which took over 1 millisecond!
And 24k requests (~10%) took over a second to return. That's very erratic behaviour.
Overall, only 72% of Gets took under 10 microseconds.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment