diff options
Diffstat (limited to 'sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany')
12 files changed, 1456 insertions, 0 deletions
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java new file mode 100644 index 0000000000..df6d3452e2 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +/** + * A listener interface for cache events + * + * @version $Rev$ $Date$ + */ +public interface CacheEventListener { + + /** + * Callback when an entry is evicted from the cache + */ + void onEviction(RecordKey key, RecordEntry entry); + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java new file mode 100644 index 0000000000..f2cae6218a --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +/** + * A header entry for a record written to the log. A header contains the following information: + * <pre> + * <ul> + * <li><strong>Owner id</strong> - the unique id of the owner, typically the SCAObject canonical name + * <li><strong>Operation</strong> - if the record is an INSERT, UPDATE, or DELETE + * <li><strong>Most Significant bits</strong> - the most signficant bits of the <code>java.util.UUID</code> + * representing the id of the persisted instance + * <li><strong>Least Significant bits</strong> - the least signficant bits of the <code>java.util.UUID</code> + * representing the id of the persisted instance + * <li><strong>Expiration</strong> - the expiration time in milliseconds when the record is set to expire + * <li><strong>Number of blocks</strong> - the number of blocks the record is written across + * <li><strong>Fields</strong> - any other data associated with the header + * </ul> + * </pre> + * + * @version $Rev$ $Date$ + */ +public class Header { + public static final short INSERT = 0; + public static final short UPDATE = 1; + public static final short DELETE = 2; + + protected byte[][] fields; + private short operation; + private String ownerId; + private String id; + private long expiration; + private int numBlocks; + + public Header() { + } + + /** + * Returns the record operation type + * + * @return the record operation type + */ + public short getOperation() { + return operation; + } + + /** + * Sets the record operation type + */ + public void setOperation(short operation) { + this.operation = operation; + } + + /** + * Returns the number of blocks the record is written across + * + * @return the number of blocks the record is written across + */ + public int getNumBlocks() { + return numBlocks; + } + + /** + * Sets the number of blocks the record is written across + */ + public void setNumBlocks(int numBlocks) { + this.numBlocks = numBlocks; + } + + /** + * Returns the unique owner id + * + * @return the unique owner id + */ + public String getOwnerId() { + return ownerId; + } + + /** + * Sets the unique owner id + */ + public void setOwnerId(String ownerId) { + this.ownerId = ownerId; + } + + /** + * Returns the record id + * + * @return the record id + */ + public String getId() { + return id; + } + + /** + * Sets the most significant bits of the record UUID + */ + public void setId(String id) { + this.id = id; + } + + /** + * Returns the time in milliseconds the record is set to expire + * + * @return the time in milliseconds the record is set to expire + */ + public long getExpiration() { + return expiration; + } + + /** + * Sets the time the record is set to expire in milliseconds + */ + public void setExpiration(long expiration) { + this.expiration = expiration; + } + + /** + * Returns additional header data + * + * @return additional header data + */ + public byte[][] getFields() { + return fields; + } + + /** + * Sets additional header data + */ + public void setFields(byte[][] fields) { + this.fields = fields; + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java new file mode 100644 index 0000000000..b8dfd072c5 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.IOException; + +import org.apache.tuscany.spi.services.store.StoreWriteException; + +import org.objectweb.howl.log.Configuration; +import org.objectweb.howl.log.LogClosedException; +import org.objectweb.howl.log.LogFileOverflowException; +import org.objectweb.howl.log.LogRecordSizeException; +import org.objectweb.howl.log.Logger; + +/** + * Extends the HOWL logger implementation adding convenience methods for processing records + * + * @version $Rev$ $Date$ + */ +public class Journal extends Logger { + public static final short HEADER = 0; + public static final short RECORD = 1; + + public Journal() throws IOException { + } + + public Journal(Configuration config) throws IOException { + super(config); + } + + /** + * Writes a header record to the log. The format of the header is defined by {@link + * SerializationHelper#createHeader(short, int, String, String, long)} + * + * @param bytes the record as a byte array + * @param force true if the disk write should be forced + * @return the log entry key + * @throws StoreWriteException + */ + public long writeHeader(byte[] bytes, boolean force) throws StoreWriteException { + try { + return put(HEADER, new byte[][]{bytes}, force); + } catch (LogRecordSizeException e) { + throw new StoreWriteException(e); + } catch (LogFileOverflowException e) { + throw new StoreWriteException(e); + } catch (IOException e) { + throw new StoreWriteException(e); + } catch (LogClosedException e) { + throw new StoreWriteException(e); + } catch (InterruptedException e) { + throw new StoreWriteException(e); + } + } + + /** + * Writes a record block to the log. The block may be the complete number of bytes or a chunked part of thereof if + * it does not fit within HOWL's block size limit. + * + * @param data the data to write + * @param id the unique record id consisting of the owner id and the UUID + * @param force true if the disk write should be force. For records that are written in multiple blocks, only the + * last write should be forced. + * @return the log entry key + * @throws StoreWriteException + */ + public long writeBlock(byte[] data, byte[] id, boolean force) throws StoreWriteException { + try { + return put(RECORD, new byte[][]{id, data}, force); + } catch (LogRecordSizeException e) { + throw new StoreWriteException(e); + } catch (LogFileOverflowException e) { + throw new StoreWriteException(e); + } catch (IOException e) { + throw new StoreWriteException(e); + } catch (LogClosedException e) { + throw new StoreWriteException(e); + } catch (InterruptedException e) { + throw new StoreWriteException(e); + } + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java new file mode 100644 index 0000000000..910598fc61 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import org.apache.tuscany.spi.services.store.StoreException; + +/** + * Denotes an error starting the journal + * + * @version $Rev$ $Date$ + */ +public class JournalIinitializationException extends StoreException { + + public JournalIinitializationException(Throwable cause) { + super(cause); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java new file mode 100644 index 0000000000..ec2e491c1d --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +/** + * Represents a record read from the log + * + * @version $Rev$ $Date$ + */ +public class JournalRecord { + + private Header header; + private byte[] data; + + public JournalRecord(byte[] data) { + this.data = data; + } + + /** + * Returns the record header + * + * @return the record header + */ + public Header getHeader() { + return header; + } + + /** + * Sets the record header + */ + public void setHeader(Header header) { + this.header = header; + } + + /** + * Returns the serialized data portion of the record which may be assembled from multiple blocks + * + * @return the serialized data portion of the record which may be assembled from multiple blocks + */ + public byte[] getData() { + return data; + } + + /** + * Sets the serialized data portion of the record which may be written across multiple blocks + */ + public void setData(byte[] data) { + this.data = data; + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java new file mode 100644 index 0000000000..945a77753a --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import org.apache.tuscany.spi.services.store.StoreException; + +/** + * Denotes an error shutting down the journal + * + * @version $Rev$ $Date$ + */ +public class JournalShutdownException extends StoreException { + + public JournalShutdownException(Throwable cause) { + super(cause); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java new file mode 100644 index 0000000000..422c074790 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import org.osoa.sca.annotations.Constructor; +import org.osoa.sca.annotations.Destroy; +import org.osoa.sca.annotations.EagerInit; +import org.osoa.sca.annotations.Init; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Service; + +import org.apache.tuscany.spi.component.SCAObject; +import org.apache.tuscany.spi.event.AbstractEventPublisher; +import org.apache.tuscany.spi.services.store.RecoveryListener; +import org.apache.tuscany.spi.services.store.Store; +import org.apache.tuscany.spi.services.store.StoreExpirationEvent; +import org.apache.tuscany.spi.services.store.StoreMonitor; +import org.apache.tuscany.spi.services.store.StoreReadException; +import org.apache.tuscany.spi.services.store.StoreWriteException; + +import org.apache.tuscany.api.annotation.Monitor; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.partition; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serialize; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeHeader; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId; +import org.objectweb.howl.log.Configuration; +import org.objectweb.howl.log.InvalidFileSetException; +import org.objectweb.howl.log.InvalidLogBufferException; +import org.objectweb.howl.log.InvalidLogKeyException; +import org.objectweb.howl.log.LogClosedException; +import org.objectweb.howl.log.LogConfigurationException; +import org.objectweb.howl.log.LogEventListener; +import org.objectweb.howl.log.LogException; +import org.objectweb.howl.log.LogFileOverflowException; +import org.objectweb.howl.log.LogRecord; +import org.objectweb.howl.log.LogRecordSizeException; + +/** + * A journal-based store service that uses HOWL for reliable persistence and recovery of object instances. Insert, + * update, and delete operations, as well as object instances, are written to a binary log. Delete operations are + * written as a single header block as defined by {@link SerializationHelper#serializeHeader(short,int,String,String, + *long)}. Insert and update operations are written using multiple blocks consisting of at least one header and 1..n + * additional blocks containing the object byte array. If the byte array size is greater than the log block size (the + * HOWL default is 4K), it must be partitioned into smaller units using {@link SerializationHelper#partition(byte[], + *int)} and written across multiple blocks. The header contains the number of ensuing blocks a record occupies. Since + * block writes to the log may be interleaved, blocks for a given record may not be consecutive. In order to identify + * the record a block belongs to, the first byte array of written data for the block contains the serialized owner id + * and UUID associated with the record. + * <p/> + * A cache of all active persisted instances is maintained in memory and is used for read operations. When an instance + * is persisted, a log record is written and an entry is added to the cache containing the instance. When an instance is + * deleted, a delete record is written to the log and its corresponding cache entry is removed. During recovery, the + * instance cache is reconstituted by reading the log and deserializing active records. + * <p/> + * The journal store is designed for reliable, performant persistence and therefore avoids the overhead associated with + * using a relational database. This does, however, impose two configuration requirements. The first is that a log file + * must be larger than the size of all <bold>active</bold> instances in the runtime. As a log file begins to fill, the + * store will copy active records (expired records will be ignored) to a new file and the old log will be recycled. A + * second requirement is that all active instances must fit within the memory limits of the runtime JVM since the store + * does not perform passivation and a table of all active instances is maintained for querying. + * + * @version $Rev$ $Date$ + */ +@Service(Store.class) +@EagerInit +public class JournalStore extends AbstractEventPublisher implements Store { + private static final int UNITIALIZED = -99; + + // the cache of active records + private ConcurrentHashMap<RecordKey, RecordEntry> cache; + // private ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Object shutdownLock = new Object(); + // TODO integrate with a core threading scheme + private ScheduledExecutorService scheduler; + private StoreMonitor monitor; + private Journal journal; + private long defaultExpirationOffset = 600000; // 10 minutes + private long reaperInterval = 1000; + private int blockSize = 4096; // 4k standard for HOWL + + private Configuration config; + // HOWL properties - cf. <code>org.objectweb.howl.log.Configuration</code> + private boolean checksumEnabled; + private int bufferSize = UNITIALIZED; + private String bufferClassName; + private int maxBuffers = UNITIALIZED; + private int minBuffers = UNITIALIZED; + private int flushSleepTime = UNITIALIZED; + private boolean flushPartialBuffers; + private int threadsWaitingForceThreshold = UNITIALIZED; + private int maxBlocksPerFile = UNITIALIZED; + private int maxLogFiles = UNITIALIZED; + private String logFileDir = "../stores"; + private String logFileExt = "store"; + private String logFileName = "tuscany"; + private String logFileMode; + // end HOWL properties + + /** + * Creates a new store instance + * + * @param monitor the monitor for recording store events + */ + @Constructor + public JournalStore(@Monitor StoreMonitor monitor) { + this.monitor = monitor; + config = new Configuration(); + } + + /** + * Protected constructor for unit testing + * + * @param monitor the monitor for recording store events + * @param journal the journal the store should write to, typically a mock + */ + protected JournalStore(StoreMonitor monitor, Journal journal) { + this.monitor = monitor; + this.journal = journal; + config = new Configuration(); + } + + public int getBlockSize() { + return blockSize; + } + + /** + * Returns the maximum default expiration offset for records in the store + * + * @return the maximum default expiration offset for records in the store + */ + @Property + public long getExpirationOffset() { + return defaultExpirationOffset; + } + + /** + * Sets the maximum default expiration offset for records in the store + */ + public void setDefaultExpirationOffset(long defaultExpirationOffset) { + this.defaultExpirationOffset = defaultExpirationOffset; + } + + @Property + public void setBlockSize(int blockSize) { + this.blockSize = blockSize; + } + + public boolean isChecksumEnabled() { + return checksumEnabled; + } + + @Property + public void setChecksumEnabled(boolean val) { + config.setChecksumEnabled(val); + this.checksumEnabled = val; + } + + public int getBufferSize() { + return bufferSize; + } + + @Property + public void setBufferSize(int bufferSize) throws JournalStorePropertyException { + try { + config.setBufferSize(bufferSize); + this.bufferSize = bufferSize; + } catch (LogConfigurationException e) { + throw new JournalStorePropertyException(e); + } + } + + public String getBufferClassName() { + return bufferClassName; + } + + @Property + public void setBufferClassName(String bufferClassName) { + config.setBufferClassName(bufferClassName); + this.bufferClassName = bufferClassName; + } + + public int getMaxBuffers() { + return maxBuffers; + } + + @Property + public void setMaxBuffers(int maxBuffers) throws JournalStorePropertyException { + try { + config.setMaxBuffers(maxBuffers); + this.maxBuffers = maxBuffers; + } catch (LogConfigurationException e) { + throw new JournalStorePropertyException(e); + } + } + + public int getMinBuffers() { + return minBuffers; + } + + @Property + public void setMinBuffers(int minBuffers) throws JournalStorePropertyException { + try { + config.setMinBuffers(minBuffers); + this.minBuffers = minBuffers; + } catch (LogConfigurationException e) { + throw new JournalStorePropertyException(e); + } + } + + public int getFlushSleepTime() { + return flushSleepTime; + } + + @Property + public void setFlushSleepTime(int time) { + config.setFlushSleepTime(time); + this.flushSleepTime = time; + } + + public boolean isFlushPartialBuffers() { + return flushPartialBuffers; + } + + @Property + public void setFlushPartialBuffers(boolean val) { + config.setFlushPartialBuffers(val); + this.flushPartialBuffers = val; + } + + public int getThreadsWaitingForceThreshold() { + return threadsWaitingForceThreshold; + } + + @Property + public void setThreadsWaitingForceThreshold(int threshold) { + config.setThreadsWaitingForceThreshold(threshold); + this.threadsWaitingForceThreshold = threshold; + } + + public int getMaxBlocksPerFile() { + return maxBlocksPerFile; + } + + @Property + public void setMaxBlocksPerFile(int maxBlocksPerFile) { + config.setMaxBlocksPerFile(maxBlocksPerFile); + this.maxBlocksPerFile = maxBlocksPerFile; + } + + public int getMaxLogFiles() { + return maxLogFiles; + } + + @Property + public void setMaxLogFiles(int maxLogFiles) { + config.setMaxLogFiles(maxLogFiles); + this.maxLogFiles = maxLogFiles; + } + + public String getLogFileDir() { + return logFileDir; + } + + @Property + public void setLogFileDir(String dir) { + config.setLogFileDir(dir); + this.logFileDir = dir; + } + + public String getLogFileExt() { + return logFileExt; + } + + @Property + public void setLogFileExt(String logFileExt) { + config.setLogFileExt(logFileExt); + this.logFileExt = logFileExt; + } + + public String getLogFileName() { + return logFileName; + } + + @Property + public void setLogFileName(String logFileName) { + config.setLogFileName(logFileName); + this.logFileName = logFileName; + } + + public String getLogFileMode() { + return logFileMode; + } + + public void setLogFileMode(String logFileMode) throws JournalStorePropertyException { + try { + config.setLogFileMode(logFileMode); + this.logFileMode = logFileMode; + } catch (LogConfigurationException e) { + throw new JournalStorePropertyException(e); + } + } + + /** + * Initializes the store by opening the journal and starting the checkpoint daemon + * + * @throws JournalIinitializationException + * + */ + @Init + public void init() throws JournalIinitializationException { + try { + cache = new ConcurrentHashMap<RecordKey, RecordEntry>(); + if (journal == null) { + config.setLogFileName(logFileName); + config.setLogFileExt(logFileExt); + config.setLogFileDir(logFileDir); + journal = new Journal(config); + } + journal.setLogEventListener(new JournalLogEventListener()); + journal.open(); + scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleWithFixedDelay(new Reaper(), reaperInterval, reaperInterval, MILLISECONDS); + monitor.start("Started journal store"); + } catch (IOException e) { + throw new JournalIinitializationException(e); + } catch (LogConfigurationException e) { + throw new JournalIinitializationException(e); + } catch (InvalidLogBufferException e) { + throw new JournalIinitializationException(e); + } catch (InterruptedException e) { + throw new JournalIinitializationException(e); + } catch (InvalidFileSetException e) { + throw new JournalIinitializationException(e); + } + } + + /** + * Performs an orderly close of the journal and checkpoint daemon + * + * @throws JournalShutdownException + */ + @Destroy + public void destroy() throws JournalShutdownException { + try { + // avoid potential deadlock by acquiring write lock since the reaper thread can block when calling a + // synchronized journal method (e.g. journal.put) since journal.close() below holds a synchronization lock. + // This deadlock will prevent the scheduler.shutdown from completing, hanging this store shutdown + synchronized (shutdownLock) { + scheduler.shutdown(); + journal.close(); + monitor.stop("Stopped journal store"); + } + } catch (IOException e) { + throw new JournalShutdownException(e); + } catch (InterruptedException e) { + throw new JournalShutdownException(e); + } + } + + public void insertRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException { + write(owner, id, object, expiration, Header.INSERT); + } + + public void updateRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException { + write(owner, id, object, expiration, Header.UPDATE); + } + + public Object readRecord(SCAObject owner, String id) throws StoreReadException { + RecordEntry record; + RecordKey key = new RecordKey(id, owner); + record = cache.get(key); + if (record == null) { + return null; + } + if (record.getExpiration() != Store.NEVER && record.getExpiration() < System.currentTimeMillis()) { + cache.remove(key); + } + return record.getObject(); + } + + public void removeRecord(SCAObject owner, String id) throws StoreWriteException { + try { + journal.writeHeader(serializeHeader(Header.DELETE, 0, owner.getUri().toString(), id, NEVER), true); + RecordKey key = new RecordKey(id, owner); + // remove from the cache + cache.remove(key); + } catch (IOException e) { + throw new StoreWriteException(e); + } + + } + + public void removeRecords() throws StoreWriteException { + + } + + public void recover(RecoveryListener listener) throws StoreReadException { + } + + /** + * Writes a record to the log. If a record needs to be written in multiple blocks, only the last block write is + * forced. + * + * @param owner + * @param id + * @param object + * @param expiration + * @param operation + * @throws StoreWriteException + */ + private void write(SCAObject owner, String id, Object object, long expiration, short operation) + throws StoreWriteException { + if (!(object instanceof Serializable)) { + String name = object.getClass().getName(); + throw new StoreWriteException("Type must implement serializable", name, id); + } + Serializable serializable = (Serializable) object; + String name = owner.getUri().toString(); + List<byte[]> bytes; + try { + bytes = partition(serialize(serializable), blockSize); + } catch (IOException e) { + throw new StoreWriteException(e); + } + try { + byte[] header = serializeHeader(operation, bytes.size(), name, id, expiration); + long headerKey = journal.writeHeader(header, false); + // write chunked records in non-forced mode except last one + byte[] recordId = serializeRecordId(name, id); + long[] keys = new long[bytes.size() + 1]; + keys[0] = headerKey; + for (int i = 0; i < bytes.size() - 1; i++) { + byte[] chunk = bytes.get(i); + keys[i + 1] = journal.writeBlock(chunk, recordId, false); + } + // add last record using a forced write + journal.writeBlock(bytes.get(bytes.size() - 1), recordId, true); + RecordKey key = new RecordKey(id, owner); + // add to the entry in the cache + cache.put(key, new RecordEntry(serializable, operation, keys, expiration)); + } catch (IOException e) { + throw new StoreWriteException(e); + } + } + + /** + * Handles log overflow events + */ + private class JournalLogEventListener implements LogEventListener { + + public void logOverflowNotification(long overflowFence) { + assert overflowFence != 0; + long newMark = Long.MAX_VALUE; + long now = System.currentTimeMillis(); + try { + for (Map.Entry<RecordKey, RecordEntry> entry : cache.entrySet()) { + RecordEntry record = entry.getValue(); + long[] journalKey = record.getJournalKeys(); + if (journalKey[0] > overflowFence) { + // do not copy to new log but check if this is the new mark + if (journalKey[0] < newMark) { + newMark = journalKey[0]; + } + continue; + } + if (entry.getValue().getExpiration() > now) { + // copy the blocks associayed with the record to the new journal only if it is not expired + for (long key : journalKey) { + LogRecord lrecord = journal.get(null, key); + journal.put(lrecord.getFields(), false); + + } + } + } + if (newMark == Long.MAX_VALUE) { + newMark = overflowFence; + } + // force write + journal.mark(newMark, true); + } catch (LogRecordSizeException e) { + monitor.error(e); + } catch (LogFileOverflowException e) { + monitor.error(e); + } catch (LogConfigurationException e) { + monitor.error(e); + } catch (IOException e) { + monitor.error(e); + } catch (InvalidLogBufferException e) { + monitor.error(e); + } catch (LogClosedException e) { + monitor.error(e); + } catch (LogException e) { + monitor.error(e); + } catch (InterruptedException e) { + monitor.error(e); + } + } + + public boolean isLoggable(int level) { + return false; + } + + public void log(int level, String message) { + + } + + public void log(int level, String message, Throwable thrown) { + + } + } + + /** + * Periodically scans the instance cache, clearing out expired entries + */ + private class Reaper implements Runnable { + + public void run() { + try { + synchronized (shutdownLock) { + boolean force = false; + long now = System.currentTimeMillis(); + long oldest = Long.MAX_VALUE; + for (Map.Entry<RecordKey, RecordEntry> entry : cache.entrySet()) { + RecordKey key = entry.getKey(); + RecordEntry record = entry.getValue(); + if (record.getExpiration() <= now) { + try { + String ownerName = key.getOwner().getUri().toString(); + String id = key.getId(); + byte[] header = + SerializationHelper.serializeHeader(Header.DELETE, 0, ownerName, id, Store.NEVER); + journal.writeHeader(header, false); + // notify listeners + SCAObject owner = key.getOwner(); + Object instance = record.getObject(); + // notify listeners of the expiration + StoreExpirationEvent event = new StoreExpirationEvent(this, owner, instance); + publish(event); + } catch (IOException e) { + monitor.error(e); + } catch (StoreWriteException e) { + monitor.error(e); + } + force = true; + cache.remove(entry.getKey()); // semantics of ConcurrentHashMap allow this + } else { + if (record.getJournalKeys()[0] < oldest) { + oldest = record.getJournalKeys()[0]; + } + } + } + if (force) { + // perform a forced write and update the journal mark + journal.mark(oldest, true); + } + } + } catch (IOException e) { + monitor.error(e); + } catch (LogClosedException e) { + monitor.error(e); + } catch (InvalidLogKeyException e) { + monitor.error(e); + } catch (InterruptedException e) { + monitor.error(e); + } + } + } + + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java new file mode 100644 index 0000000000..275bad21f7 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import org.apache.tuscany.spi.services.store.StoreException; + +/** + * Denotes an error setting a property on the journal store + * + * @version $Rev$ $Date$ + */ +public class JournalStorePropertyException extends StoreException { + + public JournalStorePropertyException(Throwable cause) { + super(cause); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java new file mode 100644 index 0000000000..28fa9c43ac --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.Serializable; + +/** + * A journal store cache entry for a record + * + * @version $Rev$ $Date$ + */ +public class RecordEntry { + private Serializable object; + private short operation; + private long expiration; + private long[] journalKeys; + + + /** + * Creates a new cached record + * + * @param object the object to serialize + * @param operation an <code>INSERT</code> or <code>UPDATE</code> operation + * @param journalKey the journal key for the block where the record is written + */ + public RecordEntry(Serializable object, short operation, long[] journalKey, long expiration) { + assert object != null; + this.object = object; + this.operation = operation; + this.journalKeys = journalKey; + this.expiration = expiration; + } + + /** + * Returns the object + * + * @return the object + */ + public Serializable getObject() { + return object; + } + + /** + * Returns the type of operation + * + * @return the type of operation + */ + public short getOperation() { + return operation; + } + + public long getExpiration() { + return expiration; + } + + public long[] getJournalKeys() { + return journalKeys; + } + + public void setJournalKeys(long[] journalKeys) { + this.journalKeys = journalKeys; + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java new file mode 100644 index 0000000000..7293fddb55 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import org.apache.tuscany.spi.component.SCAObject; + +/** + * Used by the store cache to retrieve record entries + * + * @version $Rev$ $Date$ + */ +public class RecordKey { + + private String id; + private SCAObject owner; + + public RecordKey(String id, SCAObject owner) { + this.id = id; + this.owner = owner; + } + + public String getId() { + return id; + } + + public SCAObject getOwner() { + return owner; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final RecordKey recordKey = (RecordKey) o; + if (id != null ? !id.equals(recordKey.id) : recordKey.id != null) { + return false; + } + return !(owner != null ? !owner.getUri().equals(recordKey.owner.getUri()) : + recordKey.owner != null); + } + + public int hashCode() { + int result; + result = (id != null ? id.hashCode() : 0); + result = 31 * result + (owner != null ? owner.hashCode() : 0); + return result; + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java new file mode 100644 index 0000000000..0a7197b014 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.tuscany.spi.component.SCAExternalizable; +import org.apache.tuscany.spi.component.WorkContext; + +/** + * Utility methods for processing journal records + * + * @version $Rev$ $Date$ + */ +public final class SerializationHelper { + + private SerializationHelper() { + } + + /** + * Serializes and object + * + * @param serializable the object to serialize + * @throws IOException + */ + public static byte[] serialize(Serializable serializable) throws IOException { + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bas); + out.writeObject(serializable); + return bas.toByteArray(); + } + + /** + * Deserializes an object using the TCCL + * + * @param bytes the serialized object byte array + * @param workContext the current work context + */ + public static Object deserialize(byte[] bytes, WorkContext workContext) throws IOException, ClassNotFoundException { + Object o = new TCCLObjectInputStream(new ByteArrayInputStream(bytes)).readObject(); + if (o instanceof SCAExternalizable) { + SCAExternalizable externalizable = (SCAExternalizable) o; + externalizable.setWorkContext(workContext); + externalizable.reactivate(); + } + return o; + } + + /** + * Breaks a byte array into a series of blocks of the given size + * + * @param bytes the byte array to partition + * @param size the partition size + */ + public static List<byte[]> partition(byte[] bytes, int size) { + assert size > 0; + List<byte[]> list = new ArrayList<byte[]>(); + int pos = 0; + while (pos < bytes.length) { + if (pos + size > bytes.length) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + for (int i = pos; i < bytes.length; i++) { + stream.write(bytes[i]); + } + byte[] partition = stream.toByteArray(); + list.add(partition); + pos = pos + partition.length; + } else { + byte[] partition = new byte[size]; + for (int i = 0; i < size; i++) { + partition[i] = bytes[pos + i]; + } + list.add(partition); + pos = pos + size; + } + } + return list; + } + + /** + * Creates a serialized header entry that may be written to a log + * + * @param operation the operation type, i.e. {@link Header#INSERT}, {@link Header#UPDATE}, or {@link + * JournalStore#DELETE} + * @param numRecords the number of blocks that the record will be written two excluding the header block + * @param ownerId the id of the owner of the record + * @param id the id of the record unique to the owner + * @param expiration the record expirtation time in milliseconds + * @return a byte array containing the serialized header + * @throws IOException + */ + public static byte[] serializeHeader(short operation, int numRecords, String ownerId, String id, long expiration) + throws IOException { + ByteArrayOutputStream stream = null; + ObjectOutputStream ostream = null; + try { + stream = new ByteArrayOutputStream(); + ostream = new ObjectOutputStream(stream); + ostream.writeShort(operation); + ostream.writeInt(numRecords); + ostream.writeObject(ownerId); + ostream.writeObject(id); + ostream.writeLong(expiration); + ostream.flush(); + return stream.toByteArray(); + } finally { + if (ostream != null) { + try { + ostream.close(); + } catch (IOException e) { + // ignore + } + } + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + // ignore + } + } + } + } + + /** + * Serializes a unique record id consisting of the owner id and the record's UUID + * + * @param ownerId the id of the owner, typically an SCAObject canonical name + * @param id the id associated with the record + * @return the serialized record byte array + * @throws IOException + */ + public static byte[] serializeRecordId(String ownerId, String id) + throws IOException { + ByteArrayOutputStream stream = null; + ObjectOutputStream ostream = null; + try { + stream = new ByteArrayOutputStream(); + ostream = new ObjectOutputStream(stream); + ostream.writeObject(ownerId); + ostream.writeObject(id); + ostream.flush(); + return stream.toByteArray(); + } finally { + if (ostream != null) { + try { + ostream.close(); + } catch (IOException e) { + // ignore + } + } + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + // ignore + } + } + } + } + + /** + * Deserializes the given header. The first element from {@link Header#getFields()} is assumed to contain the header + * byte array to deserialize from + * + * @param header the header to deserialize + * @return the deserialized header + * @throws IOException + * @throws ClassNotFoundException + */ + public static Header deserializeHeader(Header header) throws IOException, ClassNotFoundException { + ByteArrayInputStream bas = null; + ObjectInputStream stream = null; + try { + bas = new ByteArrayInputStream(header.getFields()[0]); + stream = new TCCLObjectInputStream(bas); + header.setOperation(stream.readShort()); + header.setNumBlocks(stream.readInt()); + header.setOwnerId((String) stream.readObject()); + header.setId((String) stream.readObject()); + header.setExpiration(stream.readLong()); + return header; + } finally { + if (bas != null) { + try { + bas.close(); + } catch (IOException e) { + // ignore + } + } + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + // ignore + } + } + } + + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java new file mode 100644 index 0000000000..b341f9f9b9 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; + +/** + * Deserializes an object based on the thread context classloader + * + * @version $Rev$ $Date$ + */ +public class TCCLObjectInputStream extends ObjectInputStream { + private final ClassLoader classLoader; + + public TCCLObjectInputStream(InputStream in) throws IOException, SecurityException { + super(in); + this.classLoader = Thread.currentThread().getContextClassLoader(); + } + + protected Class resolveClass(ObjectStreamClass streamClass) throws IOException, ClassNotFoundException { + return classLoader.loadClass(streamClass.getName()); + } +} |