Skip to content
Snippets Groups Projects
Unverified Commit b69edeba authored by Ashwin Ramesh's avatar Ashwin Ramesh
Browse files

Use IntersectWith in IntersectSorted

parent ef1628b1
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,6 @@ import (
"sort"
"github.com/dgraph-io/dgraph/protos/taskp"
"github.com/dgraph-io/dgraph/x"
)
const jump = 32 // Jump size in InsersectWithJump.
......@@ -21,83 +20,82 @@ func ApplyFilter(u *taskp.List, f func(uint64, int) bool) {
u.Uids = out
}
func IntersectWith(u, v *taskp.List) {
// IntersectWith intersects u with v. The update is made to o.
// u, v should be sorted.
func IntersectWith(u, v, o *taskp.List) {
n := len(u.Uids)
m := len(v.Uids)
if n > m {
n, m = m, n
}
if o.Uids == nil {
o.Uids = make([]uint64, 0, n)
}
dst := o.Uids[:0]
if n == 0 {
n += 1
}
// Select appropriate function based on heuristics.
ratio := float64(m) / float64(n)
if ratio < 100 {
IntersectWithLin(u, v)
IntersectWithLin(u.Uids, v.Uids, &dst)
} else if ratio < 500 {
IntersectWithJump(u, v)
IntersectWithJump(u.Uids, v.Uids, &dst)
} else {
IntersectWithBin(u, v)
IntersectWithBin(u.Uids, v.Uids, &dst)
}
o.Uids = dst
}
// IntersectWith intersects u with v. The update is made to u.
// u, v should be sorted.
func IntersectWithLin(u, v *taskp.List) {
out := u.Uids[:0]
n := len(u.Uids)
m := len(v.Uids)
func IntersectWithLin(u, v []uint64, o *[]uint64) {
n := len(u)
m := len(v)
for i, k := 0, 0; i < n && k < m; {
uid := u.Uids[i]
vid := v.Uids[k]
uid := u[i]
vid := v[k]
if uid > vid {
for k = k + 1; k < m && v.Uids[k] < uid; k++ {
for k = k + 1; k < m && v[k] < uid; k++ {
}
} else if uid == vid {
out = append(out, uid)
*o = append(*o, uid)
k++
i++
} else {
for i = i + 1; i < n && u.Uids[i] < vid; i++ {
for i = i + 1; i < n && u[i] < vid; i++ {
}
}
}
u.Uids = out
}
func IntersectWithJump(u, v *taskp.List) {
out := u.Uids[:0]
n := len(u.Uids)
m := len(v.Uids)
func IntersectWithJump(u, v []uint64, o *[]uint64) {
n := len(u)
m := len(v)
for i, k := 0, 0; i < n && k < m; {
uid := u.Uids[i]
vid := v.Uids[k]
uid := u[i]
vid := v[k]
if uid == vid {
out = append(out, uid)
*o = append(*o, uid)
k++
i++
} else if k+jump < m && uid > v.Uids[k+jump] {
} else if k+jump < m && uid > v[k+jump] {
k = k + jump
} else if i+jump < n && vid > u.Uids[i+jump] {
} else if i+jump < n && vid > u[i+jump] {
i = i + jump
} else if uid > vid {
for k = k + 1; k < m && v.Uids[k] < uid; k++ {
for k = k + 1; k < m && v[k] < uid; k++ {
}
} else {
for i = i + 1; i < n && u.Uids[i] < vid; i++ {
for i = i + 1; i < n && u[i] < vid; i++ {
}
}
}
u.Uids = out
}
// IntersectWithBin is based on the paper
// "Fast Intersection Algorithms for Sorted Sequences"
// https://link.springer.com/chapter/10.1007/978-3-642-12476-1_3
func IntersectWithBin(u, v *taskp.List) {
d := u.Uids
q := v.Uids
func IntersectWithBin(d, q []uint64, o *[]uint64) {
ld := len(d)
lq := len(q)
......@@ -105,9 +103,7 @@ func IntersectWithBin(u, v *taskp.List) {
ld, lq = lq, ld
d, q = q, d
}
final := make([]uint64, 0, lq)
if ld == 0 || lq == 0 || d[ld-1] < q[0] || q[lq-1] < d[0] {
u.Uids = final
return
}
......@@ -121,8 +117,7 @@ func IntersectWithBin(u, v *taskp.List) {
return q[i] > val
})
binIntersect(d, q[minq:maxq], &final)
u.Uids = final
binIntersect(d, q[minq:maxq], o)
}
// binIntersect is the recursive function used.
......@@ -163,76 +158,43 @@ func binIntersect(d, q []uint64, final *[]uint64) {
}
}
// IntersectSorted intersect a list of UIDLists. An alternative is to do
// pairwise intersections n-1 times where n=number of lists. This is less
// efficient:
// Let p be length of shortest list. Let q be average length of lists. So
// nq = total number of elements.
// There are many possible cases. Consider the case where the shortest list
// is the answer (or close to the answer). The following method requires nq
// reads (each element is read only once) whereas pairwise intersections can
// require np + nq - p reads, which can be up to ~twice as many.
type listInfo struct {
l *taskp.List
length int
}
func IntersectSorted(lists []*taskp.List) *taskp.List {
if len(lists) == 0 {
return new(taskp.List)
return &taskp.List{}
}
// Scan through the smallest list. Denote as A.
// For each x in A,
// For each other list B,
// Keep popping elements until we get a y >= x.
// If y > x, mark x as "skipped". Break out of loop.
// If x is not marked as "skipped", append x to result.
var minLenIdx int
minLen := len(lists[0].Uids)
for i := 1; i < len(lists); i++ { // Start from 1.
l := lists[i]
n := len(l.Uids)
if n < minLen {
minLen = n
minLenIdx = i
}
ls := make([]listInfo, 0, len(lists))
for _, list := range lists {
ls = append(ls, listInfo{
l: list,
length: len(list.Uids),
})
}
// Sort the lists based on length.
sort.Slice(ls, func(i, j int) bool {
return ls[i].length < ls[j].length
})
out := &taskp.List{Uids: make([]uint64, ls[0].length)}
if len(ls) == 1 {
copy(out.Uids, ls[0].l.Uids)
return out
}
// Our final output. Give it some capacity.
output := make([]uint64, 0, minLen)
// lptrs[j] is the element we are looking at for lists[j].
lptrs := make([]int, len(lists))
shortList := lists[minLenIdx]
elemsLeft := true // If some list has no elems left, we can't intersect more.
for i := 0; i < len(shortList.Uids) && elemsLeft; i++ {
val := shortList.Uids[i]
if i > 0 && val == shortList.Uids[i-1] {
x.AssertTruef(false, "We shouldn't have duplicates in UIDLists")
}
var skip bool // Should we skip val in output?
for j := 0; j < len(lists); j++ { // For each other list in lists.
if j == minLenIdx {
// No point checking yourself.
continue
}
lj := lists[j]
ljp := lptrs[j]
lsz := len(lj.Uids)
for ; ljp < lsz && lj.Uids[ljp] < val; ljp++ {
}
lptrs[j] = ljp
if ljp >= lsz || lj.Uids[ljp] > val {
elemsLeft = ljp < lsz
skip = true
break
}
// Otherwise, lj.Get(ljp) = val and we continue checking other lists.
}
if !skip {
output = append(output, val)
IntersectWith(ls[0].l, ls[1].l, out)
// Intersect from smallest to largest.
for i := 2; i < len(ls); i++ {
IntersectWith(out, ls[i].l, out)
// Break if we reach size 0 as we can no longer
// add any element.
if len(out.Uids) == 0 {
break
}
}
return &taskp.List{Uids: output}
return out
}
func Difference(u, v *taskp.List) {
......
......@@ -115,7 +115,7 @@ func TestIntersectSorted1(t *testing.T) {
newList([]uint64{1, 2, 3}),
newList([]uint64{2, 3, 4, 5}),
}
require.Equal(t, IntersectSorted(input).Uids, []uint64{2, 3})
require.Equal(t, []uint64{2, 3}, IntersectSorted(input).Uids)
}
func TestIntersectSorted2(t *testing.T) {
......@@ -176,14 +176,14 @@ func TestSubSorted6(t *testing.T) {
func TestUIDListIntersect1(t *testing.T) {
u := newList([]uint64{1, 2, 3})
v := newList([]uint64{})
IntersectWith(u, v)
IntersectWith(u, v, u)
require.Empty(t, u.Uids)
}
func TestUIDListIntersect2(t *testing.T) {
u := newList([]uint64{1, 2, 3})
v := newList([]uint64{1, 2, 3, 4, 5})
IntersectWith(u, v)
IntersectWith(u, v, u)
require.Equal(t, []uint64{1, 2, 3}, u.Uids)
require.Equal(t, []uint64{1, 2, 3, 4, 5}, v.Uids)
}
......@@ -191,7 +191,7 @@ func TestUIDListIntersect2(t *testing.T) {
func TestUIDListIntersect3(t *testing.T) {
u := newList([]uint64{1, 2, 3})
v := newList([]uint64{2})
IntersectWith(u, v)
IntersectWith(u, v, u)
require.Equal(t, []uint64{2}, u.Uids)
require.Equal(t, []uint64{2}, v.Uids)
}
......@@ -199,7 +199,7 @@ func TestUIDListIntersect3(t *testing.T) {
func TestUIDListIntersect4(t *testing.T) {
u := newList([]uint64{1, 2, 3})
v := newList([]uint64{0, 5})
IntersectWith(u, v)
IntersectWith(u, v, u)
require.Empty(t, u.Uids)
require.Equal(t, []uint64{0, 5}, v.Uids)
}
......@@ -207,28 +207,28 @@ func TestUIDListIntersect4(t *testing.T) {
func TestUIDListIntersect5(t *testing.T) {
u := newList([]uint64{1, 2, 3})
v := newList([]uint64{3, 5})
IntersectWith(u, v)
IntersectWith(u, v, u)
require.Equal(t, []uint64{3}, u.Uids)
}
func TestUIDListIntersectDupFirst(t *testing.T) {
u := newList([]uint64{1, 1, 2, 3})
v := newList([]uint64{1, 2})
IntersectWith(u, v)
IntersectWith(u, v, u)
require.Equal(t, []uint64{1, 2}, u.Uids)
}
func TestUIDListIntersectDupBoth(t *testing.T) {
u := newList([]uint64{1, 1, 2, 3, 5})
v := newList([]uint64{1, 1, 2, 4})
IntersectWith(u, v)
IntersectWith(u, v, u)
require.Equal(t, []uint64{1, 1, 2}, u.Uids)
}
func TestUIDListIntersectDupSecond(t *testing.T) {
u := newList([]uint64{1, 2, 3, 5})
v := newList([]uint64{1, 1, 2, 4})
IntersectWith(u, v)
IntersectWith(u, v, u)
require.Equal(t, []uint64{1, 2}, u.Uids)
}
......@@ -258,7 +258,7 @@ func runIntersectRandom(arrSz int, limit int64, b *testing.B) {
b.ResetTimer()
for k := 0; k < b.N; k++ {
IntersectWith(u, v)
IntersectWith(u, v, u)
u.Uids = u.Uids[:arrSz]
copy(u.Uids, ucopy)
}
......@@ -317,7 +317,7 @@ func BenchmarkListIntersectRatio(b *testing.B) {
for k := 0; k < b.N; k++ {
u.Uids = u.Uids[:sz1]
copy(u.Uids, ucopy)
IntersectWith(u, v)
IntersectWith(u, v, u)
}
})
}
......
......@@ -914,7 +914,7 @@ func populateVarMap(sg *SubGraph, doneVars map[string]*taskp.List, isCascade boo
// Intersect the UidMatrix with the DestUids as some UIDs might have been removed
// by other operations. So we need to apply it on the UidMatrix.
for _, l := range child.uidMatrix {
algo.IntersectWith(l, child.DestUIDs)
algo.IntersectWith(l, child.DestUIDs, l)
}
}
......@@ -994,7 +994,7 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
// If its an id() filter, we just have to intersect the SrcUIDs with DestUIDs
// and return.
sg.fillVars(sg.Params.ParentVars)
algo.IntersectWith(sg.DestUIDs, sg.SrcUIDs)
algo.IntersectWith(sg.DestUIDs, sg.SrcUIDs, sg.DestUIDs)
rch <- nil
return
}
......@@ -1119,7 +1119,7 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
for i, ul := range sg.uidMatrix {
// A possible optimization is to return the size of the intersection
// without forming the intersection.
algo.IntersectWith(ul, sg.DestUIDs)
algo.IntersectWith(ul, sg.DestUIDs, ul)
sg.counts[i] = uint32(len(ul.Uids))
}
rch <- nil
......@@ -1184,7 +1184,7 @@ func (sg *SubGraph) applyPagination(ctx context.Context) error {
return nil
}
for i := 0; i < len(sg.uidMatrix); i++ { //_, l := range sg.uidMatrix {
algo.IntersectWith(sg.uidMatrix[i], sg.DestUIDs)
algo.IntersectWith(sg.uidMatrix[i], sg.DestUIDs, sg.uidMatrix[i])
start, end := pageRange(&sg.Params, len(sg.uidMatrix[i].Uids))
sg.uidMatrix[i].Uids = sg.uidMatrix[i].Uids[start:end]
}
......
......@@ -78,7 +78,7 @@ func (start *SubGraph) expandRecurse(ctx context.Context,
for mIdx, fromUID := range sg.SrcUIDs.Uids {
if len(sg.Filters) > 0 {
// We need to do this in case we had some filters.
algo.IntersectWith(sg.uidMatrix[mIdx], sg.DestUIDs)
algo.IntersectWith(sg.uidMatrix[mIdx], sg.DestUIDs, sg.uidMatrix[mIdx])
}
algo.ApplyFilter(sg.uidMatrix[mIdx], func(uid uint64, i int) bool {
key := fmt.Sprintf("%s|%d|%d", sg.Attr, fromUID, uid)
......
......@@ -350,7 +350,7 @@ func processTask(q *taskp.Query, gid uint32) (*taskp.Result, error) {
filtered := types.FilterGeoUids(uids, values, srcFn.geoQuery)
for i := 0; i < len(out.UidMatrix); i++ {
algo.IntersectWith(out.UidMatrix[i], filtered)
algo.IntersectWith(out.UidMatrix[i], filtered, out.UidMatrix[i])
}
}
out.IntersectDest = srcFn.intersectDest
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment