Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Adding stream listener to stream changes in child nodes #183

Merged
merged 22 commits into from
Aug 8, 2018
Merged

[WIP] Adding stream listener to stream changes in child nodes #183

merged 22 commits into from
Aug 8, 2018

Conversation

the-c0d3r
Copy link
Contributor

@the-c0d3r the-c0d3r commented Jul 21, 2018

Hi,

This PR's original intent is to add stream listener function to fire a callback function when changes in the database is detected. It is a revival of this #50 1-year-old PR and also based on the code contributed by @rizasif and @Aqsa-K in that PR. This PR is mainly to serve 3 purposes.

  1. Update the old PR with a rebase on master
  2. Clean up, refactor & fix lint errors
  3. Write tests

For now, I have completed the first and second of the above. Please advise me on how to properly write the test in PyTest. I tested the code manually by defining callback and manually modifiying data in my firebase demo project. And it seems to be fired everytime I made changes. I read through the existing test suite and I cannot think of a way to write into it yet.

Changes:

  • lint.sh : Ignore directive for "protected-access"
  • firebase_admin/sseclient.py : SSE client module
  • firebase_admin/db.py : Streaming functionality
  • New API call: database.reference('/xxx').stream(callbackFunc)

Regards

@hiranya911
Copy link
Contributor

hiranya911 commented Jul 23, 2018

@the-c0d3r thanks for putting in the time and effort into fixing this. Some comments and answers:

Please revert the changes to lint.sh. Instead add the directives to the lines in the source files where the lint errors should be ignored.

Rename sseclient.py to _sseclient.py so it will be considered an internal module.

For unit testing, you can mount an adapter on the requests.Session object and have it return some fake events (I think). See how we do something similar in the existing tests:

def instrument(self, ref, payload, status=200):
recorder = []
adapter = MockAdapter(payload, status, recorder)
ref._client.session.mount(self.test_url, adapter)
return recorder

The MockAdapter used in the above example responds to requests made by the SDK with a fake response. You will probably end up writing a similar adapter that responds with a fake stream of events.

@@ -155,10 +155,10 @@ def parent(self):

def build_headers(self, token=None):
headers = {'content-type' : 'application/json; charset=UTF-8'}
if not token and self._client._session.credentials:
if not token and self._client._session.credentials: # pylint: disable=protected-access
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think self._client.session should work here. session is exposed from the HttpClient:

@property
def session(self):
return self._session

That way you won't need the linter directives at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't catch that earlier. I have changed it, thanks. I still have yet to write the tests. I'll finish the tests in a few days.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great work @the-c0d3r . I am glad that this is finally progressing. Let me know if any help is required.

@the-c0d3r
Copy link
Contributor Author

Hi, I have added the test for SSEClient code. I'm not sure if I did it correctly or not. I added MockSSEClientAdapter() which is a subclass of MockAdapter. And I captured the actual event raw response and put it in payload. But somehow when I run the test, the MockSSEClient actually sends out Bytes object which is expected, but inside SSEClient code's response.iter_content returns bytes instead of string. I compared it with the my program to actually communicate with the server, and somehow iter_content returns string. Therefore for now, I added a check for the type in SSEClient, to call decode() if the nextchar from response.iter_content is a bytes object.

                nextchar = next(self.resp_iterator)
                if isinstance(nextchar, bytes):
                    nextchar = nextchar.decode()
                self.buf += nextchar

Python will raise TypeError: Can't convert 'bytes' object to str implicitly if executed the test withouth this check. Is there anyway to modify the MockSSEClientAdapter or the parent class, to make it's response.iter_content return string instead of bytes?
But I'm not sure about this check yet so I haven't committed this change. Please comment.

@hiranya911
Copy link
Contributor

hiranya911 commented Jul 26, 2018

Hi @the-c0d3r. I think this is due to the way iter_content() is implemented in requests:

http://docs.python-requests.org/en/master/_modules/requests/models/#Response.iter_content

There are couple of things you can try out.

  1. Try setting resp.raw.stream = True before returning from send().
  2. Try setting an encoding on the mock response as it is expected here.

One of these is likely to fix the issue.

Update:

On a second look, resp.raw.stream is expected to be a function, not a boolean. So option 2 is probably the only viable option.

@the-c0d3r
Copy link
Contributor Author

A quick test shows that the second fix you suggested works. I'll patch it in and commit that later today. And I also found the CI build failed for python3's syntax super() call in python2 environment. I'll change it back to support both python3 and python2.

Are there any additional tasks left to be completed for this PR to be merged?
PS: Does Stream class needs to have testing? I noticed that the SSE client's testing code more or less does the same thing as the Stream class.

@hiranya911
Copy link
Contributor

Thanks @the-c0d3r. This still needs to go through our internal API review process (which I have already initiated). We might get some feedback from that, which will need to be implemented -- but we can do that in steps, after merging this one. I will also take another closer look at all the code here, and post some feedback.

Copy link
Contributor

@hiranya911 hiranya911 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-c0d3r A few code review comments to address when you get a chance. But overall this looks pretty good. Thanks again for putting in the effort.

@@ -0,0 +1,198 @@
"""SSEClient module to handle streaming of realtime changes on the database
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the Apache license header here.

# Optional support for passing in a requests.Session()
self.session = session
# function for building auth header when token expires
self.build_headers = build_headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? The session we get from the db module is an instance of google.auth.transport.requests.AuthorizedSession, which is guaranteed to add the required Authorization header. So this shouldn't be required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_header() and all subsequent function arguments removed.

self.requests_kwargs['headers']['Cache-Control'] = 'no-cache'

# The 'Accept' header is not required, but explicit > implicit
self.requests_kwargs['headers']['Accept'] = 'text/event-stream'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above can be written as:

headers = self.request_kwargs.get('headers', {})
# add the required values to headers
self.request_kwargs['headers'] = headers
self.should_connect = False
self.retry = 0
self.resp.close()
# self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the commented out lines?

On a side note, I was testing this branch the other day, and I noticed that calling close() here does not immediately release the underlying socket. As a result the Stream remains active for a while even after calling stream.close(). So perhaps we do need the commented out lines?

Copy link
Contributor Author

@the-c0d3r the-c0d3r Jul 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also noticed the delay in calling stream.close(). I was calling it in the python shell, and it takes a few seconds to close and return. I'll test it out a bit more after I followed your other suggestions first.


def _connect(self):
"""connects to the server using requests"""
if self.should_connect:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also check for self.running.


import firebase_admin
from firebase_admin import _http_client
from firebase_admin import _utils
from firebase_admin._sseclient import SSEClient, KeepAuthSession
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a matter of style, we don't import individual classes and members. Please import the module here as from firebase_admin import _sseclient, and change the code below accordingly.


def start_stream(self):
"""Streaming function for the spawned thread to run"""
self.sse = SSEClient(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to the start() method above, before the thread is started? Then I think you will be able to drop the sleep() call in close().

@@ -101,6 +153,23 @@ def parent(self):
return Reference(client=self._client, segments=self._segments[:-1])
return None

def build_headers(self, token=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if we can remove this altogether (see my comment on _sseclient). If we must keep this, please rename to _build_headers() so it's treated as an internal method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This build_headers() is tested to be unnecessary just like your comment on _sseclient. So I will remove it from all subsequent code.

@@ -523,7 +523,7 @@ def test_range_query(self):
assert recorder[0].headers['User-Agent'] == db._USER_AGENT


class TestDatabseInitialization(object):
class TestDatabaseInitialization(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch 👍

sseclient = self.init_sse()
for msg in sseclient:
event = json.loads(msg.data)
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the break necessary? Shouldn't it exit on its own since the mock response only has one event?

@the-c0d3r
Copy link
Contributor Author

Hi @hiranya911, I believe I have addressed all of your comments with my latest commits, except the part about the closing of the stream. I wrote a simple python code to just setup stream and close it and record the timing. It returned within 100 ms. But somehow when executing the code line by line manually in python shell, it would take over 10 seconds to close the stream (aka time taken for python to return control to user). It will take me some time to debug this since I don't know much about requests module's internals. If you can shed some light on it, that would be great.

Copy link
Contributor

@hiranya911 hiranya911 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good. I just had one comment for you.

We can investigate the socket closing issue later. Once we get the internal approval for this, I can merge it, but I'll probably make a few more changes based on the feedback we get.

last_id: optional id
retry: the interval in ms
**kwargs: extra kwargs will be sent to requests.get
"""
self.should_connect = True
self.url = url
self.last_id = last_id
self.retry = retry
self.running = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this as it gets set to False below.

@roboflank
Copy link

+1

@hiranya911
Copy link
Contributor

@the-c0d3r the internal review process for this is chugging along. I will have an update for you next week.

Copy link
Contributor

@hiranya911 hiranya911 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Please sync this PR with the latest master when you get a chance.

@the-c0d3r
Copy link
Contributor Author

Hi @hiranya911 thanks for the update. I have synced the PR with the latest master. If there are any further tasks, please let me know.

@hiranya911
Copy link
Contributor

@the-c0d3r I have some feedback. Once they are implemented, I can merge this PR:

  1. Rename the stream() method to listen().
  2. Rename the Stream type to ListenerRegistration.
  3. Remove the stream_id parameter from the listen() method.

Let me know if you have any questions.

…emoved 'stream_id' and added more documentation
@the-c0d3r
Copy link
Contributor Author

Hi, I think I have addressed those feedbacks. Please take a look.

@hiranya911
Copy link
Contributor

Thanks @the-c0d3r. This looks pretty solid. I'll merge it shortly. I'll intend to do a bit of work on top it over the next few days, before release.

@hiranya911 hiranya911 merged commit 3f1190d into firebase:master Aug 8, 2018
@hiranya911
Copy link
Contributor

Upon taking another look, this might have a problem. I don't see how the credentials are passed into the underlying session.

@hiranya911
Copy link
Contributor

Ok, managed to fix the issue. I'll send a separate PR with the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
4 participants