/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.tuscany.persistence.store.journal; import java.io.IOException; import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static java.util.concurrent.TimeUnit.MILLISECONDS; import org.osoa.sca.annotations.Constructor; import org.osoa.sca.annotations.Destroy; import org.osoa.sca.annotations.EagerInit; import org.osoa.sca.annotations.Init; import org.osoa.sca.annotations.Property; import org.osoa.sca.annotations.Service; import org.apache.tuscany.spi.component.SCAObject; import org.apache.tuscany.spi.event.AbstractEventPublisher; import org.apache.tuscany.spi.services.store.RecoveryListener; import org.apache.tuscany.spi.services.store.Store; import org.apache.tuscany.spi.services.store.StoreExpirationEvent; import org.apache.tuscany.spi.services.store.StoreMonitor; import org.apache.tuscany.spi.services.store.StoreReadException; import org.apache.tuscany.spi.services.store.StoreWriteException; import org.apache.tuscany.api.annotation.Monitor; import static org.apache.tuscany.persistence.store.journal.SerializationHelper.partition; import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serialize; import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeHeader; import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId; import org.objectweb.howl.log.Configuration; import org.objectweb.howl.log.InvalidFileSetException; import org.objectweb.howl.log.InvalidLogBufferException; import org.objectweb.howl.log.InvalidLogKeyException; import org.objectweb.howl.log.LogClosedException; import org.objectweb.howl.log.LogConfigurationException; import org.objectweb.howl.log.LogEventListener; import org.objectweb.howl.log.LogException; import org.objectweb.howl.log.LogFileOverflowException; import org.objectweb.howl.log.LogRecord; import org.objectweb.howl.log.LogRecordSizeException; /** * A journal-based store service that uses HOWL for reliable persistence and recovery of object instances. Insert, * update, and delete operations, as well as object instances, are written to a binary log. Delete operations are * written as a single header block as defined by {@link SerializationHelper#serializeHeader(short,int,String,String, *long)}. Insert and update operations are written using multiple blocks consisting of at least one header and 1..n * additional blocks containing the object byte array. If the byte array size is greater than the log block size (the * HOWL default is 4K), it must be partitioned into smaller units using {@link SerializationHelper#partition(byte[], *int)} and written across multiple blocks. The header contains the number of ensuing blocks a record occupies. Since * block writes to the log may be interleaved, blocks for a given record may not be consecutive. In order to identify * the record a block belongs to, the first byte array of written data for the block contains the serialized owner id * and UUID associated with the record. *
* A cache of all active persisted instances is maintained in memory and is used for read operations. When an instance * is persisted, a log record is written and an entry is added to the cache containing the instance. When an instance is * deleted, a delete record is written to the log and its corresponding cache entry is removed. During recovery, the * instance cache is reconstituted by reading the log and deserializing active records. * * The journal store is designed for reliable, performant persistence and therefore avoids the overhead associated with * using a relational database. This does, however, impose two configuration requirements. The first is that a log file * must be larger than the size of allorg.objectweb.howl.log.Configuration
private boolean checksumEnabled;
private int bufferSize = UNITIALIZED;
private String bufferClassName;
private int maxBuffers = UNITIALIZED;
private int minBuffers = UNITIALIZED;
private int flushSleepTime = UNITIALIZED;
private boolean flushPartialBuffers;
private int threadsWaitingForceThreshold = UNITIALIZED;
private int maxBlocksPerFile = UNITIALIZED;
private int maxLogFiles = UNITIALIZED;
private String logFileDir = "../stores";
private String logFileExt = "store";
private String logFileName = "tuscany";
private String logFileMode;
// end HOWL properties
/**
* Creates a new store instance
*
* @param monitor the monitor for recording store events
*/
@Constructor
public JournalStore(@Monitor StoreMonitor monitor) {
this.monitor = monitor;
config = new Configuration();
}
/**
* Protected constructor for unit testing
*
* @param monitor the monitor for recording store events
* @param journal the journal the store should write to, typically a mock
*/
protected JournalStore(StoreMonitor monitor, Journal journal) {
this.monitor = monitor;
this.journal = journal;
config = new Configuration();
}
public int getBlockSize() {
return blockSize;
}
/**
* Returns the maximum default expiration offset for records in the store
*
* @return the maximum default expiration offset for records in the store
*/
@Property
public long getExpirationOffset() {
return defaultExpirationOffset;
}
/**
* Sets the maximum default expiration offset for records in the store
*/
public void setDefaultExpirationOffset(long defaultExpirationOffset) {
this.defaultExpirationOffset = defaultExpirationOffset;
}
@Property
public void setBlockSize(int blockSize) {
this.blockSize = blockSize;
}
public boolean isChecksumEnabled() {
return checksumEnabled;
}
@Property
public void setChecksumEnabled(boolean val) {
config.setChecksumEnabled(val);
this.checksumEnabled = val;
}
public int getBufferSize() {
return bufferSize;
}
@Property
public void setBufferSize(int bufferSize) throws JournalStorePropertyException {
try {
config.setBufferSize(bufferSize);
this.bufferSize = bufferSize;
} catch (LogConfigurationException e) {
throw new JournalStorePropertyException(e);
}
}
public String getBufferClassName() {
return bufferClassName;
}
@Property
public void setBufferClassName(String bufferClassName) {
config.setBufferClassName(bufferClassName);
this.bufferClassName = bufferClassName;
}
public int getMaxBuffers() {
return maxBuffers;
}
@Property
public void setMaxBuffers(int maxBuffers) throws JournalStorePropertyException {
try {
config.setMaxBuffers(maxBuffers);
this.maxBuffers = maxBuffers;
} catch (LogConfigurationException e) {
throw new JournalStorePropertyException(e);
}
}
public int getMinBuffers() {
return minBuffers;
}
@Property
public void setMinBuffers(int minBuffers) throws JournalStorePropertyException {
try {
config.setMinBuffers(minBuffers);
this.minBuffers = minBuffers;
} catch (LogConfigurationException e) {
throw new JournalStorePropertyException(e);
}
}
public int getFlushSleepTime() {
return flushSleepTime;
}
@Property
public void setFlushSleepTime(int time) {
config.setFlushSleepTime(time);
this.flushSleepTime = time;
}
public boolean isFlushPartialBuffers() {
return flushPartialBuffers;
}
@Property
public void setFlushPartialBuffers(boolean val) {
config.setFlushPartialBuffers(val);
this.flushPartialBuffers = val;
}
public int getThreadsWaitingForceThreshold() {
return threadsWaitingForceThreshold;
}
@Property
public void setThreadsWaitingForceThreshold(int threshold) {
config.setThreadsWaitingForceThreshold(threshold);
this.threadsWaitingForceThreshold = threshold;
}
public int getMaxBlocksPerFile() {
return maxBlocksPerFile;
}
@Property
public void setMaxBlocksPerFile(int maxBlocksPerFile) {
config.setMaxBlocksPerFile(maxBlocksPerFile);
this.maxBlocksPerFile = maxBlocksPerFile;
}
public int getMaxLogFiles() {
return maxLogFiles;
}
@Property
public void setMaxLogFiles(int maxLogFiles) {
config.setMaxLogFiles(maxLogFiles);
this.maxLogFiles = maxLogFiles;
}
public String getLogFileDir() {
return logFileDir;
}
@Property
public void setLogFileDir(String dir) {
config.setLogFileDir(dir);
this.logFileDir = dir;
}
public String getLogFileExt() {
return logFileExt;
}
@Property
public void setLogFileExt(String logFileExt) {
config.setLogFileExt(logFileExt);
this.logFileExt = logFileExt;
}
public String getLogFileName() {
return logFileName;
}
@Property
public void setLogFileName(String logFileName) {
config.setLogFileName(logFileName);
this.logFileName = logFileName;
}
public String getLogFileMode() {
return logFileMode;
}
public void setLogFileMode(String logFileMode) throws JournalStorePropertyException {
try {
config.setLogFileMode(logFileMode);
this.logFileMode = logFileMode;
} catch (LogConfigurationException e) {
throw new JournalStorePropertyException(e);
}
}
/**
* Initializes the store by opening the journal and starting the checkpoint daemon
*
* @throws JournalIinitializationException
*
*/
@Init
public void init() throws JournalIinitializationException {
try {
cache = new ConcurrentHashMap