summaryrefslogtreecommitdiffstats
path: root/sandbox/old/contrib/persistence/store.journal/src
diff options
context:
space:
mode:
Diffstat (limited to 'sandbox/old/contrib/persistence/store.journal/src')
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java33
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java150
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java99
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java33
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java67
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java33
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java595
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java33
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java80
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java67
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java224
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java42
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreExpireTestCase.java89
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreInsertTestCase.java134
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreOverflowTestCase.java91
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalTestCase.java78
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/MockSCAExternalizable.java57
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/RecordKeyTestCase.java77
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/SerializationHelperTestCase.java93
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/TestUtils.java47
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/Foo.java52
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalStoreThroughputTest.java117
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalThroughputTest.java146
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockMonitor.java58
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockSCAObject.java66
25 files changed, 2561 insertions, 0 deletions
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java
new file mode 100644
index 0000000000..df6d3452e2
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+/**
+ * A listener interface for cache events
+ *
+ * @version $Rev$ $Date$
+ */
+public interface CacheEventListener {
+
+ /**
+ * Callback when an entry is evicted from the cache
+ */
+ void onEviction(RecordKey key, RecordEntry entry);
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java
new file mode 100644
index 0000000000..f2cae6218a
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+/**
+ * A header entry for a record written to the log. A header contains the following information:
+ * <pre>
+ * <ul>
+ * <li><strong>Owner id</strong> - the unique id of the owner, typically the SCAObject canonical name
+ * <li><strong>Operation</strong> - if the record is an INSERT, UPDATE, or DELETE
+ * <li><strong>Most Significant bits</strong> - the most signficant bits of the <code>java.util.UUID</code>
+ * representing the id of the persisted instance
+ * <li><strong>Least Significant bits</strong> - the least signficant bits of the <code>java.util.UUID</code>
+ * representing the id of the persisted instance
+ * <li><strong>Expiration</strong> - the expiration time in milliseconds when the record is set to expire
+ * <li><strong>Number of blocks</strong> - the number of blocks the record is written across
+ * <li><strong>Fields</strong> - any other data associated with the header
+ * </ul>
+ * </pre>
+ *
+ * @version $Rev$ $Date$
+ */
+public class Header {
+ public static final short INSERT = 0;
+ public static final short UPDATE = 1;
+ public static final short DELETE = 2;
+
+ protected byte[][] fields;
+ private short operation;
+ private String ownerId;
+ private String id;
+ private long expiration;
+ private int numBlocks;
+
+ public Header() {
+ }
+
+ /**
+ * Returns the record operation type
+ *
+ * @return the record operation type
+ */
+ public short getOperation() {
+ return operation;
+ }
+
+ /**
+ * Sets the record operation type
+ */
+ public void setOperation(short operation) {
+ this.operation = operation;
+ }
+
+ /**
+ * Returns the number of blocks the record is written across
+ *
+ * @return the number of blocks the record is written across
+ */
+ public int getNumBlocks() {
+ return numBlocks;
+ }
+
+ /**
+ * Sets the number of blocks the record is written across
+ */
+ public void setNumBlocks(int numBlocks) {
+ this.numBlocks = numBlocks;
+ }
+
+ /**
+ * Returns the unique owner id
+ *
+ * @return the unique owner id
+ */
+ public String getOwnerId() {
+ return ownerId;
+ }
+
+ /**
+ * Sets the unique owner id
+ */
+ public void setOwnerId(String ownerId) {
+ this.ownerId = ownerId;
+ }
+
+ /**
+ * Returns the record id
+ *
+ * @return the record id
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Sets the most significant bits of the record UUID
+ */
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * Returns the time in milliseconds the record is set to expire
+ *
+ * @return the time in milliseconds the record is set to expire
+ */
+ public long getExpiration() {
+ return expiration;
+ }
+
+ /**
+ * Sets the time the record is set to expire in milliseconds
+ */
+ public void setExpiration(long expiration) {
+ this.expiration = expiration;
+ }
+
+ /**
+ * Returns additional header data
+ *
+ * @return additional header data
+ */
+ public byte[][] getFields() {
+ return fields;
+ }
+
+ /**
+ * Sets additional header data
+ */
+ public void setFields(byte[][] fields) {
+ this.fields = fields;
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java
new file mode 100644
index 0000000000..b8dfd072c5
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.IOException;
+
+import org.apache.tuscany.spi.services.store.StoreWriteException;
+
+import org.objectweb.howl.log.Configuration;
+import org.objectweb.howl.log.LogClosedException;
+import org.objectweb.howl.log.LogFileOverflowException;
+import org.objectweb.howl.log.LogRecordSizeException;
+import org.objectweb.howl.log.Logger;
+
+/**
+ * Extends the HOWL logger implementation adding convenience methods for processing records
+ *
+ * @version $Rev$ $Date$
+ */
+public class Journal extends Logger {
+ public static final short HEADER = 0;
+ public static final short RECORD = 1;
+
+ public Journal() throws IOException {
+ }
+
+ public Journal(Configuration config) throws IOException {
+ super(config);
+ }
+
+ /**
+ * Writes a header record to the log. The format of the header is defined by {@link
+ * SerializationHelper#createHeader(short, int, String, String, long)}
+ *
+ * @param bytes the record as a byte array
+ * @param force true if the disk write should be forced
+ * @return the log entry key
+ * @throws StoreWriteException
+ */
+ public long writeHeader(byte[] bytes, boolean force) throws StoreWriteException {
+ try {
+ return put(HEADER, new byte[][]{bytes}, force);
+ } catch (LogRecordSizeException e) {
+ throw new StoreWriteException(e);
+ } catch (LogFileOverflowException e) {
+ throw new StoreWriteException(e);
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ } catch (LogClosedException e) {
+ throw new StoreWriteException(e);
+ } catch (InterruptedException e) {
+ throw new StoreWriteException(e);
+ }
+ }
+
+ /**
+ * Writes a record block to the log. The block may be the complete number of bytes or a chunked part of thereof if
+ * it does not fit within HOWL's block size limit.
+ *
+ * @param data the data to write
+ * @param id the unique record id consisting of the owner id and the UUID
+ * @param force true if the disk write should be force. For records that are written in multiple blocks, only the
+ * last write should be forced.
+ * @return the log entry key
+ * @throws StoreWriteException
+ */
+ public long writeBlock(byte[] data, byte[] id, boolean force) throws StoreWriteException {
+ try {
+ return put(RECORD, new byte[][]{id, data}, force);
+ } catch (LogRecordSizeException e) {
+ throw new StoreWriteException(e);
+ } catch (LogFileOverflowException e) {
+ throw new StoreWriteException(e);
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ } catch (LogClosedException e) {
+ throw new StoreWriteException(e);
+ } catch (InterruptedException e) {
+ throw new StoreWriteException(e);
+ }
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java
new file mode 100644
index 0000000000..910598fc61
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import org.apache.tuscany.spi.services.store.StoreException;
+
+/**
+ * Denotes an error starting the journal
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalIinitializationException extends StoreException {
+
+ public JournalIinitializationException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java
new file mode 100644
index 0000000000..ec2e491c1d
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+/**
+ * Represents a record read from the log
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalRecord {
+
+ private Header header;
+ private byte[] data;
+
+ public JournalRecord(byte[] data) {
+ this.data = data;
+ }
+
+ /**
+ * Returns the record header
+ *
+ * @return the record header
+ */
+ public Header getHeader() {
+ return header;
+ }
+
+ /**
+ * Sets the record header
+ */
+ public void setHeader(Header header) {
+ this.header = header;
+ }
+
+ /**
+ * Returns the serialized data portion of the record which may be assembled from multiple blocks
+ *
+ * @return the serialized data portion of the record which may be assembled from multiple blocks
+ */
+ public byte[] getData() {
+ return data;
+ }
+
+ /**
+ * Sets the serialized data portion of the record which may be written across multiple blocks
+ */
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java
new file mode 100644
index 0000000000..945a77753a
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import org.apache.tuscany.spi.services.store.StoreException;
+
+/**
+ * Denotes an error shutting down the journal
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalShutdownException extends StoreException {
+
+ public JournalShutdownException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java
new file mode 100644
index 0000000000..422c074790
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import org.osoa.sca.annotations.Constructor;
+import org.osoa.sca.annotations.Destroy;
+import org.osoa.sca.annotations.EagerInit;
+import org.osoa.sca.annotations.Init;
+import org.osoa.sca.annotations.Property;
+import org.osoa.sca.annotations.Service;
+
+import org.apache.tuscany.spi.component.SCAObject;
+import org.apache.tuscany.spi.event.AbstractEventPublisher;
+import org.apache.tuscany.spi.services.store.RecoveryListener;
+import org.apache.tuscany.spi.services.store.Store;
+import org.apache.tuscany.spi.services.store.StoreExpirationEvent;
+import org.apache.tuscany.spi.services.store.StoreMonitor;
+import org.apache.tuscany.spi.services.store.StoreReadException;
+import org.apache.tuscany.spi.services.store.StoreWriteException;
+
+import org.apache.tuscany.api.annotation.Monitor;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.partition;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serialize;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeHeader;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId;
+import org.objectweb.howl.log.Configuration;
+import org.objectweb.howl.log.InvalidFileSetException;
+import org.objectweb.howl.log.InvalidLogBufferException;
+import org.objectweb.howl.log.InvalidLogKeyException;
+import org.objectweb.howl.log.LogClosedException;
+import org.objectweb.howl.log.LogConfigurationException;
+import org.objectweb.howl.log.LogEventListener;
+import org.objectweb.howl.log.LogException;
+import org.objectweb.howl.log.LogFileOverflowException;
+import org.objectweb.howl.log.LogRecord;
+import org.objectweb.howl.log.LogRecordSizeException;
+
+/**
+ * A journal-based store service that uses HOWL for reliable persistence and recovery of object instances. Insert,
+ * update, and delete operations, as well as object instances, are written to a binary log. Delete operations are
+ * written as a single header block as defined by {@link SerializationHelper#serializeHeader(short,int,String,String,
+ *long)}. Insert and update operations are written using multiple blocks consisting of at least one header and 1..n
+ * additional blocks containing the object byte array. If the byte array size is greater than the log block size (the
+ * HOWL default is 4K), it must be partitioned into smaller units using {@link SerializationHelper#partition(byte[],
+ *int)} and written across multiple blocks. The header contains the number of ensuing blocks a record occupies. Since
+ * block writes to the log may be interleaved, blocks for a given record may not be consecutive. In order to identify
+ * the record a block belongs to, the first byte array of written data for the block contains the serialized owner id
+ * and UUID associated with the record.
+ * <p/>
+ * A cache of all active persisted instances is maintained in memory and is used for read operations. When an instance
+ * is persisted, a log record is written and an entry is added to the cache containing the instance. When an instance is
+ * deleted, a delete record is written to the log and its corresponding cache entry is removed. During recovery, the
+ * instance cache is reconstituted by reading the log and deserializing active records.
+ * <p/>
+ * The journal store is designed for reliable, performant persistence and therefore avoids the overhead associated with
+ * using a relational database. This does, however, impose two configuration requirements. The first is that a log file
+ * must be larger than the size of all <bold>active</bold> instances in the runtime. As a log file begins to fill, the
+ * store will copy active records (expired records will be ignored) to a new file and the old log will be recycled. A
+ * second requirement is that all active instances must fit within the memory limits of the runtime JVM since the store
+ * does not perform passivation and a table of all active instances is maintained for querying.
+ *
+ * @version $Rev$ $Date$
+ */
+@Service(Store.class)
+@EagerInit
+public class JournalStore extends AbstractEventPublisher implements Store {
+ private static final int UNITIALIZED = -99;
+
+ // the cache of active records
+ private ConcurrentHashMap<RecordKey, RecordEntry> cache;
+ // private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Object shutdownLock = new Object();
+ // TODO integrate with a core threading scheme
+ private ScheduledExecutorService scheduler;
+ private StoreMonitor monitor;
+ private Journal journal;
+ private long defaultExpirationOffset = 600000; // 10 minutes
+ private long reaperInterval = 1000;
+ private int blockSize = 4096; // 4k standard for HOWL
+
+ private Configuration config;
+ // HOWL properties - cf. <code>org.objectweb.howl.log.Configuration</code>
+ private boolean checksumEnabled;
+ private int bufferSize = UNITIALIZED;
+ private String bufferClassName;
+ private int maxBuffers = UNITIALIZED;
+ private int minBuffers = UNITIALIZED;
+ private int flushSleepTime = UNITIALIZED;
+ private boolean flushPartialBuffers;
+ private int threadsWaitingForceThreshold = UNITIALIZED;
+ private int maxBlocksPerFile = UNITIALIZED;
+ private int maxLogFiles = UNITIALIZED;
+ private String logFileDir = "../stores";
+ private String logFileExt = "store";
+ private String logFileName = "tuscany";
+ private String logFileMode;
+ // end HOWL properties
+
+ /**
+ * Creates a new store instance
+ *
+ * @param monitor the monitor for recording store events
+ */
+ @Constructor
+ public JournalStore(@Monitor StoreMonitor monitor) {
+ this.monitor = monitor;
+ config = new Configuration();
+ }
+
+ /**
+ * Protected constructor for unit testing
+ *
+ * @param monitor the monitor for recording store events
+ * @param journal the journal the store should write to, typically a mock
+ */
+ protected JournalStore(StoreMonitor monitor, Journal journal) {
+ this.monitor = monitor;
+ this.journal = journal;
+ config = new Configuration();
+ }
+
+ public int getBlockSize() {
+ return blockSize;
+ }
+
+ /**
+ * Returns the maximum default expiration offset for records in the store
+ *
+ * @return the maximum default expiration offset for records in the store
+ */
+ @Property
+ public long getExpirationOffset() {
+ return defaultExpirationOffset;
+ }
+
+ /**
+ * Sets the maximum default expiration offset for records in the store
+ */
+ public void setDefaultExpirationOffset(long defaultExpirationOffset) {
+ this.defaultExpirationOffset = defaultExpirationOffset;
+ }
+
+ @Property
+ public void setBlockSize(int blockSize) {
+ this.blockSize = blockSize;
+ }
+
+ public boolean isChecksumEnabled() {
+ return checksumEnabled;
+ }
+
+ @Property
+ public void setChecksumEnabled(boolean val) {
+ config.setChecksumEnabled(val);
+ this.checksumEnabled = val;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ @Property
+ public void setBufferSize(int bufferSize) throws JournalStorePropertyException {
+ try {
+ config.setBufferSize(bufferSize);
+ this.bufferSize = bufferSize;
+ } catch (LogConfigurationException e) {
+ throw new JournalStorePropertyException(e);
+ }
+ }
+
+ public String getBufferClassName() {
+ return bufferClassName;
+ }
+
+ @Property
+ public void setBufferClassName(String bufferClassName) {
+ config.setBufferClassName(bufferClassName);
+ this.bufferClassName = bufferClassName;
+ }
+
+ public int getMaxBuffers() {
+ return maxBuffers;
+ }
+
+ @Property
+ public void setMaxBuffers(int maxBuffers) throws JournalStorePropertyException {
+ try {
+ config.setMaxBuffers(maxBuffers);
+ this.maxBuffers = maxBuffers;
+ } catch (LogConfigurationException e) {
+ throw new JournalStorePropertyException(e);
+ }
+ }
+
+ public int getMinBuffers() {
+ return minBuffers;
+ }
+
+ @Property
+ public void setMinBuffers(int minBuffers) throws JournalStorePropertyException {
+ try {
+ config.setMinBuffers(minBuffers);
+ this.minBuffers = minBuffers;
+ } catch (LogConfigurationException e) {
+ throw new JournalStorePropertyException(e);
+ }
+ }
+
+ public int getFlushSleepTime() {
+ return flushSleepTime;
+ }
+
+ @Property
+ public void setFlushSleepTime(int time) {
+ config.setFlushSleepTime(time);
+ this.flushSleepTime = time;
+ }
+
+ public boolean isFlushPartialBuffers() {
+ return flushPartialBuffers;
+ }
+
+ @Property
+ public void setFlushPartialBuffers(boolean val) {
+ config.setFlushPartialBuffers(val);
+ this.flushPartialBuffers = val;
+ }
+
+ public int getThreadsWaitingForceThreshold() {
+ return threadsWaitingForceThreshold;
+ }
+
+ @Property
+ public void setThreadsWaitingForceThreshold(int threshold) {
+ config.setThreadsWaitingForceThreshold(threshold);
+ this.threadsWaitingForceThreshold = threshold;
+ }
+
+ public int getMaxBlocksPerFile() {
+ return maxBlocksPerFile;
+ }
+
+ @Property
+ public void setMaxBlocksPerFile(int maxBlocksPerFile) {
+ config.setMaxBlocksPerFile(maxBlocksPerFile);
+ this.maxBlocksPerFile = maxBlocksPerFile;
+ }
+
+ public int getMaxLogFiles() {
+ return maxLogFiles;
+ }
+
+ @Property
+ public void setMaxLogFiles(int maxLogFiles) {
+ config.setMaxLogFiles(maxLogFiles);
+ this.maxLogFiles = maxLogFiles;
+ }
+
+ public String getLogFileDir() {
+ return logFileDir;
+ }
+
+ @Property
+ public void setLogFileDir(String dir) {
+ config.setLogFileDir(dir);
+ this.logFileDir = dir;
+ }
+
+ public String getLogFileExt() {
+ return logFileExt;
+ }
+
+ @Property
+ public void setLogFileExt(String logFileExt) {
+ config.setLogFileExt(logFileExt);
+ this.logFileExt = logFileExt;
+ }
+
+ public String getLogFileName() {
+ return logFileName;
+ }
+
+ @Property
+ public void setLogFileName(String logFileName) {
+ config.setLogFileName(logFileName);
+ this.logFileName = logFileName;
+ }
+
+ public String getLogFileMode() {
+ return logFileMode;
+ }
+
+ public void setLogFileMode(String logFileMode) throws JournalStorePropertyException {
+ try {
+ config.setLogFileMode(logFileMode);
+ this.logFileMode = logFileMode;
+ } catch (LogConfigurationException e) {
+ throw new JournalStorePropertyException(e);
+ }
+ }
+
+ /**
+ * Initializes the store by opening the journal and starting the checkpoint daemon
+ *
+ * @throws JournalIinitializationException
+ *
+ */
+ @Init
+ public void init() throws JournalIinitializationException {
+ try {
+ cache = new ConcurrentHashMap<RecordKey, RecordEntry>();
+ if (journal == null) {
+ config.setLogFileName(logFileName);
+ config.setLogFileExt(logFileExt);
+ config.setLogFileDir(logFileDir);
+ journal = new Journal(config);
+ }
+ journal.setLogEventListener(new JournalLogEventListener());
+ journal.open();
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ scheduler.scheduleWithFixedDelay(new Reaper(), reaperInterval, reaperInterval, MILLISECONDS);
+ monitor.start("Started journal store");
+ } catch (IOException e) {
+ throw new JournalIinitializationException(e);
+ } catch (LogConfigurationException e) {
+ throw new JournalIinitializationException(e);
+ } catch (InvalidLogBufferException e) {
+ throw new JournalIinitializationException(e);
+ } catch (InterruptedException e) {
+ throw new JournalIinitializationException(e);
+ } catch (InvalidFileSetException e) {
+ throw new JournalIinitializationException(e);
+ }
+ }
+
+ /**
+ * Performs an orderly close of the journal and checkpoint daemon
+ *
+ * @throws JournalShutdownException
+ */
+ @Destroy
+ public void destroy() throws JournalShutdownException {
+ try {
+ // avoid potential deadlock by acquiring write lock since the reaper thread can block when calling a
+ // synchronized journal method (e.g. journal.put) since journal.close() below holds a synchronization lock.
+ // This deadlock will prevent the scheduler.shutdown from completing, hanging this store shutdown
+ synchronized (shutdownLock) {
+ scheduler.shutdown();
+ journal.close();
+ monitor.stop("Stopped journal store");
+ }
+ } catch (IOException e) {
+ throw new JournalShutdownException(e);
+ } catch (InterruptedException e) {
+ throw new JournalShutdownException(e);
+ }
+ }
+
+ public void insertRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException {
+ write(owner, id, object, expiration, Header.INSERT);
+ }
+
+ public void updateRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException {
+ write(owner, id, object, expiration, Header.UPDATE);
+ }
+
+ public Object readRecord(SCAObject owner, String id) throws StoreReadException {
+ RecordEntry record;
+ RecordKey key = new RecordKey(id, owner);
+ record = cache.get(key);
+ if (record == null) {
+ return null;
+ }
+ if (record.getExpiration() != Store.NEVER && record.getExpiration() < System.currentTimeMillis()) {
+ cache.remove(key);
+ }
+ return record.getObject();
+ }
+
+ public void removeRecord(SCAObject owner, String id) throws StoreWriteException {
+ try {
+ journal.writeHeader(serializeHeader(Header.DELETE, 0, owner.getUri().toString(), id, NEVER), true);
+ RecordKey key = new RecordKey(id, owner);
+ // remove from the cache
+ cache.remove(key);
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ }
+
+ }
+
+ public void removeRecords() throws StoreWriteException {
+
+ }
+
+ public void recover(RecoveryListener listener) throws StoreReadException {
+ }
+
+ /**
+ * Writes a record to the log. If a record needs to be written in multiple blocks, only the last block write is
+ * forced.
+ *
+ * @param owner
+ * @param id
+ * @param object
+ * @param expiration
+ * @param operation
+ * @throws StoreWriteException
+ */
+ private void write(SCAObject owner, String id, Object object, long expiration, short operation)
+ throws StoreWriteException {
+ if (!(object instanceof Serializable)) {
+ String name = object.getClass().getName();
+ throw new StoreWriteException("Type must implement serializable", name, id);
+ }
+ Serializable serializable = (Serializable) object;
+ String name = owner.getUri().toString();
+ List<byte[]> bytes;
+ try {
+ bytes = partition(serialize(serializable), blockSize);
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ }
+ try {
+ byte[] header = serializeHeader(operation, bytes.size(), name, id, expiration);
+ long headerKey = journal.writeHeader(header, false);
+ // write chunked records in non-forced mode except last one
+ byte[] recordId = serializeRecordId(name, id);
+ long[] keys = new long[bytes.size() + 1];
+ keys[0] = headerKey;
+ for (int i = 0; i < bytes.size() - 1; i++) {
+ byte[] chunk = bytes.get(i);
+ keys[i + 1] = journal.writeBlock(chunk, recordId, false);
+ }
+ // add last record using a forced write
+ journal.writeBlock(bytes.get(bytes.size() - 1), recordId, true);
+ RecordKey key = new RecordKey(id, owner);
+ // add to the entry in the cache
+ cache.put(key, new RecordEntry(serializable, operation, keys, expiration));
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ }
+ }
+
+ /**
+ * Handles log overflow events
+ */
+ private class JournalLogEventListener implements LogEventListener {
+
+ public void logOverflowNotification(long overflowFence) {
+ assert overflowFence != 0;
+ long newMark = Long.MAX_VALUE;
+ long now = System.currentTimeMillis();
+ try {
+ for (Map.Entry<RecordKey, RecordEntry> entry : cache.entrySet()) {
+ RecordEntry record = entry.getValue();
+ long[] journalKey = record.getJournalKeys();
+ if (journalKey[0] > overflowFence) {
+ // do not copy to new log but check if this is the new mark
+ if (journalKey[0] < newMark) {
+ newMark = journalKey[0];
+ }
+ continue;
+ }
+ if (entry.getValue().getExpiration() > now) {
+ // copy the blocks associayed with the record to the new journal only if it is not expired
+ for (long key : journalKey) {
+ LogRecord lrecord = journal.get(null, key);
+ journal.put(lrecord.getFields(), false);
+
+ }
+ }
+ }
+ if (newMark == Long.MAX_VALUE) {
+ newMark = overflowFence;
+ }
+ // force write
+ journal.mark(newMark, true);
+ } catch (LogRecordSizeException e) {
+ monitor.error(e);
+ } catch (LogFileOverflowException e) {
+ monitor.error(e);
+ } catch (LogConfigurationException e) {
+ monitor.error(e);
+ } catch (IOException e) {
+ monitor.error(e);
+ } catch (InvalidLogBufferException e) {
+ monitor.error(e);
+ } catch (LogClosedException e) {
+ monitor.error(e);
+ } catch (LogException e) {
+ monitor.error(e);
+ } catch (InterruptedException e) {
+ monitor.error(e);
+ }
+ }
+
+ public boolean isLoggable(int level) {
+ return false;
+ }
+
+ public void log(int level, String message) {
+
+ }
+
+ public void log(int level, String message, Throwable thrown) {
+
+ }
+ }
+
+ /**
+ * Periodically scans the instance cache, clearing out expired entries
+ */
+ private class Reaper implements Runnable {
+
+ public void run() {
+ try {
+ synchronized (shutdownLock) {
+ boolean force = false;
+ long now = System.currentTimeMillis();
+ long oldest = Long.MAX_VALUE;
+ for (Map.Entry<RecordKey, RecordEntry> entry : cache.entrySet()) {
+ RecordKey key = entry.getKey();
+ RecordEntry record = entry.getValue();
+ if (record.getExpiration() <= now) {
+ try {
+ String ownerName = key.getOwner().getUri().toString();
+ String id = key.getId();
+ byte[] header =
+ SerializationHelper.serializeHeader(Header.DELETE, 0, ownerName, id, Store.NEVER);
+ journal.writeHeader(header, false);
+ // notify listeners
+ SCAObject owner = key.getOwner();
+ Object instance = record.getObject();
+ // notify listeners of the expiration
+ StoreExpirationEvent event = new StoreExpirationEvent(this, owner, instance);
+ publish(event);
+ } catch (IOException e) {
+ monitor.error(e);
+ } catch (StoreWriteException e) {
+ monitor.error(e);
+ }
+ force = true;
+ cache.remove(entry.getKey()); // semantics of ConcurrentHashMap allow this
+ } else {
+ if (record.getJournalKeys()[0] < oldest) {
+ oldest = record.getJournalKeys()[0];
+ }
+ }
+ }
+ if (force) {
+ // perform a forced write and update the journal mark
+ journal.mark(oldest, true);
+ }
+ }
+ } catch (IOException e) {
+ monitor.error(e);
+ } catch (LogClosedException e) {
+ monitor.error(e);
+ } catch (InvalidLogKeyException e) {
+ monitor.error(e);
+ } catch (InterruptedException e) {
+ monitor.error(e);
+ }
+ }
+ }
+
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java
new file mode 100644
index 0000000000..275bad21f7
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import org.apache.tuscany.spi.services.store.StoreException;
+
+/**
+ * Denotes an error setting a property on the journal store
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalStorePropertyException extends StoreException {
+
+ public JournalStorePropertyException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java
new file mode 100644
index 0000000000..28fa9c43ac
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.Serializable;
+
+/**
+ * A journal store cache entry for a record
+ *
+ * @version $Rev$ $Date$
+ */
+public class RecordEntry {
+ private Serializable object;
+ private short operation;
+ private long expiration;
+ private long[] journalKeys;
+
+
+ /**
+ * Creates a new cached record
+ *
+ * @param object the object to serialize
+ * @param operation an <code>INSERT</code> or <code>UPDATE</code> operation
+ * @param journalKey the journal key for the block where the record is written
+ */
+ public RecordEntry(Serializable object, short operation, long[] journalKey, long expiration) {
+ assert object != null;
+ this.object = object;
+ this.operation = operation;
+ this.journalKeys = journalKey;
+ this.expiration = expiration;
+ }
+
+ /**
+ * Returns the object
+ *
+ * @return the object
+ */
+ public Serializable getObject() {
+ return object;
+ }
+
+ /**
+ * Returns the type of operation
+ *
+ * @return the type of operation
+ */
+ public short getOperation() {
+ return operation;
+ }
+
+ public long getExpiration() {
+ return expiration;
+ }
+
+ public long[] getJournalKeys() {
+ return journalKeys;
+ }
+
+ public void setJournalKeys(long[] journalKeys) {
+ this.journalKeys = journalKeys;
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java
new file mode 100644
index 0000000000..7293fddb55
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import org.apache.tuscany.spi.component.SCAObject;
+
+/**
+ * Used by the store cache to retrieve record entries
+ *
+ * @version $Rev$ $Date$
+ */
+public class RecordKey {
+
+ private String id;
+ private SCAObject owner;
+
+ public RecordKey(String id, SCAObject owner) {
+ this.id = id;
+ this.owner = owner;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public SCAObject getOwner() {
+ return owner;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final RecordKey recordKey = (RecordKey) o;
+ if (id != null ? !id.equals(recordKey.id) : recordKey.id != null) {
+ return false;
+ }
+ return !(owner != null ? !owner.getUri().equals(recordKey.owner.getUri()) :
+ recordKey.owner != null);
+ }
+
+ public int hashCode() {
+ int result;
+ result = (id != null ? id.hashCode() : 0);
+ result = 31 * result + (owner != null ? owner.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java
new file mode 100644
index 0000000000..0a7197b014
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tuscany.spi.component.SCAExternalizable;
+import org.apache.tuscany.spi.component.WorkContext;
+
+/**
+ * Utility methods for processing journal records
+ *
+ * @version $Rev$ $Date$
+ */
+public final class SerializationHelper {
+
+ private SerializationHelper() {
+ }
+
+ /**
+ * Serializes and object
+ *
+ * @param serializable the object to serialize
+ * @throws IOException
+ */
+ public static byte[] serialize(Serializable serializable) throws IOException {
+ ByteArrayOutputStream bas = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bas);
+ out.writeObject(serializable);
+ return bas.toByteArray();
+ }
+
+ /**
+ * Deserializes an object using the TCCL
+ *
+ * @param bytes the serialized object byte array
+ * @param workContext the current work context
+ */
+ public static Object deserialize(byte[] bytes, WorkContext workContext) throws IOException, ClassNotFoundException {
+ Object o = new TCCLObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
+ if (o instanceof SCAExternalizable) {
+ SCAExternalizable externalizable = (SCAExternalizable) o;
+ externalizable.setWorkContext(workContext);
+ externalizable.reactivate();
+ }
+ return o;
+ }
+
+ /**
+ * Breaks a byte array into a series of blocks of the given size
+ *
+ * @param bytes the byte array to partition
+ * @param size the partition size
+ */
+ public static List<byte[]> partition(byte[] bytes, int size) {
+ assert size > 0;
+ List<byte[]> list = new ArrayList<byte[]>();
+ int pos = 0;
+ while (pos < bytes.length) {
+ if (pos + size > bytes.length) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ for (int i = pos; i < bytes.length; i++) {
+ stream.write(bytes[i]);
+ }
+ byte[] partition = stream.toByteArray();
+ list.add(partition);
+ pos = pos + partition.length;
+ } else {
+ byte[] partition = new byte[size];
+ for (int i = 0; i < size; i++) {
+ partition[i] = bytes[pos + i];
+ }
+ list.add(partition);
+ pos = pos + size;
+ }
+ }
+ return list;
+ }
+
+ /**
+ * Creates a serialized header entry that may be written to a log
+ *
+ * @param operation the operation type, i.e. {@link Header#INSERT}, {@link Header#UPDATE}, or {@link
+ * JournalStore#DELETE}
+ * @param numRecords the number of blocks that the record will be written two excluding the header block
+ * @param ownerId the id of the owner of the record
+ * @param id the id of the record unique to the owner
+ * @param expiration the record expirtation time in milliseconds
+ * @return a byte array containing the serialized header
+ * @throws IOException
+ */
+ public static byte[] serializeHeader(short operation, int numRecords, String ownerId, String id, long expiration)
+ throws IOException {
+ ByteArrayOutputStream stream = null;
+ ObjectOutputStream ostream = null;
+ try {
+ stream = new ByteArrayOutputStream();
+ ostream = new ObjectOutputStream(stream);
+ ostream.writeShort(operation);
+ ostream.writeInt(numRecords);
+ ostream.writeObject(ownerId);
+ ostream.writeObject(id);
+ ostream.writeLong(expiration);
+ ostream.flush();
+ return stream.toByteArray();
+ } finally {
+ if (ostream != null) {
+ try {
+ ostream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * Serializes a unique record id consisting of the owner id and the record's UUID
+ *
+ * @param ownerId the id of the owner, typically an SCAObject canonical name
+ * @param id the id associated with the record
+ * @return the serialized record byte array
+ * @throws IOException
+ */
+ public static byte[] serializeRecordId(String ownerId, String id)
+ throws IOException {
+ ByteArrayOutputStream stream = null;
+ ObjectOutputStream ostream = null;
+ try {
+ stream = new ByteArrayOutputStream();
+ ostream = new ObjectOutputStream(stream);
+ ostream.writeObject(ownerId);
+ ostream.writeObject(id);
+ ostream.flush();
+ return stream.toByteArray();
+ } finally {
+ if (ostream != null) {
+ try {
+ ostream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * Deserializes the given header. The first element from {@link Header#getFields()} is assumed to contain the header
+ * byte array to deserialize from
+ *
+ * @param header the header to deserialize
+ * @return the deserialized header
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public static Header deserializeHeader(Header header) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bas = null;
+ ObjectInputStream stream = null;
+ try {
+ bas = new ByteArrayInputStream(header.getFields()[0]);
+ stream = new TCCLObjectInputStream(bas);
+ header.setOperation(stream.readShort());
+ header.setNumBlocks(stream.readInt());
+ header.setOwnerId((String) stream.readObject());
+ header.setId((String) stream.readObject());
+ header.setExpiration(stream.readLong());
+ return header;
+ } finally {
+ if (bas != null) {
+ try {
+ bas.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java
new file mode 100644
index 0000000000..b341f9f9b9
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * Deserializes an object based on the thread context classloader
+ *
+ * @version $Rev$ $Date$
+ */
+public class TCCLObjectInputStream extends ObjectInputStream {
+ private final ClassLoader classLoader;
+
+ public TCCLObjectInputStream(InputStream in) throws IOException, SecurityException {
+ super(in);
+ this.classLoader = Thread.currentThread().getContextClassLoader();
+ }
+
+ protected Class resolveClass(ObjectStreamClass streamClass) throws IOException, ClassNotFoundException {
+ return classLoader.loadClass(streamClass.getName());
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreExpireTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreExpireTestCase.java
new file mode 100644
index 0000000000..4e23b3ca72
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreExpireTestCase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.net.URI;
+
+import org.apache.tuscany.spi.component.SCAObject;
+import org.apache.tuscany.spi.event.RuntimeEventListener;
+import org.apache.tuscany.spi.services.store.StoreExpirationEvent;
+import org.apache.tuscany.spi.services.store.StoreMonitor;
+
+import junit.framework.TestCase;
+import org.easymock.IAnswer;
+import org.easymock.classextension.EasyMock;
+import org.objectweb.howl.log.LogEventListener;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class JournalStoreExpireTestCase extends TestCase {
+ @SuppressWarnings({"FieldCanBeLocal"})
+ private JournalStore store;
+ private SCAObject owner;
+
+ public void testNotifyOnExpire() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class);
+ Journal journal = EasyMock.createNiceMock(Journal.class);
+ journal.setLogEventListener(EasyMock.isA(LogEventListener.class));
+ EasyMock.replay(journal);
+
+ RuntimeEventListener listener = org.easymock.EasyMock.createMock(RuntimeEventListener.class);
+ listener.onEvent(org.easymock.EasyMock.isA(StoreExpirationEvent.class));
+ EasyMock.expectLastCall().andStubAnswer(new IAnswer<Object>() {
+ public Object answer() throws Throwable {
+ latch.countDown();
+ return null;
+ }
+ });
+ org.easymock.EasyMock.replay(listener);
+
+ store = new JournalStore(monitor, journal) {
+ };
+ store.addListener(listener);
+ store.init();
+ final String id = UUID.randomUUID().toString();
+ store.insertRecord(owner, id, "test", 1);
+ if (!latch.await(10000, TimeUnit.MILLISECONDS)) {
+ // failed to notify listener
+ fail();
+ }
+ store.destroy();
+ EasyMock.verify(listener);
+ }
+
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ TestUtils.cleanupLog();
+ owner = EasyMock.createMock(SCAObject.class);
+ URI uri = URI.create("foo");
+ EasyMock.expect(owner.getUri()).andReturn(uri).atLeastOnce();
+ EasyMock.replay(owner);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ TestUtils.cleanupLog();
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreInsertTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreInsertTestCase.java
new file mode 100644
index 0000000000..a2cab56134
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreInsertTestCase.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.util.UUID;
+import java.net.URI;
+
+import org.apache.tuscany.spi.component.SCAObject;
+import org.apache.tuscany.spi.services.store.Store;
+import org.apache.tuscany.spi.services.store.StoreMonitor;
+
+import junit.framework.TestCase;
+import org.easymock.IAnswer;
+import org.easymock.classextension.EasyMock;
+import org.objectweb.howl.log.LogEventListener;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class JournalStoreInsertTestCase extends TestCase {
+ @SuppressWarnings({"FieldCanBeLocal"})
+ private JournalStore store;
+ private SCAObject owner;
+
+ public void testOrderedShutdown() throws Exception {
+ StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class);
+ Journal journal = EasyMock.createMock(Journal.class);
+ journal.setLogEventListener(EasyMock.isA(LogEventListener.class));
+ journal.open();
+ journal.close();
+ EasyMock.replay(journal);
+ store = new JournalStore(monitor, journal) {
+ };
+ store.init();
+ store.destroy();
+ EasyMock.verify(journal);
+ }
+
+ public void testInsertRecord() throws Exception {
+ StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class);
+ Journal journal = EasyMock.createMock(Journal.class);
+ journal.setLogEventListener(EasyMock.isA(LogEventListener.class));
+ journal.open();
+ final String id = UUID.randomUUID().toString();
+ EasyMock.expect(journal.writeHeader(EasyMock.isA(byte[].class), EasyMock.eq(false)))
+ .andStubAnswer(new IAnswer<Long>() {
+ public Long answer() throws Throwable {
+ Header header = new Header();
+ header.setFields(new byte[][]{(byte[]) EasyMock.getCurrentArguments()[0]});
+ // deserialize the message to test the format is correct
+ SerializationHelper.deserializeHeader(header);
+ assertTrue("Operation not INSERT", Header.INSERT == header.getOperation());
+ assertTrue("Expiration incorrect", Store.NEVER == header.getExpiration());
+ assertTrue("Least significant id incorrect",
+ id.equals(header.getId()));
+ assertTrue("Most significant id incorrect",
+ id.equals(header.getId()));
+ assertTrue("Records incorrect", 1 == header.getNumBlocks());
+ assertTrue("Owner id incorrect", "foo".equals(header.getOwnerId()));
+ return 1L;
+ }
+ });
+ EasyMock.expect(journal.writeBlock(EasyMock.isA(byte[].class), EasyMock.isA(byte[].class), EasyMock.eq(true)))
+ .andStubAnswer(new IAnswer<Long>() {
+ public Long answer() throws Throwable {
+ byte[] payload = (byte[]) EasyMock.getCurrentArguments()[0];
+ assertTrue("Block data incorrect", "test".equals(SerializationHelper.deserialize(payload, null)));
+ return 1L;
+ }
+ });
+ journal.close();
+ EasyMock.replay(journal);
+ store = new JournalStore(monitor, journal) {
+ };
+ store.init();
+ store.insertRecord(owner, id, "test", Store.NEVER);
+ store.destroy();
+ EasyMock.verify(journal);
+ }
+
+ /**
+ * Verifies that a written record will be cached. This is verified by the fact that long-term storage is never
+ * accessed.
+ */
+ public void testInsertRecordCache() throws Exception {
+ StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class);
+ Journal journal = EasyMock.createMock(Journal.class);
+ journal.setLogEventListener(EasyMock.isA(LogEventListener.class));
+ journal.open();
+ final String id = UUID.randomUUID().toString();
+ EasyMock.expect(journal.writeHeader(EasyMock.isA(byte[].class), EasyMock.eq(false))).andReturn(1L);
+ EasyMock.expect(journal.writeBlock(EasyMock.isA(byte[].class), EasyMock.isA(byte[].class), EasyMock.eq(true)))
+ .andReturn(1L);
+ journal.close();
+ EasyMock.replay(journal);
+ store = new JournalStore(monitor, journal) {
+ };
+ store.init();
+ store.insertRecord(owner, id, "test", Store.NEVER);
+ assertEquals("test", store.readRecord(owner, id));
+ store.destroy();
+ EasyMock.verify(journal);
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ TestUtils.cleanupLog();
+ owner = EasyMock.createMock(SCAObject.class);
+ URI uri = URI.create("foo");
+ EasyMock.expect(owner.getUri()).andReturn(uri).atLeastOnce();
+ EasyMock.replay(owner);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ TestUtils.cleanupLog();
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreOverflowTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreOverflowTestCase.java
new file mode 100644
index 0000000000..61a77869c1
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreOverflowTestCase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.util.UUID;
+import java.net.URI;
+
+import org.apache.tuscany.spi.component.SCAObject;
+import org.apache.tuscany.spi.services.store.StoreMonitor;
+
+import junit.framework.TestCase;
+import org.easymock.classextension.EasyMock;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class JournalStoreOverflowTestCase extends TestCase {
+ @SuppressWarnings({"FieldCanBeLocal"})
+ private JournalStore store;
+ private SCAObject owner;
+
+ /**
+ * Validates records are moved forward during a log overflow
+ *
+ * @throws Exception
+ */
+ public void testOverflow() throws Exception {
+ StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class);
+ store = new JournalStore(monitor);
+ store.setMaxBlocksPerFile(3);
+ store.init();
+ long expire = System.currentTimeMillis() + 200;
+ store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire);
+ store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire);
+ store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire);
+ store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire);
+ store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire); //
+ store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire);
+ Thread.sleep(250);
+ store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire + 20000);
+ store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire + 20000);
+ store.destroy();
+ }
+
+
+ public void testOverflowAtInsertHeader() throws Exception {
+
+ }
+
+ public void testOverflowAtUpdateHeader() throws Exception {
+
+ }
+
+ public void testOverflowAtDeleteHeader() throws Exception {
+
+ }
+
+ public void testOverflowAtBlock() throws Exception {
+
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ TestUtils.cleanupLog();
+ owner = EasyMock.createMock(SCAObject.class);
+ EasyMock.expect(owner.getUri()).andReturn(URI.create("foo")).atLeastOnce();
+ EasyMock.replay(owner);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ TestUtils.cleanupLog();
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalTestCase.java
new file mode 100644
index 0000000000..2df83a0e74
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalTestCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.ByteArrayOutputStream;
+import java.util.UUID;
+
+import static org.apache.tuscany.spi.services.store.Store.NEVER;
+
+import junit.framework.TestCase;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeHeader;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId;
+import org.objectweb.howl.log.Configuration;
+import org.objectweb.howl.log.LogRecord;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class JournalTestCase extends TestCase {
+ private Journal journal;
+
+ public void testWriteHeader() throws Exception {
+ String id = UUID.randomUUID().toString();
+ long key = journal.writeHeader(serializeHeader(Header.INSERT, 10, "foo/bar", id, NEVER), false);
+ LogRecord record = journal.get(null, key);
+ Header header = new Header();
+ header.setFields(record.getFields());
+ SerializationHelper.deserializeHeader(header);
+ assertTrue(record.type == Journal.HEADER);
+ assertEquals(Header.INSERT, header.getOperation());
+ assertEquals(10, header.getNumBlocks());
+ assertEquals("foo/bar", header.getOwnerId());
+ assertEquals(id, header.getId());
+ assertEquals(NEVER, header.getExpiration());
+ }
+
+ public void testWriteRecord() throws Exception {
+ byte[] recordId = serializeRecordId("foo", UUID.randomUUID().toString());
+ long key = journal.writeBlock("this is a test".getBytes(), recordId, true);
+ LogRecord record = journal.get(null, key);
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ assertEquals(record.type, Journal.RECORD);
+ stream.write(record.getFields()[1]);
+ JournalRecord jrecord = new JournalRecord(stream.toByteArray());
+ assertEquals("this is a test", new String(jrecord.getData()));
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ TestUtils.cleanupLog();
+ Configuration config = new Configuration();
+ config.setLogFileDir("../stores");
+ journal = new Journal(config);
+ journal.open();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ journal.close();
+ TestUtils.cleanupLog();
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/MockSCAExternalizable.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/MockSCAExternalizable.java
new file mode 100644
index 0000000000..62e8a50635
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/MockSCAExternalizable.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.tuscany.spi.component.ReactivationException;
+import org.apache.tuscany.spi.component.SCAExternalizable;
+import org.apache.tuscany.spi.component.WorkContext;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MockSCAExternalizable implements Externalizable, SCAExternalizable {
+ private WorkContext context;
+ private boolean reactivated;
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+
+ }
+
+ public boolean isReactivated() {
+ return reactivated;
+ }
+
+ public void setWorkContext(WorkContext context) {
+ this.context = context;
+ }
+
+ public void reactivate() throws ReactivationException {
+ assert context != null : "WorkContext not properly set";
+ reactivated = true;
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/RecordKeyTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/RecordKeyTestCase.java
new file mode 100644
index 0000000000..ddd69a1f54
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/RecordKeyTestCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.net.URI;
+
+import org.apache.tuscany.spi.component.SCAObject;
+
+import junit.framework.TestCase;
+import org.easymock.EasyMock;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class RecordKeyTestCase extends TestCase {
+
+ public void testEquals() throws Exception {
+ String id = "bar";
+ URI uri = URI.create("foo");
+ SCAObject owner1 = EasyMock.createMock(SCAObject.class);
+ EasyMock.expect(owner1.getUri()).andReturn(uri);
+ EasyMock.replay(owner1);
+ SCAObject owner2 = EasyMock.createMock(SCAObject.class);
+ EasyMock.expect(owner2.getUri()).andReturn(uri);
+ EasyMock.replay(owner2);
+
+ RecordKey key1 = new RecordKey(id, owner1);
+ RecordKey key2 = new RecordKey(id, owner2);
+ assertEquals(key1, key2);
+ }
+
+ public void testNotEqualsId() throws Exception {
+ String id = "bar";
+ SCAObject owner1 = EasyMock.createMock(SCAObject.class);
+ URI uri = URI.create("foo");
+ EasyMock.expect(owner1.getUri()).andReturn(uri);
+ EasyMock.replay(owner1);
+ SCAObject owner2 = EasyMock.createMock(SCAObject.class);
+ EasyMock.expect(owner2.getUri()).andReturn(uri);
+ EasyMock.replay(owner2);
+ RecordKey key1 = new RecordKey(id, owner1);
+ RecordKey key2 = new RecordKey("baz", owner2);
+ assertFalse(key1.equals(key2));
+ }
+
+ public void testNotEqualsOwner() throws Exception {
+ String id = "bar";
+ URI fooUri = URI.create("foo");
+ SCAObject owner1 = EasyMock.createMock(SCAObject.class);
+ EasyMock.expect(owner1.getUri()).andReturn(fooUri);
+ EasyMock.replay(owner1);
+ SCAObject owner2 = EasyMock.createMock(SCAObject.class);
+ URI barUri = URI.create("bar");
+ EasyMock.expect(owner2.getUri()).andReturn(barUri);
+ EasyMock.replay(owner2);
+ RecordKey key1 = new RecordKey(id, owner1);
+ RecordKey key2 = new RecordKey(id, owner2);
+ assertFalse(key1.equals(key2));
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/SerializationHelperTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/SerializationHelperTestCase.java
new file mode 100644
index 0000000000..160e4c7a23
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/SerializationHelperTestCase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.tuscany.spi.component.WorkContext;
+import static org.apache.tuscany.spi.services.store.Store.NEVER;
+
+import junit.framework.TestCase;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.deserialize;
+import org.easymock.EasyMock;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class SerializationHelperTestCase extends TestCase {
+
+ public void testTwoEvenChunks() throws Exception {
+ byte[] bytes = "this is a test".getBytes();
+ List<byte[]> chunks = SerializationHelper.partition(bytes, 7);
+ assertEquals(2, chunks.size());
+ assertEquals("this is", new String(chunks.get(0)));
+ assertEquals(" a test", new String(chunks.get(1)));
+ }
+
+ public void testUnevenChunks() throws Exception {
+ byte[] bytes = "this is a test123".getBytes();
+ List<byte[]> chunks = SerializationHelper.partition(bytes, 7);
+ assertEquals(3, chunks.size());
+ assertEquals("this is", new String(chunks.get(0)));
+ assertEquals(" a test", new String(chunks.get(1)));
+ assertEquals("123", new String(chunks.get(2)));
+ }
+
+ public void testChunkSizeGreater() throws Exception {
+ byte[] bytes = "this is a test".getBytes();
+ List<byte[]> chunks = SerializationHelper.partition(bytes, 512);
+ assertEquals(1, chunks.size());
+ byte[] chunk = chunks.get(0);
+ assertEquals(14, chunk.length);
+ assertEquals("this is a test", new String(chunk));
+ }
+
+ public void testSerializeDeserializeNonSCAExternalizable() throws Exception {
+ byte[] bytes = SerializationHelper.serialize("foo");
+ assertEquals("foo", deserialize(bytes, null));
+ }
+
+ public void testSerializeDeserializeSCAExternalizable() throws Exception {
+ byte[] bytes = SerializationHelper.serialize(new MockSCAExternalizable());
+ WorkContext context = EasyMock.createNiceMock(WorkContext.class);
+ MockSCAExternalizable externalized = (MockSCAExternalizable) deserialize(bytes, context);
+ assertTrue(externalized.isReactivated());
+ }
+
+ public void testDeserializeHeader() throws Exception {
+ String id = UUID.randomUUID().toString();
+ byte[] bytes = SerializationHelper.serializeHeader(Header.INSERT, 2, "foo", id, NEVER);
+ Header header = SerializationHelper.deserializeHeader(new MockHeader(bytes));
+ assertEquals(Header.INSERT, header.getOperation());
+ assertEquals(2, header.getNumBlocks());
+ assertEquals("foo", header.getOwnerId());
+ assertEquals(id, header.getId());
+ assertEquals(NEVER, header.getExpiration());
+ }
+
+ private class MockHeader extends Header {
+ public MockHeader(byte[] bytes) {
+ super();
+ fields = new byte[][]{bytes};
+ }
+ }
+
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/TestUtils.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/TestUtils.java
new file mode 100644
index 0000000000..5fb2ebeb82
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/TestUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.File;
+
+/**
+ * JournalThroughputTest case utilities
+ *
+ * @version $Rev$ $Date$
+ */
+public final class TestUtils {
+
+ private TestUtils() {
+
+ }
+
+ /**
+ * Removes log files from disk
+ */
+ public static void cleanupLog() {
+ File dir = new File("../stores");
+ if (!dir.exists()) {
+ return;
+ }
+ for (File file : dir.listFiles()) {
+ file.delete();
+ }
+ dir.delete();
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/Foo.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/Foo.java
new file mode 100644
index 0000000000..d9c7aee660
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/Foo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal.performance;
+
+import java.io.Serializable;
+
+/**
+ * @version $Rev$ $Date$
+ */
+@SuppressWarnings({"SerializableHasSerializationMethods"})
+public class Foo implements Serializable {
+ private static final long serialVersionUID = -1003664515513709814L;
+ protected String baz;
+ protected int bar;
+
+ public Foo(String baz, int bar) {
+ this.baz = baz;
+ this.bar = bar;
+ }
+
+ public String getBaz() {
+ return baz;
+ }
+
+ public void setBaz(String baz) {
+ this.baz = baz;
+ }
+
+ public int getBar() {
+ return bar;
+ }
+
+ public void setBar(int bar) {
+ this.bar = bar;
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalStoreThroughputTest.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalStoreThroughputTest.java
new file mode 100644
index 0000000000..eeb566f182
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalStoreThroughputTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal.performance;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.tuscany.spi.component.SCAObject;
+import org.apache.tuscany.spi.services.store.StoreWriteException;
+
+import org.apache.tuscany.persistence.store.journal.JournalShutdownException;
+import org.apache.tuscany.persistence.store.journal.JournalStore;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serialize;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId;
+import org.apache.tuscany.persistence.store.journal.TestUtils;
+
+/**
+ * Runs a basic throughput tests on JournalStore operations
+ * <p/>
+ * TODO this should be integrated with a Maven itest-based performance harness
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalStoreThroughputTest {
+ private static final int SIZE = 1000;
+ private CyclicBarrier barrier;
+ private JournalStore store;
+ private long now;
+ private SCAObject owner = new MockSCAObject();
+ private String id = UUID.randomUUID().toString();
+ private CountDownLatch latch = new CountDownLatch(1);
+ private long expire = System.currentTimeMillis() + 10000;
+ private Foo object = new Foo("this is a test", 1);
+
+ public static void main(String[] args) throws Exception {
+ JournalStoreThroughputTest test = new JournalStoreThroughputTest();
+ test.testAppend();
+ test.latch.await(5000, TimeUnit.MILLISECONDS);
+ }
+
+ public void testAppend() throws Exception {
+ TestUtils.cleanupLog();
+ store = new JournalStore(new MockMonitor());
+ store.init();
+ final Thread[] threads = new Thread[SIZE];
+ barrier = new CyclicBarrier(SIZE, new Runnable() {
+ public void run() {
+ try {
+ System.out.println("-----------------------------------------------------");
+ System.out.println("JournalStore.append()");
+ byte[] idBytes = serializeRecordId(owner.getUri().toString(), id);
+ byte[] bytes = serialize(object);
+ System.out.println("Approx record size :" + (bytes.length + idBytes.length));
+ System.out.println("Total threads :" + barrier.getNumberWaiting());
+ System.out.println("Forced writes :" + barrier.getNumberWaiting());
+ System.out.println("Time:" + (System.currentTimeMillis() - now));
+ store.destroy();
+ latch.countDown();
+ } catch (JournalShutdownException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ for (int i = 0; i < SIZE; i++) {
+ threads[i] = new Thread(new InsertWorker(true));
+ }
+ now = System.currentTimeMillis();
+ for (int i = 0; i < SIZE; i++) {
+ threads[i].start();
+ }
+ }
+
+ private class InsertWorker implements Runnable {
+ boolean forced;
+
+ public InsertWorker(boolean forced) {
+ this.forced = forced;
+ }
+
+ public void run() {
+ try {
+ store.insertRecord(owner, id, object, expire);
+ barrier.await();
+ } catch (StoreWriteException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalThroughputTest.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalThroughputTest.java
new file mode 100644
index 0000000000..7a3e8fa308
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalThroughputTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal.performance;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.tuscany.spi.services.store.StoreWriteException;
+
+import org.apache.tuscany.persistence.store.journal.Journal;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId;
+import org.apache.tuscany.persistence.store.journal.TestUtils;
+
+/**
+ * Runs a basic throughput tests on Journal operations
+ * <p/>
+ * TODO this should be integrated with a Maven itest-based performance harness
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalThroughputTest {
+ private static final int SIZE = 1000;
+ private CyclicBarrier barrier;
+ private Journal journal;
+ private byte[] bytes;
+ private byte[] recordId;
+ private long now;
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ public static void main(String[] args) throws Exception {
+ JournalThroughputTest test = new JournalThroughputTest();
+ test.testForcedWrites();
+ test.latch.await(5000, TimeUnit.MILLISECONDS);
+ test.testNonForcedWrites();
+ }
+
+ public void testForcedWrites() throws Exception {
+ TestUtils.cleanupLog();
+ journal = new Journal();
+ journal.open();
+ recordId = serializeRecordId("foo", UUID.randomUUID().toString());
+ bytes = "this is a test".getBytes();
+ final Thread[] threads = new Thread[SIZE];
+ barrier = new CyclicBarrier(SIZE, new Runnable() {
+ public void run() {
+ System.out.println("-----------------------------------------------------");
+ System.out.println("Journal.writeBlock() using forced writes");
+ System.out.println("Approx record size :" + (recordId.length + bytes.length));
+ System.out.println("Total threads :" + barrier.getNumberWaiting());
+ System.out.println("Forced writes :" + barrier.getNumberWaiting());
+ System.out.println("Time:" + (System.currentTimeMillis() - now));
+ try {
+ journal.close();
+ latch.countDown();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ for (int i = 0; i < SIZE; i++) {
+ threads[i] = new Thread(new Worker(true));
+ }
+ now = System.currentTimeMillis();
+ for (int i = 0; i < SIZE; i++) {
+ threads[i].start();
+ }
+ }
+
+ public void testNonForcedWrites() throws Exception {
+ TestUtils.cleanupLog();
+ journal = new Journal();
+ journal.open();
+ recordId = serializeRecordId("foo", UUID.randomUUID().toString());
+ bytes = "this is a test".getBytes();
+ final Thread[] threads = new Thread[SIZE];
+ barrier = new CyclicBarrier(SIZE, new Runnable() {
+ public void run() {
+ System.out.println("-----------------------------------------------------");
+ System.out.println("Journal.writeBlock() using non-forced writes");
+ System.out.println("Approx record size :" + (recordId.length + bytes.length));
+ System.out.println("Total threads :" + barrier.getNumberWaiting());
+ System.out.println("Forced writes :" + barrier.getNumberWaiting());
+ System.out.println("Time:" + (System.currentTimeMillis() - now));
+ System.out.println("-----------------------------------------------------");
+ try {
+ journal.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ for (int i = 0; i < SIZE; i++) {
+ threads[i] = new Thread(new Worker(false));
+ }
+ now = System.currentTimeMillis();
+ for (int i = 0; i < SIZE; i++) {
+ threads[i].start();
+ }
+ }
+
+ private class Worker implements Runnable {
+ boolean forced;
+
+ public Worker(boolean forced) {
+ this.forced = forced;
+ }
+
+ public void run() {
+ try {
+ journal.writeBlock(bytes, recordId, forced);
+ barrier.await();
+ } catch (StoreWriteException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockMonitor.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockMonitor.java
new file mode 100644
index 0000000000..a62630ebe0
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockMonitor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal.performance;
+
+import org.apache.tuscany.spi.services.store.StoreMonitor;
+
+import org.apache.tuscany.api.annotation.LogLevel;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MockMonitor implements StoreMonitor {
+ @LogLevel("DEBUG")
+ public void start(String msg) {
+
+ }
+
+ @LogLevel("DEBUG")
+ public void stop(String msg) {
+
+ }
+
+ @LogLevel("DEBUG")
+ public void beginRecover() {
+
+ }
+
+ @LogLevel("DEBUG")
+ public void endRecover() {
+
+ }
+
+ @LogLevel("DEBUG")
+ public void recover(Object recordId) {
+
+ }
+
+ @LogLevel("ERROR")
+ public void error(Throwable e) {
+
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockSCAObject.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockSCAObject.java
new file mode 100644
index 0000000000..e9ecdbab28
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockSCAObject.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal.performance;
+
+import java.net.URI;
+
+import org.apache.tuscany.spi.CoreRuntimeException;
+import org.apache.tuscany.spi.component.SCAObject;
+import org.apache.tuscany.spi.event.Event;
+import org.apache.tuscany.spi.event.EventFilter;
+import org.apache.tuscany.spi.event.RuntimeEventListener;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MockSCAObject implements SCAObject {
+
+ public URI getUri() {
+ return null;
+ }
+
+ public void publish(Event object) {
+
+ }
+
+ public void addListener(RuntimeEventListener listener) {
+
+ }
+
+ public void addListener(EventFilter filter, RuntimeEventListener listener) {
+
+ }
+
+ public void removeListener(RuntimeEventListener listener) {
+
+ }
+
+ public int getLifecycleState() {
+ return 0;
+ }
+
+ public void start() throws CoreRuntimeException {
+
+ }
+
+ public void stop() throws CoreRuntimeException {
+
+ }
+
+}