diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/Neo4jConnection.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/Neo4jConnection.java index 6c17f5475d..acd32d69b2 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/Neo4jConnection.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/Neo4jConnection.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Collections; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.neo4j.driver.Config; @@ -54,6 +55,10 @@ public Neo4jConnection(ConnectionParams settings, String templateVersion) { settings.asAuthToken(), Config.builder() .withLogging(Logging.slf4j()) + .withMaxConnectionLifetime( + maxConnectionLifetimeMillis(settings), TimeUnit.MILLISECONDS) + .withConnectionLivenessCheckTimeout( + connectionLivenessCheckTimeoutMillis(settings), TimeUnit.MILLISECONDS) .withUserAgent(Neo4jTelemetry.userAgent(templateVersion)) .build())); } @@ -239,4 +244,20 @@ private static TransactionConfig databaseResetMetadata(String resetMethod) { Neo4jTelemetry.transactionMetadata(Map.of("sink", "neo4j", "step", resetMethod))) .build(); } + + private static long maxConnectionLifetimeMillis(ConnectionParams settings) { + Long maxConnectionLifetimeMillis = settings.getMaxConnectionLifetimeMillis(); + if (maxConnectionLifetimeMillis == null) { + return Config.defaultConfig().maxConnectionLifetimeMillis(); + } + return maxConnectionLifetimeMillis; + } + + private static Long connectionLivenessCheckTimeoutMillis(ConnectionParams settings) { + Long connectionLivenessCheckTimeoutMillis = settings.getConnectionLivenessCheckTimeoutMillis(); + if (connectionLivenessCheckTimeoutMillis == null) { + return Config.defaultConfig().idleTimeBeforeConnectionTest(); + } + return connectionLivenessCheckTimeoutMillis; + } } diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/BasicConnectionParams.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/BasicConnectionParams.java index 07a1a87fa4..82c87265b8 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/BasicConnectionParams.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/BasicConnectionParams.java @@ -31,8 +31,11 @@ public BasicConnectionParams( @JsonProperty("server_url") String serverUrl, @JsonProperty("database") String database, @JsonProperty("username") String username, - @JsonProperty("pwd") String password) { - super(serverUrl, database); + @JsonProperty("pwd") String password, + @JsonProperty("max_connection_lifetime_millis") Long maxConnectionLifetimeMillis, + @JsonProperty("connection_liveness_check_timeout_millis") + Long connectionLivenessCheckTimeoutMillis) { + super(serverUrl, database, maxConnectionLifetimeMillis, connectionLivenessCheckTimeoutMillis); this.username = username; this.password = password; } diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/BearerConnectionParams.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/BearerConnectionParams.java index 4aa507b472..1257ecdbb4 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/BearerConnectionParams.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/BearerConnectionParams.java @@ -28,8 +28,11 @@ class BearerConnectionParams extends ConnectionParams { public BearerConnectionParams( @JsonProperty("server_url") String serverUrl, @JsonProperty("database") String database, - @JsonProperty("token") String token) { - super(serverUrl, database); + @JsonProperty("token") String token, + @JsonProperty("max_connection_lifetime_millis") Long maxConnectionLifetimeMillis, + @JsonProperty("connection_liveness_check_timeout_millis") + Long connectionLivenessCheckTimeoutMillis) { + super(serverUrl, database, maxConnectionLifetimeMillis, connectionLivenessCheckTimeoutMillis); this.token = token; } diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/ConnectionParams.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/ConnectionParams.java index 222ca7c829..afa6bf2335 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/ConnectionParams.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/ConnectionParams.java @@ -34,10 +34,18 @@ public abstract class ConnectionParams implements Serializable { private final String serverUrl; private final String database; + private final Long maxConnectionLifetimeMillis; + private final Long connectionLivenessCheckTimeoutMillis; - public ConnectionParams(String serverUrl, String database) { + public ConnectionParams( + String serverUrl, + String database, + Long maxConnectionLifetimeMillis, + Long connectionLivenessCheckTimeoutMillis) { this.serverUrl = serverUrl; this.database = database == null ? "neo4j" : database; + this.maxConnectionLifetimeMillis = maxConnectionLifetimeMillis; + this.connectionLivenessCheckTimeoutMillis = connectionLivenessCheckTimeoutMillis; } public String getServerUrl() { @@ -48,23 +56,31 @@ public String getDatabase() { return database; } + public Long getMaxConnectionLifetimeMillis() { + return maxConnectionLifetimeMillis; + } + + public Long getConnectionLivenessCheckTimeoutMillis() { + return connectionLivenessCheckTimeoutMillis; + } + public abstract AuthToken asAuthToken(); @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof ConnectionParams that)) { return false; } - - ConnectionParams that = (ConnectionParams) o; - return Objects.equals(serverUrl, that.serverUrl) && Objects.equals(database, that.database); + return Objects.equals(serverUrl, that.serverUrl) + && Objects.equals(database, that.database) + && Objects.equals(maxConnectionLifetimeMillis, that.maxConnectionLifetimeMillis) + && Objects.equals( + connectionLivenessCheckTimeoutMillis, that.connectionLivenessCheckTimeoutMillis); } @Override public int hashCode() { - return Objects.hash(serverUrl, database); + return Objects.hash( + serverUrl, database, maxConnectionLifetimeMillis, connectionLivenessCheckTimeoutMillis); } } diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/CustomConnectionParams.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/CustomConnectionParams.java index af9097958c..fd8ef61d40 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/CustomConnectionParams.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/CustomConnectionParams.java @@ -37,8 +37,11 @@ public CustomConnectionParams( @JsonProperty("credentials") String credentials, @JsonProperty("realm") String realm, @JsonProperty("scheme") String scheme, - @JsonProperty("parameters") Map parameters) { - super(serverUrl, database); + @JsonProperty("parameters") Map parameters, + @JsonProperty("max_connection_lifetime_millis") Long maxConnectionLifetimeMillis, + @JsonProperty("connection_liveness_check_timeout_millis") + Long connectionLivenessCheckTimeoutMillis) { + super(serverUrl, database, maxConnectionLifetimeMillis, connectionLivenessCheckTimeoutMillis); this.principal = principal; this.credentials = credentials; this.realm = realm; diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/KerberosConnectionParams.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/KerberosConnectionParams.java index c38b8c0f15..184005bcfd 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/KerberosConnectionParams.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/KerberosConnectionParams.java @@ -28,8 +28,11 @@ class KerberosConnectionParams extends ConnectionParams { public KerberosConnectionParams( @JsonProperty("server_url") String serverUrl, @JsonProperty("database") String database, - @JsonProperty("ticket") String ticket) { - super(serverUrl, database); + @JsonProperty("ticket") String ticket, + @JsonProperty("max_connection_lifetime_millis") Long maxConnectionLifetimeMillis, + @JsonProperty("connection_liveness_check_timeout_millis") + Long connectionLivenessCheckTimeoutMillis) { + super(serverUrl, database, maxConnectionLifetimeMillis, connectionLivenessCheckTimeoutMillis); this.ticket = ticket; } diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/NoAuthConnectionParams.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/NoAuthConnectionParams.java index 6f0657b8e0..8bc5ac92b6 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/NoAuthConnectionParams.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/connection/NoAuthConnectionParams.java @@ -24,8 +24,12 @@ class NoAuthConnectionParams extends ConnectionParams { @JsonCreator public NoAuthConnectionParams( - @JsonProperty("server_url") String serverUrl, @JsonProperty("database") String database) { - super(serverUrl, database); + @JsonProperty("server_url") String serverUrl, + @JsonProperty("database") String database, + @JsonProperty("max_connection_lifetime_millis") Long maxConnectionLifetimeMillis, + @JsonProperty("connection_liveness_check_timeout_millis") + Long connectionLivenessCheckTimeoutMillis) { + super(serverUrl, database, maxConnectionLifetimeMillis, connectionLivenessCheckTimeoutMillis); } @Override diff --git a/v2/googlecloud-to-neo4j/src/main/resources/schemas/connection.v1.0.json b/v2/googlecloud-to-neo4j/src/main/resources/schemas/connection.v1.0.json index 9c8873f8bb..aaad3455be 100644 --- a/v2/googlecloud-to-neo4j/src/main/resources/schemas/connection.v1.0.json +++ b/v2/googlecloud-to-neo4j/src/main/resources/schemas/connection.v1.0.json @@ -24,6 +24,14 @@ "basic", "none", "kerberos", "bearer", "custom" ], "default": "basic" + }, + "max_connection_lifetime_millis": { + "description": "Max connection lifetime in milliseconds", + "type": "integer" + }, + "connection_liveness_check_timeout_millis": { + "description": "Max idle time for pool connections without liveness check upon acquisition", + "type": "integer" } }, "required": [ diff --git a/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/model/connection/ConnectionParamsTest.java b/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/model/connection/ConnectionParamsTest.java index fb214f3565..a6f4d012fe 100644 --- a/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/model/connection/ConnectionParamsTest.java +++ b/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/model/connection/ConnectionParamsTest.java @@ -39,7 +39,8 @@ public void parsesMinimalBasicAuthInformation() throws Exception { assertThat(connectionParams).isInstanceOf(BasicConnectionParams.class); assertThat(connectionParams) - .isEqualTo(new BasicConnectionParams("bolt://example.com", null, "neo4j", "password")); + .isEqualTo( + new BasicConnectionParams("bolt://example.com", null, "neo4j", "password", null, null)); } @Test @@ -47,11 +48,12 @@ public void parsesFullBasicAuthInformation() throws Exception { ConnectionParams connectionParams = Json.map( json( - "{\"auth_type\": \"basic\", \"server_url\": \"bolt://example.com\", \"database\": \"db\", \"username\": \"neo4j\", \"pwd\": \"password\"}"), + "{\"auth_type\": \"basic\", \"server_url\": \"bolt://example.com\", \"database\": \"db\", \"username\": \"neo4j\", \"pwd\": \"password\", \"max_connection_lifetime_millis\": 42, \"connection_liveness_check_timeout_millis\": 4242}"), ConnectionParams.class); assertThat(connectionParams) - .isEqualTo(new BasicConnectionParams("bolt://example.com", "db", "neo4j", "password")); + .isEqualTo( + new BasicConnectionParams("bolt://example.com", "db", "neo4j", "password", 42L, 4242L)); } @Test @@ -62,7 +64,8 @@ public void parsesMinimalNoAuthInformation() throws Exception { ConnectionParams.class); assertThat(connectionParams).isInstanceOf(NoAuthConnectionParams.class); - assertThat(connectionParams).isEqualTo(new NoAuthConnectionParams("bolt://example.com", null)); + assertThat(connectionParams) + .isEqualTo(new NoAuthConnectionParams("bolt://example.com", null, null, null)); } @Test @@ -70,10 +73,11 @@ public void parsesFullNoAuthInformation() throws Exception { ConnectionParams connectionParams = Json.map( json( - "{\"auth_type\": \"none\", \"server_url\": \"bolt://example.com\", \"database\": \"db\"}"), + "{\"auth_type\": \"none\", \"server_url\": \"bolt://example.com\", \"database\": \"db\", \"max_connection_lifetime_millis\": 42, \"connection_liveness_check_timeout_millis\": 4242}"), ConnectionParams.class); - assertThat(connectionParams).isEqualTo(new NoAuthConnectionParams("bolt://example.com", "db")); + assertThat(connectionParams) + .isEqualTo(new NoAuthConnectionParams("bolt://example.com", "db", 42L, 4242L)); } @Test @@ -87,7 +91,8 @@ public void parsesMinimalKerberosInformation() throws Exception { assertThat(connectionParams).isInstanceOf(KerberosConnectionParams.class); assertThat(connectionParams) .isEqualTo( - new KerberosConnectionParams("bolt://example.com", null, "dGhpcyBpcyBhIHRpY2tldA==")); + new KerberosConnectionParams( + "bolt://example.com", null, "dGhpcyBpcyBhIHRpY2tldA==", null, null)); } @Test @@ -95,12 +100,13 @@ public void parsesFullKerberosInformation() throws Exception { ConnectionParams connectionParams = Json.map( json( - "{\"auth_type\": \"kerberos\", \"server_url\": \"bolt://example.com\", \"database\": \"db\", \"ticket\": \"dGhpcyBpcyBhIHRpY2tldA==\"}"), + "{\"auth_type\": \"kerberos\", \"server_url\": \"bolt://example.com\", \"database\": \"db\", \"ticket\": \"dGhpcyBpcyBhIHRpY2tldA==\", \"max_connection_lifetime_millis\": 42, \"connection_liveness_check_timeout_millis\": 4242}"), ConnectionParams.class); assertThat(connectionParams) .isEqualTo( - new KerberosConnectionParams("bolt://example.com", "db", "dGhpcyBpcyBhIHRpY2tldA==")); + new KerberosConnectionParams( + "bolt://example.com", "db", "dGhpcyBpcyBhIHRpY2tldA==", 42L, 4242L)); } @Test @@ -113,7 +119,7 @@ public void parsesMinimalBearerInformation() throws Exception { assertThat(connectionParams).isInstanceOf(BearerConnectionParams.class); assertThat(connectionParams) - .isEqualTo(new BearerConnectionParams("bolt://example.com", null, "a-token")); + .isEqualTo(new BearerConnectionParams("bolt://example.com", null, "a-token", null, null)); } @Test @@ -121,11 +127,11 @@ public void parsesFullBearerInformation() throws Exception { ConnectionParams connectionParams = Json.map( json( - "{\"auth_type\": \"bearer\", \"server_url\": \"bolt://example.com\", \"database\": \"db\", \"token\": \"a-token\"}"), + "{\"auth_type\": \"bearer\", \"server_url\": \"bolt://example.com\", \"database\": \"db\", \"token\": \"a-token\", \"max_connection_lifetime_millis\": 42, \"connection_liveness_check_timeout_millis\": 4242}"), ConnectionParams.class); assertThat(connectionParams) - .isEqualTo(new BearerConnectionParams("bolt://example.com", "db", "a-token")); + .isEqualTo(new BearerConnectionParams("bolt://example.com", "db", "a-token", 42L, 4242L)); } @Test @@ -146,6 +152,8 @@ public void parsesMinimalCustomInformation() throws Exception { "some-credentials", null, "a-scheme", + null, + null, null)); } @@ -154,7 +162,7 @@ public void parsesFullCustomInformation() throws Exception { ConnectionParams connectionParams = Json.map( json( - "{\"auth_type\": \"custom\", \"server_url\": \"bolt://example.com\", \"database\": \"db\", \"principal\": \"a-principal\", \"credentials\": \"some-credentials\", \"realm\": \"a-realm\", \"scheme\": \"a-scheme\", \"parameters\": {\"foo\": \"bar\", \"baz\": true}}"), + "{\"auth_type\": \"custom\", \"server_url\": \"bolt://example.com\", \"database\": \"db\", \"principal\": \"a-principal\", \"credentials\": \"some-credentials\", \"realm\": \"a-realm\", \"scheme\": \"a-scheme\", \"parameters\": {\"foo\": \"bar\", \"baz\": true}, \"max_connection_lifetime_millis\": 42, \"connection_liveness_check_timeout_millis\": 4242}"), ConnectionParams.class); assertThat(connectionParams) @@ -166,7 +174,9 @@ public void parsesFullCustomInformation() throws Exception { "some-credentials", "a-realm", "a-scheme", - Map.of("foo", "bar", "baz", true))); + Map.of("foo", "bar", "baz", true), + 42L, + 4242L)); } @NotNull