diff options
Diffstat (limited to 'sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache')
8 files changed, 994 insertions, 0 deletions
diff --git a/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/Converter.java b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/Converter.java new file mode 100644 index 0000000000..b37ae6cbe5 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/Converter.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.persistence.store.jdbc; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; + +import org.apache.tuscany.spi.services.store.StoreReadException; +import org.apache.tuscany.spi.services.store.StoreWriteException; + +/** + * Converters mediate the particularities of specific databases and JDBC drivers such as data types during read and + * write operations + * + * @version $Rev$ $Date$ + */ +public interface Converter { + + /** + * Returns the SQL statement to select a record for update + * + * @return the SQL statement to select a record for update + */ + String getSelectUpdateSql(); + + /** + * Returns the SQL statement to add a record + * + * @return the SQL statement to add a record + */ + String getInsertSql(); + + /** + * Returns the SQL statement to update a record + * + * @return the SQL statement to update a record + */ + String getUpdateSql(); + + /** + * Returns the SQL statement to retrieve a record + * + * @return the SQL statement to retrieve a record + */ + String getFindSql(); + + /** + * Returns the SQL statement to remove a record + * + * @return the SQL statement to remove a record + */ + String getDeleteSql(); + + /** + * Returns the SQL statement to remove expired records + * + * @return the SQL statement to remove expired records + */ + String getDeleteExpiredSql(); + + String getDeleteRecordSql(); + + /** + * Writes a new record to the underlying store using batch semantics. That is, the insert will be added as a batch + * operation to the prepared statment. It is the responsibility of the client (i.e. the prepared statement "owner") + * to exectute the statement when the batch threshold is reached. Note implementations must assume auto commit is + * false. + * + * @param stmt + * @param ownerId + * @param id + * @param expiration + * @param object + * @throws org.apache.tuscany.spi.services.store.StoreWriteException + * + */ + void insert(PreparedStatement stmt, String ownerId, String id, long expiration, Serializable object) + throws StoreWriteException; + + /** + * @param stmt + * @param ownerId + * @param id + * @param object + * @throws StoreWriteException + */ + void update(PreparedStatement stmt, String ownerId, String id, Serializable object) throws StoreWriteException; + + /** + * @param stmt + * @param ownerId + * @param id + * @return + * @throws StoreWriteException + */ + boolean findAndLock(PreparedStatement stmt, String ownerId, String id) throws StoreWriteException; + + /** + * Reads a record from the underlying store. Note implementations must assume auto commit is false. + * + * @param conn + * @param ownerId + * @param id + * @return + * @throws StoreReadException + */ + Object read(Connection conn, String ownerId, String id) throws StoreReadException; + + /** + * @param stmt + * @param ownerId + * @param id + * @throws StoreWriteException + */ + void delete(PreparedStatement stmt, String ownerId, String id) throws StoreWriteException; + +} diff --git a/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/JDBCStore.java b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/JDBCStore.java new file mode 100644 index 0000000000..5bd04e98da --- /dev/null +++ b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/JDBCStore.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.persistence.store.jdbc; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.sql.DataSource; + +import org.osoa.sca.annotations.Destroy; +import org.osoa.sca.annotations.EagerInit; +import org.osoa.sca.annotations.Init; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Resource; +import org.osoa.sca.annotations.Service; + +import org.apache.tuscany.spi.annotation.Autowire; +import org.apache.tuscany.spi.component.SCAObject; +import org.apache.tuscany.spi.event.AbstractEventPublisher; +import org.apache.tuscany.spi.services.store.RecoveryListener; +import org.apache.tuscany.spi.services.store.Store; +import org.apache.tuscany.spi.services.store.StoreMonitor; +import org.apache.tuscany.spi.services.store.StoreReadException; +import org.apache.tuscany.spi.services.store.StoreWriteException; + +import org.apache.tuscany.api.annotation.Monitor; + +/** + * A store implementation that uses a relational database to persist records transactionally. + * <p/> + * Note this implementation does not yet support destruction callbacks for expired. In order to support this, expired + * records must be rehydrated and deleted individually. + * + * @version $Rev$ $Date$ + */ +@Service(Store.class) +@EagerInit +public class JDBCStore extends AbstractEventPublisher implements Store { + private DataSource dataSource; + private StoreMonitor monitor; + private Converter converter; + // TODO integrate with a core threading scheme + @SuppressWarnings({"FieldCanBeLocal"}) + private ScheduledExecutorService scheduler; + private long reaperInterval = 300000; + private long defaultExpirationOffset = 600000; // 10 minutes + + //private + public JDBCStore(@Resource(mappedName = "StoreDS")DataSource dataSource, + @Autowire Converter converter, + @Monitor StoreMonitor monitor) { + this.dataSource = dataSource; + this.converter = converter; + this.monitor = monitor; + } + + /** + * Returns the maximum default expiration offset for records in the store + * + * @return the maximum default expiration offset for records in the store + */ + @Property + public long getExpirationOffset() { + return defaultExpirationOffset; + } + + /** + * Sets the maximum default expiration offset for records in the store + */ + public void setDefaultExpirationOffset(long defaultExpirationOffset) { + this.defaultExpirationOffset = defaultExpirationOffset; + } + + /** + * Sets the interval for expired entry scanning to be performed + */ + @Property + public void setReaperInterval(long reaperInterval) { + this.reaperInterval = reaperInterval; + } + + /** + * Returns the interval for expired entry scanning to be performed + */ + public long getReaperInterval() { + return reaperInterval; + } + + @Init + public void init() { + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleWithFixedDelay(new Reaper(), reaperInterval, reaperInterval, TimeUnit.MILLISECONDS); + monitor.start("JDBC store started"); + } + + @Destroy + public void destroy() { + scheduler.shutdown(); + monitor.stop("JDBC store stopped"); + } + + public void insertRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException { + if (!(object instanceof Serializable)) { + throw new NonSerializableTypeException("Type must implement serializable", owner.getCanonicalName(), id); + } + Serializable serializable = (Serializable) object; + String canonicalName = owner.getCanonicalName(); + Connection conn = null; + PreparedStatement insertStmt = null; + PreparedStatement updateStmt = null; + long now = System.currentTimeMillis(); + try { + conn = dataSource.getConnection(); + if (now < expiration || expiration == NEVER) { + PreparedStatement stmt = conn.prepareStatement(converter.getSelectUpdateSql()); + if (converter.findAndLock(stmt, canonicalName, id)) { + updateStmt = conn.prepareStatement(converter.getUpdateSql()); + converter.update(updateStmt, canonicalName, id, serializable); + } else { + insertStmt = conn.prepareStatement(converter.getInsertSql()); + converter.insert(insertStmt, canonicalName, id, expiration, serializable); + } + } + try { + if (insertStmt != null) { + insertStmt.executeUpdate(); + } + if (updateStmt != null) { + updateStmt.executeUpdate(); + } + conn.commit(); + } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException e2) { + monitor.error(e2); + } + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } + } catch (SQLException e) { + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } finally { + close(insertStmt); + close(updateStmt); + close(conn); + } + } + + public void updateRecord(SCAObject owner, String id, Object object, long expiration) throws StoreWriteException { + if (!(object instanceof Serializable)) { + throw new NonSerializableTypeException("Type must implement serializable", owner.getCanonicalName(), id); + } + Serializable serializable = (Serializable) object; + String canonicalName = owner.getCanonicalName(); + Connection conn = null; + PreparedStatement insertStmt = null; + PreparedStatement updateStmt = null; + try { + conn = dataSource.getConnection(); + conn = dataSource.getConnection(); + conn.setAutoCommit(false); + updateStmt = conn.prepareStatement(converter.getUpdateSql()); + converter.update(updateStmt, canonicalName, id, serializable); + try { + if (updateStmt != null) { + updateStmt.executeUpdate(); + } + conn.commit(); + } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException e2) { + monitor.error(e2); + } + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } + } catch (SQLException e) { + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } finally { + close(insertStmt); + close(updateStmt); + close(conn); + } + + } + + public Object readRecord(SCAObject owner, String id) throws StoreReadException { + Connection conn = null; + try { + conn = dataSource.getConnection(); + Object object = converter.read(conn, owner.getCanonicalName(), id); + conn.commit(); + return object; + } catch (SQLException e) { + try { + if (conn != null) { + conn.rollback(); + } + } catch (SQLException e2) { + monitor.error(e2); + } + throw new StoreReadException(owner.getName(), id, e); + } finally { + close(conn); + } + } + + public void removeRecord(SCAObject owner, String id) throws StoreWriteException { + Connection conn = null; + try { + conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(converter.getDeleteRecordSql()); + converter.delete(stmt, owner.getCanonicalName(), id); + stmt.executeUpdate(); + conn.commit(); + } catch (SQLException e) { + try { + if (conn != null) { + conn.rollback(); + } + } catch (SQLException e2) { + monitor.error(e2); + } + throw new StoreWriteException(owner.getCanonicalName(), id, e); + } finally { + close(conn); + } + } + + public void removeRecords() throws StoreWriteException { + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = dataSource.getConnection(); + stmt = conn.prepareStatement(converter.getDeleteSql()); + stmt.executeUpdate(); + } catch (SQLException e) { + throw new StoreWriteException(e); + } finally { + close(stmt, conn); + } + } + + public void recover(RecoveryListener listener) { + throw new UnsupportedOperationException(); + } + + private void close(Connection conn) { + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + monitor.error(e); + } + } + } + + private void close(PreparedStatement stmt) { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + monitor.error(e); + } + } + } + + private void close(PreparedStatement stmt, Connection conn) { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + monitor.error(e); + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + monitor.error(e); + } + } + } + + /** + * Inner class responsible for clearing out expired entries + */ + private class Reaper implements Runnable { + public void run() { + long now = System.currentTimeMillis(); + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = dataSource.getConnection(); + stmt = conn.prepareStatement(converter.getDeleteExpiredSql()); + stmt.setLong(1, now); + stmt.executeUpdate(); + conn.commit(); + } catch (SQLException e) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e1) { + // ignore + } + } + monitor.error(e); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // ingnore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + // ignore + } + } + } + } + } + +} diff --git a/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/NonSerializableTypeException.java b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/NonSerializableTypeException.java new file mode 100644 index 0000000000..75fdde42d0 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/NonSerializableTypeException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.persistence.store.jdbc; + +import org.apache.tuscany.spi.services.store.StoreWriteException; + +/** + * @version $Rev$ $Date$ + */ +public class NonSerializableTypeException extends StoreWriteException { + + public NonSerializableTypeException(String message, String owner, String identifier) { + super(message, owner, identifier); + } +} diff --git a/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/Record.java b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/Record.java new file mode 100644 index 0000000000..e568481e17 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/Record.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.persistence.store.jdbc; + +import java.io.Serializable; + +/** + * Represents a persistent object and its metadata. + * <p/> + * Note this class has a natural ordering that is inconsistent with equals. + * + * @version $Rev$ $Date$ + */ +public class Record implements Comparable { + public static final int INSERT = 0; + public static final int UPDATE = 1; + + private String ownerId; + private String id; + private Serializable object; + private long expiration = JDBCStore.NEVER; + private int operation; + + /** + * Creates a new record + * + * @param ownerId + * @param id the unique id of the record + * @param object the object to serialize + * @param expiration the expirary time, {@link org.apache.tuscany.spi.services.store.Store.NEVER} if there is no + * expiration + * @param operation an <code>INSERT</code> or <code>UPDATE</code> operation + */ + public Record(String ownerId, String id, Serializable object, long expiration, int operation) { + this.id = id; + this.object = object; + this.expiration = expiration; + this.operation = operation; + this.ownerId = ownerId; + } + + /** + * Returns the unique object id + * + * @return the unique object id + */ + public String getId() { + return id; + } + + /** + * Returns the object + * + * @return the object + */ + public Serializable getObject() { + return object; + } + + /** + * Sets the object to serialize + * + * @param object the object + */ + public void setObject(Serializable object) { + this.object = object; + } + + /** + * Returns the expiration time + * + * @return the expiration time + */ + public long getExpiration() { + return expiration; + } + + /** + * Returns the type of operation + * + * @return the type of operation + */ + public int getOperation() { + return operation; + } + + public String getOwnerId() { + return ownerId; + } + + public void setOwnerId(String ownerId) { + this.ownerId = ownerId; + } + + public int compareTo(Object o) { + assert o instanceof Record; + Record record = (Record) o; + if (record.getOperation() == operation) { + return 0; + } else if (record.getOperation() == INSERT) { + return 1; + } else if (record.getOperation() == UPDATE) { + return -1; + } else { + throw new AssertionError(); + } + } +} diff --git a/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/TCCLObjectInputStream.java b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/TCCLObjectInputStream.java new file mode 100644 index 0000000000..1cd18c1b24 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/TCCLObjectInputStream.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.persistence.store.jdbc; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; + +/** + * Deserializes an object based on the thread context classloader + * + * @version $Rev$ $Date$ + */ +public class TCCLObjectInputStream extends ObjectInputStream { + private final ClassLoader classLoader; + + public TCCLObjectInputStream(InputStream in) throws IOException, SecurityException { + super(in); + this.classLoader = Thread.currentThread().getContextClassLoader(); + } + + protected Class resolveClass(ObjectStreamClass streamClass) throws IOException, ClassNotFoundException { + return classLoader.loadClass(streamClass.getName()); + } +}
\ No newline at end of file diff --git a/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/converter/AbstractConverter.java b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/converter/AbstractConverter.java new file mode 100644 index 0000000000..1419c5a9fe --- /dev/null +++ b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/converter/AbstractConverter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.persistence.store.jdbc.converter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import org.apache.tuscany.spi.services.store.StoreWriteException; + +import org.apache.tuscany.service.persistence.store.jdbc.Converter; +import org.apache.tuscany.service.persistence.store.jdbc.TCCLObjectInputStream; + +/** + * Base class for <code>Converter</code> implementations + * + * @version $Rev$ $Date$ + */ +public abstract class AbstractConverter implements Converter { + public static final int OWNER = 1; + public static final int ID = 2; + public static final int EXPIRATION = 3; + public static final int DATA = 4; + public static final int OBJECT_UPDATE = 1; + public static final int OWNER_UPDATE = 2; + public static final int ID_UPDATE = 3; + + protected String findSql = "SELECT * FROM CONVERSATION_STATE WHERE OWNER = ? AND ID = ?"; + protected String insertSql = + "INSERT INTO CONVERSATION_STATE (OWNER, ID, EXPIRATION, OBJECT) VALUES (?, ?, ?, ?)"; + protected String updateSql = "UPDATE CONVERSATION_STATE SET OBJECT = ? WHERE OWNER = ? AND ID = ?"; + protected String selectUpdateSql = + "SELECT ID FROM CONVERSATION_STATE WHERE OWNER = ? AND ID = ? FOR UPDATE"; + protected String deleteSql = "DELETE FROM CONVERSATION_STATE WHERE ID = ?"; + protected String deleteExpiredSql = "DELETE FROM CONVERSATION_STATE WHERE EXPIRATION <= ?"; + protected String deleteRecordSql = "DELETE FROM CONVERSATION_STATE WHERE OWNER = ? AND ID = ?"; + + public String getInsertSql() { + return insertSql; + } + + public String getUpdateSql() { + return updateSql; + } + + public String getFindSql() { + return findSql; + } + + public String getDeleteSql() { + return deleteSql; + } + + public String getDeleteExpiredSql() { + return deleteExpiredSql; + } + + public String getSelectUpdateSql() { + return selectUpdateSql; + } + + public String getDeleteRecordSql() { + return deleteRecordSql; + } + + public boolean findAndLock(PreparedStatement stmt, String ownerId, String id) + throws StoreWriteException { + try { + stmt.setString(OWNER, ownerId); + stmt.setString(ID, id); + return stmt.executeQuery().next(); + } catch (SQLException e) { + throw new StoreWriteException(e); + } + } + + public void delete(PreparedStatement stmt, String ownerId, String id) throws StoreWriteException { + try { + stmt.setString(OWNER, ownerId); + stmt.setString(ID, id); + } catch (SQLException e) { + throw new StoreWriteException(e); + } + } + + protected byte[] serialize(Serializable serializable) throws IOException { + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bas); + out.writeObject(serializable); + return bas.toByteArray(); + } + + protected Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException { + return new TCCLObjectInputStream(new ByteArrayInputStream(bytes)).readObject(); + } + +} diff --git a/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/converter/HSQLDBConverter.java b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/converter/HSQLDBConverter.java new file mode 100644 index 0000000000..a85ab5abfa --- /dev/null +++ b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/converter/HSQLDBConverter.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.persistence.store.jdbc.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.tuscany.spi.services.store.StoreReadException; + +import org.apache.tuscany.spi.services.store.StoreWriteException; + +/** + * Performs writing and reading operations to HSQLDB + * + * @version $Rev$ $Date$ + */ +public class HSQLDBConverter extends AbstractConverter { + + // HSQLDB does not support SELECT FOR UPDATE + protected String selectUpdateSql = "SELECT ID FROM CONVERSATION_STATE WHERE OWNER = ? AND ID = ?"; + + public String getSelectUpdateSql() { + return selectUpdateSql; + } + + public void insert(PreparedStatement stmt, String ownerId, String id, long expiration, Serializable object) + throws StoreWriteException { + try { + stmt.setString(OWNER, ownerId); + stmt.setString(ID, id); + stmt.setLong(EXPIRATION, expiration); + stmt.setBytes(DATA, serialize(object)); + } catch (SQLException e) { + throw new StoreWriteException(e); + } catch (IOException e) { + throw new StoreWriteException(e); + } + } + + public void update(PreparedStatement stmt, String ownerId, String id, Serializable object) + throws StoreWriteException { + try { + stmt.setBytes(OBJECT_UPDATE, serialize(object)); + stmt.setString(OWNER_UPDATE, ownerId); + stmt.setString(ID_UPDATE, id); + } catch (SQLException e) { + throw new StoreWriteException(e); + } catch (IOException e) { + throw new StoreWriteException(e); + } + } + + public Object read(Connection conn, String ownerId, String id) throws StoreReadException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(findSql); + stmt.setString(OWNER, ownerId); + stmt.setString(ID, id); + ResultSet rs = stmt.executeQuery(); + boolean more = rs.next(); + if (!more) { + return null; + } + return deserialize(rs.getBytes(DATA)); + } catch (SQLException e) { + throw new StoreReadException(e); + } catch (IOException e) { + throw new StoreReadException(e); + } catch (ClassNotFoundException e) { + throw new StoreReadException(e); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // ignore + } + } + } + } +} diff --git a/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/converter/JDBCConverter.java b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/converter/JDBCConverter.java new file mode 100644 index 0000000000..4ab9ed6749 --- /dev/null +++ b/sandbox/old/contrib/persistence/store.jdbc/src/main/java/org/apache/tuscany/service/persistence/store/jdbc/converter/JDBCConverter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.persistence.store.jdbc.converter; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.tuscany.spi.services.store.StoreReadException; +import org.apache.tuscany.spi.services.store.StoreWriteException; + +/** + * Performs writing and reading operations to a JDBC driver/database combination that supports Blobs + * + * @version $Rev$ $Date$ + */ +public class JDBCConverter extends AbstractConverter { + + public void insert(PreparedStatement stmt, String ownerId, String id, long expiration, Serializable object) + throws StoreWriteException { + try { + stmt.setString(OWNER, ownerId); + stmt.setString(ID, id); + stmt.setLong(EXPIRATION, expiration); + byte[] data = serialize(object); + InputStream in = new ByteArrayInputStream(data); + stmt.setBinaryStream(DATA, in, data.length); + } catch (SQLException e) { + throw new StoreWriteException(e); + } catch (IOException e) { + throw new StoreWriteException(e); + } + } + + public void update(PreparedStatement stmt, String ownerId, String id, Serializable object) + throws StoreWriteException { + throw new UnsupportedOperationException(); + } + + public Object read(Connection conn, String ownerId, String id) throws StoreReadException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(findSql); + stmt.setString(OWNER, ownerId); + stmt.setString(ID, id); + ResultSet rs = stmt.executeQuery(); + rs.next(); + Blob blob = rs.getBlob(DATA); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + BufferedInputStream in = new BufferedInputStream(blob.getBinaryStream()); + int b; + byte[] buffer = new byte[1024]; + while ((b = in.read(buffer, 0, 1024)) != -1) { + out.write(buffer, 0, b); + } + return deserialize(out.toByteArray()); + } catch (SQLException e) { + throw new StoreReadException(e); + } catch (IOException e) { + throw new StoreReadException(e); + } catch (ClassNotFoundException e) { + throw new StoreReadException(e); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // ignore + } + } + } + } + +}
\ No newline at end of file |