-
Notifications
You must be signed in to change notification settings - Fork 323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Adding stream listener to stream changes in child nodes #183
Conversation
@the-c0d3r thanks for putting in the time and effort into fixing this. Some comments and answers: Please revert the changes to Rename For unit testing, you can mount an adapter on the firebase-admin-python/tests/test_db.py Lines 128 to 132 in 70d9817
The |
…-python into stream_listener
firebase_admin/db.py
Outdated
@@ -155,10 +155,10 @@ def parent(self): | |||
|
|||
def build_headers(self, token=None): | |||
headers = {'content-type' : 'application/json; charset=UTF-8'} | |||
if not token and self._client._session.credentials: | |||
if not token and self._client._session.credentials: # pylint: disable=protected-access |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think self._client.session
should work here. session
is exposed from the HttpClient
:
firebase-admin-python/firebase_admin/_http_client.py
Lines 53 to 55 in c420b4f
@property | |
def session(self): | |
return self._session |
That way you won't need the linter directives at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I didn't catch that earlier. I have changed it, thanks. I still have yet to write the tests. I'll finish the tests in a few days.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great work @the-c0d3r . I am glad that this is finally progressing. Let me know if any help is required.
Hi, I have added the test for SSEClient code. I'm not sure if I did it correctly or not. I added
Python will raise |
Hi @the-c0d3r. I think this is due to the way http://docs.python-requests.org/en/master/_modules/requests/models/#Response.iter_content There are couple of things you can try out.
One of these is likely to fix the issue. Update:On a second look, |
A quick test shows that the second fix you suggested works. I'll patch it in and commit that later today. And I also found the CI build failed for python3's syntax Are there any additional tasks left to be completed for this PR to be merged? |
Thanks @the-c0d3r. This still needs to go through our internal API review process (which I have already initiated). We might get some feedback from that, which will need to be implemented -- but we can do that in steps, after merging this one. I will also take another closer look at all the code here, and post some feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@the-c0d3r A few code review comments to address when you get a chance. But overall this looks pretty good. Thanks again for putting in the effort.
@@ -0,0 +1,198 @@ | |||
"""SSEClient module to handle streaming of realtime changes on the database |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the Apache license header here.
firebase_admin/_sseclient.py
Outdated
# Optional support for passing in a requests.Session() | ||
self.session = session | ||
# function for building auth header when token expires | ||
self.build_headers = build_headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this? The session we get from the db module is an instance of google.auth.transport.requests.AuthorizedSession
, which is guaranteed to add the required Authorization header. So this shouldn't be required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build_header()
and all subsequent function arguments removed.
firebase_admin/_sseclient.py
Outdated
self.requests_kwargs['headers']['Cache-Control'] = 'no-cache' | ||
|
||
# The 'Accept' header is not required, but explicit > implicit | ||
self.requests_kwargs['headers']['Accept'] = 'text/event-stream' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above can be written as:
headers = self.request_kwargs.get('headers', {})
# add the required values to headers
self.request_kwargs['headers'] = headers
self.should_connect = False | ||
self.retry = 0 | ||
self.resp.close() | ||
# self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the commented out lines?
On a side note, I was testing this branch the other day, and I noticed that calling close() here does not immediately release the underlying socket. As a result the Stream remains active for a while even after calling stream.close()
. So perhaps we do need the commented out lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also noticed the delay in calling stream.close()
. I was calling it in the python shell, and it takes a few seconds to close and return. I'll test it out a bit more after I followed your other suggestions first.
|
||
def _connect(self): | ||
"""connects to the server using requests""" | ||
if self.should_connect: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also check for self.running
.
firebase_admin/db.py
Outdated
|
||
import firebase_admin | ||
from firebase_admin import _http_client | ||
from firebase_admin import _utils | ||
from firebase_admin._sseclient import SSEClient, KeepAuthSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a matter of style, we don't import individual classes and members. Please import the module here as from firebase_admin import _sseclient
, and change the code below accordingly.
firebase_admin/db.py
Outdated
|
||
def start_stream(self): | ||
"""Streaming function for the spawned thread to run""" | ||
self.sse = SSEClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this to the start() method above, before the thread is started? Then I think you will be able to drop the sleep() call in close().
firebase_admin/db.py
Outdated
@@ -101,6 +153,23 @@ def parent(self): | |||
return Reference(client=self._client, segments=self._segments[:-1]) | |||
return None | |||
|
|||
def build_headers(self, token=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See if we can remove this altogether (see my comment on _sseclient). If we must keep this, please rename to _build_headers() so it's treated as an internal method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This build_headers()
is tested to be unnecessary just like your comment on _sseclient
. So I will remove it from all subsequent code.
@@ -523,7 +523,7 @@ def test_range_query(self): | |||
assert recorder[0].headers['User-Agent'] == db._USER_AGENT | |||
|
|||
|
|||
class TestDatabseInitialization(object): | |||
class TestDatabaseInitialization(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch 👍
tests/test_sseclient.py
Outdated
sseclient = self.init_sse() | ||
for msg in sseclient: | ||
event = json.loads(msg.data) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the break necessary? Shouldn't it exit on its own since the mock response only has one event?
Hi @hiranya911, I believe I have addressed all of your comments with my latest commits, except the part about the closing of the stream. I wrote a simple python code to just setup stream and close it and record the timing. It returned within 100 ms. But somehow when executing the code line by line manually in python shell, it would take over 10 seconds to close the stream (aka time taken for python to return control to user). It will take me some time to debug this since I don't know much about requests module's internals. If you can shed some light on it, that would be great. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good. I just had one comment for you.
We can investigate the socket closing issue later. Once we get the internal approval for this, I can merge it, but I'll probably make a few more changes based on the feedback we get.
firebase_admin/_sseclient.py
Outdated
last_id: optional id | ||
retry: the interval in ms | ||
**kwargs: extra kwargs will be sent to requests.get | ||
""" | ||
self.should_connect = True | ||
self.url = url | ||
self.last_id = last_id | ||
self.retry = retry | ||
self.running = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this as it gets set to False below.
+1 |
@the-c0d3r the internal review process for this is chugging along. I will have an update for you next week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Please sync this PR with the latest master when you get a chance.
Hi @hiranya911 thanks for the update. I have synced the PR with the latest master. If there are any further tasks, please let me know. |
@the-c0d3r I have some feedback. Once they are implemented, I can merge this PR:
Let me know if you have any questions. |
…emoved 'stream_id' and added more documentation
Hi, I think I have addressed those feedbacks. Please take a look. |
Thanks @the-c0d3r. This looks pretty solid. I'll merge it shortly. I'll intend to do a bit of work on top it over the next few days, before release. |
Upon taking another look, this might have a problem. I don't see how the credentials are passed into the underlying session. |
Ok, managed to fix the issue. I'll send a separate PR with the fix. |
Hi,
This PR's original intent is to add stream listener function to fire a callback function when changes in the database is detected. It is a revival of this #50 1-year-old PR and also based on the code contributed by @rizasif and @Aqsa-K in that PR. This PR is mainly to serve 3 purposes.
For now, I have completed the first and second of the above. Please advise me on how to properly write the test in PyTest. I tested the code manually by defining callback and manually modifiying data in my firebase demo project. And it seems to be fired everytime I made changes. I read through the existing test suite and I cannot think of a way to write into it yet.
Changes:
lint.sh
: Ignore directive for "protected-access"firebase_admin/sseclient.py
: SSE client modulefirebase_admin/db.py
: Streaming functionalitydatabase.reference('/xxx').stream(callbackFunc)
Regards