diff options
Diffstat (limited to 'sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/JDBCStore.java')
-rw-r--r-- | sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/JDBCStore.java | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/JDBCStore.java b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/JDBCStore.java new file mode 100644 index 0000000000..5bd04e98da --- /dev/null +++ b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/JDBCStore.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.persistence.store.jdbc; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.sql.DataSource; + +import org.osoa.sca.annotations.Destroy; +import org.osoa.sca.annotations.EagerInit; +import org.osoa.sca.annotations.Init; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Resource; +import org.osoa.sca.annotations.Service; + +import org.apache.tuscany.spi.annotation.Autowire; +import org.apache.tuscany.spi.component.SCAObject; +import org.apache.tuscany.spi.event.AbstractEventPublisher; +import org.apache.tuscany.spi.services.store.RecoveryListener; +import org.apache.tuscany.spi.services.store.Store; +import org.apache.tuscany.spi.services.store.StoreMonitor; +import org.apache.tuscany.spi.services.store.StoreReadException; +import org.apache.tuscany.spi.services.store.StoreWriteException; + +import org.apache.tuscany.api.annotation.Monitor; + +/** + * A store implementation that uses a relational database to persist records transactionally. + * <p/> + * Note this implementation does not yet support destruction callbacks for expired. In order to support this, expired + * records must be rehydrated and deleted individually. + * + * @version $Rev$ $Date$ + */ +@Service(Store.class) +@EagerInit +public class JDBCStore extends AbstractEventPublisher implements Store { + private DataSource dataSource; + private StoreMonitor monitor; + private Converter converter; + // TODO integrate with a core threading scheme + @SuppressWarnings({"FieldCanBeLocal"}) + private ScheduledExecutorService scheduler; + private long reaperInterval = 300000; + private long defaultExpirationOffset = 600000; // 10 minutes + + //private + public JDBCStore(@Resource(mappedName = "StoreDS")DataSource dataSource, + @Autowire Converter converter, + @Monitor StoreMonitor monitor) { + this.dataSource = dataSource; + this.converter = converter; + this.monitor = monitor; + } + + /** + * Returns the maximum default expiration offset for records in the store + * + * @return the maximum default expiration offset for records in the store + */ + @Property + public long getExpirationOffset() { + return defaultExpirationOffset; + } + + /** + * Sets the maximum default expiration offset for records in the store + */ + public void setDefaultExpirationOffset(long defaultExpirationOffset) { + this.defaultExpirationOffset = defaultExpirationOffset; + } + + /** + * Sets the interval for expired entry scanning to be performed + */ + @Property + public void setReaperInterval(long reaperInterval) { + this.reaperInterval = reaperInterval; + } + + /** + * Returns the interval for expired entry scanning to be performed + */ + public long getReaperInterval() { + return reaperInterval; + } + + @Init + public void init() { + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleWithFixedDelay(new Reaper(), reaperInterval, reaperInterval, TimeUnit.MILLISECONDS); + monitor.start("JDBC store started"); + } + + @Destroy + public void destroy() { + scheduler.shutdown(); + monitor.stop("JDBC store stopped"); + } + + public void insertRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException { + if (!(object instanceof Serializable)) { + throw new NonSerializableTypeException("Type must implement serializable", owner.getCanonicalName(), id); + } + Serializable serializable = (Serializable) object; + String canonicalName = owner.getCanonicalName(); + Connection conn = null; + PreparedStatement insertStmt = null; + PreparedStatement updateStmt = null; + long now = System.currentTimeMillis(); + try { + conn = dataSource.getConnection(); + if (now < expiration || expiration == NEVER) { + PreparedStatement stmt = conn.prepareStatement(converter.getSelectUpdateSql()); + if (converter.findAndLock(stmt, canonicalName, id)) { + updateStmt = conn.prepareStatement(converter.getUpdateSql()); + converter.update(updateStmt, canonicalName, id, serializable); + } else { + insertStmt = conn.prepareStatement(converter.getInsertSql()); + converter.insert(insertStmt, canonicalName, id, expiration, serializable); + } + } + try { + if (insertStmt != null) { + insertStmt.executeUpdate(); + } + if (updateStmt != null) { + updateStmt.executeUpdate(); + } + conn.commit(); + } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException e2) { + monitor.error(e2); + } + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } + } catch (SQLException e) { + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } finally { + close(insertStmt); + close(updateStmt); + close(conn); + } + } + + public void updateRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException { + if (!(object instanceof Serializable)) { + throw new NonSerializableTypeException("Type must implement serializable", owner.getCanonicalName(), id); + } + Serializable serializable = (Serializable) object; + String canonicalName = owner.getCanonicalName(); + Connection conn = null; + PreparedStatement insertStmt = null; + PreparedStatement updateStmt = null; + try { + conn = dataSource.getConnection(); + conn = dataSource.getConnection(); + conn.setAutoCommit(false); + updateStmt = conn.prepareStatement(converter.getUpdateSql()); + converter.update(updateStmt, canonicalName, id, serializable); + try { + if (updateStmt != null) { + updateStmt.executeUpdate(); + } + conn.commit(); + } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException e2) { + monitor.error(e2); + } + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } + } catch (SQLException e) { + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } finally { + close(insertStmt); + close(updateStmt); + close(conn); + } + + } + + public Object readRecord(SCAObject owner, String id) throws StoreReadException { + Connection conn = null; + try { + conn = dataSource.getConnection(); + Object object = converter.read(conn, owner.getCanonicalName(), id); + conn.commit(); + return object; + } catch (SQLException e) { + try { + if (conn != null) { + conn.rollback(); + } + } catch (SQLException e2) { + monitor.error(e2); + } + throw new StoreReadException(owner.getName(), id, e); + } finally { + close(conn); + } + } + + public void removeRecord(SCAObject owner, String id) throws StoreWriteException { + Connection conn = null; + try { + conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(converter.getDeleteRecordSql()); + converter.delete(stmt, owner.getCanonicalName(), id); + stmt.executeUpdate(); + conn.commit(); + } catch (SQLException e) { + try { + if (conn != null) { + conn.rollback(); + } + } catch (SQLException e2) { + monitor.error(e2); + } + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } finally { + close(conn); + } + } + + public void removeRecords() throws StoreWriteException { + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = dataSource.getConnection(); + stmt = conn.prepareStatement(converter.getDeleteSql()); + stmt.executeUpdate(); + } catch (SQLException e) { + throw new StoreWriteException(e); + } finally { + close(stmt, conn); + } + } + + public void recover(RecoveryListener listener) { + throw new UnsupportedOperationException(); + } + + private void close(Connection conn) { + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + monitor.error(e); + } + } + } + + private void close(PreparedStatement stmt) { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + monitor.error(e); + } + } + } + + private void close(PreparedStatement stmt, Connection conn) { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + monitor.error(e); + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + monitor.error(e); + } + } + } + + /** + * Inner class responsible for clearing out expired entries + */ + private class Reaper implements Runnable { + public void run() { + long now = System.currentTimeMillis(); + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = dataSource.getConnection(); + stmt = conn.prepareStatement(converter.getDeleteExpiredSql()); + stmt.setLong(1, now); + stmt.executeUpdate(); + conn.commit(); + } catch (SQLException e) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e1) { + // ignore + } + } + monitor.error(e); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // ingnore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + // ignore + } + } + } + } + } + +} |