Compare commits

...

9 Commits

2 changed files with 160 additions and 168 deletions

View File

@ -3,111 +3,166 @@ title: Reddit
author: @nathanwindisch
author_url: https://git.wnd.sh/owui-tools/reddit
funding_url: https://patreon.com/NathanWindisch
version: 0.0.1
version: 0.0.4
changelog:
- 0.0.1 - Initial upload to openwebui community.
- 0.0.2 - Renamed from "Reddit Feeds" to just "Reddit".
- 0.0.3 - Updated author_url in docstring to point to
git repo.
- 0.0.4 - Updated to use a class instead of a series of
functions (RedditPage), and merged the user
and subreddit functions into a single method
(RedditPage#get_page).
"""
import re
import json
import requests
from enum import Enum
from typing import Awaitable, Callable
from pydantic import BaseModel, Field
from requests.models import Response
class RedditPageType(Enum):
SUBREDDIT = "r"
USER = "u"
SUBREDDIT_COMMENTS = "r"
USER_COMMENTS = "u"
class RedditPageData:
def __init__(self, uri: str = "", posts: list = [], comments: list = [], after: str = ""):
self.uri = uri
self.posts = posts
self.comments = comments
self.after = after
def __str__(self): return json.dumps({ "uri": self.uri, "posts": self.posts, "comments": self.comments, "after": self.after })
def parse_reddit_page(response: Response):
data = json.loads(response.content)
output = []
if "data" not in data: return output
if "children" not in data["data"]: return output
for item in data["data"]["children"]: output.append(item)
return output
class RedditPage:
def __init__(self, id: str, page_type: RedditPageType, after: str | None = None):
self.base_uri = "https://old.reddit.com"
self.id = id
self.page_type = page_type
self.after = ""
self.children = []
self.posts = []
self.comments = []
self.after = after
def __str__(self): return json.dumps(RedditPageData(uri=self.get_uri(), posts=self.posts, comments=self.comments, after=self.after))
def get_uri(self):
uri = f"{self.base_uri}/{self.page_type.value}/{self.id}.json"
if self.after: uri += f"?after={self.after}"
return uri
def get_data(self): return { "posts": self.posts, "comments": self.comments, "after": self.after }
def get_page(self):
response = requests.get(self.get_uri())
if not response.ok: return RedditPageData({ "posts": [], "comments": [], "after": "" })
raw_data = json.loads(response.content)
is_comments = self.page_type.name.endswith("_COMMENTS")
if is_comments:
for i in range(0, 1): self.extract_children(raw_data[i])
self.after = None
else:
self.extract_children(raw_data)
try: self.after = raw_data["data"]["after"]
except: None
self.parse_posts()
self.parse_comments()
return RedditPageData(posts=self.posts, comments=self.comments, after=self.after)
def parse_posts(data: list):
posts = []
for item in data:
if item["kind"] != "t3": continue # skip non-post items
item = item["data"]
posts.append({
"id": item["name"],
"title": item["title"],
"description": item["selftext"],
"link": item["url"],
def extract_children(self, data):
if "data" in data and "children" in data["data"]:
for item in data["data"]["children"]: self.children.append(item)
"author_username": item["author"],
"author_id": item["author_fullname"],
"subreddit_name": item["subreddit"],
"subreddit_id": item["subreddit_id"],
"subreddit_subscribers": item["subreddit_subscribers"],
def parse_posts(self):
for item in self.children:
if item["kind"] != "t3": continue # skip non-post items
item = item["data"]
self.posts.append({
# General information
"id": item["name"],
"title": item["title"],
"description": item["selftext"],
"link": item["url"],
"score": item["score"],
"upvotes": item["ups"],
"downvotes": item["downs"],
"upvote_ratio": item["upvote_ratio"],
"total_comments": item["num_comments"],
"total_crossposts": item["num_crossposts"],
"total_awards": item["total_awards_received"],
"domain": item["domain"],
"flair_text": item["link_flair_text"],
"media_embed": item["media_embed"],
# Author & subreddit information
"author_username": item["author"],
"author_id": item["author_fullname"],
"subreddit_name": item["subreddit"],
"subreddit_id": item["subreddit_id"],
"subreddit_subscribers": item["subreddit_subscribers"],
"is_pinned": item["pinned"],
"is_self": item["is_self"],
"is_video": item["is_video"],
"is_media_only": item["media_only"],
"is_over_18": item["over_18"],
"is_edited": item["edited"],
"is_hidden": item["hidden"],
"is_archived": item["archived"],
"is_locked": item["locked"],
"is_quarantined": item["quarantine"],
"is_spoiler": item["spoiler"],
"is_stickied": item["stickied"],
"is_send_replies": item["send_replies"],
# Post information
"score": item["score"],
"upvotes": item["ups"],
"downvotes": item["downs"],
"upvote_ratio": item["upvote_ratio"],
"total_comments": item["num_comments"],
"total_crossposts": item["num_crossposts"],
"total_awards": item["total_awards_received"],
"domain": item["domain"],
"flair_text": item["link_flair_text"],
"media_embed": item["media_embed"],
"published_at": item["created_utc"],
})
return posts
# Post flags
"is_pinned": item["pinned"],
"is_self": item["is_self"],
"is_video": item["is_video"],
"is_media_only": item["media_only"],
"is_over_18": item["over_18"],
"is_edited": item["edited"],
"is_hidden": item["hidden"],
"is_archived": item["archived"],
"is_locked": item["locked"],
"is_quarantined": item["quarantine"],
"is_spoiler": item["spoiler"],
"is_stickied": item["stickied"],
"is_send_replies": item["send_replies"],
"created_at": item["created_utc"],
})
def parse_comments(data: list):
comments = []
for item in data:
if item["kind"] != "t1": continue # skip non-comment items
item = item["data"]
comments.append({
"id": item["name"],
"body": item["body"],
"link": item["permalink"],
"post_id": item["link_id"],
"post_title": item["link_title"],
"post_link": item["link_permalink"],
def parse_comments(self):
for item in self.children:
if item["kind"] != "t1": continue
item = item["data"]
self.comments.append({
# General information
"id": item["name"],
"body": item["body"],
"link": item["permalink"],
"post_id": item["link_id"],
"post_title": item["link_title"],
"post_link": item["link_permalink"],
"author_username": item["author"],
"author_id": item["author_fullname"],
"subreddit_name": item["subreddit"],
"subreddit_id": item["subreddit_id"],
# Author & subreddit information
"author_username": item["author"],
"author_id": item["author_fullname"],
"subreddit_name": item["subreddit"],
"subreddit_id": item["subreddit_id"],
# Comment information
"score": item["score"],
"upvotes": item["ups"],
"downvotes": item["downs"],
"total_comments": item["num_comments"],
"total_awards": item["total_awards_received"],
# Comment flags
"is_edited": item["edited"],
"is_archived": item["archived"],
"is_locked": item["locked"],
"is_quarantined": item["quarantine"],
"is_stickied": item["stickied"],
"is_send_replies": item["send_replies"],
# Comment date
"published_at": item["created_utc"],
})
"score": item["score"],
"upvotes": item["ups"],
"downvotes": item["downs"],
"total_comments": item["num_comments"],
"total_awards": item["total_awards_received"],
"is_edited": item["edited"],
"is_archived": item["archived"],
"is_locked": item["locked"],
"is_quarantined": item["quarantine"],
"is_stickied": item["stickied"],
"is_send_replies": item["send_replies"],
"published_at": item["created_utc"],
})
return comments
class Tools:
@ -119,95 +174,31 @@ class Tools:
description="The user agent to use when making requests to Reddit."
)
async def get_subreddit_feed(
async def get_reddit_feed(
self,
subreddit: str,
id: str,
page_type: RedditPageType,
__event_emitter__: Callable[[dict], Awaitable[None]],
__user__: dict = {},
) -> str:
"""
Get the latest posts from a subreddit, as an array of JSON objects with the following properties: 'id', 'title', 'description', 'link', 'author_username', 'author_id', 'subreddit_name', 'subreddit_id', 'subreddit_subscribers', 'score', 'upvotes', 'downvotes', 'upvote_ratio', 'total_comments', 'total_crossposts', 'total_awards', 'domain', 'flair_text', 'media_embed', 'is_pinned', 'is_self', 'is_video', 'is_media_only', 'is_over_18', 'is_edited', 'is_hidden', 'is_archived', 'is_locked', 'is_quarantined', 'is_spoiler', 'is_stickied', 'is_send_replies', 'published_at'.
:param subreddit: The subreddit to get the latest posts from.
:return: A list of posts with the previously mentioned properties, or an error message.
Retrieves the popular posts from a specific feed, either a /u/username or /r/subreddit feed, or the comments on either.
:param id: The ID of the feed to retrieve, such as a username or a subreddit name. Additionally, a post ID can be appended to either to retrieve comments from a specific post.
:param page_type: The type of page to retrieve, must be 'USER', 'SUBREDDIT', 'USER_COMMENTS' or 'SUBREDDIT_COMMENTS'.
:return: An object containing a list of posts, comments, and the next ID to use under the 'after' key, or an error message.
Note: The 'USER' page_type will retrieve both posts and comments, while the 'SUBREDDIT' page_type will only retrieve posts (unless a post id is provided as well, and the page_type is 'SUBREDDIT_COMMENTS').
"""
headers = { "User-Agent": __user__["valves"].USER_AGENT }
await __event_emitter__({ "data": { "description": f"Starting retrieval for r/{subreddit}'s Reddit Feed...", "status": "in_progress", "done": False }, "type": "status" })
id = id.replace("/r/", "").replace("/u/", "").replace("u/", "").replace("r/", "") # Strip any /r/ or /u/ from the ID
if subreddit == "":
await __event_emitter__({ "data": { "description": f"Error: No subreddit provided.", "status": "complete", "done": True }, "type": "status" })
return "Error: No subreddit provided"
subreddit = subreddit.replace("/r/", "").replace("r/", "")
# This accounts for the type being dropped by OpenWebUI
if not isinstance(page_type, RedditPageType):
try:
page_type = RedditPageType[page_type]
except ValueError:
await __event_emitter__({ "data": { "description": f"Error: Invalid page type '{page_type}', try 'USER', 'SUBREDDIT', 'USER_COMMENTS' or 'SUBREDDIT_COMMENTS'.", "status": "complete", "done": True }, "type": "status" })
return f"Error: Invalid page type '{page_type}', try either 'USER', 'SUBREDDIT', 'USER_COMMENTS' or 'SUBREDDIT_COMMENTS'."
if not re.match(r"^[A-Za-z0-9_]{2,21}$", subreddit):
await __event_emitter__({ "data": { "description": f"Error: Invalid subreddit name '{subreddit}' (either too long or two short).", "status": "complete", "done": True }, "type": "status" })
return "Error: Invalid subreddit name"
try:
response = requests.get(f"https://reddit.com/r/{subreddit}.json", headers=headers)
if not response.ok:
await __event_emitter__({ "data": { "description": f"Error: Failed to retrieve r/{subreddit}'s Reddit Feed: {response.status_code}.", "status": "complete", "done": True }, "type": "status" })
return f"Error: {response.status_code}"
else:
output = parse_posts(parse_reddit_page(response))
await __event_emitter__({ "data": { "description": f"Retrieved {len(output)} posts from r/{subreddit}'s Reddit Feed.", "status": "complete", "done": True }, "type": "status" })
return json.dumps(output)
except Exception as e:
await __event_emitter__({ "data": { "description": f"Failed to retrieve any posts from r/{subreddit}'s Reddit Feed: {e}.", "status": "complete", "done": True }, "type": "status" })
return f"Error: {e}"
async def get_user_feed(
self,
username: str,
__event_emitter__: Callable[[dict], Awaitable[None]],
__user__: dict = {},
) -> str:
"""
Get the latest posts from a given user, as a JSON object with an array of 'post' objects with the following properties: 'id', 'title', 'description', 'link', 'author_username', 'author_id', 'subreddit_name', 'subreddit_id', 'subreddit_subscribers', 'score', 'upvotes', 'downvotes', 'upvote_ratio', 'total_comments', 'total_crossposts', 'total_awards', 'domain', 'flair_text', 'media_embed', 'is_pinned', 'is_self', 'is_video', 'is_media_only', 'is_over_18', 'is_edited', 'is_hidden', 'is_archived', 'is_locked', 'is_quarantined', 'is_spoiler', 'is_stickied', 'is_send_replies', 'published_at'.
Additionally, the resultant object will also contain an array of 'comment' objects with the following properties: 'id', 'body', 'link', 'post_id', 'post_title', 'post_link', 'author_id', 'post_author_username', 'subreddit_name', 'subreddit_id', 'subreddit_subscribers', 'score', 'upvotes', 'downvotes', 'total_comments', 'total_awards', 'is_edited', 'is_archived', 'is_locked', 'is_quarantined', 'is_stickied', 'is_send_replies', 'published_at'.
:param username: The username to get the latest posts from.
:return: A object with list of posts and a list of comments (both with the previously mentioned properties), or an error message.
"""
headers = { "User-Agent": __user__["valves"].USER_AGENT }
await __event_emitter__({ "data": { "description": f"Starting retrieval for u/{username}'s Reddit Feed...", "status": "in_progress", "done": False }, "type": "status" })
if username == "":
await __event_emitter__({ "data": { "description": f"Error: No username provided.", "status": "complete", "done": True }, "type": "status" })
return "Error: No username provided."
username = username.replace("/u/", "").replace("u/", "")
if not re.match(r"^[A-Za-z0-9_]{3,20}$", username):
await __event_emitter__({ "data": { "description": f"Error: Invalid username '{username}' (either too long or two short).", "status": "complete", "done": True }, "type": "status" })
return "Error: Invalid username."
try:
response = requests.get(f"https://reddit.com/u/{username}.json", headers=headers)
if not response.ok:
await __event_emitter__({ "data": { "description": f"Error: Failed to retrieve u/{username}'s Reddit Feed: {response.status_code}.", "status": "complete", "done": True }, "type": "status" })
return f"Error: {response.status_code}"
else:
page = parse_reddit_page(response) # user pages can have both posts and comments.
posts = parse_posts(page)
comments = parse_comments(page)
await __event_emitter__({ "data": { "description": f"Retrieved {len(posts)} posts and {len(comments)} comments from u/{username}'s Reddit Feed.", "status": "complete", "done": True }, "type": "status" })
return json.dumps({ "posts": posts, "comments": comments })
except Exception as e:
await __event_emitter__({ "data": { "description": f"Failed to retrieve any posts from u/{username}'s Reddit Feed: {e}.", "status": "complete", "done": True }, "type": "status" })
return f"Error: {e}"
async def main():
__user__ = {
"valves": Tools.UserValves(
USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
)
}
async def mock_event_emitter(event: dict): print("Event Emitted:", event["data"])
await Tools().get_subreddit_feed("sysadmin", mock_event_emitter, __user__)
await Tools().get_user_feed("natfan", mock_event_emitter, __user__)
import asyncio
if __name__ == "__main__": asyncio.run(main())
await __event_emitter__({ "data": { "description": f"Starting retrieval for {page_type.value}/{id}...", "status": "in_progress", "done": False }, "type": "status" })
page = RedditPage(id, page_type).get_page()
await __event_emitter__({ "data": { "description": f"Retrieved {len(page.posts)} posts and {len(page.comments)} comments from {page_type.value}/{id}.", "status": "complete", "done": True }, "type": "status" })
return str(page)

View File

@ -2,7 +2,7 @@
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))
# Unfortunately, we don't get any type hinting, but this does work :)
from reddit import Tools
from reddit import RedditPageType, Tools
async def main():
__user__ = {
@ -11,8 +11,9 @@ async def main():
)
}
async def mock_event_emitter(event: dict): print("Event Emitted:", event["data"])
await Tools().get_subreddit_feed("sysadmin", mock_event_emitter, __user__)
await Tools().get_user_feed("NathanWindisch", mock_event_emitter, __user__)
await Tools().get_reddit_feed("sysadmin", RedditPageType.SUBREDDIT, mock_event_emitter, __user__)
await Tools().get_reddit_feed("sysadmin/1eb73j6", RedditPageType.SUBREDDIT_COMMENTS, mock_event_emitter, __user__)
await Tools().get_reddit_feed("natfan", RedditPageType.USER, mock_event_emitter, __user__)
import asyncio
if __name__ == "__main__": asyncio.run(main())