http1/2: Removed usage of Option
This commit is contained in:
parent
d7c80876f6
commit
04709748ec
http1/main/scala/org/omegazero/proxy/http1
http2/main/scala/org/omegazero/proxy/http2
@ -23,7 +23,7 @@ object HTTP1 {
|
||||
class HTTP1(downstreamConnection: SocketConnection, proxy: Proxy, config: HTTPEngineConfig)
|
||||
extends AbstractHTTPEngine(downstreamConnection, proxy, config, new ProxyHTTP1Server(downstreamConnection, config)) {
|
||||
|
||||
this.httpServer.asInstanceOf[ProxyHTTP1Server].onError = Some(this.respondError(_, _, _));
|
||||
this.httpServer.asInstanceOf[ProxyHTTP1Server].onError = this.respondError(_, _, _);
|
||||
|
||||
override def getRequestLogger(): Logger = HTTP1.logger;
|
||||
override def getHTTPVersionName(): String = HTTP1.VERSION_NAME;
|
||||
|
@ -39,11 +39,11 @@ class ProxyHTTP1Client(private val connection: SocketConnection, private val use
|
||||
private val transmitter = new HTTP1MessageTransmitter(this.connectionWS);
|
||||
private val responseReceiver = new HTTP1ResponseReceiver(this.config.getMaxHeaderSize());
|
||||
|
||||
private var currentRequestStream: Option[OutgoingRequestStream] = None;
|
||||
private var currentRequestStream: OutgoingRequestStream = null;
|
||||
|
||||
this.connection.on("writable", () => {
|
||||
if(this.currentRequestStream.isDefined)
|
||||
this.currentRequestStream.get.callOnWritable();
|
||||
if(this.currentRequestStream != null)
|
||||
this.currentRequestStream.callOnWritable();
|
||||
});
|
||||
|
||||
|
||||
@ -55,34 +55,34 @@ class ProxyHTTP1Client(private val connection: SocketConnection, private val use
|
||||
|
||||
override def close(): Unit = {
|
||||
this.connection.destroy();
|
||||
if(this.currentRequestStream.isDefined)
|
||||
this.currentRequestStream.get.close();
|
||||
if(this.currentRequestStream != null)
|
||||
this.currentRequestStream.close();
|
||||
}
|
||||
|
||||
|
||||
override def newRequest(request: HTTPRequest): HTTPClientStream = {
|
||||
if(this.currentRequestStream.isDefined || this.connection.hasDisconnected())
|
||||
if(this.currentRequestStream != null || this.connection.hasDisconnected())
|
||||
return null;
|
||||
if(request.isChunkedTransfer())
|
||||
request.setHeader("transfer-encoding", "chunked");
|
||||
this.currentRequestStream = Some(new OutgoingRequestStream(request));
|
||||
return this.currentRequestStream.get;
|
||||
this.currentRequestStream = new OutgoingRequestStream(request);
|
||||
return this.currentRequestStream;
|
||||
}
|
||||
|
||||
override def getActiveRequests(): Collection[HTTPClientStream] = if this.currentRequestStream.isDefined then Collections.singleton(this.currentRequestStream.get) else Collections.emptySet();
|
||||
override def getActiveRequests(): Collection[HTTPClientStream] = if this.currentRequestStream != null then Collections.singleton(this.currentRequestStream) else Collections.emptySet();
|
||||
|
||||
override def getMaxConcurrentRequestCount(): Int = 1;
|
||||
|
||||
|
||||
private def processResponseData(data: Array[Byte]): Unit = {
|
||||
var remainingData = data;
|
||||
if(this.currentRequestStream.isEmpty){
|
||||
if(this.currentRequestStream == null){
|
||||
logger.debug(this.remoteName, " Received unexpected data on connection");
|
||||
this.close();
|
||||
return;
|
||||
}
|
||||
|
||||
if(this.currentRequestStream.get.getResponse() == null){
|
||||
if(this.currentRequestStream.getResponse() == null){
|
||||
var offset = this.responseReceiver.receive(remainingData, 0);
|
||||
if(offset < 0)
|
||||
return;
|
||||
@ -91,28 +91,28 @@ class ProxyHTTP1Client(private val connection: SocketConnection, private val use
|
||||
this.responseReceiver.reset();
|
||||
|
||||
if(response.isIntermediateMessage()){
|
||||
this.currentRequestStream.get.responseReceived(response);
|
||||
this.currentRequestStream.responseReceived(response);
|
||||
return;
|
||||
}
|
||||
|
||||
var dechunker = new MessageBodyDechunker(response, (resdata) => {
|
||||
var last = resdata.length == 0;
|
||||
this.currentRequestStream.get.callOnResponseData(new HTTPResponseData(response, last, resdata));
|
||||
this.currentRequestStream.callOnResponseData(new HTTPResponseData(response, last, resdata));
|
||||
if(last){
|
||||
this.currentRequestStream.get.callOnResponseEnded(null);
|
||||
this.currentRequestStream.callOnResponseEnded(null);
|
||||
response.setAttachment(ProxyHTTP1Client.ATTACHMENT_KEY_DECHUNKER, null);
|
||||
this.currentRequestStream = None;
|
||||
this.currentRequestStream = null;
|
||||
if("close".equals(response.getHeader("connection")))
|
||||
this.connection.close();
|
||||
}
|
||||
});
|
||||
response.setAttachment(ProxyHTTP1Client.ATTACHMENT_KEY_DECHUNKER, dechunker);
|
||||
|
||||
this.currentRequestStream.get.responseReceived(response);
|
||||
this.currentRequestStream.responseReceived(response);
|
||||
|
||||
remainingData = Arrays.copyOfRange(remainingData, offset, remainingData.length);
|
||||
}
|
||||
this.currentRequestStream.get.getResponse().getAttachment(ProxyHTTP1Client.ATTACHMENT_KEY_DECHUNKER).asInstanceOf[MessageBodyDechunker].addData(remainingData);
|
||||
this.currentRequestStream.getResponse().getAttachment(ProxyHTTP1Client.ATTACHMENT_KEY_DECHUNKER).asInstanceOf[MessageBodyDechunker].addData(remainingData);
|
||||
}
|
||||
|
||||
|
||||
|
@ -38,19 +38,19 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
private val connectionWS: WritableSocket = new SocketConnectionWritable(this.connection);
|
||||
private val remoteName = this.connectionWS.getRemoteName();
|
||||
|
||||
private var onNewRequest: Option[Consumer[HTTPServerStream]] = None;
|
||||
var onError: Option[(HTTPRequest, Int, String) => Unit] = None;
|
||||
private var onNewRequest: Consumer[HTTPServerStream] = null;
|
||||
var onError: (HTTPRequest, Int, String) => Unit = null;
|
||||
|
||||
private val transmitter = new HTTP1MessageTransmitter(this.connectionWS);
|
||||
private val requestReceiver = new HTTP1RequestReceiver(this.config.getMaxHeaderSize(), this.connection.isInstanceOf[org.omegazero.net.socket.TLSConnection]);
|
||||
|
||||
private var currentRequestTimeoutRef: Object = null;
|
||||
private var currentRequestStream: Option[IncomingRequestStream] = None;
|
||||
private def currentRequestOrNull = if this.currentRequestStream.isDefined then this.currentRequestStream.get.getRequest() else null;
|
||||
private var currentRequestStream: IncomingRequestStream = null;
|
||||
private def currentRequestOrNull = if this.currentRequestStream != null then this.currentRequestStream.getRequest() else null;
|
||||
|
||||
this.connection.on("writable", () => {
|
||||
if(this.currentRequestStream.isDefined)
|
||||
this.currentRequestStream.get.callOnWritable();
|
||||
if(this.currentRequestStream != null)
|
||||
this.currentRequestStream.callOnWritable();
|
||||
});
|
||||
|
||||
|
||||
@ -61,13 +61,13 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
case e: InvalidHTTPMessageException => {
|
||||
if(logger.debug())
|
||||
logger.debug(this.remoteName, " HTTP error: ", if NetCommon.PRINT_STACK_TRACES then e else e.toString());
|
||||
this.onError.get (this.currentRequestOrNull, HTTPStatus.STATUS_BAD_REQUEST, if e.isMsgUserVisible() then e.getMessage() else HTTPCommon.MSG_BAD_REQUEST);
|
||||
if(this.currentRequestStream.isDefined)
|
||||
this.currentRequestStream.get.callOnError(e);
|
||||
this.onError(this.currentRequestOrNull, HTTPStatus.STATUS_BAD_REQUEST, if e.isMsgUserVisible() then e.getMessage() else HTTPCommon.MSG_BAD_REQUEST);
|
||||
if(this.currentRequestStream != null)
|
||||
this.currentRequestStream.callOnError(e);
|
||||
}
|
||||
case e: Exception => {
|
||||
if(this.currentRequestStream.isDefined && !this.currentRequestStream.get.isClosed()){
|
||||
this.onError.get (this.currentRequestOrNull, HTTPStatus.STATUS_INTERNAL_SERVER_ERROR, HTTPCommon.MSG_SERVER_ERROR);
|
||||
if(this.currentRequestStream != null && !this.currentRequestStream.isClosed()){
|
||||
this.onError(this.currentRequestOrNull, HTTPStatus.STATUS_INTERNAL_SERVER_ERROR, HTTPCommon.MSG_SERVER_ERROR);
|
||||
logger.error(this.remoteName, " Error processing packet: ", e);
|
||||
}else
|
||||
throw e;
|
||||
@ -79,15 +79,15 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
|
||||
override def close(): Unit = {
|
||||
this.connection.destroy();
|
||||
if(this.currentRequestStream.isDefined)
|
||||
this.currentRequestStream.get.close();
|
||||
if(this.currentRequestStream != null)
|
||||
this.currentRequestStream.close();
|
||||
}
|
||||
|
||||
|
||||
override def onNewRequest(callback: Consumer[HTTPServerStream]): Unit = this.onNewRequest = Some(callback);
|
||||
override def onNewRequest(callback: Consumer[HTTPServerStream]): Unit = this.onNewRequest = callback;
|
||||
|
||||
override def getActiveRequests(): Collection[HTTPServerStream] =
|
||||
if this.currentRequestStream.isDefined then Collections.singleton(this.currentRequestStream.get) else Collections.emptySet();
|
||||
if this.currentRequestStream != null then Collections.singleton(this.currentRequestStream) else Collections.emptySet();
|
||||
|
||||
|
||||
override def respond(request: HTTPRequest, responsedata: HTTPResponseData): Unit = {
|
||||
@ -112,11 +112,11 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
var data = HTTPCommon.prepareHTTPResponse(request, response, responsedata.getData());
|
||||
if(request != null){
|
||||
request.synchronized {
|
||||
if(this.currentRequestStream.get.requestEnded){ // request incl data fully received
|
||||
this.currentRequestStream.get.startResponse(response);
|
||||
this.currentRequestStream.get.sendResponseData(data, true);
|
||||
if(this.currentRequestStream.requestEnded){ // request incl data fully received
|
||||
this.currentRequestStream.startResponse(response);
|
||||
this.currentRequestStream.sendResponseData(data, true);
|
||||
}else
|
||||
this.currentRequestStream.get.pendingResponse = Some(new HTTPResponseData(response, data));
|
||||
this.currentRequestStream.pendingResponse = new HTTPResponseData(response, data);
|
||||
}
|
||||
}else{
|
||||
if(this.writeHTTPMsg(response))
|
||||
@ -130,7 +130,7 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
|
||||
private def processData(data: Array[Byte]): Unit = {
|
||||
var remainingData = data;
|
||||
if(this.currentRequestStream.isEmpty){
|
||||
if(this.currentRequestStream == null){
|
||||
if(this.currentRequestTimeoutRef == null)
|
||||
this.currentRequestTimeoutRef = Tasks.I.timeout(this.handleRequestTimeout _, this.config.getRequestTimeout()).daemon();
|
||||
var offset = this.requestReceiver.receive(remainingData, 0);
|
||||
@ -147,7 +147,7 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
|
||||
var reqstream = new IncomingRequestStream(request);
|
||||
reqstream.setReceiveData(true);
|
||||
this.currentRequestStream = Some(reqstream);
|
||||
this.currentRequestStream = reqstream;
|
||||
|
||||
var dechunker = new MessageBodyDechunker(request, (reqdata) => {
|
||||
var last = reqdata.length == 0;
|
||||
@ -155,19 +155,19 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
if(last){
|
||||
reqstream.callOnRequestEnded(null);
|
||||
request.setAttachment(ProxyHTTP1Server.ATTACHMENT_KEY_DECHUNKER, null);
|
||||
if(reqstream.pendingResponse.isDefined){
|
||||
reqstream.startResponse(reqstream.pendingResponse.get.getHttpMessage());
|
||||
reqstream.sendResponseData(reqstream.pendingResponse.get.getData(), true);
|
||||
if(reqstream.pendingResponse != null){
|
||||
reqstream.startResponse(reqstream.pendingResponse.getHttpMessage());
|
||||
reqstream.sendResponseData(reqstream.pendingResponse.getData(), true);
|
||||
}
|
||||
}
|
||||
});
|
||||
request.setAttachment(ProxyHTTP1Server.ATTACHMENT_KEY_DECHUNKER, dechunker);
|
||||
|
||||
this.onNewRequest.get.accept(reqstream);
|
||||
this.onNewRequest.accept(reqstream);
|
||||
|
||||
remainingData = Arrays.copyOfRange(remainingData, offset, remainingData.length);
|
||||
}
|
||||
if(this.currentRequestStream.get.requestEnded){
|
||||
if(this.currentRequestStream.requestEnded){
|
||||
if(logger.debug())
|
||||
logger.debug(this.remoteName, " Received data after request ended");
|
||||
this.close();
|
||||
@ -187,8 +187,8 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
private def handleRequestTimeout(): Unit = {
|
||||
logger.debug(this.remoteName, " Request timeout");
|
||||
try{
|
||||
assert(this.currentRequestStream.isEmpty);
|
||||
this.onError.get (null, HTTPStatus.STATUS_REQUEST_TIMEOUT, HTTPCommon.MSG_REQUEST_TIMEOUT);
|
||||
assert(this.currentRequestStream == null);
|
||||
this.onError(null, HTTPStatus.STATUS_REQUEST_TIMEOUT, HTTPCommon.MSG_REQUEST_TIMEOUT);
|
||||
this.requestReceiver.reset();
|
||||
}catch{
|
||||
case e: Exception => {
|
||||
@ -214,7 +214,7 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
|
||||
private var chunkedTransfer = false;
|
||||
|
||||
var pendingResponse: Option[HTTPResponseData] = None;
|
||||
var pendingResponse: HTTPResponseData = null;
|
||||
def requestEnded = !this.request.hasAttachment(ProxyHTTP1Server.ATTACHMENT_KEY_DECHUNKER);
|
||||
|
||||
override def close(reason: MessageStreamClosedException.CloseReason): Unit = {
|
||||
@ -248,7 +248,7 @@ class ProxyHTTP1Server(private val connection: SocketConnection, private val con
|
||||
if(this.chunkedTransfer)
|
||||
ProxyHTTP1Server.this.connection.write(ProxyHTTP1Server.EMPTY_CHUNK);
|
||||
this.closed = true;
|
||||
ProxyHTTP1Server.this.currentRequestStream = None;
|
||||
ProxyHTTP1Server.this.currentRequestStream = null;
|
||||
}
|
||||
return ProxyHTTP1Server.this.connection.isWritable();
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ object HTTP2 {
|
||||
class HTTP2(downstreamConnection: SocketConnection, proxy: Proxy, config: HTTPEngineConfig)
|
||||
extends AbstractHTTPEngine(downstreamConnection, proxy, config, new ProxyHTTP2Server(downstreamConnection, config)) {
|
||||
|
||||
this.httpServer.asInstanceOf[ProxyHTTP2Server].onError = Some(this.respondError(_, _, _));
|
||||
this.httpServer.asInstanceOf[ProxyHTTP2Server].onError = this.respondError(_, _, _);
|
||||
|
||||
override def getRequestLogger(): Logger = HTTP2.logger;
|
||||
override def getHTTPVersionName(): String = HTTP2.VERSION_NAME;
|
||||
|
@ -43,8 +43,8 @@ class ProxyHTTP2Server(private val dsConnection: SocketConnection, private val c
|
||||
|
||||
private val disablePromiseRequestLog = config.optBoolean("disablePromiseRequestLog", config.isDisableDefaultRequestLog());
|
||||
|
||||
private var onNewRequest: Option[Consumer[HTTPServerStream]] = None;
|
||||
var onError: Option[(HTTPRequest, Int, String) => Unit] = None;
|
||||
private var onNewRequest: Consumer[HTTPServerStream] = null;
|
||||
var onError: (HTTPRequest, Int, String) => Unit = null;
|
||||
|
||||
private var prefaceReceived = false;
|
||||
private var nextStreamId = 2;
|
||||
@ -130,7 +130,7 @@ class ProxyHTTP2Server(private val dsConnection: SocketConnection, private val c
|
||||
override def isServerPushEnabled(): Boolean = this.getControlStream().getRemoteSettings().get(HTTP2Constants.SETTINGS_ENABLE_PUSH) == 1;
|
||||
|
||||
|
||||
override def onNewRequest(callback: Consumer[HTTPServerStream]): Unit = this.onNewRequest = Some(callback);
|
||||
override def onNewRequest(callback: Consumer[HTTPServerStream]): Unit = this.onNewRequest = callback;
|
||||
|
||||
override def getActiveRequests(): Collection[HTTPServerStream] = Collections.unmodifiableCollection(this.requestStreams.values());
|
||||
|
||||
@ -156,7 +156,7 @@ class ProxyHTTP2Server(private val dsConnection: SocketConnection, private val c
|
||||
reqstream.startResponse(response);
|
||||
reqstream.sendResponseData(data, true);
|
||||
}else
|
||||
reqstream.pendingResponse = Some(new HTTPResponseData(response, data));
|
||||
reqstream.pendingResponse = new HTTPResponseData(response, data);
|
||||
}
|
||||
}
|
||||
|
||||
@ -186,7 +186,7 @@ class ProxyHTTP2Server(private val dsConnection: SocketConnection, private val c
|
||||
|
||||
this.requestStreams.put(clientStream.getStreamId(), reqstream);
|
||||
|
||||
this.onNewRequest.get.accept(reqstream);
|
||||
this.onNewRequest.accept(reqstream);
|
||||
if(endStream)
|
||||
reqstream.callOnRequestEnded(null);
|
||||
}
|
||||
@ -200,16 +200,16 @@ class ProxyHTTP2Server(private val dsConnection: SocketConnection, private val c
|
||||
|
||||
class IncomingRequestStream(request: HTTPRequest, val clientStream: MessageStream) extends AbstractHTTPServerStream(request, ProxyHTTP2Server.this) {
|
||||
|
||||
var pendingResponse: Option[HTTPResponseData] = None;
|
||||
var pendingResponse: HTTPResponseData = null;
|
||||
var requestEnded = false;
|
||||
|
||||
|
||||
override def callOnRequestEnded(trailers: HTTPMessageTrailers) = {
|
||||
super.callOnRequestEnded(trailers);
|
||||
this.requestEnded = true;
|
||||
if(this.pendingResponse.isDefined && this.clientStream.isExpectingResponse()){
|
||||
this.startResponse(this.pendingResponse.get.getHttpMessage());
|
||||
this.sendResponseData(this.pendingResponse.get.getData(), true);
|
||||
if(this.pendingResponse != null && this.clientStream.isExpectingResponse()){
|
||||
this.startResponse(this.pendingResponse.getHttpMessage());
|
||||
this.sendResponseData(this.pendingResponse.getData(), true);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user