Compare commits
9 Commits
7999cb488f
...
4fea6ee8d6
Author | SHA1 | Date | |
---|---|---|---|
4fea6ee8d6 | |||
4bcc5ab62c | |||
194ee6272d | |||
c656a67961 | |||
8e6c1549df | |||
5c9e36de31 | |||
8ccd04fca2 | |||
6072d70aea | |||
3020e015b5 |
321
src/reddit.py
321
src/reddit.py
@ -3,111 +3,166 @@ title: Reddit
|
||||
author: @nathanwindisch
|
||||
author_url: https://git.wnd.sh/owui-tools/reddit
|
||||
funding_url: https://patreon.com/NathanWindisch
|
||||
version: 0.0.1
|
||||
version: 0.0.4
|
||||
changelog:
|
||||
- 0.0.1 - Initial upload to openwebui community.
|
||||
- 0.0.2 - Renamed from "Reddit Feeds" to just "Reddit".
|
||||
- 0.0.3 - Updated author_url in docstring to point to
|
||||
git repo.
|
||||
- 0.0.4 - Updated to use a class instead of a series of
|
||||
functions (RedditPage), and merged the user
|
||||
and subreddit functions into a single method
|
||||
(RedditPage#get_page).
|
||||
"""
|
||||
|
||||
|
||||
import re
|
||||
import json
|
||||
import requests
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Callable
|
||||
from pydantic import BaseModel, Field
|
||||
from requests.models import Response
|
||||
|
||||
class RedditPageType(Enum):
|
||||
SUBREDDIT = "r"
|
||||
USER = "u"
|
||||
SUBREDDIT_COMMENTS = "r"
|
||||
USER_COMMENTS = "u"
|
||||
|
||||
class RedditPageData:
|
||||
def __init__(self, uri: str = "", posts: list = [], comments: list = [], after: str = ""):
|
||||
self.uri = uri
|
||||
self.posts = posts
|
||||
self.comments = comments
|
||||
self.after = after
|
||||
def __str__(self): return json.dumps({ "uri": self.uri, "posts": self.posts, "comments": self.comments, "after": self.after })
|
||||
|
||||
|
||||
def parse_reddit_page(response: Response):
|
||||
data = json.loads(response.content)
|
||||
output = []
|
||||
if "data" not in data: return output
|
||||
if "children" not in data["data"]: return output
|
||||
for item in data["data"]["children"]: output.append(item)
|
||||
return output
|
||||
class RedditPage:
|
||||
def __init__(self, id: str, page_type: RedditPageType, after: str | None = None):
|
||||
self.base_uri = "https://old.reddit.com"
|
||||
self.id = id
|
||||
self.page_type = page_type
|
||||
self.after = ""
|
||||
self.children = []
|
||||
self.posts = []
|
||||
self.comments = []
|
||||
self.after = after
|
||||
|
||||
def __str__(self): return json.dumps(RedditPageData(uri=self.get_uri(), posts=self.posts, comments=self.comments, after=self.after))
|
||||
def get_uri(self):
|
||||
uri = f"{self.base_uri}/{self.page_type.value}/{self.id}.json"
|
||||
if self.after: uri += f"?after={self.after}"
|
||||
return uri
|
||||
def get_data(self): return { "posts": self.posts, "comments": self.comments, "after": self.after }
|
||||
def get_page(self):
|
||||
response = requests.get(self.get_uri())
|
||||
if not response.ok: return RedditPageData({ "posts": [], "comments": [], "after": "" })
|
||||
raw_data = json.loads(response.content)
|
||||
is_comments = self.page_type.name.endswith("_COMMENTS")
|
||||
|
||||
if is_comments:
|
||||
for i in range(0, 1): self.extract_children(raw_data[i])
|
||||
self.after = None
|
||||
else:
|
||||
self.extract_children(raw_data)
|
||||
try: self.after = raw_data["data"]["after"]
|
||||
except: None
|
||||
|
||||
self.parse_posts()
|
||||
self.parse_comments()
|
||||
return RedditPageData(posts=self.posts, comments=self.comments, after=self.after)
|
||||
|
||||
|
||||
def parse_posts(data: list):
|
||||
posts = []
|
||||
for item in data:
|
||||
if item["kind"] != "t3": continue # skip non-post items
|
||||
item = item["data"]
|
||||
posts.append({
|
||||
"id": item["name"],
|
||||
"title": item["title"],
|
||||
"description": item["selftext"],
|
||||
"link": item["url"],
|
||||
def extract_children(self, data):
|
||||
if "data" in data and "children" in data["data"]:
|
||||
for item in data["data"]["children"]: self.children.append(item)
|
||||
|
||||
"author_username": item["author"],
|
||||
"author_id": item["author_fullname"],
|
||||
"subreddit_name": item["subreddit"],
|
||||
"subreddit_id": item["subreddit_id"],
|
||||
"subreddit_subscribers": item["subreddit_subscribers"],
|
||||
def parse_posts(self):
|
||||
for item in self.children:
|
||||
if item["kind"] != "t3": continue # skip non-post items
|
||||
item = item["data"]
|
||||
self.posts.append({
|
||||
# General information
|
||||
"id": item["name"],
|
||||
"title": item["title"],
|
||||
"description": item["selftext"],
|
||||
"link": item["url"],
|
||||
|
||||
"score": item["score"],
|
||||
"upvotes": item["ups"],
|
||||
"downvotes": item["downs"],
|
||||
"upvote_ratio": item["upvote_ratio"],
|
||||
"total_comments": item["num_comments"],
|
||||
"total_crossposts": item["num_crossposts"],
|
||||
"total_awards": item["total_awards_received"],
|
||||
"domain": item["domain"],
|
||||
"flair_text": item["link_flair_text"],
|
||||
"media_embed": item["media_embed"],
|
||||
# Author & subreddit information
|
||||
"author_username": item["author"],
|
||||
"author_id": item["author_fullname"],
|
||||
"subreddit_name": item["subreddit"],
|
||||
"subreddit_id": item["subreddit_id"],
|
||||
"subreddit_subscribers": item["subreddit_subscribers"],
|
||||
|
||||
"is_pinned": item["pinned"],
|
||||
"is_self": item["is_self"],
|
||||
"is_video": item["is_video"],
|
||||
"is_media_only": item["media_only"],
|
||||
"is_over_18": item["over_18"],
|
||||
"is_edited": item["edited"],
|
||||
"is_hidden": item["hidden"],
|
||||
"is_archived": item["archived"],
|
||||
"is_locked": item["locked"],
|
||||
"is_quarantined": item["quarantine"],
|
||||
"is_spoiler": item["spoiler"],
|
||||
"is_stickied": item["stickied"],
|
||||
"is_send_replies": item["send_replies"],
|
||||
# Post information
|
||||
"score": item["score"],
|
||||
"upvotes": item["ups"],
|
||||
"downvotes": item["downs"],
|
||||
"upvote_ratio": item["upvote_ratio"],
|
||||
"total_comments": item["num_comments"],
|
||||
"total_crossposts": item["num_crossposts"],
|
||||
"total_awards": item["total_awards_received"],
|
||||
"domain": item["domain"],
|
||||
"flair_text": item["link_flair_text"],
|
||||
"media_embed": item["media_embed"],
|
||||
|
||||
"published_at": item["created_utc"],
|
||||
})
|
||||
return posts
|
||||
# Post flags
|
||||
"is_pinned": item["pinned"],
|
||||
"is_self": item["is_self"],
|
||||
"is_video": item["is_video"],
|
||||
"is_media_only": item["media_only"],
|
||||
"is_over_18": item["over_18"],
|
||||
"is_edited": item["edited"],
|
||||
"is_hidden": item["hidden"],
|
||||
"is_archived": item["archived"],
|
||||
"is_locked": item["locked"],
|
||||
"is_quarantined": item["quarantine"],
|
||||
"is_spoiler": item["spoiler"],
|
||||
"is_stickied": item["stickied"],
|
||||
"is_send_replies": item["send_replies"],
|
||||
|
||||
"created_at": item["created_utc"],
|
||||
})
|
||||
|
||||
def parse_comments(data: list):
|
||||
comments = []
|
||||
for item in data:
|
||||
if item["kind"] != "t1": continue # skip non-comment items
|
||||
item = item["data"]
|
||||
comments.append({
|
||||
"id": item["name"],
|
||||
"body": item["body"],
|
||||
"link": item["permalink"],
|
||||
"post_id": item["link_id"],
|
||||
"post_title": item["link_title"],
|
||||
"post_link": item["link_permalink"],
|
||||
def parse_comments(self):
|
||||
for item in self.children:
|
||||
if item["kind"] != "t1": continue
|
||||
item = item["data"]
|
||||
self.comments.append({
|
||||
# General information
|
||||
"id": item["name"],
|
||||
"body": item["body"],
|
||||
"link": item["permalink"],
|
||||
"post_id": item["link_id"],
|
||||
"post_title": item["link_title"],
|
||||
"post_link": item["link_permalink"],
|
||||
|
||||
"author_username": item["author"],
|
||||
"author_id": item["author_fullname"],
|
||||
"subreddit_name": item["subreddit"],
|
||||
"subreddit_id": item["subreddit_id"],
|
||||
# Author & subreddit information
|
||||
"author_username": item["author"],
|
||||
"author_id": item["author_fullname"],
|
||||
"subreddit_name": item["subreddit"],
|
||||
"subreddit_id": item["subreddit_id"],
|
||||
|
||||
# Comment information
|
||||
"score": item["score"],
|
||||
"upvotes": item["ups"],
|
||||
"downvotes": item["downs"],
|
||||
"total_comments": item["num_comments"],
|
||||
"total_awards": item["total_awards_received"],
|
||||
|
||||
# Comment flags
|
||||
"is_edited": item["edited"],
|
||||
"is_archived": item["archived"],
|
||||
"is_locked": item["locked"],
|
||||
"is_quarantined": item["quarantine"],
|
||||
"is_stickied": item["stickied"],
|
||||
"is_send_replies": item["send_replies"],
|
||||
|
||||
# Comment date
|
||||
"published_at": item["created_utc"],
|
||||
})
|
||||
|
||||
"score": item["score"],
|
||||
"upvotes": item["ups"],
|
||||
"downvotes": item["downs"],
|
||||
"total_comments": item["num_comments"],
|
||||
"total_awards": item["total_awards_received"],
|
||||
"is_edited": item["edited"],
|
||||
"is_archived": item["archived"],
|
||||
"is_locked": item["locked"],
|
||||
"is_quarantined": item["quarantine"],
|
||||
"is_stickied": item["stickied"],
|
||||
"is_send_replies": item["send_replies"],
|
||||
"published_at": item["created_utc"],
|
||||
})
|
||||
return comments
|
||||
|
||||
|
||||
class Tools:
|
||||
@ -119,95 +174,31 @@ class Tools:
|
||||
description="The user agent to use when making requests to Reddit."
|
||||
)
|
||||
|
||||
async def get_subreddit_feed(
|
||||
async def get_reddit_feed(
|
||||
self,
|
||||
subreddit: str,
|
||||
id: str,
|
||||
page_type: RedditPageType,
|
||||
__event_emitter__: Callable[[dict], Awaitable[None]],
|
||||
__user__: dict = {},
|
||||
) -> str:
|
||||
"""
|
||||
Get the latest posts from a subreddit, as an array of JSON objects with the following properties: 'id', 'title', 'description', 'link', 'author_username', 'author_id', 'subreddit_name', 'subreddit_id', 'subreddit_subscribers', 'score', 'upvotes', 'downvotes', 'upvote_ratio', 'total_comments', 'total_crossposts', 'total_awards', 'domain', 'flair_text', 'media_embed', 'is_pinned', 'is_self', 'is_video', 'is_media_only', 'is_over_18', 'is_edited', 'is_hidden', 'is_archived', 'is_locked', 'is_quarantined', 'is_spoiler', 'is_stickied', 'is_send_replies', 'published_at'.
|
||||
:param subreddit: The subreddit to get the latest posts from.
|
||||
:return: A list of posts with the previously mentioned properties, or an error message.
|
||||
Retrieves the popular posts from a specific feed, either a /u/username or /r/subreddit feed, or the comments on either.
|
||||
:param id: The ID of the feed to retrieve, such as a username or a subreddit name. Additionally, a post ID can be appended to either to retrieve comments from a specific post.
|
||||
:param page_type: The type of page to retrieve, must be 'USER', 'SUBREDDIT', 'USER_COMMENTS' or 'SUBREDDIT_COMMENTS'.
|
||||
:return: An object containing a list of posts, comments, and the next ID to use under the 'after' key, or an error message.
|
||||
Note: The 'USER' page_type will retrieve both posts and comments, while the 'SUBREDDIT' page_type will only retrieve posts (unless a post id is provided as well, and the page_type is 'SUBREDDIT_COMMENTS').
|
||||
"""
|
||||
headers = { "User-Agent": __user__["valves"].USER_AGENT }
|
||||
await __event_emitter__({ "data": { "description": f"Starting retrieval for r/{subreddit}'s Reddit Feed...", "status": "in_progress", "done": False }, "type": "status" })
|
||||
id = id.replace("/r/", "").replace("/u/", "").replace("u/", "").replace("r/", "") # Strip any /r/ or /u/ from the ID
|
||||
|
||||
if subreddit == "":
|
||||
await __event_emitter__({ "data": { "description": f"Error: No subreddit provided.", "status": "complete", "done": True }, "type": "status" })
|
||||
return "Error: No subreddit provided"
|
||||
subreddit = subreddit.replace("/r/", "").replace("r/", "")
|
||||
# This accounts for the type being dropped by OpenWebUI
|
||||
if not isinstance(page_type, RedditPageType):
|
||||
try:
|
||||
page_type = RedditPageType[page_type]
|
||||
except ValueError:
|
||||
await __event_emitter__({ "data": { "description": f"Error: Invalid page type '{page_type}', try 'USER', 'SUBREDDIT', 'USER_COMMENTS' or 'SUBREDDIT_COMMENTS'.", "status": "complete", "done": True }, "type": "status" })
|
||||
return f"Error: Invalid page type '{page_type}', try either 'USER', 'SUBREDDIT', 'USER_COMMENTS' or 'SUBREDDIT_COMMENTS'."
|
||||
|
||||
if not re.match(r"^[A-Za-z0-9_]{2,21}$", subreddit):
|
||||
await __event_emitter__({ "data": { "description": f"Error: Invalid subreddit name '{subreddit}' (either too long or two short).", "status": "complete", "done": True }, "type": "status" })
|
||||
return "Error: Invalid subreddit name"
|
||||
|
||||
try:
|
||||
response = requests.get(f"https://reddit.com/r/{subreddit}.json", headers=headers)
|
||||
|
||||
if not response.ok:
|
||||
await __event_emitter__({ "data": { "description": f"Error: Failed to retrieve r/{subreddit}'s Reddit Feed: {response.status_code}.", "status": "complete", "done": True }, "type": "status" })
|
||||
return f"Error: {response.status_code}"
|
||||
else:
|
||||
output = parse_posts(parse_reddit_page(response))
|
||||
await __event_emitter__({ "data": { "description": f"Retrieved {len(output)} posts from r/{subreddit}'s Reddit Feed.", "status": "complete", "done": True }, "type": "status" })
|
||||
return json.dumps(output)
|
||||
except Exception as e:
|
||||
await __event_emitter__({ "data": { "description": f"Failed to retrieve any posts from r/{subreddit}'s Reddit Feed: {e}.", "status": "complete", "done": True }, "type": "status" })
|
||||
return f"Error: {e}"
|
||||
|
||||
|
||||
async def get_user_feed(
|
||||
self,
|
||||
username: str,
|
||||
__event_emitter__: Callable[[dict], Awaitable[None]],
|
||||
__user__: dict = {},
|
||||
) -> str:
|
||||
"""
|
||||
Get the latest posts from a given user, as a JSON object with an array of 'post' objects with the following properties: 'id', 'title', 'description', 'link', 'author_username', 'author_id', 'subreddit_name', 'subreddit_id', 'subreddit_subscribers', 'score', 'upvotes', 'downvotes', 'upvote_ratio', 'total_comments', 'total_crossposts', 'total_awards', 'domain', 'flair_text', 'media_embed', 'is_pinned', 'is_self', 'is_video', 'is_media_only', 'is_over_18', 'is_edited', 'is_hidden', 'is_archived', 'is_locked', 'is_quarantined', 'is_spoiler', 'is_stickied', 'is_send_replies', 'published_at'.
|
||||
Additionally, the resultant object will also contain an array of 'comment' objects with the following properties: 'id', 'body', 'link', 'post_id', 'post_title', 'post_link', 'author_id', 'post_author_username', 'subreddit_name', 'subreddit_id', 'subreddit_subscribers', 'score', 'upvotes', 'downvotes', 'total_comments', 'total_awards', 'is_edited', 'is_archived', 'is_locked', 'is_quarantined', 'is_stickied', 'is_send_replies', 'published_at'.
|
||||
:param username: The username to get the latest posts from.
|
||||
:return: A object with list of posts and a list of comments (both with the previously mentioned properties), or an error message.
|
||||
"""
|
||||
headers = { "User-Agent": __user__["valves"].USER_AGENT }
|
||||
await __event_emitter__({ "data": { "description": f"Starting retrieval for u/{username}'s Reddit Feed...", "status": "in_progress", "done": False }, "type": "status" })
|
||||
|
||||
if username == "":
|
||||
await __event_emitter__({ "data": { "description": f"Error: No username provided.", "status": "complete", "done": True }, "type": "status" })
|
||||
return "Error: No username provided."
|
||||
username = username.replace("/u/", "").replace("u/", "")
|
||||
|
||||
if not re.match(r"^[A-Za-z0-9_]{3,20}$", username):
|
||||
await __event_emitter__({ "data": { "description": f"Error: Invalid username '{username}' (either too long or two short).", "status": "complete", "done": True }, "type": "status" })
|
||||
return "Error: Invalid username."
|
||||
|
||||
try:
|
||||
response = requests.get(f"https://reddit.com/u/{username}.json", headers=headers)
|
||||
|
||||
if not response.ok:
|
||||
await __event_emitter__({ "data": { "description": f"Error: Failed to retrieve u/{username}'s Reddit Feed: {response.status_code}.", "status": "complete", "done": True }, "type": "status" })
|
||||
return f"Error: {response.status_code}"
|
||||
else:
|
||||
page = parse_reddit_page(response) # user pages can have both posts and comments.
|
||||
posts = parse_posts(page)
|
||||
comments = parse_comments(page)
|
||||
await __event_emitter__({ "data": { "description": f"Retrieved {len(posts)} posts and {len(comments)} comments from u/{username}'s Reddit Feed.", "status": "complete", "done": True }, "type": "status" })
|
||||
return json.dumps({ "posts": posts, "comments": comments })
|
||||
except Exception as e:
|
||||
await __event_emitter__({ "data": { "description": f"Failed to retrieve any posts from u/{username}'s Reddit Feed: {e}.", "status": "complete", "done": True }, "type": "status" })
|
||||
return f"Error: {e}"
|
||||
|
||||
|
||||
|
||||
async def main():
|
||||
__user__ = {
|
||||
"valves": Tools.UserValves(
|
||||
USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
|
||||
)
|
||||
}
|
||||
async def mock_event_emitter(event: dict): print("Event Emitted:", event["data"])
|
||||
await Tools().get_subreddit_feed("sysadmin", mock_event_emitter, __user__)
|
||||
await Tools().get_user_feed("natfan", mock_event_emitter, __user__)
|
||||
|
||||
import asyncio
|
||||
if __name__ == "__main__": asyncio.run(main())
|
||||
await __event_emitter__({ "data": { "description": f"Starting retrieval for {page_type.value}/{id}...", "status": "in_progress", "done": False }, "type": "status" })
|
||||
page = RedditPage(id, page_type).get_page()
|
||||
await __event_emitter__({ "data": { "description": f"Retrieved {len(page.posts)} posts and {len(page.comments)} comments from {page_type.value}/{id}.", "status": "complete", "done": True }, "type": "status" })
|
||||
return str(page)
|
@ -2,7 +2,7 @@
|
||||
import sys, os
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))
|
||||
# Unfortunately, we don't get any type hinting, but this does work :)
|
||||
from reddit import Tools
|
||||
from reddit import RedditPageType, Tools
|
||||
|
||||
async def main():
|
||||
__user__ = {
|
||||
@ -11,8 +11,9 @@ async def main():
|
||||
)
|
||||
}
|
||||
async def mock_event_emitter(event: dict): print("Event Emitted:", event["data"])
|
||||
await Tools().get_subreddit_feed("sysadmin", mock_event_emitter, __user__)
|
||||
await Tools().get_user_feed("NathanWindisch", mock_event_emitter, __user__)
|
||||
await Tools().get_reddit_feed("sysadmin", RedditPageType.SUBREDDIT, mock_event_emitter, __user__)
|
||||
await Tools().get_reddit_feed("sysadmin/1eb73j6", RedditPageType.SUBREDDIT_COMMENTS, mock_event_emitter, __user__)
|
||||
await Tools().get_reddit_feed("natfan", RedditPageType.USER, mock_event_emitter, __user__)
|
||||
|
||||
import asyncio
|
||||
if __name__ == "__main__": asyncio.run(main())
|
Loading…
Reference in New Issue
Block a user