实现Cassandra数据自动失效功能
一、Cassandra数据失效机制
现在使用Cassandra的地方很多,由于Cassandra的写的性能很好,所以有一部分使用Cassandra做为类似于日志功能来使用,所
以一个需求就提出来了,那就是希望Cassandra能提供一个自动失效功能,希望Cassandra能保留一定天数后,能自动删除数据。
这种需求的确很常见,但是遗憾的是Cassandra目前仍然不能满足这个需求,虽然Cassandra已经提供了实现这个功能的基础,下面详细看一下Cassandra是怎么删除数据的:在我写的另一篇文章中也介绍了Cassandra删除数据的规则《Cassandra分布式数据库详解》系列文档。
Cassandra判断数据是否有效有三个地方,分别是下面代码片段:
第一段代码
longmaxChange=column.mostRecentLiveChangeAt();
return(!column.isMarkedForDelete()||column.getLocalDeletionTime()>
gcBefore||maxChange>column.getMarkedForDeleteAt())//(1)
&&(!container.isMarkedForDelete()||maxChange>
container.getMarkedForDeleteAt());//(2)
这段代码判断这个列是否应该被关联,关联有两个条件
(1)列没有被删除或者删除的时间在有效期以内或者删除的时间在最后修改数据的数据之前
(2)列所在的容器没有被删除或者列的修改时间在容器删除时间之后
第二段代码
for(byte[]cname:cf.getColumnNames())
{
IColumnc=cf.getColumnsMap().get(cname);
longminTimestamp=Math.max(c.getMarkedForDeleteAt(),
cf.getMarkedForDeleteAt());
for(IColumnsubColumn:c.getSubColumns())
{
if(subColumn.timestamp()<=minTimestamp
||(subColumn.isMarkedForDelete()&&subColumn.getLocalDeletionTime()<=gcBefore)){
((SuperColumn)c).remove(subColumn.name());
}}
if(c.getSubColumns().isEmpty()&&c.getLocalDeletionTime()<=gcBefore){
cf.remove(c.name());
}}
或
for(byte[]cname:cf.getColumnNames())
{IColumnc=cf.getColumnsMap().get(cname);
if((c.isMarkedForDelete()&&c.getLocalDeletionTime()<=gcBefore)||c.timestamp()<=cf.getMarkedForDeleteAt()){
cf.remove(cname);
}}
if(cf.getColumnCount()==0&&cf.getLocalDeletionTime()<=gcBefore){
returnnull;
}
第二段代码是判断数据是否应该被删除,也有两个或条件
(1)列已经被删除了并且数据已经过了时效期
(2)数据的修改时间在容器删除时间之前
当所有列都删除并且容器已失效,这个key就会被删除,key的删除是在SSTable合并的时候完成的
第三段代码是在客户端中
for(IColumncolumn:columns)
{
if(column.isMarkedForDelete())
{
continue;
}
Columnthrift_column=newColumn(column.name(),column.value(),
column.timestamp());
thriftColumns.add(createColumnOrSuperColumn_Column(thrift_column));
}
这段代码清楚的说明只要列被删除,客户端将取不到数据
关于gcBefore是在配置文件中设置的864000,很多人以为这个时间就是数据的失效时间,以为在这个时间段内数据可以被使用,从上面的三段代码来看,Cassandra在设计数据失效机制,在实际应用中几乎没有利用价值,也就是数据失效对使用者来说没有任何好处
二、改造Cassandra实现真正的自动失效功能
从上分析Cassandra判断数据是否失效主要根据三个标识
(1)Column是否被删除标识:isMarkedForDelete
(2)Column的getLocalDeletionTime(),这个时间就是个失效时间GCGraceSeconds比较,判断该Column是否已经失效,这个和前面的条件是并集
(3)Column和所在ColumnFamily的删除时间markedForDeleteAt比较如果小于这个时间,改Column就会被删除
所以我们要实现自动失效数据,不用通过调用remove接口,就能实现。也就是当数据在写到数据库之前,就标识这个数据在GCGraceSeconds时间内有效,一旦超过这个时间,数据被被自动删除,而且是物理删除。
需要改造的地方如下:
A.改造org.apache.cassandra.thrift.ColumnPath类,增加一个
publicbooleanis_delete;
属性,当我们在调用insert接口是标识这个Column数据是自动失效的。如下所示:
ColumnPathcol=new
ColumnPath(columnFamily,superColumn,column.getBytes(“UTF-8″),true);
client.insert(keyspace,key,col,value.getBytes(),System.currentTimeMillis(),
ConsistencyLevel.ONE);
B.改造org.apache.cassandra.thrift.Column类
也增加同样增加is_delete属性,当我们调用batch_insert或batch_mutate接口是同样可以设置Column时支持自动失效的。
A和B是客户端接口需要修改的地方,下面是服务器端要修改的地方:
C.org.apache.cassandra.db.filter.QueryPath类
也增加同样增加is_delete属性,用来保存客户端传过来的is_delete值。并增加一个结构体:
publicQueryPath(StringcolumnFamilyName,byte[]superColumnName,byte[]columnName,booleanis_delete){
this.columnFamilyName=columnFamilyName;
this.superColumnName=superColumnName;
this.columnName=columnName;
this.is_delete=is_delete;
}
在insert、batch_insert和batch_mutate三个接口创建QueryPath对象的地方改成上面这个结构体创建对象
D.org.apache.cassandra.db.ColumnFamily类
修改addColumn方法,将false改为path.is_delete
publicvoidaddColumn(QueryPathpath,byte[]value,longtimestamp)
{
addColumn(path,value,timestamp,path.is_delete);
//addColumn(path,value,timestamp,false);
}
E.org.apache.cassandra.db.Column类
修改getLocalDeletionTime方法,直接去timestamp时间
publicintgetLocalDeletionTime()
{
assertisMarkedForDelete;
//returnByteBuffer.wrap(value).getInt();
return(int)(timestamp/1000);
}
同时修改comparePriority方法,改变Column替换规则
publiclongcomparePriority(Columno)
{
if(o.timestamp==-1){
return-1;
}
if(this.timestamp==-1){
return1;
}
if(isMarkedForDelete)
{
//tombstonealwayswinsties.
returntimestamp<o.timestamp?-1:1;
}
returntimestamp–o.timestamp;
}
F.
org.apache.cassandra.db.RowMutation类
修改delete方法
publicvoiddelete(QueryPathpath,longtimestamp)
{
assertpath.columnFamilyName!=null;
StringcfName=path.columnFamilyName;
intlocalDeleteTime=(int)(System.currentTimeMillis()/1000);
ColumnFamilycolumnFamily=modifications_.get(cfName);
if(columnFamily==null)
columnFamily=ColumnFamily.create(table_,cfName);
if(path.superColumnName==
null&&path.columnName==null)
{
columnFamily.delete(localDeleteTime,timestamp);
}
else
if(path.columnName==null)
{
SuperColumnsc=newSuperColumn(path.superColumnName,DatabaseDescriptor.getSubComparator(table_,cfName));
sc.markForDeleteAt(localDeleteTime,timestamp);
columnFamily.addColumn(sc);
}
else
{
ByteBufferbytes=ByteBuffer.allocate(4);
bytes.putInt(localDeleteTime);
longdeleteTime=-1;
//columnFamily.addColumn(path,bytes.array(),timestamp,true);
columnFamily.addColumn(path,
bytes.array(),deleteTime,true);
}
modifications_.put(cfName,columnFamily);
}
调用删除接口时将删除时间设为-1,这个和前面的修改的comparePriority方法相适应。
G.修改CassandraServer类的thriftifyColumns和thriftifySubColumns
却掉isMarkedForDelete检查
publicList<ColumnOrSuperColumn>
thriftifyColumns(Collection<IColumn>columns,booleanreverseOrder)
{
ArrayList<ColumnOrSuperColumn>thriftColumns=new
ArrayList<ColumnOrSuperColumn>(columns.size());
for
(IColumncolumn:columns)
{
/*if(column.isMarkedForDelete())
{
continue;
}*/
Columnthrift_column=newColumn(column.name(),column.value(),column.timestamp());
thriftColumns.add(createColumnOrSuperColumn_Column(thrift_column));
}
//
wehavetodothereversinghere,sinceinternallywepassresultsaroundin
ColumnFamily
//
objects,whichalwayssorttheircolumnsinthe“natural”order
//
TODOthisisinconvenientfordirectusersofStorageProxy
if(reverseOrder)
Collections.reverse(thriftColumns);
returnthriftColumns;
}
数据有效时间可以通过GCGraceSeconds配置项来设置,这样超过gcBefore时间,客户端就会取不到数据,并且Cassandra在执行SSTable合并的时候会执行物理删除,如果想立即删除数据可以调用remove接口,数据将会立即被删除,但是在有效时间内被删除的数据客户端仍然能够取到数据。这样就真正实现了数据自动失效和删除的功能。