Добрый день!
Я пишу класс на java, который будет использоваться как хранимая процедура в oracle 11g для перекачки данных из mysql в oracle (через гетерогенные сервисы работать сложнее и проблемнее).
Столкнулся с проблемой - не смотря на то, что для соединений с базами всегда авто-коммит устанавливается в false, после записи данных в oracle и вызова далее rollback, данные в БД остаются, то есть коммит происходит.
Адекватного ответа на просторах интернета, к сожалению, не нашел.
Ниже код класса. Проблема проявляется так:
- в приватном методе connect() устанавливается setAutoCommit(false), соединение сохраняется в переменную экземпляра класса
- возникает ошибка при вызове updateSmtpStatus() => устанавливается mysqlSuccess = false
- в вызове finalizeImport() вызывается rollback на оба соединения - и oracle и mysql
- захожу в oracle - данные, записанные туда при вызове saveData(), присутствуют в полном объеме
public class SmtpData {
private final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
private final String TRANSACT_DB = "...";
private final String BULK_DB = "...";
private final String ORACLE = "jdbc:default:connection:";
private final int BULK_SIZE = 500;
private Connection mysql;
private Connection orcl;
private Connection orclLog;
private String type;
private int importId;
private ResultSet smtpData;
private Boolean mysqlSuccess;
private Boolean orclSuccess;
enum ExecutionResult {
SUCCESS,
ERROR_LOCAL_CONNECTION_FAILED,
ERROR_MYSQL_CONNECTION_FAILED,
ERROR_MYSQL_SELECT_FAILED,
ERROR_MYSQL_NO_DATA,
ERROR_ORACLE_SAVE_FAILED,
ERROR_MYSQL_UPDATE_FAILED
}
public SmtpData(String type, int importId) {
this.type = type;
this.importId = importId;
}
public static String transferData(String type, String db_user, String db_pass, int importId) {
SmtpData sd = new SmtpData(type, importId);
ExecutionResult result;
result = sd.setConnections(db_user, db_pass);
if (result.equals(ExecutionResult.SUCCESS)) {
result = sd.getSmtpData();
if (result.equals(ExecutionResult.SUCCESS)) {
result = sd.saveData();
}
if (result.equals(ExecutionResult.SUCCESS)) {
result = sd.updateSmtpStatus();
}
sd.finalizeImport();
}
return result.toString();
}
public ExecutionResult setConnections(String db_user, String db_pass) {
try {
this.orcl = this.connect(this.ORACLE, null, null);
this.orclLog = this.connect(this.ORACLE, null, null);
} catch (SQLException e) {
this.log(this.traceToString(e));
return ExecutionResult.ERROR_LOCAL_CONNECTION_FAILED;
} catch (ClassNotFoundException e) {
this.log(this.traceToString(e));
return ExecutionResult.ERROR_LOCAL_CONNECTION_FAILED;
}
try {
if (this.type.equals("transact")) {
this.mysql = this.connect(this.TRANSACT_DB, db_user, db_pass);
} else if (this.type.equals("bulk")) {
this.mysql = this.connect(this.BULK_DB, db_user, db_pass);
}
} catch (SQLException e) {
this.log(this.traceToString(e));
return ExecutionResult.ERROR_MYSQL_CONNECTION_FAILED;
} catch (ClassNotFoundException e) {
this.log(this.traceToString(e));
return ExecutionResult.ERROR_MYSQL_CONNECTION_FAILED;
}
return ExecutionResult.SUCCESS;
}
public ExecutionResult getSmtpData() {
this.log("Start getSmtpData()");
String sql = "select ...";
try {
CallableStatement stmt = this.mysql.prepareCall(sql);
this.smtpData = stmt.executeQuery();
if (!this.smtpData.isBeforeFirst()) {
this.mysqlSuccess = false;
return ExecutionResult.ERROR_MYSQL_NO_DATA;
}
} catch (SQLException e) {
this.mysqlSuccess = false;
this.log(this.traceToString(e));
return ExecutionResult.ERROR_MYSQL_SELECT_FAILED;
}
this.mysqlSuccess = true;
this.log("End getSmtpData()");
return ExecutionResult.SUCCESS;
}
public ExecutionResult saveData() {
this.log("Start saveData()");
String sql = "insert ...";
try {
CallableStatement stmt = this.orcl.prepareCall(sql);
int bulkSize = BULK_SIZE;
while (this.smtpData.next()) {
stmt.setInt(1, this.importId);
stmt.setInt(2, this.smtpData.getInt("..."));
stmt.setString(3, this.smtpData.getString("..."));
String smtpCode = this.smtpData.getString("...");
stmt.setString(4, defineEvent(smtpCode));
stmt.setTimestamp(5, this.smtpData.getTimestamp("..."));
stmt.setString(6, smtpCode);
stmt.setString(7, this.smtpData.getString("..."));
stmt.setInt(8, this.smtpData.getInt("..."));
stmt.addBatch();
if (--bulkSize <= 0) {
stmt.executeBatch();
bulkSize = this.BULK_SIZE;
}
}
if (bulkSize < this.BULK_SIZE) {
stmt.executeBatch();
}
} catch (SQLException e) {
this.orclSuccess = false;
this.log(this.traceToString(e));
return ExecutionResult.ERROR_ORACLE_SAVE_FAILED;
}
this.orclSuccess = true;
this.log("End saveData()");
return ExecutionResult.SUCCESS;
}
public ExecutionResult updateSmtpStatus() {
this.log("Start updateSmtpStatus()");
String sql = "update ...";
try {
CallableStatement stmt = this.mysql.prepareCall(sql);
this.smtpData.first();
int minId = this.smtpData.getInt("nid");
this.smtpData.last();
int maxId = this.smtpData.getInt("nid");
this.log("Min ID: " + minId + "; max ID: " + maxId);
stmt.setInt(1, 2);
stmt.setInt(2, minId);
stmt.setInt(2, maxId);
stmt.execute();
} catch (SQLException e) {
this.mysqlSuccess = false;
this.log(this.traceToString(e));
return ExecutionResult.ERROR_MYSQL_UPDATE_FAILED;
}
this.mysqlSuccess = true;
this.log("End updateSmtpStatus()");
return ExecutionResult.SUCCESS;
}
public void finalizeImport() {
this.log("Start finalizeImport()");
try {
if (this.mysqlSuccess && this.orclSuccess) {
this.log("Commit in finalizeImport()");
this.mysql.commit();
this.orcl.commit();
} else {
this.log("Rollback in finalizeImport()");
this.mysql.rollback();
this.orcl.rollback();
}
this.mysql.close();
this.orcl.close();
this.log("End finalizeImport()");
this.orclLog.close();
} catch (SQLException e) {
this.log(this.traceToString(e));
}
}
private Connection connect(String db, String db_user, String db_pass) throws SQLException, ClassNotFoundException {
Connection conn;
if (db.equals(this.ORACLE)) {
conn = DriverManager.getConnection(db);
} else {
Class.forName(MYSQL_DRIVER);
conn = DriverManager.getConnection(db, db_user, db_pass);
}
conn.setAutoCommit(false);
return conn;
}
private void log(String message) {
String sql = "insert ...";
try {
CallableStatement stmt = this.orclLog.prepareCall(sql);
stmt.setString(1, message);
stmt.execute();
stmt.close();
this.orclLog.commit();
} catch (SQLException e) {
}
}
private String traceToString(Throwable exception) {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw, true);
exception.printStackTrace(pw);
return sw.getBuffer().toString();
}
private String defineEvent(String smtpCode) {
String event = "";
switch (smtpCode.charAt(0)) {
case '2':
event = "sent";
break;
case '4':
event = "soft_bounce";
break;
case '5':
event = "hard_bounce";
break;
}
return event;
}
}