diff options
Diffstat (limited to 'sandbox/old/contrib/persistence/store.journal/src')
25 files changed, 2561 insertions, 0 deletions
diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java new file mode 100644 index 0000000000..df6d3452e2 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/CacheEventListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +/** + * A listener interface for cache events + * + * @version $Rev$ $Date$ + */ +public interface CacheEventListener { + + /** + * Callback when an entry is evicted from the cache + */ + void onEviction(RecordKey key, RecordEntry entry); + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java new file mode 100644 index 0000000000..f2cae6218a --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Header.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +/** + * A header entry for a record written to the log. A header contains the following information: + * <pre> + * <ul> + * <li><strong>Owner id</strong> - the unique id of the owner, typically the SCAObject canonical name + * <li><strong>Operation</strong> - if the record is an INSERT, UPDATE, or DELETE + * <li><strong>Most Significant bits</strong> - the most signficant bits of the <code>java.util.UUID</code> + * representing the id of the persisted instance + * <li><strong>Least Significant bits</strong> - the least signficant bits of the <code>java.util.UUID</code> + * representing the id of the persisted instance + * <li><strong>Expiration</strong> - the expiration time in milliseconds when the record is set to expire + * <li><strong>Number of blocks</strong> - the number of blocks the record is written across + * <li><strong>Fields</strong> - any other data associated with the header + * </ul> + * </pre> + * + * @version $Rev$ $Date$ + */ +public class Header { + public static final short INSERT = 0; + public static final short UPDATE = 1; + public static final short DELETE = 2; + + protected byte[][] fields; + private short operation; + private String ownerId; + private String id; + private long expiration; + private int numBlocks; + + public Header() { + } + + /** + * Returns the record operation type + * + * @return the record operation type + */ + public short getOperation() { + return operation; + } + + /** + * Sets the record operation type + */ + public void setOperation(short operation) { + this.operation = operation; + } + + /** + * Returns the number of blocks the record is written across + * + * @return the number of blocks the record is written across + */ + public int getNumBlocks() { + return numBlocks; + } + + /** + * Sets the number of blocks the record is written across + */ + public void setNumBlocks(int numBlocks) { + this.numBlocks = numBlocks; + } + + /** + * Returns the unique owner id + * + * @return the unique owner id + */ + public String getOwnerId() { + return ownerId; + } + + /** + * Sets the unique owner id + */ + public void setOwnerId(String ownerId) { + this.ownerId = ownerId; + } + + /** + * Returns the record id + * + * @return the record id + */ + public String getId() { + return id; + } + + /** + * Sets the most significant bits of the record UUID + */ + public void setId(String id) { + this.id = id; + } + + /** + * Returns the time in milliseconds the record is set to expire + * + * @return the time in milliseconds the record is set to expire + */ + public long getExpiration() { + return expiration; + } + + /** + * Sets the time the record is set to expire in milliseconds + */ + public void setExpiration(long expiration) { + this.expiration = expiration; + } + + /** + * Returns additional header data + * + * @return additional header data + */ + public byte[][] getFields() { + return fields; + } + + /** + * Sets additional header data + */ + public void setFields(byte[][] fields) { + this.fields = fields; + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java new file mode 100644 index 0000000000..b8dfd072c5 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/Journal.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.IOException; + +import org.apache.tuscany.spi.services.store.StoreWriteException; + +import org.objectweb.howl.log.Configuration; +import org.objectweb.howl.log.LogClosedException; +import org.objectweb.howl.log.LogFileOverflowException; +import org.objectweb.howl.log.LogRecordSizeException; +import org.objectweb.howl.log.Logger; + +/** + * Extends the HOWL logger implementation adding convenience methods for processing records + * + * @version $Rev$ $Date$ + */ +public class Journal extends Logger { + public static final short HEADER = 0; + public static final short RECORD = 1; + + public Journal() throws IOException { + } + + public Journal(Configuration config) throws IOException { + super(config); + } + + /** + * Writes a header record to the log. The format of the header is defined by {@link + * SerializationHelper#createHeader(short, int, String, String, long)} + * + * @param bytes the record as a byte array + * @param force true if the disk write should be forced + * @return the log entry key + * @throws StoreWriteException + */ + public long writeHeader(byte[] bytes, boolean force) throws StoreWriteException { + try { + return put(HEADER, new byte[][]{bytes}, force); + } catch (LogRecordSizeException e) { + throw new StoreWriteException(e); + } catch (LogFileOverflowException e) { + throw new StoreWriteException(e); + } catch (IOException e) { + throw new StoreWriteException(e); + } catch (LogClosedException e) { + throw new StoreWriteException(e); + } catch (InterruptedException e) { + throw new StoreWriteException(e); + } + } + + /** + * Writes a record block to the log. The block may be the complete number of bytes or a chunked part of thereof if + * it does not fit within HOWL's block size limit. + * + * @param data the data to write + * @param id the unique record id consisting of the owner id and the UUID + * @param force true if the disk write should be force. For records that are written in multiple blocks, only the + * last write should be forced. + * @return the log entry key + * @throws StoreWriteException + */ + public long writeBlock(byte[] data, byte[] id, boolean force) throws StoreWriteException { + try { + return put(RECORD, new byte[][]{id, data}, force); + } catch (LogRecordSizeException e) { + throw new StoreWriteException(e); + } catch (LogFileOverflowException e) { + throw new StoreWriteException(e); + } catch (IOException e) { + throw new StoreWriteException(e); + } catch (LogClosedException e) { + throw new StoreWriteException(e); + } catch (InterruptedException e) { + throw new StoreWriteException(e); + } + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java new file mode 100644 index 0000000000..910598fc61 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalIinitializationException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import org.apache.tuscany.spi.services.store.StoreException; + +/** + * Denotes an error starting the journal + * + * @version $Rev$ $Date$ + */ +public class JournalIinitializationException extends StoreException { + + public JournalIinitializationException(Throwable cause) { + super(cause); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java new file mode 100644 index 0000000000..ec2e491c1d --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalRecord.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +/** + * Represents a record read from the log + * + * @version $Rev$ $Date$ + */ +public class JournalRecord { + + private Header header; + private byte[] data; + + public JournalRecord(byte[] data) { + this.data = data; + } + + /** + * Returns the record header + * + * @return the record header + */ + public Header getHeader() { + return header; + } + + /** + * Sets the record header + */ + public void setHeader(Header header) { + this.header = header; + } + + /** + * Returns the serialized data portion of the record which may be assembled from multiple blocks + * + * @return the serialized data portion of the record which may be assembled from multiple blocks + */ + public byte[] getData() { + return data; + } + + /** + * Sets the serialized data portion of the record which may be written across multiple blocks + */ + public void setData(byte[] data) { + this.data = data; + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java new file mode 100644 index 0000000000..945a77753a --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalShutdownException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import org.apache.tuscany.spi.services.store.StoreException; + +/** + * Denotes an error shutting down the journal + * + * @version $Rev$ $Date$ + */ +public class JournalShutdownException extends StoreException { + + public JournalShutdownException(Throwable cause) { + super(cause); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java new file mode 100644 index 0000000000..422c074790 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStore.java @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import org.osoa.sca.annotations.Constructor; +import org.osoa.sca.annotations.Destroy; +import org.osoa.sca.annotations.EagerInit; +import org.osoa.sca.annotations.Init; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Service; + +import org.apache.tuscany.spi.component.SCAObject; +import org.apache.tuscany.spi.event.AbstractEventPublisher; +import org.apache.tuscany.spi.services.store.RecoveryListener; +import org.apache.tuscany.spi.services.store.Store; +import org.apache.tuscany.spi.services.store.StoreExpirationEvent; +import org.apache.tuscany.spi.services.store.StoreMonitor; +import org.apache.tuscany.spi.services.store.StoreReadException; +import org.apache.tuscany.spi.services.store.StoreWriteException; + +import org.apache.tuscany.api.annotation.Monitor; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.partition; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serialize; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeHeader; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId; +import org.objectweb.howl.log.Configuration; +import org.objectweb.howl.log.InvalidFileSetException; +import org.objectweb.howl.log.InvalidLogBufferException; +import org.objectweb.howl.log.InvalidLogKeyException; +import org.objectweb.howl.log.LogClosedException; +import org.objectweb.howl.log.LogConfigurationException; +import org.objectweb.howl.log.LogEventListener; +import org.objectweb.howl.log.LogException; +import org.objectweb.howl.log.LogFileOverflowException; +import org.objectweb.howl.log.LogRecord; +import org.objectweb.howl.log.LogRecordSizeException; + +/** + * A journal-based store service that uses HOWL for reliable persistence and recovery of object instances. Insert, + * update, and delete operations, as well as object instances, are written to a binary log. Delete operations are + * written as a single header block as defined by {@link SerializationHelper#serializeHeader(short,int,String,String, + *long)}. Insert and update operations are written using multiple blocks consisting of at least one header and 1..n + * additional blocks containing the object byte array. If the byte array size is greater than the log block size (the + * HOWL default is 4K), it must be partitioned into smaller units using {@link SerializationHelper#partition(byte[], + *int)} and written across multiple blocks. The header contains the number of ensuing blocks a record occupies. Since + * block writes to the log may be interleaved, blocks for a given record may not be consecutive. In order to identify + * the record a block belongs to, the first byte array of written data for the block contains the serialized owner id + * and UUID associated with the record. + * <p/> + * A cache of all active persisted instances is maintained in memory and is used for read operations. When an instance + * is persisted, a log record is written and an entry is added to the cache containing the instance. When an instance is + * deleted, a delete record is written to the log and its corresponding cache entry is removed. During recovery, the + * instance cache is reconstituted by reading the log and deserializing active records. + * <p/> + * The journal store is designed for reliable, performant persistence and therefore avoids the overhead associated with + * using a relational database. This does, however, impose two configuration requirements. The first is that a log file + * must be larger than the size of all <bold>active</bold> instances in the runtime. As a log file begins to fill, the + * store will copy active records (expired records will be ignored) to a new file and the old log will be recycled. A + * second requirement is that all active instances must fit within the memory limits of the runtime JVM since the store + * does not perform passivation and a table of all active instances is maintained for querying. + * + * @version $Rev$ $Date$ + */ +@Service(Store.class) +@EagerInit +public class JournalStore extends AbstractEventPublisher implements Store { + private static final int UNITIALIZED = -99; + + // the cache of active records + private ConcurrentHashMap<RecordKey, RecordEntry> cache; + // private ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Object shutdownLock = new Object(); + // TODO integrate with a core threading scheme + private ScheduledExecutorService scheduler; + private StoreMonitor monitor; + private Journal journal; + private long defaultExpirationOffset = 600000; // 10 minutes + private long reaperInterval = 1000; + private int blockSize = 4096; // 4k standard for HOWL + + private Configuration config; + // HOWL properties - cf. <code>org.objectweb.howl.log.Configuration</code> + private boolean checksumEnabled; + private int bufferSize = UNITIALIZED; + private String bufferClassName; + private int maxBuffers = UNITIALIZED; + private int minBuffers = UNITIALIZED; + private int flushSleepTime = UNITIALIZED; + private boolean flushPartialBuffers; + private int threadsWaitingForceThreshold = UNITIALIZED; + private int maxBlocksPerFile = UNITIALIZED; + private int maxLogFiles = UNITIALIZED; + private String logFileDir = "../stores"; + private String logFileExt = "store"; + private String logFileName = "tuscany"; + private String logFileMode; + // end HOWL properties + + /** + * Creates a new store instance + * + * @param monitor the monitor for recording store events + */ + @Constructor + public JournalStore(@Monitor StoreMonitor monitor) { + this.monitor = monitor; + config = new Configuration(); + } + + /** + * Protected constructor for unit testing + * + * @param monitor the monitor for recording store events + * @param journal the journal the store should write to, typically a mock + */ + protected JournalStore(StoreMonitor monitor, Journal journal) { + this.monitor = monitor; + this.journal = journal; + config = new Configuration(); + } + + public int getBlockSize() { + return blockSize; + } + + /** + * Returns the maximum default expiration offset for records in the store + * + * @return the maximum default expiration offset for records in the store + */ + @Property + public long getExpirationOffset() { + return defaultExpirationOffset; + } + + /** + * Sets the maximum default expiration offset for records in the store + */ + public void setDefaultExpirationOffset(long defaultExpirationOffset) { + this.defaultExpirationOffset = defaultExpirationOffset; + } + + @Property + public void setBlockSize(int blockSize) { + this.blockSize = blockSize; + } + + public boolean isChecksumEnabled() { + return checksumEnabled; + } + + @Property + public void setChecksumEnabled(boolean val) { + config.setChecksumEnabled(val); + this.checksumEnabled = val; + } + + public int getBufferSize() { + return bufferSize; + } + + @Property + public void setBufferSize(int bufferSize) throws JournalStorePropertyException { + try { + config.setBufferSize(bufferSize); + this.bufferSize = bufferSize; + } catch (LogConfigurationException e) { + throw new JournalStorePropertyException(e); + } + } + + public String getBufferClassName() { + return bufferClassName; + } + + @Property + public void setBufferClassName(String bufferClassName) { + config.setBufferClassName(bufferClassName); + this.bufferClassName = bufferClassName; + } + + public int getMaxBuffers() { + return maxBuffers; + } + + @Property + public void setMaxBuffers(int maxBuffers) throws JournalStorePropertyException { + try { + config.setMaxBuffers(maxBuffers); + this.maxBuffers = maxBuffers; + } catch (LogConfigurationException e) { + throw new JournalStorePropertyException(e); + } + } + + public int getMinBuffers() { + return minBuffers; + } + + @Property + public void setMinBuffers(int minBuffers) throws JournalStorePropertyException { + try { + config.setMinBuffers(minBuffers); + this.minBuffers = minBuffers; + } catch (LogConfigurationException e) { + throw new JournalStorePropertyException(e); + } + } + + public int getFlushSleepTime() { + return flushSleepTime; + } + + @Property + public void setFlushSleepTime(int time) { + config.setFlushSleepTime(time); + this.flushSleepTime = time; + } + + public boolean isFlushPartialBuffers() { + return flushPartialBuffers; + } + + @Property + public void setFlushPartialBuffers(boolean val) { + config.setFlushPartialBuffers(val); + this.flushPartialBuffers = val; + } + + public int getThreadsWaitingForceThreshold() { + return threadsWaitingForceThreshold; + } + + @Property + public void setThreadsWaitingForceThreshold(int threshold) { + config.setThreadsWaitingForceThreshold(threshold); + this.threadsWaitingForceThreshold = threshold; + } + + public int getMaxBlocksPerFile() { + return maxBlocksPerFile; + } + + @Property + public void setMaxBlocksPerFile(int maxBlocksPerFile) { + config.setMaxBlocksPerFile(maxBlocksPerFile); + this.maxBlocksPerFile = maxBlocksPerFile; + } + + public int getMaxLogFiles() { + return maxLogFiles; + } + + @Property + public void setMaxLogFiles(int maxLogFiles) { + config.setMaxLogFiles(maxLogFiles); + this.maxLogFiles = maxLogFiles; + } + + public String getLogFileDir() { + return logFileDir; + } + + @Property + public void setLogFileDir(String dir) { + config.setLogFileDir(dir); + this.logFileDir = dir; + } + + public String getLogFileExt() { + return logFileExt; + } + + @Property + public void setLogFileExt(String logFileExt) { + config.setLogFileExt(logFileExt); + this.logFileExt = logFileExt; + } + + public String getLogFileName() { + return logFileName; + } + + @Property + public void setLogFileName(String logFileName) { + config.setLogFileName(logFileName); + this.logFileName = logFileName; + } + + public String getLogFileMode() { + return logFileMode; + } + + public void setLogFileMode(String logFileMode) throws JournalStorePropertyException { + try { + config.setLogFileMode(logFileMode); + this.logFileMode = logFileMode; + } catch (LogConfigurationException e) { + throw new JournalStorePropertyException(e); + } + } + + /** + * Initializes the store by opening the journal and starting the checkpoint daemon + * + * @throws JournalIinitializationException + * + */ + @Init + public void init() throws JournalIinitializationException { + try { + cache = new ConcurrentHashMap<RecordKey, RecordEntry>(); + if (journal == null) { + config.setLogFileName(logFileName); + config.setLogFileExt(logFileExt); + config.setLogFileDir(logFileDir); + journal = new Journal(config); + } + journal.setLogEventListener(new JournalLogEventListener()); + journal.open(); + scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleWithFixedDelay(new Reaper(), reaperInterval, reaperInterval, MILLISECONDS); + monitor.start("Started journal store"); + } catch (IOException e) { + throw new JournalIinitializationException(e); + } catch (LogConfigurationException e) { + throw new JournalIinitializationException(e); + } catch (InvalidLogBufferException e) { + throw new JournalIinitializationException(e); + } catch (InterruptedException e) { + throw new JournalIinitializationException(e); + } catch (InvalidFileSetException e) { + throw new JournalIinitializationException(e); + } + } + + /** + * Performs an orderly close of the journal and checkpoint daemon + * + * @throws JournalShutdownException + */ + @Destroy + public void destroy() throws JournalShutdownException { + try { + // avoid potential deadlock by acquiring write lock since the reaper thread can block when calling a + // synchronized journal method (e.g. journal.put) since journal.close() below holds a synchronization lock. + // This deadlock will prevent the scheduler.shutdown from completing, hanging this store shutdown + synchronized (shutdownLock) { + scheduler.shutdown(); + journal.close(); + monitor.stop("Stopped journal store"); + } + } catch (IOException e) { + throw new JournalShutdownException(e); + } catch (InterruptedException e) { + throw new JournalShutdownException(e); + } + } + + public void insertRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException { + write(owner, id, object, expiration, Header.INSERT); + } + + public void updateRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException { + write(owner, id, object, expiration, Header.UPDATE); + } + + public Object readRecord(SCAObject owner, String id) throws StoreReadException { + RecordEntry record; + RecordKey key = new RecordKey(id, owner); + record = cache.get(key); + if (record == null) { + return null; + } + if (record.getExpiration() != Store.NEVER && record.getExpiration() < System.currentTimeMillis()) { + cache.remove(key); + } + return record.getObject(); + } + + public void removeRecord(SCAObject owner, String id) throws StoreWriteException { + try { + journal.writeHeader(serializeHeader(Header.DELETE, 0, owner.getUri().toString(), id, NEVER), true); + RecordKey key = new RecordKey(id, owner); + // remove from the cache + cache.remove(key); + } catch (IOException e) { + throw new StoreWriteException(e); + } + + } + + public void removeRecords() throws StoreWriteException { + + } + + public void recover(RecoveryListener listener) throws StoreReadException { + } + + /** + * Writes a record to the log. If a record needs to be written in multiple blocks, only the last block write is + * forced. + * + * @param owner + * @param id + * @param object + * @param expiration + * @param operation + * @throws StoreWriteException + */ + private void write(SCAObject owner, String id, Object object, long expiration, short operation) + throws StoreWriteException { + if (!(object instanceof Serializable)) { + String name = object.getClass().getName(); + throw new StoreWriteException("Type must implement serializable", name, id); + } + Serializable serializable = (Serializable) object; + String name = owner.getUri().toString(); + List<byte[]> bytes; + try { + bytes = partition(serialize(serializable), blockSize); + } catch (IOException e) { + throw new StoreWriteException(e); + } + try { + byte[] header = serializeHeader(operation, bytes.size(), name, id, expiration); + long headerKey = journal.writeHeader(header, false); + // write chunked records in non-forced mode except last one + byte[] recordId = serializeRecordId(name, id); + long[] keys = new long[bytes.size() + 1]; + keys[0] = headerKey; + for (int i = 0; i < bytes.size() - 1; i++) { + byte[] chunk = bytes.get(i); + keys[i + 1] = journal.writeBlock(chunk, recordId, false); + } + // add last record using a forced write + journal.writeBlock(bytes.get(bytes.size() - 1), recordId, true); + RecordKey key = new RecordKey(id, owner); + // add to the entry in the cache + cache.put(key, new RecordEntry(serializable, operation, keys, expiration)); + } catch (IOException e) { + throw new StoreWriteException(e); + } + } + + /** + * Handles log overflow events + */ + private class JournalLogEventListener implements LogEventListener { + + public void logOverflowNotification(long overflowFence) { + assert overflowFence != 0; + long newMark = Long.MAX_VALUE; + long now = System.currentTimeMillis(); + try { + for (Map.Entry<RecordKey, RecordEntry> entry : cache.entrySet()) { + RecordEntry record = entry.getValue(); + long[] journalKey = record.getJournalKeys(); + if (journalKey[0] > overflowFence) { + // do not copy to new log but check if this is the new mark + if (journalKey[0] < newMark) { + newMark = journalKey[0]; + } + continue; + } + if (entry.getValue().getExpiration() > now) { + // copy the blocks associayed with the record to the new journal only if it is not expired + for (long key : journalKey) { + LogRecord lrecord = journal.get(null, key); + journal.put(lrecord.getFields(), false); + + } + } + } + if (newMark == Long.MAX_VALUE) { + newMark = overflowFence; + } + // force write + journal.mark(newMark, true); + } catch (LogRecordSizeException e) { + monitor.error(e); + } catch (LogFileOverflowException e) { + monitor.error(e); + } catch (LogConfigurationException e) { + monitor.error(e); + } catch (IOException e) { + monitor.error(e); + } catch (InvalidLogBufferException e) { + monitor.error(e); + } catch (LogClosedException e) { + monitor.error(e); + } catch (LogException e) { + monitor.error(e); + } catch (InterruptedException e) { + monitor.error(e); + } + } + + public boolean isLoggable(int level) { + return false; + } + + public void log(int level, String message) { + + } + + public void log(int level, String message, Throwable thrown) { + + } + } + + /** + * Periodically scans the instance cache, clearing out expired entries + */ + private class Reaper implements Runnable { + + public void run() { + try { + synchronized (shutdownLock) { + boolean force = false; + long now = System.currentTimeMillis(); + long oldest = Long.MAX_VALUE; + for (Map.Entry<RecordKey, RecordEntry> entry : cache.entrySet()) { + RecordKey key = entry.getKey(); + RecordEntry record = entry.getValue(); + if (record.getExpiration() <= now) { + try { + String ownerName = key.getOwner().getUri().toString(); + String id = key.getId(); + byte[] header = + SerializationHelper.serializeHeader(Header.DELETE, 0, ownerName, id, Store.NEVER); + journal.writeHeader(header, false); + // notify listeners + SCAObject owner = key.getOwner(); + Object instance = record.getObject(); + // notify listeners of the expiration + StoreExpirationEvent event = new StoreExpirationEvent(this, owner, instance); + publish(event); + } catch (IOException e) { + monitor.error(e); + } catch (StoreWriteException e) { + monitor.error(e); + } + force = true; + cache.remove(entry.getKey()); // semantics of ConcurrentHashMap allow this + } else { + if (record.getJournalKeys()[0] < oldest) { + oldest = record.getJournalKeys()[0]; + } + } + } + if (force) { + // perform a forced write and update the journal mark + journal.mark(oldest, true); + } + } + } catch (IOException e) { + monitor.error(e); + } catch (LogClosedException e) { + monitor.error(e); + } catch (InvalidLogKeyException e) { + monitor.error(e); + } catch (InterruptedException e) { + monitor.error(e); + } + } + } + + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java new file mode 100644 index 0000000000..275bad21f7 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/JournalStorePropertyException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import org.apache.tuscany.spi.services.store.StoreException; + +/** + * Denotes an error setting a property on the journal store + * + * @version $Rev$ $Date$ + */ +public class JournalStorePropertyException extends StoreException { + + public JournalStorePropertyException(Throwable cause) { + super(cause); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java new file mode 100644 index 0000000000..28fa9c43ac --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordEntry.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.Serializable; + +/** + * A journal store cache entry for a record + * + * @version $Rev$ $Date$ + */ +public class RecordEntry { + private Serializable object; + private short operation; + private long expiration; + private long[] journalKeys; + + + /** + * Creates a new cached record + * + * @param object the object to serialize + * @param operation an <code>INSERT</code> or <code>UPDATE</code> operation + * @param journalKey the journal key for the block where the record is written + */ + public RecordEntry(Serializable object, short operation, long[] journalKey, long expiration) { + assert object != null; + this.object = object; + this.operation = operation; + this.journalKeys = journalKey; + this.expiration = expiration; + } + + /** + * Returns the object + * + * @return the object + */ + public Serializable getObject() { + return object; + } + + /** + * Returns the type of operation + * + * @return the type of operation + */ + public short getOperation() { + return operation; + } + + public long getExpiration() { + return expiration; + } + + public long[] getJournalKeys() { + return journalKeys; + } + + public void setJournalKeys(long[] journalKeys) { + this.journalKeys = journalKeys; + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java new file mode 100644 index 0000000000..7293fddb55 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/RecordKey.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import org.apache.tuscany.spi.component.SCAObject; + +/** + * Used by the store cache to retrieve record entries + * + * @version $Rev$ $Date$ + */ +public class RecordKey { + + private String id; + private SCAObject owner; + + public RecordKey(String id, SCAObject owner) { + this.id = id; + this.owner = owner; + } + + public String getId() { + return id; + } + + public SCAObject getOwner() { + return owner; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final RecordKey recordKey = (RecordKey) o; + if (id != null ? !id.equals(recordKey.id) : recordKey.id != null) { + return false; + } + return !(owner != null ? !owner.getUri().equals(recordKey.owner.getUri()) : + recordKey.owner != null); + } + + public int hashCode() { + int result; + result = (id != null ? id.hashCode() : 0); + result = 31 * result + (owner != null ? owner.hashCode() : 0); + return result; + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java new file mode 100644 index 0000000000..0a7197b014 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/SerializationHelper.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.tuscany.spi.component.SCAExternalizable; +import org.apache.tuscany.spi.component.WorkContext; + +/** + * Utility methods for processing journal records + * + * @version $Rev$ $Date$ + */ +public final class SerializationHelper { + + private SerializationHelper() { + } + + /** + * Serializes and object + * + * @param serializable the object to serialize + * @throws IOException + */ + public static byte[] serialize(Serializable serializable) throws IOException { + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bas); + out.writeObject(serializable); + return bas.toByteArray(); + } + + /** + * Deserializes an object using the TCCL + * + * @param bytes the serialized object byte array + * @param workContext the current work context + */ + public static Object deserialize(byte[] bytes, WorkContext workContext) throws IOException, ClassNotFoundException { + Object o = new TCCLObjectInputStream(new ByteArrayInputStream(bytes)).readObject(); + if (o instanceof SCAExternalizable) { + SCAExternalizable externalizable = (SCAExternalizable) o; + externalizable.setWorkContext(workContext); + externalizable.reactivate(); + } + return o; + } + + /** + * Breaks a byte array into a series of blocks of the given size + * + * @param bytes the byte array to partition + * @param size the partition size + */ + public static List<byte[]> partition(byte[] bytes, int size) { + assert size > 0; + List<byte[]> list = new ArrayList<byte[]>(); + int pos = 0; + while (pos < bytes.length) { + if (pos + size > bytes.length) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + for (int i = pos; i < bytes.length; i++) { + stream.write(bytes[i]); + } + byte[] partition = stream.toByteArray(); + list.add(partition); + pos = pos + partition.length; + } else { + byte[] partition = new byte[size]; + for (int i = 0; i < size; i++) { + partition[i] = bytes[pos + i]; + } + list.add(partition); + pos = pos + size; + } + } + return list; + } + + /** + * Creates a serialized header entry that may be written to a log + * + * @param operation the operation type, i.e. {@link Header#INSERT}, {@link Header#UPDATE}, or {@link + * JournalStore#DELETE} + * @param numRecords the number of blocks that the record will be written two excluding the header block + * @param ownerId the id of the owner of the record + * @param id the id of the record unique to the owner + * @param expiration the record expirtation time in milliseconds + * @return a byte array containing the serialized header + * @throws IOException + */ + public static byte[] serializeHeader(short operation, int numRecords, String ownerId, String id, long expiration) + throws IOException { + ByteArrayOutputStream stream = null; + ObjectOutputStream ostream = null; + try { + stream = new ByteArrayOutputStream(); + ostream = new ObjectOutputStream(stream); + ostream.writeShort(operation); + ostream.writeInt(numRecords); + ostream.writeObject(ownerId); + ostream.writeObject(id); + ostream.writeLong(expiration); + ostream.flush(); + return stream.toByteArray(); + } finally { + if (ostream != null) { + try { + ostream.close(); + } catch (IOException e) { + // ignore + } + } + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + // ignore + } + } + } + } + + /** + * Serializes a unique record id consisting of the owner id and the record's UUID + * + * @param ownerId the id of the owner, typically an SCAObject canonical name + * @param id the id associated with the record + * @return the serialized record byte array + * @throws IOException + */ + public static byte[] serializeRecordId(String ownerId, String id) + throws IOException { + ByteArrayOutputStream stream = null; + ObjectOutputStream ostream = null; + try { + stream = new ByteArrayOutputStream(); + ostream = new ObjectOutputStream(stream); + ostream.writeObject(ownerId); + ostream.writeObject(id); + ostream.flush(); + return stream.toByteArray(); + } finally { + if (ostream != null) { + try { + ostream.close(); + } catch (IOException e) { + // ignore + } + } + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + // ignore + } + } + } + } + + /** + * Deserializes the given header. The first element from {@link Header#getFields()} is assumed to contain the header + * byte array to deserialize from + * + * @param header the header to deserialize + * @return the deserialized header + * @throws IOException + * @throws ClassNotFoundException + */ + public static Header deserializeHeader(Header header) throws IOException, ClassNotFoundException { + ByteArrayInputStream bas = null; + ObjectInputStream stream = null; + try { + bas = new ByteArrayInputStream(header.getFields()[0]); + stream = new TCCLObjectInputStream(bas); + header.setOperation(stream.readShort()); + header.setNumBlocks(stream.readInt()); + header.setOwnerId((String) stream.readObject()); + header.setId((String) stream.readObject()); + header.setExpiration(stream.readLong()); + return header; + } finally { + if (bas != null) { + try { + bas.close(); + } catch (IOException e) { + // ignore + } + } + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + // ignore + } + } + } + + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java new file mode 100644 index 0000000000..b341f9f9b9 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/main/java/org/apache/tuscany/persistence/store/journal/TCCLObjectInputStream.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; + +/** + * Deserializes an object based on the thread context classloader + * + * @version $Rev$ $Date$ + */ +public class TCCLObjectInputStream extends ObjectInputStream { + private final ClassLoader classLoader; + + public TCCLObjectInputStream(InputStream in) throws IOException, SecurityException { + super(in); + this.classLoader = Thread.currentThread().getContextClassLoader(); + } + + protected Class resolveClass(ObjectStreamClass streamClass) throws IOException, ClassNotFoundException { + return classLoader.loadClass(streamClass.getName()); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreExpireTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreExpireTestCase.java new file mode 100644 index 0000000000..4e23b3ca72 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreExpireTestCase.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.net.URI; + +import org.apache.tuscany.spi.component.SCAObject; +import org.apache.tuscany.spi.event.RuntimeEventListener; +import org.apache.tuscany.spi.services.store.StoreExpirationEvent; +import org.apache.tuscany.spi.services.store.StoreMonitor; + +import junit.framework.TestCase; +import org.easymock.IAnswer; +import org.easymock.classextension.EasyMock; +import org.objectweb.howl.log.LogEventListener; + +/** + * @version $Rev$ $Date$ + */ +public class JournalStoreExpireTestCase extends TestCase { + @SuppressWarnings({"FieldCanBeLocal"}) + private JournalStore store; + private SCAObject owner; + + public void testNotifyOnExpire() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class); + Journal journal = EasyMock.createNiceMock(Journal.class); + journal.setLogEventListener(EasyMock.isA(LogEventListener.class)); + EasyMock.replay(journal); + + RuntimeEventListener listener = org.easymock.EasyMock.createMock(RuntimeEventListener.class); + listener.onEvent(org.easymock.EasyMock.isA(StoreExpirationEvent.class)); + EasyMock.expectLastCall().andStubAnswer(new IAnswer<Object>() { + public Object answer() throws Throwable { + latch.countDown(); + return null; + } + }); + org.easymock.EasyMock.replay(listener); + + store = new JournalStore(monitor, journal) { + }; + store.addListener(listener); + store.init(); + final String id = UUID.randomUUID().toString(); + store.insertRecord(owner, id, "test", 1); + if (!latch.await(10000, TimeUnit.MILLISECONDS)) { + // failed to notify listener + fail(); + } + store.destroy(); + EasyMock.verify(listener); + } + + + protected void setUp() throws Exception { + super.setUp(); + TestUtils.cleanupLog(); + owner = EasyMock.createMock(SCAObject.class); + URI uri = URI.create("foo"); + EasyMock.expect(owner.getUri()).andReturn(uri).atLeastOnce(); + EasyMock.replay(owner); + } + + protected void tearDown() throws Exception { + super.tearDown(); + TestUtils.cleanupLog(); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreInsertTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreInsertTestCase.java new file mode 100644 index 0000000000..a2cab56134 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreInsertTestCase.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.util.UUID; +import java.net.URI; + +import org.apache.tuscany.spi.component.SCAObject; +import org.apache.tuscany.spi.services.store.Store; +import org.apache.tuscany.spi.services.store.StoreMonitor; + +import junit.framework.TestCase; +import org.easymock.IAnswer; +import org.easymock.classextension.EasyMock; +import org.objectweb.howl.log.LogEventListener; + +/** + * @version $Rev$ $Date$ + */ +public class JournalStoreInsertTestCase extends TestCase { + @SuppressWarnings({"FieldCanBeLocal"}) + private JournalStore store; + private SCAObject owner; + + public void testOrderedShutdown() throws Exception { + StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class); + Journal journal = EasyMock.createMock(Journal.class); + journal.setLogEventListener(EasyMock.isA(LogEventListener.class)); + journal.open(); + journal.close(); + EasyMock.replay(journal); + store = new JournalStore(monitor, journal) { + }; + store.init(); + store.destroy(); + EasyMock.verify(journal); + } + + public void testInsertRecord() throws Exception { + StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class); + Journal journal = EasyMock.createMock(Journal.class); + journal.setLogEventListener(EasyMock.isA(LogEventListener.class)); + journal.open(); + final String id = UUID.randomUUID().toString(); + EasyMock.expect(journal.writeHeader(EasyMock.isA(byte[].class), EasyMock.eq(false))) + .andStubAnswer(new IAnswer<Long>() { + public Long answer() throws Throwable { + Header header = new Header(); + header.setFields(new byte[][]{(byte[]) EasyMock.getCurrentArguments()[0]}); + // deserialize the message to test the format is correct + SerializationHelper.deserializeHeader(header); + assertTrue("Operation not INSERT", Header.INSERT == header.getOperation()); + assertTrue("Expiration incorrect", Store.NEVER == header.getExpiration()); + assertTrue("Least significant id incorrect", + id.equals(header.getId())); + assertTrue("Most significant id incorrect", + id.equals(header.getId())); + assertTrue("Records incorrect", 1 == header.getNumBlocks()); + assertTrue("Owner id incorrect", "foo".equals(header.getOwnerId())); + return 1L; + } + }); + EasyMock.expect(journal.writeBlock(EasyMock.isA(byte[].class), EasyMock.isA(byte[].class), EasyMock.eq(true))) + .andStubAnswer(new IAnswer<Long>() { + public Long answer() throws Throwable { + byte[] payload = (byte[]) EasyMock.getCurrentArguments()[0]; + assertTrue("Block data incorrect", "test".equals(SerializationHelper.deserialize(payload, null))); + return 1L; + } + }); + journal.close(); + EasyMock.replay(journal); + store = new JournalStore(monitor, journal) { + }; + store.init(); + store.insertRecord(owner, id, "test", Store.NEVER); + store.destroy(); + EasyMock.verify(journal); + } + + /** + * Verifies that a written record will be cached. This is verified by the fact that long-term storage is never + * accessed. + */ + public void testInsertRecordCache() throws Exception { + StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class); + Journal journal = EasyMock.createMock(Journal.class); + journal.setLogEventListener(EasyMock.isA(LogEventListener.class)); + journal.open(); + final String id = UUID.randomUUID().toString(); + EasyMock.expect(journal.writeHeader(EasyMock.isA(byte[].class), EasyMock.eq(false))).andReturn(1L); + EasyMock.expect(journal.writeBlock(EasyMock.isA(byte[].class), EasyMock.isA(byte[].class), EasyMock.eq(true))) + .andReturn(1L); + journal.close(); + EasyMock.replay(journal); + store = new JournalStore(monitor, journal) { + }; + store.init(); + store.insertRecord(owner, id, "test", Store.NEVER); + assertEquals("test", store.readRecord(owner, id)); + store.destroy(); + EasyMock.verify(journal); + } + + protected void setUp() throws Exception { + super.setUp(); + TestUtils.cleanupLog(); + owner = EasyMock.createMock(SCAObject.class); + URI uri = URI.create("foo"); + EasyMock.expect(owner.getUri()).andReturn(uri).atLeastOnce(); + EasyMock.replay(owner); + } + + protected void tearDown() throws Exception { + super.tearDown(); + TestUtils.cleanupLog(); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreOverflowTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreOverflowTestCase.java new file mode 100644 index 0000000000..61a77869c1 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalStoreOverflowTestCase.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.util.UUID; +import java.net.URI; + +import org.apache.tuscany.spi.component.SCAObject; +import org.apache.tuscany.spi.services.store.StoreMonitor; + +import junit.framework.TestCase; +import org.easymock.classextension.EasyMock; + +/** + * @version $Rev$ $Date$ + */ +public class JournalStoreOverflowTestCase extends TestCase { + @SuppressWarnings({"FieldCanBeLocal"}) + private JournalStore store; + private SCAObject owner; + + /** + * Validates records are moved forward during a log overflow + * + * @throws Exception + */ + public void testOverflow() throws Exception { + StoreMonitor monitor = EasyMock.createMock(StoreMonitor.class); + store = new JournalStore(monitor); + store.setMaxBlocksPerFile(3); + store.init(); + long expire = System.currentTimeMillis() + 200; + store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire); + store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire); + store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire); + store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire); + store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire); // + store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire); + Thread.sleep(250); + store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire + 20000); + store.insertRecord(owner, UUID.randomUUID().toString(), "test", expire + 20000); + store.destroy(); + } + + + public void testOverflowAtInsertHeader() throws Exception { + + } + + public void testOverflowAtUpdateHeader() throws Exception { + + } + + public void testOverflowAtDeleteHeader() throws Exception { + + } + + public void testOverflowAtBlock() throws Exception { + + } + + protected void setUp() throws Exception { + super.setUp(); + TestUtils.cleanupLog(); + owner = EasyMock.createMock(SCAObject.class); + EasyMock.expect(owner.getUri()).andReturn(URI.create("foo")).atLeastOnce(); + EasyMock.replay(owner); + } + + protected void tearDown() throws Exception { + super.tearDown(); + TestUtils.cleanupLog(); + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalTestCase.java new file mode 100644 index 0000000000..2df83a0e74 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/JournalTestCase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.ByteArrayOutputStream; +import java.util.UUID; + +import static org.apache.tuscany.spi.services.store.Store.NEVER; + +import junit.framework.TestCase; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeHeader; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId; +import org.objectweb.howl.log.Configuration; +import org.objectweb.howl.log.LogRecord; + +/** + * @version $Rev$ $Date$ + */ +public class JournalTestCase extends TestCase { + private Journal journal; + + public void testWriteHeader() throws Exception { + String id = UUID.randomUUID().toString(); + long key = journal.writeHeader(serializeHeader(Header.INSERT, 10, "foo/bar", id, NEVER), false); + LogRecord record = journal.get(null, key); + Header header = new Header(); + header.setFields(record.getFields()); + SerializationHelper.deserializeHeader(header); + assertTrue(record.type == Journal.HEADER); + assertEquals(Header.INSERT, header.getOperation()); + assertEquals(10, header.getNumBlocks()); + assertEquals("foo/bar", header.getOwnerId()); + assertEquals(id, header.getId()); + assertEquals(NEVER, header.getExpiration()); + } + + public void testWriteRecord() throws Exception { + byte[] recordId = serializeRecordId("foo", UUID.randomUUID().toString()); + long key = journal.writeBlock("this is a test".getBytes(), recordId, true); + LogRecord record = journal.get(null, key); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + assertEquals(record.type, Journal.RECORD); + stream.write(record.getFields()[1]); + JournalRecord jrecord = new JournalRecord(stream.toByteArray()); + assertEquals("this is a test", new String(jrecord.getData())); + } + + protected void setUp() throws Exception { + super.setUp(); + TestUtils.cleanupLog(); + Configuration config = new Configuration(); + config.setLogFileDir("../stores"); + journal = new Journal(config); + journal.open(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + journal.close(); + TestUtils.cleanupLog(); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/MockSCAExternalizable.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/MockSCAExternalizable.java new file mode 100644 index 0000000000..62e8a50635 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/MockSCAExternalizable.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +import org.apache.tuscany.spi.component.ReactivationException; +import org.apache.tuscany.spi.component.SCAExternalizable; +import org.apache.tuscany.spi.component.WorkContext; + +/** + * @version $Rev$ $Date$ + */ +public class MockSCAExternalizable implements Externalizable, SCAExternalizable { + private WorkContext context; + private boolean reactivated; + + public void writeExternal(ObjectOutput out) throws IOException { + + } + + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + + } + + public boolean isReactivated() { + return reactivated; + } + + public void setWorkContext(WorkContext context) { + this.context = context; + } + + public void reactivate() throws ReactivationException { + assert context != null : "WorkContext not properly set"; + reactivated = true; + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/RecordKeyTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/RecordKeyTestCase.java new file mode 100644 index 0000000000..ddd69a1f54 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/RecordKeyTestCase.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.net.URI; + +import org.apache.tuscany.spi.component.SCAObject; + +import junit.framework.TestCase; +import org.easymock.EasyMock; + +/** + * @version $Rev$ $Date$ + */ +public class RecordKeyTestCase extends TestCase { + + public void testEquals() throws Exception { + String id = "bar"; + URI uri = URI.create("foo"); + SCAObject owner1 = EasyMock.createMock(SCAObject.class); + EasyMock.expect(owner1.getUri()).andReturn(uri); + EasyMock.replay(owner1); + SCAObject owner2 = EasyMock.createMock(SCAObject.class); + EasyMock.expect(owner2.getUri()).andReturn(uri); + EasyMock.replay(owner2); + + RecordKey key1 = new RecordKey(id, owner1); + RecordKey key2 = new RecordKey(id, owner2); + assertEquals(key1, key2); + } + + public void testNotEqualsId() throws Exception { + String id = "bar"; + SCAObject owner1 = EasyMock.createMock(SCAObject.class); + URI uri = URI.create("foo"); + EasyMock.expect(owner1.getUri()).andReturn(uri); + EasyMock.replay(owner1); + SCAObject owner2 = EasyMock.createMock(SCAObject.class); + EasyMock.expect(owner2.getUri()).andReturn(uri); + EasyMock.replay(owner2); + RecordKey key1 = new RecordKey(id, owner1); + RecordKey key2 = new RecordKey("baz", owner2); + assertFalse(key1.equals(key2)); + } + + public void testNotEqualsOwner() throws Exception { + String id = "bar"; + URI fooUri = URI.create("foo"); + SCAObject owner1 = EasyMock.createMock(SCAObject.class); + EasyMock.expect(owner1.getUri()).andReturn(fooUri); + EasyMock.replay(owner1); + SCAObject owner2 = EasyMock.createMock(SCAObject.class); + URI barUri = URI.create("bar"); + EasyMock.expect(owner2.getUri()).andReturn(barUri); + EasyMock.replay(owner2); + RecordKey key1 = new RecordKey(id, owner1); + RecordKey key2 = new RecordKey(id, owner2); + assertFalse(key1.equals(key2)); + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/SerializationHelperTestCase.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/SerializationHelperTestCase.java new file mode 100644 index 0000000000..160e4c7a23 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/SerializationHelperTestCase.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.util.List; +import java.util.UUID; + +import org.apache.tuscany.spi.component.WorkContext; +import static org.apache.tuscany.spi.services.store.Store.NEVER; + +import junit.framework.TestCase; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.deserialize; +import org.easymock.EasyMock; + +/** + * @version $Rev$ $Date$ + */ +public class SerializationHelperTestCase extends TestCase { + + public void testTwoEvenChunks() throws Exception { + byte[] bytes = "this is a test".getBytes(); + List<byte[]> chunks = SerializationHelper.partition(bytes, 7); + assertEquals(2, chunks.size()); + assertEquals("this is", new String(chunks.get(0))); + assertEquals(" a test", new String(chunks.get(1))); + } + + public void testUnevenChunks() throws Exception { + byte[] bytes = "this is a test123".getBytes(); + List<byte[]> chunks = SerializationHelper.partition(bytes, 7); + assertEquals(3, chunks.size()); + assertEquals("this is", new String(chunks.get(0))); + assertEquals(" a test", new String(chunks.get(1))); + assertEquals("123", new String(chunks.get(2))); + } + + public void testChunkSizeGreater() throws Exception { + byte[] bytes = "this is a test".getBytes(); + List<byte[]> chunks = SerializationHelper.partition(bytes, 512); + assertEquals(1, chunks.size()); + byte[] chunk = chunks.get(0); + assertEquals(14, chunk.length); + assertEquals("this is a test", new String(chunk)); + } + + public void testSerializeDeserializeNonSCAExternalizable() throws Exception { + byte[] bytes = SerializationHelper.serialize("foo"); + assertEquals("foo", deserialize(bytes, null)); + } + + public void testSerializeDeserializeSCAExternalizable() throws Exception { + byte[] bytes = SerializationHelper.serialize(new MockSCAExternalizable()); + WorkContext context = EasyMock.createNiceMock(WorkContext.class); + MockSCAExternalizable externalized = (MockSCAExternalizable) deserialize(bytes, context); + assertTrue(externalized.isReactivated()); + } + + public void testDeserializeHeader() throws Exception { + String id = UUID.randomUUID().toString(); + byte[] bytes = SerializationHelper.serializeHeader(Header.INSERT, 2, "foo", id, NEVER); + Header header = SerializationHelper.deserializeHeader(new MockHeader(bytes)); + assertEquals(Header.INSERT, header.getOperation()); + assertEquals(2, header.getNumBlocks()); + assertEquals("foo", header.getOwnerId()); + assertEquals(id, header.getId()); + assertEquals(NEVER, header.getExpiration()); + } + + private class MockHeader extends Header { + public MockHeader(byte[] bytes) { + super(); + fields = new byte[][]{bytes}; + } + } + + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/TestUtils.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/TestUtils.java new file mode 100644 index 0000000000..5fb2ebeb82 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/TestUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal; + +import java.io.File; + +/** + * JournalThroughputTest case utilities + * + * @version $Rev$ $Date$ + */ +public final class TestUtils { + + private TestUtils() { + + } + + /** + * Removes log files from disk + */ + public static void cleanupLog() { + File dir = new File("../stores"); + if (!dir.exists()) { + return; + } + for (File file : dir.listFiles()) { + file.delete(); + } + dir.delete(); + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/Foo.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/Foo.java new file mode 100644 index 0000000000..d9c7aee660 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/Foo.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal.performance; + +import java.io.Serializable; + +/** + * @version $Rev$ $Date$ + */ +@SuppressWarnings({"SerializableHasSerializationMethods"}) +public class Foo implements Serializable { + private static final long serialVersionUID = -1003664515513709814L; + protected String baz; + protected int bar; + + public Foo(String baz, int bar) { + this.baz = baz; + this.bar = bar; + } + + public String getBaz() { + return baz; + } + + public void setBaz(String baz) { + this.baz = baz; + } + + public int getBar() { + return bar; + } + + public void setBar(int bar) { + this.bar = bar; + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalStoreThroughputTest.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalStoreThroughputTest.java new file mode 100644 index 0000000000..eeb566f182 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalStoreThroughputTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal.performance; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; + +import org.apache.tuscany.spi.component.SCAObject; +import org.apache.tuscany.spi.services.store.StoreWriteException; + +import org.apache.tuscany.persistence.store.journal.JournalShutdownException; +import org.apache.tuscany.persistence.store.journal.JournalStore; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serialize; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId; +import org.apache.tuscany.persistence.store.journal.TestUtils; + +/** + * Runs a basic throughput tests on JournalStore operations + * <p/> + * TODO this should be integrated with a Maven itest-based performance harness + * + * @version $Rev$ $Date$ + */ +public class JournalStoreThroughputTest { + private static final int SIZE = 1000; + private CyclicBarrier barrier; + private JournalStore store; + private long now; + private SCAObject owner = new MockSCAObject(); + private String id = UUID.randomUUID().toString(); + private CountDownLatch latch = new CountDownLatch(1); + private long expire = System.currentTimeMillis() + 10000; + private Foo object = new Foo("this is a test", 1); + + public static void main(String[] args) throws Exception { + JournalStoreThroughputTest test = new JournalStoreThroughputTest(); + test.testAppend(); + test.latch.await(5000, TimeUnit.MILLISECONDS); + } + + public void testAppend() throws Exception { + TestUtils.cleanupLog(); + store = new JournalStore(new MockMonitor()); + store.init(); + final Thread[] threads = new Thread[SIZE]; + barrier = new CyclicBarrier(SIZE, new Runnable() { + public void run() { + try { + System.out.println("-----------------------------------------------------"); + System.out.println("JournalStore.append()"); + byte[] idBytes = serializeRecordId(owner.getUri().toString(), id); + byte[] bytes = serialize(object); + System.out.println("Approx record size :" + (bytes.length + idBytes.length)); + System.out.println("Total threads :" + barrier.getNumberWaiting()); + System.out.println("Forced writes :" + barrier.getNumberWaiting()); + System.out.println("Time:" + (System.currentTimeMillis() - now)); + store.destroy(); + latch.countDown(); + } catch (JournalShutdownException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + for (int i = 0; i < SIZE; i++) { + threads[i] = new Thread(new InsertWorker(true)); + } + now = System.currentTimeMillis(); + for (int i = 0; i < SIZE; i++) { + threads[i].start(); + } + } + + private class InsertWorker implements Runnable { + boolean forced; + + public InsertWorker(boolean forced) { + this.forced = forced; + } + + public void run() { + try { + store.insertRecord(owner, id, object, expire); + barrier.await(); + } catch (StoreWriteException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalThroughputTest.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalThroughputTest.java new file mode 100644 index 0000000000..7a3e8fa308 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/JournalThroughputTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal.performance; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; + +import org.apache.tuscany.spi.services.store.StoreWriteException; + +import org.apache.tuscany.persistence.store.journal.Journal; +import static org.apache.tuscany.persistence.store.journal.SerializationHelper.serializeRecordId; +import org.apache.tuscany.persistence.store.journal.TestUtils; + +/** + * Runs a basic throughput tests on Journal operations + * <p/> + * TODO this should be integrated with a Maven itest-based performance harness + * + * @version $Rev$ $Date$ + */ +public class JournalThroughputTest { + private static final int SIZE = 1000; + private CyclicBarrier barrier; + private Journal journal; + private byte[] bytes; + private byte[] recordId; + private long now; + private CountDownLatch latch = new CountDownLatch(1); + + public static void main(String[] args) throws Exception { + JournalThroughputTest test = new JournalThroughputTest(); + test.testForcedWrites(); + test.latch.await(5000, TimeUnit.MILLISECONDS); + test.testNonForcedWrites(); + } + + public void testForcedWrites() throws Exception { + TestUtils.cleanupLog(); + journal = new Journal(); + journal.open(); + recordId = serializeRecordId("foo", UUID.randomUUID().toString()); + bytes = "this is a test".getBytes(); + final Thread[] threads = new Thread[SIZE]; + barrier = new CyclicBarrier(SIZE, new Runnable() { + public void run() { + System.out.println("-----------------------------------------------------"); + System.out.println("Journal.writeBlock() using forced writes"); + System.out.println("Approx record size :" + (recordId.length + bytes.length)); + System.out.println("Total threads :" + barrier.getNumberWaiting()); + System.out.println("Forced writes :" + barrier.getNumberWaiting()); + System.out.println("Time:" + (System.currentTimeMillis() - now)); + try { + journal.close(); + latch.countDown(); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + for (int i = 0; i < SIZE; i++) { + threads[i] = new Thread(new Worker(true)); + } + now = System.currentTimeMillis(); + for (int i = 0; i < SIZE; i++) { + threads[i].start(); + } + } + + public void testNonForcedWrites() throws Exception { + TestUtils.cleanupLog(); + journal = new Journal(); + journal.open(); + recordId = serializeRecordId("foo", UUID.randomUUID().toString()); + bytes = "this is a test".getBytes(); + final Thread[] threads = new Thread[SIZE]; + barrier = new CyclicBarrier(SIZE, new Runnable() { + public void run() { + System.out.println("-----------------------------------------------------"); + System.out.println("Journal.writeBlock() using non-forced writes"); + System.out.println("Approx record size :" + (recordId.length + bytes.length)); + System.out.println("Total threads :" + barrier.getNumberWaiting()); + System.out.println("Forced writes :" + barrier.getNumberWaiting()); + System.out.println("Time:" + (System.currentTimeMillis() - now)); + System.out.println("-----------------------------------------------------"); + try { + journal.close(); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + for (int i = 0; i < SIZE; i++) { + threads[i] = new Thread(new Worker(false)); + } + now = System.currentTimeMillis(); + for (int i = 0; i < SIZE; i++) { + threads[i].start(); + } + } + + private class Worker implements Runnable { + boolean forced; + + public Worker(boolean forced) { + this.forced = forced; + } + + public void run() { + try { + journal.writeBlock(bytes, recordId, forced); + barrier.await(); + } catch (StoreWriteException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockMonitor.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockMonitor.java new file mode 100644 index 0000000000..a62630ebe0 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockMonitor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal.performance; + +import org.apache.tuscany.spi.services.store.StoreMonitor; + +import org.apache.tuscany.api.annotation.LogLevel; + +/** + * @version $Rev$ $Date$ + */ +public class MockMonitor implements StoreMonitor { + @LogLevel("DEBUG") + public void start(String msg) { + + } + + @LogLevel("DEBUG") + public void stop(String msg) { + + } + + @LogLevel("DEBUG") + public void beginRecover() { + + } + + @LogLevel("DEBUG") + public void endRecover() { + + } + + @LogLevel("DEBUG") + public void recover(Object recordId) { + + } + + @LogLevel("ERROR") + public void error(Throwable e) { + + } +} diff --git a/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockSCAObject.java b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockSCAObject.java new file mode 100644 index 0000000000..e9ecdbab28 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.journal/src/test/java/org/apache/tuscany/persistence/store/journal/performance/MockSCAObject.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.persistence.store.journal.performance; + +import java.net.URI; + +import org.apache.tuscany.spi.CoreRuntimeException; +import org.apache.tuscany.spi.component.SCAObject; +import org.apache.tuscany.spi.event.Event; +import org.apache.tuscany.spi.event.EventFilter; +import org.apache.tuscany.spi.event.RuntimeEventListener; + +/** + * @version $Rev$ $Date$ + */ +public class MockSCAObject implements SCAObject { + + public URI getUri() { + return null; + } + + public void publish(Event object) { + + } + + public void addListener(RuntimeEventListener listener) { + + } + + public void addListener(EventFilter filter, RuntimeEventListener listener) { + + } + + public void removeListener(RuntimeEventListener listener) { + + } + + public int getLifecycleState() { + return 0; + } + + public void start() throws CoreRuntimeException { + + } + + public void stop() throws CoreRuntimeException { + + } + +} |