1 // Ryzom - MMORPG Framework <http://dev.ryzom.com/projects/ryzom/>
2 // Copyright (C) 2010 Winch Gate Property Limited
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU Affero General Public License as
6 // published by the Free Software Foundation, either version 3 of the
7 // License, or (at your option) any later version.
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU Affero General Public License for more details.
14 // You should have received a copy of the GNU Affero General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
17 #include "pds_table_buffer.h"
19 #include "nel/misc/time_nl.h"
26 uint32
CTableBuffer::_MaxRefFileSize
= 32768; //128*1024*1024;
29 uint32
CTableBuffer::_CommonStamp
= 0;
34 CTableBuffer::CTableBuffer()
42 CTableBuffer::~CTableBuffer()
50 void CTableBuffer::clear()
70 void CTableBuffer::init(uint32 tableId
, uint32 rowSize
, bool mapped
)
77 _InternalRowSize
= _RowSize
+ getHeaderSize();
78 // compute maximum number of rows a ref file will contain
79 if (!initRowsPerFile())
88 void CTableBuffer::init(uint32 tableId
, const std::string
& refRootPath
, const std::string
& refPath
) //CRefIndex& ref)
94 _RefRootPath
= refRootPath
;
103 void CTableBuffer::setupRef(CRefIndex
& ref
)
107 _RefRootPath
= ref
.getRootPath();
108 _RefPath
= ref
.getPath();
109 _ReferenceStamp
= uint32(ref
.Timestamp
.toTime());
111 // clear all ref files
114 // release all rows that can be released
122 * Return a pointer to straight row data
123 * If row is not loaded, data are loaded from table file
125 CTableBuffer::CAccessor
CTableBuffer::getRow(RY_PDS::TRowIndex row
)
129 TRowData rowData
= NULL
;
130 TRowMap::iterator it
= _RowMap
.find(row
);
132 PDS_FULL_DEBUG("getRow(): row '%d'", row
);
134 if (it
== _RowMap
.end())
136 rowData
= new uint8
[_InternalRowSize
];
138 it
= _RowMap
.insert(TRowMap::value_type(row
, rowData
)).first
;
139 loadRow(row
, rowData
);
144 return CAccessor(it
, _Mapped
);
148 * Return a pointer to straight new row data
149 * Row is filled with blank, not loaded from file
151 CTableBuffer::CAccessor
CTableBuffer::getEmptyRow(RY_PDS::TRowIndex row
)
155 TRowData rowData
= NULL
;
156 TRowMap::iterator it
= _RowMap
.find(row
);
158 if (it
== _RowMap
.end())
160 rowData
= new uint8
[_InternalRowSize
];
161 it
= _RowMap
.insert(TRowMap::value_type(row
, rowData
)).first
;
162 memset(rowData
, 0, _InternalRowSize
);
166 PDS_WARNING("getEmptyRow(): row '%d' already exists! data might be lost if buffer modified!", row
);
171 return CAccessor(it
, _Mapped
);
176 * Row is marked as non releasable until it is released enough time
178 bool CTableBuffer::acquireRow(CAccessor accessor
)
182 return acquireRow(accessor
._MapIt
);
187 * Row is marked as purgeable, and will be purged as soon as possible
188 * unless it is reactivated before purge
190 bool CTableBuffer::releaseRow(CAccessor accessor
)
194 return releaseRow(accessor
._MapIt
);
199 * Row is marked as purgeable, and will be purged as soon as possible
200 * unless it is reactivated before purge
202 bool CTableBuffer::releaseRow(RY_PDS::TRowIndex row
)
206 TRowMap::iterator it
= _RowMap
.find(row
);
208 if (it
== _RowMap
.end())
213 return releaseRow(it
);
219 bool CTableBuffer::releaseAll()
221 TRowMap::iterator it
, itr
;
223 for (it
=_RowMap
.begin(); it
!=_RowMap
.end(); )
225 // force all rows as unacquired
227 releaseRow(itr
, true);
239 * Load a row from the appropriate reference file
241 bool CTableBuffer::loadRow(RY_PDS::TRowIndex row
, TRowData rowData
)
245 // locate reference file
246 uint32 file
= row
/ _RowsPerFile
;
248 // check reference is ready
252 CDBReferenceFile
& refFile
= *(_RefFileMap
[file
]);
254 if (!refFile
.read(row
, rowData
))
257 // row comes from reference
258 // clear Dirty and AcquireCount, as row was loaded from reference
259 CHeader
* hdr
= (CHeader
*)rowData
;
261 hdr
->clearDirtStamp();
262 hdr
->clearAcquireCount();
271 * Update in file the whole row
273 bool CTableBuffer::updateRow(CAccessor accessor
)
275 return updateRow(accessor
.row(), accessor
.fullRow(), true);
281 bool CTableBuffer::updateRow(RY_PDS::TRowIndex row
, const TRowData rowData
, bool forceWriteToDisk
)
285 TRowMap::iterator it
= _RowMap
.find(row
);
287 // if row is not mapped in ram or if disk write is forced
288 if (it
== _RowMap
.end() || forceWriteToDisk
)
290 // locate reference file
291 uint32 file
= row
/ _RowsPerFile
;
292 // check reference is ready
297 CDBReferenceFile
& refFile
= *(_RefFileMap
[file
]);
298 if (!refFile
.update(row
, rowData
))
302 // if row is mapped in ram
303 if (it
!= _RowMap
.end())
305 // get row data buffer
306 TRowData dest
= (*it
).second
;
308 // row is clean and warm from reference file
309 ((CHeader
*)rowData
)->clearDirtStamp();
312 memcpy(dest
, rowData
, _InternalRowSize
);
319 * Mark a Row as being dirty for later delta save
321 bool CTableBuffer::dirtyRow(CAccessor accessor
)
325 CHeader
* header
= (CHeader
*)accessor
.fullRow();
327 // check row not dirty already
328 if (!header
->dirty())
332 _DirtyList
.push_back(accessor
);
341 bool CTableBuffer::buildDelta(const CTimestamp
& starttimestamp
, const CTimestamp
& endtimestamp
)
345 // check there is something to write
346 if (_DirtyList
.empty())
351 std::string deltaFilename
= CDBDeltaFile::getDeltaFileName(_TableId
, endtimestamp
);
352 delta
.setup(deltaFilename
, _RefRootPath
+"seconds", _InternalRowSize
, starttimestamp
, endtimestamp
);
355 delta
.setDeltaIds(_CurrentDeltaId
, _CurrentDeltaId
);
359 // go through all dirty rows
360 for (i
=0; i
<_DirtyList
.size(); ++i
)
362 RY_PDS::TRowIndex row
= _DirtyList
[i
].row();
363 TRowData data
= _DirtyList
[i
].fullRow();
366 ((CHeader
*)data
)->clearFlags(CHeader::Dirty
);
368 if (!delta
.write(row
, data
))
376 std::string filename
= _RefRootPath
+"seconds/"+deltaFilename
;
377 PDS_DEBUG("buildDelta(): built table delta '%s', %d rows written", filename
.c_str(), i
);
383 * Apply delta changes from a file
385 bool CTableBuffer::applyDeltaChanges(const std::string
& filename
)
389 delta
.setup(filename
, _InternalRowSize
, CTimestamp(), CTimestamp());
391 if (!delta
.preload())
393 PDS_WARNING("applyDeltaChanges(): failed to preload file '%s'", filename
.c_str());
397 // internal row size not set, get it from delta file
398 if (_InternalRowSize
== 0)
400 // get row size from delta
401 _InternalRowSize
= delta
.getRowSize();
402 // recompute good number of rows per file
405 else if (_InternalRowSize
!= delta
.getRowSize())
407 PDS_WARNING("applyDeltaChanges(): delta file '%s' has mismatching row size (%d bytes expected, %d bytes found)", filename
.c_str(), _InternalRowSize
, delta
.getRowSize());
411 uint32 startDeltaId
, endDeltaId
;
412 delta
.getDeltaIds(startDeltaId
, endDeltaId
);
414 if (!updateDeltaIds(startDeltaId
, endDeltaId
))
416 PDS_WARNING("applyDeltaChanges(): failed to update delta ids from file '%s'", filename
.c_str());
421 std::vector
<uint8
> buffer(_InternalRowSize
);
422 uint8
* data
= &(buffer
[0]);
424 // read data from delta till the end
427 // read data from delta file
428 if (!delta
.read(index
, data
))
430 PDS_WARNING("applyDeltaChanges(): failed to load next row from delta file '%s'", filename
.c_str());
434 if (index
== 0xffffffff)
437 // update disk data with read buffer
438 if (!updateRow(index
, data
, true))
440 PDS_WARNING("applyDeltaChanges(): failed to update row '%d' data from file '%s'", index
, filename
.c_str());
450 * Flush Released Rows from memory
452 void CTableBuffer::flushReleased()
454 TReleaseSet::iterator it
;
456 // go through all released rows
457 for (it
=_ReleaseSet
.begin(); it
!=_ReleaseSet
.end(); )
460 TRowMap::iterator rit
= _RowMap
.find(*it
);
461 if (rit
== _RowMap
.end())
463 PDS_WARNING("flushReleased(): row '%d' not present, already released?", *it
);
464 // anyway, remove from released set
465 TReleaseSet::iterator itr
= (it
++);
466 _ReleaseSet
.erase(itr
);
470 // check row has really been released
471 CHeader
* header
= (CHeader
*)(*rit
).second
;
472 if (!header
->isAcquired())
474 PDS_WARNING("flushReleased(): try to release row '%d' not flagged as being released, bypassed", *it
);
475 // remove from release set
476 TReleaseSet::iterator itr
= (it
++);
477 _ReleaseSet
.erase(itr
);
481 // the row may have been released and dirtied at the same time
482 // don't flush it, since it has not been deltaed...
483 if (header
->dirty() || header
->getDirtStamp() >= _ReferenceStamp
)
490 delete (*rit
).second
;
494 TReleaseSet::iterator itr
= (it
++);
495 _ReleaseSet
.erase(itr
);
502 void CTableBuffer::resetDirty()
506 TRowMap::iterator it
;
507 for (it
=_RowMap
.begin(); it
!=_RowMap
.end(); ++it
)
509 CHeader
* header
= (CHeader
*)((*it
).second
);
510 header
->clearFlags(CHeader::Dirty
);
515 * Flush Reference files
517 void CTableBuffer::flushRefFiles()
520 for (i
=0; i
<_RefFileMap
.size(); ++i
)
521 if (_RefFileMap
[i
] != NULL
&& _RefFileMap
[i
]->initialised())
522 _RefFileMap
[i
]->flush();
526 * Purge all references
528 bool CTableBuffer::purgeReferences()
531 for (i
=0; i
<_RefFileMap
.size(); ++i
)
533 if (_RefFileMap
[i
] != NULL
&& _RefFileMap
[i
]->initialised())
535 _RefFileMap
[i
]->flush();
536 delete _RefFileMap
[i
];
537 _RefFileMap
[i
] = NULL
;
547 * Open all reference files in reference directory
549 bool CTableBuffer::openAllRefFilesWrite()
551 std::vector
<std::string
> refs
;
553 getReferenceFilesList(refs
);
556 for (i
=0; i
<refs
.size(); ++i
)
561 // check file is a reference of the table
562 if (!CDBReferenceFile::isRefFile(refs
[i
], tableId
, refFileId
) || tableId
!= _TableId
)
564 PDS_WARNING("openAllRefFilesWrite(): file '%s' is not a reference file for table '%d'", refs
[i
].c_str(), _TableId
);
568 if (!checkRef(refFileId
))
570 PDS_WARNING("openAllRefFilesWrite(): failed to check reference file '%s' for table '%d'", refs
[i
].c_str(), _TableId
);
574 if (!_RefFileMap
[refFileId
]->prewrite())
576 PDS_WARNING("openAllRefFilesWrite(): failed to check prewrite reference file '%s' for table '%d'", refs
[i
].c_str(), _TableId
);
585 * Open all reference files in reference directory
587 bool CTableBuffer::openAllRefFilesRead()
589 std::vector
<std::string
> refs
;
591 getReferenceFilesList(refs
);
593 uint32 startId
= 0xffffffff;
594 uint32 endId
= 0xffffffff;
597 for (i
=0; i
<refs
.size(); ++i
)
602 // check file is a reference of the table
603 if (!CDBReferenceFile::isRefFile(refs
[i
], tableId
, refFileId
) || tableId
!= _TableId
)
605 PDS_WARNING("openAllRefFilesRead(): file '%s' is not a reference file for table '%d'", refs
[i
].c_str(), _TableId
);
609 if (!checkRef(refFileId
))
611 PDS_WARNING("openAllRefFilesRead(): failed to check reference file '%s' for table '%d'", refs
[i
].c_str(), _TableId
);
615 if (!_RefFileMap
[refFileId
]->preload())
617 PDS_WARNING("openAllRefFilesRead(): failed to check preload reference file '%s' for table '%d'", refs
[i
].c_str(), _TableId
);
622 _RefFileMap
[refFileId
]->getUpdateDeltaIds(startId
, endIdCheck
);
624 if (endId
== 0xffffffff)
628 else if (endId
!= endIdCheck
)
630 PDS_WARNING("openAllRefFilesRead(): expected endId '%d', found '%d' in file '%s'", endId
, endIdCheck
, refs
[i
].c_str());
635 _CurrentDeltaId
= endId
+ 1;
642 * Update delta ids for all references
644 bool CTableBuffer::updateDeltaIds(uint32 start
, uint32 end
)
647 for (i
=0; i
<_RefFileMap
.size(); ++i
)
649 if (_RefFileMap
[i
] == NULL
|| !_RefFileMap
[i
]->initialised())
651 PDS_WARNING("updateDeltaIds(): for table '%d', file %d not initialised", _TableId
, i
);
655 if (!_RefFileMap
[i
]->updateDeltaIds(start
, end
))
657 PDS_WARNING("updateDeltaIds(): failed to update delta ids for table %d, file %d", _TableId
, i
);
668 * Get Reference Files list
670 void CTableBuffer::getReferenceFilesList(std::vector
<std::string
> &result
)
673 std::vector
<std::string
> files
;
674 NLMISC::CPath::getPathContent(_RefPath
, false, false, true, files
);
677 // check all files in reference directory
678 for (i
=0; i
<files
.size(); ++i
)
683 // check file is a reference of the table
684 if (!CDBReferenceFile::isRefFile(files
[i
], tableId
, refFileId
) || tableId
!= _TableId
)
687 if (result
.size() <= refFileId
)
688 result
.resize(refFileId
+1);
690 result
[refFileId
] = files
[i
];
698 bool CTableBuffer::buildRowMapper()
700 std::vector
<std::string
> files
;
701 getReferenceFilesList(files
);
703 std::vector
<uint8
> buffer(_InternalRowSize
);
704 uint8
* rowData
= &(buffer
[0]);
709 // check all files in reference directory
710 for (i
=0; i
<files
.size(); ++i
)
712 if (files
[i
].empty())
715 // get row indices in reference file
716 RY_PDS::TRowIndex base
, end
, row
;
718 CDBReferenceFile refFile
;
720 // init and preload file
721 refFile
.setup(NLMISC::CFile::getFilename(files
[i
]), _RefPath
, 0, 0, _InternalRowSize
);
722 if (!refFile
.preload())
724 PDS_WARNING("buildRowMapper(): failed to preload() '%s'", files
[i
].c_str());
728 // get row indices in reference file
729 base
= refFile
.getBaseIndex();
730 end
= refFile
.getEndIndex();
734 for (row
=base
; row
<end
; ++row
)
736 CAccessor accessor
= getRow(row
);
738 if (!processRow(accessor
))
740 PDS_WARNING("buildRowMapper(): failed to process row '%d' in file '%s'", row
, files
[i
].c_str());
741 releaseRow(accessor
);
745 releaseRow(accessor
);
753 * Process row for RowMapper
755 bool CTableBuffer::processRow(CAccessor
& accessor
)
757 CMappedHeader
* header
= (CMappedHeader
*)accessor
.fullRow();
759 // allocate row if needed
760 if (!header
->allocated())
763 if (!_RowMapper
.allocate(accessor
.row()))
765 PDS_WARNING("processRow(): failed to allocate row '%d'", accessor
.row());
770 // unallocated rows shouldn't be mapped...
774 if (!header
->mapped())
776 if (!RY_PDS::ResolveUnmappedRows
)
778 PDS_WARNING("processRow(): failed, row '%d' not mapped", accessor
.row());
784 if (_RowMapper
.isMapped(header
->getKey()))
786 // check key not yet mapped
787 RY_PDS::CObjectIndex prevMap
= _RowMapper
.get(header
->getKey());
790 if (!RY_PDS::ResolveDoubleMappedRows
)
792 PDS_WARNING("processRow(): key '%016" NL_I64
"X' already mapped to '%s', failed", header
->getKey(), prevMap
.toString().c_str());
796 PDS_WARNING("processRow(): key '%016" NL_I64
"X' already mapped to '%s'", header
->getKey(), prevMap
.toString().c_str());
798 if (RY_PDS::ResolveDoubleMappedKeepOlder
)
806 unmapRow(prevMap
, header
->getKey());
809 if (!_RowMapper
.map(header
->getKey(), RY_PDS::CObjectIndex((RY_PDS::TTableIndex
)_TableId
, accessor
.row())))
811 PDS_WARNING("processRow(): failed to map row '%d'", accessor
.row());
821 * Process Rows, apply external row processing after rows are loaded
823 bool CTableBuffer::processRows(IRowProcessor
* processor
)
825 RY_PDS::TRowIndex row
;
827 for (row
=0; row
<maxRowIndex(); ++row
)
829 // process only allocated rows
830 if (!_RowMapper
.allocated(row
))
833 CAccessor accessor
= getRow(row
);
835 if (!processor
->processRow((RY_PDS::TTableIndex
)_TableId
, accessor
))
837 PDS_WARNING("processRows(): failed to process row '%d'", row
);
838 releaseRow(accessor
);
842 releaseRow(accessor
);
851 * Allocate a row in a table
852 * \param row is the row to allocate
853 * Return true if succeeded
855 bool CTableBuffer::allocate(RY_PDS::TRowIndex row
, CAccessor
& accessor
)
858 if (_RowMapper
.allocated(row
))
860 PDS_WARNING("allocate(): row '%d' already allocated", row
);
864 accessor
= getRow(row
);
865 CMappedHeader
*header
= (CMappedHeader
*)accessor
.fullRow();
866 header
->setFlags(CHeader::Allocated
);
867 _RowMapper
.allocate(row
);
869 // just in case, clear map key
870 if (_Mapped
&& header
->getKey() != 0)
875 return dirtyRow(accessor
);
879 * Deallocate a row in a table
880 * \param row is the row to deallocate
881 * Return true if succeeded
883 bool CTableBuffer::deallocate(RY_PDS::TRowIndex row
)
885 // check row is allocated
886 if (!_RowMapper
.allocated(row
))
888 PDS_WARNING("deallocate(): row '%d' not yet allocated", row
);
892 CAccessor accessor
= getRow(row
);
893 CMappedHeader
*header
= (CMappedHeader
*)accessor
.fullRow();
895 // unmap row if was previously mapped -- just in case unmap not called
896 if (_Mapped
&& header
->getKey() != 0)
898 _RowMapper
.unmap(header
->getKey());
902 header
->clearAcquireCount();
903 header
->clearFlags(CHeader::Allocated
);
904 _RowMapper
.deallocate(row
);
906 return dirtyRow(accessor
);
911 * \param row is the row to allocate
912 * \param key is the 64 bits row key
913 * Return true if succeeded
915 bool CTableBuffer::mapRow(const RY_PDS::CObjectIndex
&index
, uint64 key
)
919 PDS_WARNING("mapRow(): table not mapped");
923 // check row is allocated
924 if (!_RowMapper
.allocated(index
.row()))
926 PDS_WARNING("mapRow(): row '%d' not yet allocated", index
.row());
930 CAccessor accessor
= getRow(index
.row());
931 CMappedHeader
*header
= (CMappedHeader
*)accessor
.fullRow();
933 if (!_RowMapper
.map(key
, index
))
935 PDS_WARNING("mapRow(): failed to map row '%d' to key '%016" NL_I64
"X'", index
.row(), key
);
939 // unmap row if was previously mapped
940 if (header
->getKey() != 0)
942 _RowMapper
.unmap(header
->getKey());
952 * Unmap a row in a table
953 * \param tableIndex is the table to find row
954 * \param key is the 64 bits row key
955 * Return true if succeeded
957 bool CTableBuffer::unmapRow(const RY_PDS::CObjectIndex
&index
, uint64 key
)
961 PDS_WARNING("unmapRow(): table not mapped");
965 // check row is allocated
966 if (!_RowMapper
.allocated(index
.row()))
968 PDS_WARNING("unmapRow(): row '%d' not yet allocated", index
.row());
972 if (!_RowMapper
.unmap(key
))
974 PDS_WARNING("unmapRow(): failed to unmap row '%d' from key '%016" NL_I64
"X", index
.row(), key
);
978 CAccessor accessor
= getRow(index
.row());
979 CMappedHeader
*header
= (CMappedHeader
*)accessor
.fullRow();
981 if (header
->getKey() != key
)
983 PDS_WARNING("unmapRow(): row '%d' is mapped to '%016" NL_I64
"X', unmap row anyway, system may not recover object", index
.row(), key
);
994 * \param key is the 64 bits row key
995 * Return a valid TRowIndex if success
997 RY_PDS::CObjectIndex
CTableBuffer::getMappedRow(uint64 key
) const
1001 PDS_WARNING("mapRow(): table not mapped");
1002 return RY_PDS::CObjectIndex::null();
1005 return _RowMapper
.get(key
);
1011 * Update common Timestamp
1013 void CTableBuffer::updateCommonStamp()
1015 _CommonStamp
= NLMISC::CTime::getSecondsSince1970();
1027 * Setup debug delta file
1029 bool CTableBuffer::setupDebugDeltaFile(const std::string
& filename
, CDBDeltaFile
& delta
) const
1031 delta
.setup(filename
, _InternalRowSize
, CTimestamp(), CTimestamp());
1033 if (!delta
.preload())
1035 PDS_WARNING("setupDebugDeltaFile(): failed to preload file '%s'", filename
.c_str());
1043 * Get Delta file Row
1045 uint8
* CTableBuffer::getDeltaRow(uint32
& row
, CDBDeltaFile
& delta
) const
1047 static std::vector
<uint8
> buffer
;
1049 buffer
.resize(_InternalRowSize
);
1050 uint8
* data
= &(buffer
[0]);
1052 // read data from delta file
1053 if (!delta
.read(row
, data
))
1055 PDS_WARNING("getDeltaRow(): failed to load next row from delta file");
1059 if (row
== 0xffffffff)
1062 return data
+ getHeaderSize();