/*
 * Copyright 2015 DGraph Labs, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * 		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package posting

import (
	"bytes"
	"encoding/binary"
	"encoding/json"
	"errors"
	"fmt"
	"math"
	"sync"
	"sync/atomic"
	"time"
	"unsafe"

	"github.com/Sirupsen/logrus"
	"github.com/dgraph-io/dgraph/commit"
	"github.com/dgraph-io/dgraph/posting/types"
	"github.com/dgraph-io/dgraph/store"
	"github.com/dgraph-io/dgraph/x"
	"github.com/dgryski/go-farm"
	"github.com/google/flatbuffers/go"
	"github.com/zond/gotomic"
)

var glog = x.Log("posting")
var E_TMP_ERROR = fmt.Errorf("Temporary Error. Please retry.")

const Set = 0x01
const Del = 0x02

type buffer struct {
	d []byte
}

type MutationLink struct {
	idx     int
	moveidx int
	posting *types.Posting
}

type List struct {
	sync.RWMutex
	key         []byte
	ghash       gotomic.Hashable
	hash        uint32
	pbuffer     unsafe.Pointer
	pstore      *store.Store // postinglist store
	clog        *commit.Logger
	lastCompact time.Time
	wg          sync.WaitGroup
	deleteMe    bool

	// Mutations
	mlayer        map[int]types.Posting // stores only replace instructions.
	mdelta        int                   // len(plist) + mdelta = final length.
	maxMutationTs int64                 // Track maximum mutation ts.
	mindex        []*MutationLink
	dirtyTs       int64 // Use atomics for this.
}

func NewList() *List {
	l := new(List)
	l.wg.Add(1)
	l.mlayer = make(map[int]types.Posting)
	return l
}

type ByUid []*types.Posting

func (pa ByUid) Len() int           { return len(pa) }
func (pa ByUid) Swap(i, j int)      { pa[i], pa[j] = pa[j], pa[i] }
func (pa ByUid) Less(i, j int) bool { return pa[i].Uid() < pa[j].Uid() }

func samePosting(a *types.Posting, b *types.Posting) bool {
	if a.Uid() != b.Uid() {
		return false
	}
	if a.ValueLength() != b.ValueLength() {
		return false
	}
	if !bytes.Equal(a.ValueBytes(), b.ValueBytes()) {
		return false
	}
	if !bytes.Equal(a.Source(), b.Source()) {
		return false
	}
	return true
}

// key = (entity uid, attribute)
func Key(uid uint64, attr string) []byte {
	buf := bytes.NewBufferString(attr)
	buf.WriteRune('|')
	if err := binary.Write(buf, binary.LittleEndian, uid); err != nil {
		glog.Fatalf("Error while creating key with attr: %v uid: %v\n", attr, uid)
	}
	return buf.Bytes()
}

func newPosting(t x.DirectedEdge, op byte) []byte {
	b := flatbuffers.NewBuilder(0)
	var bo flatbuffers.UOffsetT
	if t.Value != nil {
		if t.ValueId != math.MaxUint64 {
			glog.Fatal("This should have already been set by the caller.")
		}
		bytes, err := json.Marshal(t.Value)
		if err != nil {
			glog.WithError(err).Fatal("Unable to marshal value")
			return []byte{}
		}
		bo = b.CreateByteVector(bytes)
	}
	so := b.CreateString(t.Source)
	types.PostingStart(b)
	if bo > 0 {
		types.PostingAddValue(b, bo)
	}
	types.PostingAddUid(b, t.ValueId)
	types.PostingAddSource(b, so)
	types.PostingAddTs(b, t.Timestamp.UnixNano())
	types.PostingAddOp(b, op)
	vend := types.PostingEnd(b)
	b.Finish(vend)

	return b.Bytes[b.Head():]
}

func addEdgeToPosting(b *flatbuffers.Builder,
	t x.DirectedEdge, op byte) flatbuffers.UOffsetT {

	var bo flatbuffers.UOffsetT
	if t.Value != nil {
		if t.ValueId != math.MaxUint64 {
			glog.Fatal("This should have already been set by the caller.")
		}
		bytes, err := json.Marshal(t.Value)
		if err != nil {
			glog.WithError(err).Fatal("Unable to marshal value")
			return 0
		}
		bo = b.CreateByteVector(bytes)
	}
	so := b.CreateString(t.Source) // Do this before posting start.

	types.PostingStart(b)
	if bo > 0 {
		types.PostingAddValue(b, bo)
	}
	types.PostingAddUid(b, t.ValueId)
	types.PostingAddSource(b, so)
	types.PostingAddTs(b, t.Timestamp.UnixNano())
	types.PostingAddOp(b, op)
	return types.PostingEnd(b)
}

func addPosting(b *flatbuffers.Builder, p types.Posting) flatbuffers.UOffsetT {
	so := b.CreateByteString(p.Source()) // Do this before posting start.
	var bo flatbuffers.UOffsetT
	if p.ValueLength() > 0 {
		bo = b.CreateByteVector(p.ValueBytes())
	}

	types.PostingStart(b)
	types.PostingAddUid(b, p.Uid())
	if bo > 0 {
		types.PostingAddValue(b, bo)
	}
	types.PostingAddSource(b, so)
	types.PostingAddTs(b, p.Ts())
	types.PostingAddOp(b, p.Op())
	return types.PostingEnd(b)
}

var empty []byte
var emptyPosting []byte

// package level init
func init() {
	{
		b := flatbuffers.NewBuilder(0)
		types.PostingListStart(b)
		of := types.PostingListEnd(b)
		b.Finish(of)
		empty = b.Bytes[b.Head():]
	}

	{
		b := flatbuffers.NewBuilder(0)
		types.PostingStart(b)
		types.PostingAddUid(b, 0)
		of := types.PostingEnd(b)
		b.Finish(of)
		emptyPosting = b.Bytes[b.Head():]
	}

	glog.Infof("Empty size: [%d] EmptyPosting size: [%d]",
		len(empty), len(emptyPosting))
}

func ParseValue(i *interface{}, value []byte) error {
	if len(value) == 0 {
		return errors.New("No value found in posting")
	}

	if len(value) == 1 && value[0] == 0x00 {
		i = nil
		return nil
	}

	return json.Unmarshal(value, i)
}

func (l *List) init(key []byte, pstore *store.Store, clog *commit.Logger) {
	l.Lock()
	defer l.Unlock()
	defer l.wg.Done()

	if len(empty) == 0 {
		glog.Fatal("empty should have some bytes.")
	}
	l.key = key
	l.pstore = pstore
	l.clog = clog

	posting := l.getPostingList()
	l.maxMutationTs = posting.CommitTs()
	l.hash = farm.Fingerprint32(key)
	l.ghash = gotomic.IntKey(farm.Fingerprint64(key))
	l.mlayer = make(map[int]types.Posting)

	if clog == nil {
		return
	}

	err := clog.StreamEntries(posting.CommitTs()+1, l.hash,
		func(hdr commit.Header, buffer []byte) {

			uo := flatbuffers.GetUOffsetT(buffer)
			m := new(types.Posting)
			m.Init(buffer, uo)
			if m.Ts() > l.maxMutationTs {
				l.maxMutationTs = m.Ts()
			}
			glog.WithFields(logrus.Fields{
				"uid":    m.Uid(),
				"source": string(m.Source()),
				"ts":     m.Ts(),
			}).Debug("Got entry from log")
			l.mergeMutation(m)
		})
	if err != nil {
		glog.WithError(err).Error("While streaming entries.")
	}
}

// There's no need for lock acquisition for this.
func (l *List) getPostingList() *types.PostingList {
	pb := atomic.LoadPointer(&l.pbuffer)
	buf := (*buffer)(pb)

	if buf == nil || len(buf.d) == 0 {
		nbuf := new(buffer)
		var err error
		if nbuf.d, err = l.pstore.Get(l.key); err != nil {
			// glog.Debugf("While retrieving posting list from db: %v\n", err)
			// Error. Just set to empty.
			nbuf.d = make([]byte, len(empty))
			copy(nbuf.d, empty)
		}
		if atomic.CompareAndSwapPointer(&l.pbuffer, pb, unsafe.Pointer(nbuf)) {
			return types.GetRootAsPostingList(nbuf.d, 0)

		} else {
			// Someone else replaced the pointer in the meantime. Retry recursively.
			return l.getPostingList()
		}
	}
	return types.GetRootAsPostingList(buf.d, 0)
}

// Caller must hold at least a read lock.
func (l *List) lePostingIndex(maxUid uint64) (int, uint64) {
	posting := l.getPostingList()
	left, right := 0, posting.PostingsLength()-1
	sofar := -1
	p := new(types.Posting)

	for left <= right {
		pos := (left + right) / 2
		if ok := posting.Postings(p, pos); !ok {
			glog.WithField("idx", pos).Fatal("Unable to parse posting from list.")
		}
		val := p.Uid()
		if val > maxUid {
			right = pos - 1
			continue
		}
		if val == maxUid {
			return pos, val
		}
		sofar = pos
		left = pos + 1
	}
	if sofar == -1 {
		return -1, 0
	}
	if ok := posting.Postings(p, sofar); !ok {
		glog.WithField("idx", sofar).Fatal("Unable to parse posting from list.")
	}
	return sofar, p.Uid()
}

func (l *List) leMutationIndex(maxUid uint64) (int, uint64) {
	left, right := 0, len(l.mindex)-1
	sofar := -1
	for left <= right {
		pos := (left + right) / 2
		m := l.mindex[pos]
		val := m.posting.Uid()
		if val > maxUid {
			right = pos - 1
			continue
		}
		if val == maxUid {
			return pos, val
		}
		sofar = pos
		left = pos + 1
	}
	if sofar == -1 {
		return -1, 0
	}
	return sofar, l.mindex[sofar].posting.Uid()
}

func (l *List) mindexInsertAt(mlink *MutationLink, mi int) {
	l.mindex = append(l.mindex, nil)
	copy(l.mindex[mi+1:], l.mindex[mi:])
	l.mindex[mi] = mlink
	for i := mi + 1; i < len(l.mindex); i++ {
		l.mindex[i].idx += 1
	}
}

func (l *List) mindexDeleteAt(mi int) {
	glog.WithField("mi", mi).WithField("size", len(l.mindex)).
		Debug("mindexDeleteAt")
	l.mindex = append(l.mindex[:mi], l.mindex[mi+1:]...)
	for i := mi; i < len(l.mindex); i++ {
		l.mindex[i].idx -= 1
	}
}

// mutationIndex (mindex) is useful to avoid having to parse the entire
// postinglist upto idx, for every Get(*types.Posting, idx), which has a
// complexity of O(idx). Iteration over N size posting list would this push
// us into O(N^2) territory, without this technique.
//
// Using this technique,
// we can overlay mutation layers over immutable posting list, to allow for
// O(m) lookups, where m = size of mutation list. Obviously, the size of
// mutation list should be much smaller than the size of posting list, except
// in tiny posting lists, where performance wouldn't be such a concern anyways.
//
// Say we have this data:
// Posting List (plist, immutable):
// idx:   0  1  2  3  4  5
// value: 2  5  9 10 13 15
//
// Mutation List (mlist):
// idx:          0   1   2
// value:        7  10  13' // posting uid is 13 but other values vary.
// Op:         SET DEL SET
// Effective:  ADD DEL REP  (REP = replace)
//
// ----------------------------------------------------------------------------
// regenerateIndex would generate these:
// mlayer (layer just above posting list contains only replace instructions)
// idx:          4
// value:       13'
// Op:       	 SET
// Effective:  REP  (REP = replace)
//
// mindex:
// idx:          2   4
// value:        7  10
// moveidx:     -1  +1
// Effective:  ADD DEL
//
// Now, let's see how the access would work:
// idx: get --> calculation [idx, served from, value]
// idx: 0 --> 0   [0, plist, 2]
// idx: 1 --> 1   [1, plist, 5]
// idx: 2 --> ADD from mindex
//        -->     [2, mindex, 7] // also has moveidx = -1
// idx: 3 --> 3 + moveidx=-1 = 2 [2, plist, 9]
// idx: 4 --> DEL from mindex
//        --> 4 + moveidx=-1 + moveidx=+1 = 4 [4, mlayer, 13']
// idx: 5 --> 5 + moveidx=-1 + moveidx=+1 = 5 [5, plist, 15]
//
// Thus we can provide mutation layers over immutable posting list, while
// still ensuring fast lookup access.
//
// NOTE: This function expects the caller to hold a RW Lock.
// Update: With mergeMutation function, we're adding mutations with a cost
// of O(log M + log N), where M = number of previous mutations, and N =
// number of postings in the immutable posting list.
func (l *List) mergeMutation(mp *types.Posting) {
	curUid := mp.Uid()
	pi, puid := l.lePostingIndex(curUid)  // O(log N)
	mi, muid := l.leMutationIndex(curUid) // O(log M)
	inPlist := puid == curUid

	// O(1) follows, but any additions or deletions from mindex would
	// be O(M) due to element shifting. In terms of benchmarks, this performs
	// a LOT better than when I was running O(N + M), re-generating mutation
	// flatbuffers, linked lists etc.
	mlink := new(MutationLink)
	mlink.posting = mp

	if mp.Op() == Del {
		if muid == curUid { // curUid found in mindex.
			if inPlist { // In plist, so replace previous instruction in mindex.
				mlink.moveidx = 1
				mlink.idx = pi + mi
				l.mindex[mi] = mlink

			} else { // Not in plist, so delete previous instruction in mindex.
				l.mdelta -= 1
				l.mindexDeleteAt(mi)
			}

		} else { // curUid not found in mindex.
			if inPlist { // In plist, so insert in mindex.
				mlink.moveidx = 1
				l.mdelta -= 1
				mlink.idx = pi + mi + 1
				l.mindexInsertAt(mlink, mi+1)

			} else {
				// Not found in plist, and not found in mindex. So, ignore.
			}
		}

	} else if mp.Op() == Set {
		if muid == curUid { // curUid found in mindex.
			if inPlist { // In plist, so delete previous instruction, set in mlayer.
				l.mindexDeleteAt(mi)
				l.mlayer[pi] = *mp

			} else { // Not in plist, so replace previous set instruction in mindex.
				// NOTE: This prev instruction couldn't have been a Del instruction.
				mlink.idx = pi + 1 + mi
				mlink.moveidx = -1
				l.mindex[mi] = mlink
			}

		} else { // curUid not found in mindex.
			if inPlist { // In plist, so just set it in mlayer.
				// If this posting matches what we already have in posting list,
				// we don't need to `dirty` this by adding to mlayer.
				plist := l.getPostingList()
				var cp types.Posting
				if ok := plist.Postings(&cp, pi); ok {
					if samePosting(&cp, mp) {
						return // do nothing.
					}
				}
				l.mlayer[pi] = *mp

			} else { // not in plist, not in mindex, so insert in mindex.
				mlink.moveidx = -1
				l.mdelta += 1
				mlink.idx = pi + 1 + mi + 1 // right of pi, and right of mi.
				l.mindexInsertAt(mlink, mi+1)
			}
		}

	} else {
		glog.WithField("op", mp.Op()).Fatal("Invalid operation.")
	}
}

// Caller must hold at least a read lock.
func (l *List) length() int {
	plist := l.getPostingList()
	return plist.PostingsLength() + l.mdelta
}

func (l *List) Length() int {
	l.wg.Wait()

	l.RLock()
	defer l.RUnlock()
	return l.length()
}

func (l *List) Get(p *types.Posting, i int) bool {
	l.wg.Wait()

	l.RLock()
	defer l.RUnlock()
	return l.get(p, i)
}

// Caller must hold at least a read lock.
func (l *List) get(p *types.Posting, i int) bool {
	plist := l.getPostingList()
	if len(l.mindex) == 0 {
		if val, ok := l.mlayer[i]; ok {
			*p = val
			return true
		}
		return plist.Postings(p, i)
	}

	// Iterate over mindex, and see if we have instructions
	// for the given index. Otherwise, sum up the move indexes
	// uptil the given index, so we know where to look in
	// mlayer and/or the main posting list.
	move := 0
	for _, mlink := range l.mindex {
		if mlink.idx > i {
			break

		} else if mlink.idx == i {
			// Found an instruction. Check what is says.
			if mlink.posting.Op() == Set {
				// ADD
				glog.WithField("idx", i).
					WithField("uid", mlink.posting.Uid()).
					WithField("source", string(mlink.posting.Source())).
					Debug("Returning from mlink")
				*p = *mlink.posting
				return true

			} else if mlink.posting.Op() == Del {
				// DELETE
				// The loop will break in the next iteration, after updating the move
				// variable.

			} else {
				glog.Fatal("Someone, I mean you, forgot to tackle" +
					" this operation. Stop drinking.")
			}
		}
		move += mlink.moveidx
	}
	newidx := i + move
	glog.WithFields(logrus.Fields{
		"newidx": newidx,
		"idx":    i,
		"move":   move,
	}).Debug("Final Indices")

	// Check if we have any replace instruction in mlayer.
	if val, ok := l.mlayer[newidx]; ok {
		*p = val
		return true
	}
	// Hit the main posting list.
	if newidx >= plist.PostingsLength() {
		return false
	}
	return plist.Postings(p, newidx)
}

func (l *List) SetForDeletion() {
	l.wg.Wait()
	l.Lock()
	defer l.Unlock()
	l.deleteMe = true
}

// In benchmarks, the time taken per AddMutation before was
// plateauing at 2.5 ms with sync per 10 log entries, and increasing
// for sync per 100 log entries (to 3 ms per AddMutation), largely because
// of how index generation was being done.
//
// With this change, the benchmarks perform as good as benchmarks for
// commit.Logger, where the less frequently file sync happens, the faster
// AddMutations run.
//
// PASS
// BenchmarkAddMutations_SyncEveryLogEntry-6    	     100	  24712455 ns/op
// BenchmarkAddMutations_SyncEvery10LogEntry-6  	     500	   2485961 ns/op
// BenchmarkAddMutations_SyncEvery100LogEntry-6 	   10000	    298352 ns/op
// BenchmarkAddMutations_SyncEvery1000LogEntry-6	   30000	     63544 ns/op
// ok  	github.com/dgraph-io/dgraph/posting	10.291s
func (l *List) AddMutation(t x.DirectedEdge, op byte) error {
	l.wg.Wait()
	l.Lock()
	defer l.Unlock()
	if l.deleteMe {
		return E_TMP_ERROR
	}

	if t.Timestamp.UnixNano() < l.maxMutationTs {
		return fmt.Errorf("Mutation ts lower than committed ts.")
	}

	// Mutation arrives:
	// - Check if we had any(SET/DEL) before this, stored in the mutation list.
	//		- If yes, then replace that mutation. Jump to a)
	// a)		check if the entity exists in main posting list.
	// 				- If yes, store the mutation.
	// 				- If no, disregard this mutation.

	// All edges with a value set, have the same uid. In other words,
	// an (entity, attribute) can only have one interface{} value.
	if t.Value != nil {
		t.ValueId = math.MaxUint64
	}
	if t.ValueId == 0 {
		return fmt.Errorf("ValueId cannot be zero.")
	}

	mbuf := newPosting(t, op)
	uo := flatbuffers.GetUOffsetT(mbuf)
	mpost := new(types.Posting)
	mpost.Init(mbuf, uo)

	glog.WithFields(logrus.Fields{
		"uid":    mpost.Uid(),
		"source": string(mpost.Source()),
		"ts":     mpost.Ts(),
	}).Debug("Add mutation")

	l.mergeMutation(mpost)
	l.maxMutationTs = t.Timestamp.UnixNano()
	if len(l.mindex)+len(l.mlayer) > 0 {
		atomic.StoreInt64(&l.dirtyTs, time.Now().UnixNano())
		if dirtymap != nil {
			dirtymap.Put(l.ghash, true)
		}
	}
	if l.clog == nil {
		return nil
	}
	return l.clog.AddLog(t.Timestamp.UnixNano(), l.hash, mbuf)
}

func (l *List) MergeIfDirty() (merged bool, err error) {
	if atomic.LoadInt64(&l.dirtyTs) == 0 {
		glog.WithField("dirty", false).Debug("Not Committing")
		return false, nil
	} else {
		glog.WithField("dirty", true).Debug("Committing")
	}
	return l.merge()
}

func (l *List) merge() (merged bool, rerr error) {
	l.wg.Wait()
	l.Lock()
	defer l.Unlock()

	if len(l.mindex)+len(l.mlayer) == 0 {
		atomic.StoreInt64(&l.dirtyTs, 0)
		return false, nil
	}

	var p types.Posting
	sz := l.length()
	b := flatbuffers.NewBuilder(0)
	offsets := make([]flatbuffers.UOffsetT, sz)
	for i := 0; i < sz; i++ {
		if ok := l.get(&p, i); !ok {
			glog.WithField("idx", i).Fatal("Unable to parse posting.")
		}
		offsets[i] = addPosting(b, p)
	}
	types.PostingListStartPostingsVector(b, sz)
	for i := len(offsets) - 1; i >= 0; i-- {
		b.PrependUOffsetT(offsets[i])
	}
	vend := b.EndVector(sz)

	types.PostingListStart(b)
	types.PostingListAddPostings(b, vend)
	types.PostingListAddCommitTs(b, l.maxMutationTs)
	end := types.PostingListEnd(b)
	b.Finish(end)

	if err := l.pstore.SetOne(l.key, b.Bytes[b.Head():]); err != nil {
		glog.WithField("error", err).Fatal("While storing posting list")
		return true, err
	}

	// Now reset the mutation variables.
	atomic.StorePointer(&l.pbuffer, nil) // Make prev buffer eligible for GC.
	atomic.StoreInt64(&l.dirtyTs, 0)     // Set as clean.
	l.lastCompact = time.Now()
	l.mlayer = make(map[int]types.Posting)
	l.mdelta = 0
	l.mindex = nil
	return true, nil
}

func (l *List) LastCompactionTs() time.Time {
	l.RLock()
	defer l.RUnlock()
	return l.lastCompact
}

func (l *List) GetUids(offset, count int) []uint64 {
	l.wg.Wait()
	l.RLock()
	defer l.RUnlock()

	if offset < 0 {
		glog.WithField("offset", offset).Fatal("Unexpected offset")
		return make([]uint64, 0)
	}

	if count < 0 {
		count = 0 - count
		offset = l.length() - count
	}

	if count == 0 {
		count = l.length() - offset

	} else if count > l.length()-offset {
		count = l.length() - offset
	}
	if count < 0 {
		count = 0
	}

	result := make([]uint64, count)
	result = result[:0]
	var p types.Posting
	for i := offset; i < count+offset && i < l.length(); i++ {
		if ok := l.get(&p, i); !ok || p.Uid() == math.MaxUint64 {
			break
		}
		result = append(result, p.Uid())
	}
	return result
}

func (l *List) Value() (result []byte, rerr error) {
	l.wg.Wait()
	l.RLock()
	defer l.RUnlock()

	if l.length() == 0 {
		return result, fmt.Errorf("No value found")
	}

	var p types.Posting
	if ok := l.get(&p, l.length()-1); !ok {
		return result, fmt.Errorf("Unable to get last posting")
	}
	if p.Uid() != math.MaxUint64 {
		return result, fmt.Errorf("No value found")
	}
	return p.ValueBytes(), nil
}