summaryrefslogtreecommitdiffstats
path: root/sandbox/old/contrib/persistence/store.journal/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'sandbox/old/contrib/persistence/store.journal/src/main/java/org')
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java33
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java150
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java99
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java33
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java67
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java33
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java595
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java33
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java80
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java67
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java224
-rw-r--r--sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java42
12 files changed, 1456 insertions, 0 deletions
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java
new file mode 100644
index 0000000000..df6d3452e2
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+/**
+ * A listener interface for cache events
+ *
+ * @version $Rev$ $Date$
+ */
+public interface CacheEventListener {
+
+ /**
+ * Callback when an entry is evicted from the cache
+ */
+ void onEviction(RecordKey key, RecordEntry entry);
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java
new file mode 100644
index 0000000000..f2cae6218a
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+/**
+ * A header entry for a record written to the log. A header contains the following information:
+ * <pre>
+ * <ul>
+ * <li><strong>Owner id</strong> - the unique id of the owner, typically the SCAObject canonical name
+ * <li><strong>Operation</strong> - if the record is an INSERT, UPDATE, or DELETE
+ * <li><strong>Most Significant bits</strong> - the most signficant bits of the <code>java.util.UUID</code>
+ * representing the id of the persisted instance
+ * <li><strong>Least Significant bits</strong> - the least signficant bits of the <code>java.util.UUID</code>
+ * representing the id of the persisted instance
+ * <li><strong>Expiration</strong> - the expiration time in milliseconds when the record is set to expire
+ * <li><strong>Number of blocks</strong> - the number of blocks the record is written across
+ * <li><strong>Fields</strong> - any other data associated with the header
+ * </ul>
+ * </pre>
+ *
+ * @version $Rev$ $Date$
+ */
+public class Header {
+ public static final short INSERT = 0;
+ public static final short UPDATE = 1;
+ public static final short DELETE = 2;
+
+ protected byte[][] fields;
+ private short operation;
+ private String ownerId;
+ private String id;
+ private long expiration;
+ private int numBlocks;
+
+ public Header() {
+ }
+
+ /**
+ * Returns the record operation type
+ *
+ * @return the record operation type
+ */
+ public short getOperation() {
+ return operation;
+ }
+
+ /**
+ * Sets the record operation type
+ */
+ public void setOperation(short operation) {
+ this.operation = operation;
+ }
+
+ /**
+ * Returns the number of blocks the record is written across
+ *
+ * @return the number of blocks the record is written across
+ */
+ public int getNumBlocks() {
+ return numBlocks;
+ }
+
+ /**
+ * Sets the number of blocks the record is written across
+ */
+ public void setNumBlocks(int numBlocks) {
+ this.numBlocks = numBlocks;
+ }
+
+ /**
+ * Returns the unique owner id
+ *
+ * @return the unique owner id
+ */
+ public String getOwnerId() {
+ return ownerId;
+ }
+
+ /**
+ * Sets the unique owner id
+ */
+ public void setOwnerId(String ownerId) {
+ this.ownerId = ownerId;
+ }
+
+ /**
+ * Returns the record id
+ *
+ * @return the record id
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Sets the most significant bits of the record UUID
+ */
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * Returns the time in milliseconds the record is set to expire
+ *
+ * @return the time in milliseconds the record is set to expire
+ */
+ public long getExpiration() {
+ return expiration;
+ }
+
+ /**
+ * Sets the time the record is set to expire in milliseconds
+ */
+ public void setExpiration(long expiration) {
+ this.expiration = expiration;
+ }
+
+ /**
+ * Returns additional header data
+ *
+ * @return additional header data
+ */
+ public byte[][] getFields() {
+ return fields;
+ }
+
+ /**
+ * Sets additional header data
+ */
+ public void setFields(byte[][] fields) {
+ this.fields = fields;
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java
new file mode 100644
index 0000000000..b8dfd072c5
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.IOException;
+
+import org.apache.tuscany.spi.services.store.StoreWriteException;
+
+import org.objectweb.howl.log.Configuration;
+import org.objectweb.howl.log.LogClosedException;
+import org.objectweb.howl.log.LogFileOverflowException;
+import org.objectweb.howl.log.LogRecordSizeException;
+import org.objectweb.howl.log.Logger;
+
+/**
+ * Extends the HOWL logger implementation adding convenience methods for processing records
+ *
+ * @version $Rev$ $Date$
+ */
+public class Journal extends Logger {
+ public static final short HEADER = 0;
+ public static final short RECORD = 1;
+
+ public Journal() throws IOException {
+ }
+
+ public Journal(Configuration config) throws IOException {
+ super(config);
+ }
+
+ /**
+ * Writes a header record to the log. The format of the header is defined by {@link
+ * SerializationHelper#createHeader(short, int, String, String, long)}
+ *
+ * @param bytes the record as a byte array
+ * @param force true if the disk write should be forced
+ * @return the log entry key
+ * @throws StoreWriteException
+ */
+ public long writeHeader(byte[] bytes, boolean force) throws StoreWriteException {
+ try {
+ return put(HEADER, new byte[][]{bytes}, force);
+ } catch (LogRecordSizeException e) {
+ throw new StoreWriteException(e);
+ } catch (LogFileOverflowException e) {
+ throw new StoreWriteException(e);
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ } catch (LogClosedException e) {
+ throw new StoreWriteException(e);
+ } catch (InterruptedException e) {
+ throw new StoreWriteException(e);
+ }
+ }
+
+ /**
+ * Writes a record block to the log. The block may be the complete number of bytes or a chunked part of thereof if
+ * it does not fit within HOWL's block size limit.
+ *
+ * @param data the data to write
+ * @param id the unique record id consisting of the owner id and the UUID
+ * @param force true if the disk write should be force. For records that are written in multiple blocks, only the
+ * last write should be forced.
+ * @return the log entry key
+ * @throws StoreWriteException
+ */
+ public long writeBlock(byte[] data, byte[] id, boolean force) throws StoreWriteException {
+ try {
+ return put(RECORD, new byte[][]{id, data}, force);
+ } catch (LogRecordSizeException e) {
+ throw new StoreWriteException(e);
+ } catch (LogFileOverflowException e) {
+ throw new StoreWriteException(e);
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ } catch (LogClosedException e) {
+ throw new StoreWriteException(e);
+ } catch (InterruptedException e) {
+ throw new StoreWriteException(e);
+ }
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java
new file mode 100644
index 0000000000..910598fc61
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import org.apache.tuscany.spi.services.store.StoreException;
+
+/**
+ * Denotes an error starting the journal
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalIinitializationException extends StoreException {
+
+ public JournalIinitializationException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java
new file mode 100644
index 0000000000..ec2e491c1d
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+/**
+ * Represents a record read from the log
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalRecord {
+
+ private Header header;
+ private byte[] data;
+
+ public JournalRecord(byte[] data) {
+ this.data = data;
+ }
+
+ /**
+ * Returns the record header
+ *
+ * @return the record header
+ */
+ public Header getHeader() {
+ return header;
+ }
+
+ /**
+ * Sets the record header
+ */
+ public void setHeader(Header header) {
+ this.header = header;
+ }
+
+ /**
+ * Returns the serialized data portion of the record which may be assembled from multiple blocks
+ *
+ * @return the serialized data portion of the record which may be assembled from multiple blocks
+ */
+ public byte[] getData() {
+ return data;
+ }
+
+ /**
+ * Sets the serialized data portion of the record which may be written across multiple blocks
+ */
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java
new file mode 100644
index 0000000000..945a77753a
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import org.apache.tuscany.spi.services.store.StoreException;
+
+/**
+ * Denotes an error shutting down the journal
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalShutdownException extends StoreException {
+
+ public JournalShutdownException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java
new file mode 100644
index 0000000000..422c074790
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import org.osoa.sca.annotations.Constructor;
+import org.osoa.sca.annotations.Destroy;
+import org.osoa.sca.annotations.EagerInit;
+import org.osoa.sca.annotations.Init;
+import org.osoa.sca.annotations.Property;
+import org.osoa.sca.annotations.Service;
+
+import org.apache.tuscany.spi.component.SCAObject;
+import org.apache.tuscany.spi.event.AbstractEventPublisher;
+import org.apache.tuscany.spi.services.store.RecoveryListener;
+import org.apache.tuscany.spi.services.store.Store;
+import org.apache.tuscany.spi.services.store.StoreExpirationEvent;
+import org.apache.tuscany.spi.services.store.StoreMonitor;
+import org.apache.tuscany.spi.services.store.StoreReadException;
+import org.apache.tuscany.spi.services.store.StoreWriteException;
+
+import org.apache.tuscany.api.annotation.Monitor;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.partition;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serialize;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeHeader;
+import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId;
+import org.objectweb.howl.log.Configuration;
+import org.objectweb.howl.log.InvalidFileSetException;
+import org.objectweb.howl.log.InvalidLogBufferException;
+import org.objectweb.howl.log.InvalidLogKeyException;
+import org.objectweb.howl.log.LogClosedException;
+import org.objectweb.howl.log.LogConfigurationException;
+import org.objectweb.howl.log.LogEventListener;
+import org.objectweb.howl.log.LogException;
+import org.objectweb.howl.log.LogFileOverflowException;
+import org.objectweb.howl.log.LogRecord;
+import org.objectweb.howl.log.LogRecordSizeException;
+
+/**
+ * A journal-based store service that uses HOWL for reliable persistence and recovery of object instances. Insert,
+ * update, and delete operations, as well as object instances, are written to a binary log. Delete operations are
+ * written as a single header block as defined by {@link SerializationHelper#serializeHeader(short,int,String,String,
+ *long)}. Insert and update operations are written using multiple blocks consisting of at least one header and 1..n
+ * additional blocks containing the object byte array. If the byte array size is greater than the log block size (the
+ * HOWL default is 4K), it must be partitioned into smaller units using {@link SerializationHelper#partition(byte[],
+ *int)} and written across multiple blocks. The header contains the number of ensuing blocks a record occupies. Since
+ * block writes to the log may be interleaved, blocks for a given record may not be consecutive. In order to identify
+ * the record a block belongs to, the first byte array of written data for the block contains the serialized owner id
+ * and UUID associated with the record.
+ * <p/>
+ * A cache of all active persisted instances is maintained in memory and is used for read operations. When an instance
+ * is persisted, a log record is written and an entry is added to the cache containing the instance. When an instance is
+ * deleted, a delete record is written to the log and its corresponding cache entry is removed. During recovery, the
+ * instance cache is reconstituted by reading the log and deserializing active records.
+ * <p/>
+ * The journal store is designed for reliable, performant persistence and therefore avoids the overhead associated with
+ * using a relational database. This does, however, impose two configuration requirements. The first is that a log file
+ * must be larger than the size of all <bold>active</bold> instances in the runtime. As a log file begins to fill, the
+ * store will copy active records (expired records will be ignored) to a new file and the old log will be recycled. A
+ * second requirement is that all active instances must fit within the memory limits of the runtime JVM since the store
+ * does not perform passivation and a table of all active instances is maintained for querying.
+ *
+ * @version $Rev$ $Date$
+ */
+@Service(Store.class)
+@EagerInit
+public class JournalStore extends AbstractEventPublisher implements Store {
+ private static final int UNITIALIZED = -99;
+
+ // the cache of active records
+ private ConcurrentHashMap<RecordKey, RecordEntry> cache;
+ // private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Object shutdownLock = new Object();
+ // TODO integrate with a core threading scheme
+ private ScheduledExecutorService scheduler;
+ private StoreMonitor monitor;
+ private Journal journal;
+ private long defaultExpirationOffset = 600000; // 10 minutes
+ private long reaperInterval = 1000;
+ private int blockSize = 4096; // 4k standard for HOWL
+
+ private Configuration config;
+ // HOWL properties - cf. <code>org.objectweb.howl.log.Configuration</code>
+ private boolean checksumEnabled;
+ private int bufferSize = UNITIALIZED;
+ private String bufferClassName;
+ private int maxBuffers = UNITIALIZED;
+ private int minBuffers = UNITIALIZED;
+ private int flushSleepTime = UNITIALIZED;
+ private boolean flushPartialBuffers;
+ private int threadsWaitingForceThreshold = UNITIALIZED;
+ private int maxBlocksPerFile = UNITIALIZED;
+ private int maxLogFiles = UNITIALIZED;
+ private String logFileDir = "../stores";
+ private String logFileExt = "store";
+ private String logFileName = "tuscany";
+ private String logFileMode;
+ // end HOWL properties
+
+ /**
+ * Creates a new store instance
+ *
+ * @param monitor the monitor for recording store events
+ */
+ @Constructor
+ public JournalStore(@Monitor StoreMonitor monitor) {
+ this.monitor = monitor;
+ config = new Configuration();
+ }
+
+ /**
+ * Protected constructor for unit testing
+ *
+ * @param monitor the monitor for recording store events
+ * @param journal the journal the store should write to, typically a mock
+ */
+ protected JournalStore(StoreMonitor monitor, Journal journal) {
+ this.monitor = monitor;
+ this.journal = journal;
+ config = new Configuration();
+ }
+
+ public int getBlockSize() {
+ return blockSize;
+ }
+
+ /**
+ * Returns the maximum default expiration offset for records in the store
+ *
+ * @return the maximum default expiration offset for records in the store
+ */
+ @Property
+ public long getExpirationOffset() {
+ return defaultExpirationOffset;
+ }
+
+ /**
+ * Sets the maximum default expiration offset for records in the store
+ */
+ public void setDefaultExpirationOffset(long defaultExpirationOffset) {
+ this.defaultExpirationOffset = defaultExpirationOffset;
+ }
+
+ @Property
+ public void setBlockSize(int blockSize) {
+ this.blockSize = blockSize;
+ }
+
+ public boolean isChecksumEnabled() {
+ return checksumEnabled;
+ }
+
+ @Property
+ public void setChecksumEnabled(boolean val) {
+ config.setChecksumEnabled(val);
+ this.checksumEnabled = val;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ @Property
+ public void setBufferSize(int bufferSize) throws JournalStorePropertyException {
+ try {
+ config.setBufferSize(bufferSize);
+ this.bufferSize = bufferSize;
+ } catch (LogConfigurationException e) {
+ throw new JournalStorePropertyException(e);
+ }
+ }
+
+ public String getBufferClassName() {
+ return bufferClassName;
+ }
+
+ @Property
+ public void setBufferClassName(String bufferClassName) {
+ config.setBufferClassName(bufferClassName);
+ this.bufferClassName = bufferClassName;
+ }
+
+ public int getMaxBuffers() {
+ return maxBuffers;
+ }
+
+ @Property
+ public void setMaxBuffers(int maxBuffers) throws JournalStorePropertyException {
+ try {
+ config.setMaxBuffers(maxBuffers);
+ this.maxBuffers = maxBuffers;
+ } catch (LogConfigurationException e) {
+ throw new JournalStorePropertyException(e);
+ }
+ }
+
+ public int getMinBuffers() {
+ return minBuffers;
+ }
+
+ @Property
+ public void setMinBuffers(int minBuffers) throws JournalStorePropertyException {
+ try {
+ config.setMinBuffers(minBuffers);
+ this.minBuffers = minBuffers;
+ } catch (LogConfigurationException e) {
+ throw new JournalStorePropertyException(e);
+ }
+ }
+
+ public int getFlushSleepTime() {
+ return flushSleepTime;
+ }
+
+ @Property
+ public void setFlushSleepTime(int time) {
+ config.setFlushSleepTime(time);
+ this.flushSleepTime = time;
+ }
+
+ public boolean isFlushPartialBuffers() {
+ return flushPartialBuffers;
+ }
+
+ @Property
+ public void setFlushPartialBuffers(boolean val) {
+ config.setFlushPartialBuffers(val);
+ this.flushPartialBuffers = val;
+ }
+
+ public int getThreadsWaitingForceThreshold() {
+ return threadsWaitingForceThreshold;
+ }
+
+ @Property
+ public void setThreadsWaitingForceThreshold(int threshold) {
+ config.setThreadsWaitingForceThreshold(threshold);
+ this.threadsWaitingForceThreshold = threshold;
+ }
+
+ public int getMaxBlocksPerFile() {
+ return maxBlocksPerFile;
+ }
+
+ @Property
+ public void setMaxBlocksPerFile(int maxBlocksPerFile) {
+ config.setMaxBlocksPerFile(maxBlocksPerFile);
+ this.maxBlocksPerFile = maxBlocksPerFile;
+ }
+
+ public int getMaxLogFiles() {
+ return maxLogFiles;
+ }
+
+ @Property
+ public void setMaxLogFiles(int maxLogFiles) {
+ config.setMaxLogFiles(maxLogFiles);
+ this.maxLogFiles = maxLogFiles;
+ }
+
+ public String getLogFileDir() {
+ return logFileDir;
+ }
+
+ @Property
+ public void setLogFileDir(String dir) {
+ config.setLogFileDir(dir);
+ this.logFileDir = dir;
+ }
+
+ public String getLogFileExt() {
+ return logFileExt;
+ }
+
+ @Property
+ public void setLogFileExt(String logFileExt) {
+ config.setLogFileExt(logFileExt);
+ this.logFileExt = logFileExt;
+ }
+
+ public String getLogFileName() {
+ return logFileName;
+ }
+
+ @Property
+ public void setLogFileName(String logFileName) {
+ config.setLogFileName(logFileName);
+ this.logFileName = logFileName;
+ }
+
+ public String getLogFileMode() {
+ return logFileMode;
+ }
+
+ public void setLogFileMode(String logFileMode) throws JournalStorePropertyException {
+ try {
+ config.setLogFileMode(logFileMode);
+ this.logFileMode = logFileMode;
+ } catch (LogConfigurationException e) {
+ throw new JournalStorePropertyException(e);
+ }
+ }
+
+ /**
+ * Initializes the store by opening the journal and starting the checkpoint daemon
+ *
+ * @throws JournalIinitializationException
+ *
+ */
+ @Init
+ public void init() throws JournalIinitializationException {
+ try {
+ cache = new ConcurrentHashMap<RecordKey, RecordEntry>();
+ if (journal == null) {
+ config.setLogFileName(logFileName);
+ config.setLogFileExt(logFileExt);
+ config.setLogFileDir(logFileDir);
+ journal = new Journal(config);
+ }
+ journal.setLogEventListener(new JournalLogEventListener());
+ journal.open();
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ scheduler.scheduleWithFixedDelay(new Reaper(), reaperInterval, reaperInterval, MILLISECONDS);
+ monitor.start("Started journal store");
+ } catch (IOException e) {
+ throw new JournalIinitializationException(e);
+ } catch (LogConfigurationException e) {
+ throw new JournalIinitializationException(e);
+ } catch (InvalidLogBufferException e) {
+ throw new JournalIinitializationException(e);
+ } catch (InterruptedException e) {
+ throw new JournalIinitializationException(e);
+ } catch (InvalidFileSetException e) {
+ throw new JournalIinitializationException(e);
+ }
+ }
+
+ /**
+ * Performs an orderly close of the journal and checkpoint daemon
+ *
+ * @throws JournalShutdownException
+ */
+ @Destroy
+ public void destroy() throws JournalShutdownException {
+ try {
+ // avoid potential deadlock by acquiring write lock since the reaper thread can block when calling a
+ // synchronized journal method (e.g. journal.put) since journal.close() below holds a synchronization lock.
+ // This deadlock will prevent the scheduler.shutdown from completing, hanging this store shutdown
+ synchronized (shutdownLock) {
+ scheduler.shutdown();
+ journal.close();
+ monitor.stop("Stopped journal store");
+ }
+ } catch (IOException e) {
+ throw new JournalShutdownException(e);
+ } catch (InterruptedException e) {
+ throw new JournalShutdownException(e);
+ }
+ }
+
+ public void insertRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException {
+ write(owner, id, object, expiration, Header.INSERT);
+ }
+
+ public void updateRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException {
+ write(owner, id, object, expiration, Header.UPDATE);
+ }
+
+ public Object readRecord(SCAObject owner, String id) throws StoreReadException {
+ RecordEntry record;
+ RecordKey key = new RecordKey(id, owner);
+ record = cache.get(key);
+ if (record == null) {
+ return null;
+ }
+ if (record.getExpiration() != Store.NEVER && record.getExpiration() < System.currentTimeMillis()) {
+ cache.remove(key);
+ }
+ return record.getObject();
+ }
+
+ public void removeRecord(SCAObject owner, String id) throws StoreWriteException {
+ try {
+ journal.writeHeader(serializeHeader(Header.DELETE, 0, owner.getUri().toString(), id, NEVER), true);
+ RecordKey key = new RecordKey(id, owner);
+ // remove from the cache
+ cache.remove(key);
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ }
+
+ }
+
+ public void removeRecords() throws StoreWriteException {
+
+ }
+
+ public void recover(RecoveryListener listener) throws StoreReadException {
+ }
+
+ /**
+ * Writes a record to the log. If a record needs to be written in multiple blocks, only the last block write is
+ * forced.
+ *
+ * @param owner
+ * @param id
+ * @param object
+ * @param expiration
+ * @param operation
+ * @throws StoreWriteException
+ */
+ private void write(SCAObject owner, String id, Object object, long expiration, short operation)
+ throws StoreWriteException {
+ if (!(object instanceof Serializable)) {
+ String name = object.getClass().getName();
+ throw new StoreWriteException("Type must implement serializable", name, id);
+ }
+ Serializable serializable = (Serializable) object;
+ String name = owner.getUri().toString();
+ List<byte[]> bytes;
+ try {
+ bytes = partition(serialize(serializable), blockSize);
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ }
+ try {
+ byte[] header = serializeHeader(operation, bytes.size(), name, id, expiration);
+ long headerKey = journal.writeHeader(header, false);
+ // write chunked records in non-forced mode except last one
+ byte[] recordId = serializeRecordId(name, id);
+ long[] keys = new long[bytes.size() + 1];
+ keys[0] = headerKey;
+ for (int i = 0; i < bytes.size() - 1; i++) {
+ byte[] chunk = bytes.get(i);
+ keys[i + 1] = journal.writeBlock(chunk, recordId, false);
+ }
+ // add last record using a forced write
+ journal.writeBlock(bytes.get(bytes.size() - 1), recordId, true);
+ RecordKey key = new RecordKey(id, owner);
+ // add to the entry in the cache
+ cache.put(key, new RecordEntry(serializable, operation, keys, expiration));
+ } catch (IOException e) {
+ throw new StoreWriteException(e);
+ }
+ }
+
+ /**
+ * Handles log overflow events
+ */
+ private class JournalLogEventListener implements LogEventListener {
+
+ public void logOverflowNotification(long overflowFence) {
+ assert overflowFence != 0;
+ long newMark = Long.MAX_VALUE;
+ long now = System.currentTimeMillis();
+ try {
+ for (Map.Entry<RecordKey, RecordEntry> entry : cache.entrySet()) {
+ RecordEntry record = entry.getValue();
+ long[] journalKey = record.getJournalKeys();
+ if (journalKey[0] > overflowFence) {
+ // do not copy to new log but check if this is the new mark
+ if (journalKey[0] < newMark) {
+ newMark = journalKey[0];
+ }
+ continue;
+ }
+ if (entry.getValue().getExpiration() > now) {
+ // copy the blocks associayed with the record to the new journal only if it is not expired
+ for (long key : journalKey) {
+ LogRecord lrecord = journal.get(null, key);
+ journal.put(lrecord.getFields(), false);
+
+ }
+ }
+ }
+ if (newMark == Long.MAX_VALUE) {
+ newMark = overflowFence;
+ }
+ // force write
+ journal.mark(newMark, true);
+ } catch (LogRecordSizeException e) {
+ monitor.error(e);
+ } catch (LogFileOverflowException e) {
+ monitor.error(e);
+ } catch (LogConfigurationException e) {
+ monitor.error(e);
+ } catch (IOException e) {
+ monitor.error(e);
+ } catch (InvalidLogBufferException e) {
+ monitor.error(e);
+ } catch (LogClosedException e) {
+ monitor.error(e);
+ } catch (LogException e) {
+ monitor.error(e);
+ } catch (InterruptedException e) {
+ monitor.error(e);
+ }
+ }
+
+ public boolean isLoggable(int level) {
+ return false;
+ }
+
+ public void log(int level, String message) {
+
+ }
+
+ public void log(int level, String message, Throwable thrown) {
+
+ }
+ }
+
+ /**
+ * Periodically scans the instance cache, clearing out expired entries
+ */
+ private class Reaper implements Runnable {
+
+ public void run() {
+ try {
+ synchronized (shutdownLock) {
+ boolean force = false;
+ long now = System.currentTimeMillis();
+ long oldest = Long.MAX_VALUE;
+ for (Map.Entry<RecordKey, RecordEntry> entry : cache.entrySet()) {
+ RecordKey key = entry.getKey();
+ RecordEntry record = entry.getValue();
+ if (record.getExpiration() <= now) {
+ try {
+ String ownerName = key.getOwner().getUri().toString();
+ String id = key.getId();
+ byte[] header =
+ SerializationHelper.serializeHeader(Header.DELETE, 0, ownerName, id, Store.NEVER);
+ journal.writeHeader(header, false);
+ // notify listeners
+ SCAObject owner = key.getOwner();
+ Object instance = record.getObject();
+ // notify listeners of the expiration
+ StoreExpirationEvent event = new StoreExpirationEvent(this, owner, instance);
+ publish(event);
+ } catch (IOException e) {
+ monitor.error(e);
+ } catch (StoreWriteException e) {
+ monitor.error(e);
+ }
+ force = true;
+ cache.remove(entry.getKey()); // semantics of ConcurrentHashMap allow this
+ } else {
+ if (record.getJournalKeys()[0] < oldest) {
+ oldest = record.getJournalKeys()[0];
+ }
+ }
+ }
+ if (force) {
+ // perform a forced write and update the journal mark
+ journal.mark(oldest, true);
+ }
+ }
+ } catch (IOException e) {
+ monitor.error(e);
+ } catch (LogClosedException e) {
+ monitor.error(e);
+ } catch (InvalidLogKeyException e) {
+ monitor.error(e);
+ } catch (InterruptedException e) {
+ monitor.error(e);
+ }
+ }
+ }
+
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java
new file mode 100644
index 0000000000..275bad21f7
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import org.apache.tuscany.spi.services.store.StoreException;
+
+/**
+ * Denotes an error setting a property on the journal store
+ *
+ * @version $Rev$ $Date$
+ */
+public class JournalStorePropertyException extends StoreException {
+
+ public JournalStorePropertyException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java
new file mode 100644
index 0000000000..28fa9c43ac
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.Serializable;
+
+/**
+ * A journal store cache entry for a record
+ *
+ * @version $Rev$ $Date$
+ */
+public class RecordEntry {
+ private Serializable object;
+ private short operation;
+ private long expiration;
+ private long[] journalKeys;
+
+
+ /**
+ * Creates a new cached record
+ *
+ * @param object the object to serialize
+ * @param operation an <code>INSERT</code> or <code>UPDATE</code> operation
+ * @param journalKey the journal key for the block where the record is written
+ */
+ public RecordEntry(Serializable object, short operation, long[] journalKey, long expiration) {
+ assert object != null;
+ this.object = object;
+ this.operation = operation;
+ this.journalKeys = journalKey;
+ this.expiration = expiration;
+ }
+
+ /**
+ * Returns the object
+ *
+ * @return the object
+ */
+ public Serializable getObject() {
+ return object;
+ }
+
+ /**
+ * Returns the type of operation
+ *
+ * @return the type of operation
+ */
+ public short getOperation() {
+ return operation;
+ }
+
+ public long getExpiration() {
+ return expiration;
+ }
+
+ public long[] getJournalKeys() {
+ return journalKeys;
+ }
+
+ public void setJournalKeys(long[] journalKeys) {
+ this.journalKeys = journalKeys;
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java
new file mode 100644
index 0000000000..7293fddb55
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import org.apache.tuscany.spi.component.SCAObject;
+
+/**
+ * Used by the store cache to retrieve record entries
+ *
+ * @version $Rev$ $Date$
+ */
+public class RecordKey {
+
+ private String id;
+ private SCAObject owner;
+
+ public RecordKey(String id, SCAObject owner) {
+ this.id = id;
+ this.owner = owner;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public SCAObject getOwner() {
+ return owner;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final RecordKey recordKey = (RecordKey) o;
+ if (id != null ? !id.equals(recordKey.id) : recordKey.id != null) {
+ return false;
+ }
+ return !(owner != null ? !owner.getUri().equals(recordKey.owner.getUri()) :
+ recordKey.owner != null);
+ }
+
+ public int hashCode() {
+ int result;
+ result = (id != null ? id.hashCode() : 0);
+ result = 31 * result + (owner != null ? owner.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java
new file mode 100644
index 0000000000..0a7197b014
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tuscany.spi.component.SCAExternalizable;
+import org.apache.tuscany.spi.component.WorkContext;
+
+/**
+ * Utility methods for processing journal records
+ *
+ * @version $Rev$ $Date$
+ */
+public final class SerializationHelper {
+
+ private SerializationHelper() {
+ }
+
+ /**
+ * Serializes and object
+ *
+ * @param serializable the object to serialize
+ * @throws IOException
+ */
+ public static byte[] serialize(Serializable serializable) throws IOException {
+ ByteArrayOutputStream bas = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bas);
+ out.writeObject(serializable);
+ return bas.toByteArray();
+ }
+
+ /**
+ * Deserializes an object using the TCCL
+ *
+ * @param bytes the serialized object byte array
+ * @param workContext the current work context
+ */
+ public static Object deserialize(byte[] bytes, WorkContext workContext) throws IOException, ClassNotFoundException {
+ Object o = new TCCLObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
+ if (o instanceof SCAExternalizable) {
+ SCAExternalizable externalizable = (SCAExternalizable) o;
+ externalizable.setWorkContext(workContext);
+ externalizable.reactivate();
+ }
+ return o;
+ }
+
+ /**
+ * Breaks a byte array into a series of blocks of the given size
+ *
+ * @param bytes the byte array to partition
+ * @param size the partition size
+ */
+ public static List<byte[]> partition(byte[] bytes, int size) {
+ assert size > 0;
+ List<byte[]> list = new ArrayList<byte[]>();
+ int pos = 0;
+ while (pos < bytes.length) {
+ if (pos + size > bytes.length) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ for (int i = pos; i < bytes.length; i++) {
+ stream.write(bytes[i]);
+ }
+ byte[] partition = stream.toByteArray();
+ list.add(partition);
+ pos = pos + partition.length;
+ } else {
+ byte[] partition = new byte[size];
+ for (int i = 0; i < size; i++) {
+ partition[i] = bytes[pos + i];
+ }
+ list.add(partition);
+ pos = pos + size;
+ }
+ }
+ return list;
+ }
+
+ /**
+ * Creates a serialized header entry that may be written to a log
+ *
+ * @param operation the operation type, i.e. {@link Header#INSERT}, {@link Header#UPDATE}, or {@link
+ * JournalStore#DELETE}
+ * @param numRecords the number of blocks that the record will be written two excluding the header block
+ * @param ownerId the id of the owner of the record
+ * @param id the id of the record unique to the owner
+ * @param expiration the record expirtation time in milliseconds
+ * @return a byte array containing the serialized header
+ * @throws IOException
+ */
+ public static byte[] serializeHeader(short operation, int numRecords, String ownerId, String id, long expiration)
+ throws IOException {
+ ByteArrayOutputStream stream = null;
+ ObjectOutputStream ostream = null;
+ try {
+ stream = new ByteArrayOutputStream();
+ ostream = new ObjectOutputStream(stream);
+ ostream.writeShort(operation);
+ ostream.writeInt(numRecords);
+ ostream.writeObject(ownerId);
+ ostream.writeObject(id);
+ ostream.writeLong(expiration);
+ ostream.flush();
+ return stream.toByteArray();
+ } finally {
+ if (ostream != null) {
+ try {
+ ostream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * Serializes a unique record id consisting of the owner id and the record's UUID
+ *
+ * @param ownerId the id of the owner, typically an SCAObject canonical name
+ * @param id the id associated with the record
+ * @return the serialized record byte array
+ * @throws IOException
+ */
+ public static byte[] serializeRecordId(String ownerId, String id)
+ throws IOException {
+ ByteArrayOutputStream stream = null;
+ ObjectOutputStream ostream = null;
+ try {
+ stream = new ByteArrayOutputStream();
+ ostream = new ObjectOutputStream(stream);
+ ostream.writeObject(ownerId);
+ ostream.writeObject(id);
+ ostream.flush();
+ return stream.toByteArray();
+ } finally {
+ if (ostream != null) {
+ try {
+ ostream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * Deserializes the given header. The first element from {@link Header#getFields()} is assumed to contain the header
+ * byte array to deserialize from
+ *
+ * @param header the header to deserialize
+ * @return the deserialized header
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public static Header deserializeHeader(Header header) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bas = null;
+ ObjectInputStream stream = null;
+ try {
+ bas = new ByteArrayInputStream(header.getFields()[0]);
+ stream = new TCCLObjectInputStream(bas);
+ header.setOperation(stream.readShort());
+ header.setNumBlocks(stream.readInt());
+ header.setOwnerId((String) stream.readObject());
+ header.setId((String) stream.readObject());
+ header.setExpiration(stream.readLong());
+ return header;
+ } finally {
+ if (bas != null) {
+ try {
+ bas.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+
+ }
+
+}
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java
new file mode 100644
index 0000000000..b341f9f9b9
--- /dev/null
+++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.persistence.store.journal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * Deserializes an object based on the thread context classloader
+ *
+ * @version $Rev$ $Date$
+ */
+public class TCCLObjectInputStream extends ObjectInputStream {
+ private final ClassLoader classLoader;
+
+ public TCCLObjectInputStream(InputStream in) throws IOException, SecurityException {
+ super(in);
+ this.classLoader = Thread.currentThread().getContextClassLoader();
+ }
+
+ protected Class resolveClass(ObjectStreamClass streamClass) throws IOException, ClassNotFoundException {
+ return classLoader.loadClass(streamClass.getName());
+ }
+}